Files
chaturbate-dvr/channel/channel_record.go
2025-06-25 06:04:49 +08:00

135 lines
4.0 KiB
Go

package channel
import (
"context"
"errors"
"fmt"
"time"
"github.com/avast/retry-go/v4"
"github.com/teacat/chaturbate-dvr/chaturbate"
"github.com/teacat/chaturbate-dvr/internal"
"github.com/teacat/chaturbate-dvr/server"
)
// Monitor starts monitoring the channel for live streams and records them.
func (ch *Channel) Monitor() {
client := chaturbate.NewClient()
ch.Info("starting to record `%s`", ch.Config.Username)
// Create a new context with a cancel function,
// the CancelFunc will be stored in the channel's CancelFunc field
// and will be called by `Pause` or `Stop` functions
ctx, _ := ch.WithCancel(context.Background())
var err error
for {
if err = ctx.Err(); err != nil {
break
}
pipeline := func() error {
return ch.RecordStream(ctx, client)
}
onRetry := func(_ uint, err error) {
ch.UpdateOnlineStatus(false)
if errors.Is(err, internal.ErrChannelOffline) || errors.Is(err, internal.ErrPrivateStream) {
ch.Info("channel is offline or private, try again in %d min(s)", server.Config.Interval)
} else if errors.Is(err, internal.ErrCloudflareBlocked) {
ch.Info("channel was blocked by Cloudflare; try with `-cookies` and `-user-agent`? try again in %d min(s)", server.Config.Interval)
} else if errors.Is(err, context.Canceled) {
// ...
} else {
ch.Error("on retry: %s: retrying in %d min(s)", err.Error(), server.Config.Interval)
}
}
if err = retry.Do(
pipeline,
retry.Context(ctx),
retry.Attempts(0),
retry.Delay(time.Duration(server.Config.Interval)*time.Minute),
retry.DelayType(retry.FixedDelay),
retry.OnRetry(onRetry),
); err != nil {
break
}
}
// Always cleanup when monitor exits, regardless of error
if err := ch.Cleanup(); err != nil {
ch.Error("cleanup on monitor exit: %s", err.Error())
}
// Log error if it's not a context cancellation
if err != nil && !errors.Is(err, context.Canceled) {
ch.Error("record stream: %s", err.Error())
}
}
// Update sends an update signal to the channel's update channel.
// This notifies the Server-sent Event to boradcast the channel information to the client.
func (ch *Channel) Update() {
ch.UpdateCh <- true
}
// RecordStream records the stream of the channel using the provided client.
// It retrieves the stream information and starts watching the segments.
func (ch *Channel) RecordStream(ctx context.Context, client *chaturbate.Client) error {
stream, err := client.GetStream(ctx, ch.Config.Username)
if err != nil {
return fmt.Errorf("get stream: %w", err)
}
ch.StreamedAt = time.Now().Unix()
ch.Sequence = 0
if err := ch.NextFile(); err != nil {
return fmt.Errorf("next file: %w", err)
}
// Ensure file is cleaned up when this function exits in any case
defer func() {
if err := ch.Cleanup(); err != nil {
ch.Error("cleanup on record stream exit: %s", err.Error())
}
}()
playlist, err := stream.GetPlaylist(ctx, ch.Config.Resolution, ch.Config.Framerate)
if err != nil {
return fmt.Errorf("get playlist: %w", err)
}
ch.UpdateOnlineStatus(true) // Update online status after `GetPlaylist` is OK
ch.Info("stream quality - resolution %dp (target: %dp), framerate %dfps (target: %dfps)", playlist.Resolution, ch.Config.Resolution, playlist.Framerate, ch.Config.Framerate)
return playlist.WatchSegments(ctx, ch.HandleSegment)
}
// HandleSegment processes and writes segment data to a file.
func (ch *Channel) HandleSegment(b []byte, duration float64) error {
if ch.Config.IsPaused {
return retry.Unrecoverable(internal.ErrPaused)
}
n, err := ch.File.Write(b)
if err != nil {
return fmt.Errorf("write file: %w", err)
}
ch.Filesize += n
ch.Duration += duration
ch.Info("duration: %s, filesize: %s", internal.FormatDuration(ch.Duration), internal.FormatFilesize(ch.Filesize))
// Send an SSE update to update the view
ch.Update()
if ch.ShouldSwitchFile() {
if err := ch.NextFile(); err != nil {
return fmt.Errorf("next file: %w", err)
}
ch.Info("max filesize or duration exceeded, new file created: %s", ch.File.Name())
return nil
}
return nil
}