chaturbate-dvr/manager/manager.go
2025-07-28 21:33:38 +02:00

201 lines
4.7 KiB
Go

package manager
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"sort"
"strings"
"sync"
"github.com/r3labs/sse/v2"
"github.com/teacat/chaturbate-dvr/channel"
"github.com/teacat/chaturbate-dvr/entity"
"github.com/teacat/chaturbate-dvr/router/view"
)
// Manager is responsible for managing channels and their states.
type Manager struct {
Channels sync.Map
SSE *sse.Server
}
// New initializes a new Manager instance with an SSE server.
func New() (*Manager, error) {
server := sse.New()
server.SplitData = true
updateStream := server.CreateStream("updates")
updateStream.AutoReplay = false
return &Manager{
SSE: server,
}, nil
}
// SaveConfig saves the current channels and state to a JSON file.
func (m *Manager) SaveConfig() error {
var config []*entity.ChannelConfig
m.Channels.Range(func(key, value any) bool {
config = append(config, value.(*channel.Channel).Config)
return true
})
b, err := json.MarshalIndent(config, "", " ")
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if err := os.MkdirAll("./conf", 0777); err != nil {
return fmt.Errorf("mkdir all conf: %w", err)
}
if err := os.WriteFile("./conf/channels.json", b, 0777); err != nil {
return fmt.Errorf("write file: %w", err)
}
return nil
}
// LoadConfig loads the channels from JSON and starts them.
func (m *Manager) LoadConfig() error {
b, err := os.ReadFile("./conf/channels.json")
if os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("read file: %w", err)
}
var config []*entity.ChannelConfig
if err := json.Unmarshal(b, &config); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
for i, conf := range config {
ch := channel.New(conf)
m.Channels.Store(conf.Username, ch)
if ch.Config.IsPaused {
ch.Info("channel was paused, waiting for resume")
continue
}
go ch.Resume(i)
}
return nil
}
// CreateChannel starts monitoring an M3U8 stream
func (m *Manager) CreateChannel(conf *entity.ChannelConfig, shouldSave bool) error {
conf.Sanitize()
ch := channel.New(conf)
// prevent duplicate channels
_, ok := m.Channels.Load(conf.Username)
if ok {
return fmt.Errorf("channel %s already exists", conf.Username)
}
m.Channels.Store(conf.Username, ch)
go ch.Resume(0)
if shouldSave {
if err := m.SaveConfig(); err != nil {
return fmt.Errorf("save config: %w", err)
}
}
return nil
}
// StopChannel stops the channel.
func (m *Manager) StopChannel(username string) error {
thing, ok := m.Channels.Load(username)
if !ok {
return nil
}
thing.(*channel.Channel).Stop()
m.Channels.Delete(username)
if err := m.SaveConfig(); err != nil {
return fmt.Errorf("save config: %w", err)
}
return nil
}
// PauseChannel pauses the channel.
func (m *Manager) PauseChannel(username string) error {
thing, ok := m.Channels.Load(username)
if !ok {
return nil
}
thing.(*channel.Channel).Pause()
if err := m.SaveConfig(); err != nil {
return fmt.Errorf("save config: %w", err)
}
return nil
}
// ResumeChannel resumes the channel.
func (m *Manager) ResumeChannel(username string) error {
thing, ok := m.Channels.Load(username)
if !ok {
return nil
}
thing.(*channel.Channel).Resume(0)
if err := m.SaveConfig(); err != nil {
return fmt.Errorf("save config: %w", err)
}
return nil
}
// ChannelInfo returns a list of channel information for the web UI.
func (m *Manager) ChannelInfo() []*entity.ChannelInfo {
var channels []*entity.ChannelInfo
// Iterate over the channels and append their information to the slice
m.Channels.Range(func(key, value any) bool {
channels = append(channels, value.(*channel.Channel).ExportInfo())
return true
})
sort.Slice(channels, func(i, j int) bool {
// First priority: Online channels
if channels[i].IsOnline != channels[j].IsOnline {
return channels[i].IsOnline
}
// Second priority: Alphabetical order by username
return strings.ToLower(channels[i].Username) < strings.ToLower(channels[j].Username)
})
return channels
}
// Publish sends an SSE event to the specified channel.
func (m *Manager) Publish(evt entity.Event, info *entity.ChannelInfo) {
switch evt {
case entity.EventUpdate:
var b bytes.Buffer
if err := view.InfoTpl.ExecuteTemplate(&b, "channel_info", info); err != nil {
fmt.Println("Error executing template:", err)
return
}
m.SSE.Publish("updates", &sse.Event{
Event: []byte(info.Username + "-info"),
Data: b.Bytes(),
})
case entity.EventLog:
m.SSE.Publish("updates", &sse.Event{
Event: []byte(info.Username + "-log"),
Data: []byte(strings.Join(info.Logs, "\n")),
})
}
}
// Subscriber handles SSE subscriptions for the specified channel.
func (m *Manager) Subscriber(w http.ResponseWriter, r *http.Request) {
m.SSE.ServeHTTP(w, r)
}