diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4ebf2f8 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +./videos \ No newline at end of file diff --git a/.old/servicex/chaturbate.go b/.old/servicex/chaturbate.go deleted file mode 100644 index 7825c4c..0000000 --- a/.old/servicex/chaturbate.go +++ /dev/null @@ -1,67 +0,0 @@ -package service - -import "errors" - -var ( - ErrChannelNotFound = errors.New("channel not found") - ErrChannelExists = errors.New("channel already exists") - ErrChannelNotPaused = errors.New("channel not paused") - ErrChannelIsPaused = errors.New("channel is paused") - ErrListenNotFound = errors.New("listen not found") -) - -const ( - ResolutionFallbackUpscale = "up" - ResolutionFallbackDownscale = "down" -) - -type Chaturbate interface { - GetChannel(username string) (ChaturbateChannel, error) - CreateChannel(config *ChaturbateConfig) error - DeleteChannel(username string) error - PauseChannel(username string) error - ResumeChannel(username string) error - ListChannels() ([]ChaturbateChannel, error) - ListenUpdate() (<-chan *Update, string) - StopListenUpdate(id string) error -} - -type ChaturbateChannel interface { - Username() string - ChannelURL() string - FilenamePattern() string - Framerate() int - Resolution() int - ResolutionFallback() string - LastStreamedAt() string - Filename() string - SegmentDuration() int - SplitDuration() int - SegmentFilesize() int - SplitFilesize() int - IsOnline() bool - IsPaused() bool - Logs() []string -} - -type ChaturbateConfig struct { - Username string - FilenamePattern string - Framerate int - Resolution int - ResolutionFallback string - SplitDuration int - SplitFilesize int -} - -type Update struct { - Username string `json:"username"` - Log string `json:"log"` - IsPaused bool `json:"is_paused"` - IsOnline bool `json:"is_online"` - IsStopped bool `json:"is_stopped"` - Filename string `json:"filename"` - LastStreamedAt string `json:"last_streamed_at"` - SegmentDuration string `json:"segment_duration"` - SegmentFilesize string `json:"segment_filesize"` -} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3e61240 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:latest + +WORKDIR /usr/src/app + +COPY go.mod go.sum ./ +RUN go mod download && go mod verify + +COPY . . +RUN go build + +CMD [ "sh", "-c", "./chaturbate-dvr -u $USERNAME -ui no start" ] \ No newline at end of file diff --git a/README.md b/README.md index f8d3c76..4877a4c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,12 @@ -# +# Chaturbate DVR + +The program watches a specified Chaturbate channel and save the stream in real-time when the channel goes online. + +**Warning**: The streaming content on Chaturbate is copyrighted, you should not copy, share, distribute the content. (for more information, check [DMCA](https://www.dmca.com/)) + +## Usage + +The program works for 64-bit macOS, Linux, Windows (or ARM). Just get in the `/bin` folder and find your operating system then execute the program in terminal. ## 📺 Framerate & Resolution / Fallback diff --git a/chaturbate-dvr b/chaturbate-dvr index 3f07b73..d21e399 100644 Binary files a/chaturbate-dvr and b/chaturbate-dvr differ diff --git a/chaturbate/channel.go b/chaturbate/channel.go index fccaf37..b2dc118 100644 --- a/chaturbate/channel.go +++ b/chaturbate/channel.go @@ -1,22 +1,11 @@ package chaturbate import ( - "bytes" - "crypto/tls" - "encoding/json" - "fmt" - "io" - "log" - "net/http" "os" "regexp" - "strconv" "strings" "sync" "time" - - "github.com/grafov/m3u8" - "github.com/samber/lo" ) var ( @@ -40,14 +29,15 @@ type Channel struct { Framerate int Resolution int ResolutionFallback string - SegmentDuration int - SplitDuration int - SegmentFilesize int - SplitFilesize int + SegmentDuration int // Seconds + SplitDuration int // Minutes + SegmentFilesize int // Bytes + SplitFilesize int // MB IsOnline bool IsPaused bool isStopped bool Logs []string + logType logType bufferLock sync.Mutex buffer map[int][]byte @@ -62,400 +52,86 @@ type Channel struct { sessionPattern map[string]any splitIndex int + PauseChannel chan bool UpdateChannel chan *Update ResumeChannel chan bool } // Run func (w *Channel) Run() { + if w.Username == "" { + w.log(logTypeError, "username is empty, use `-u USERNAME` to specify") + return + } + for { if w.IsPaused { + w.log(logTypeInfo, "channel is paused") <-w.ResumeChannel // blocking + w.log(logTypeInfo, "channel is resumed") } if w.isStopped { + w.log(logTypeInfo, "channel is stopped") break } body, err := w.requestChannelBody() if err != nil { - w.log("Error occurred while requesting channel body: %w", err) + w.log(logTypeError, "body request error: %w", err) } if strings.Contains(body, "playlist.m3u8") { w.IsOnline = true w.LastStreamedAt = time.Now().Format("2006-01-02 15:04:05") - w.log("Channel is online.") + w.log(logTypeInfo, "channel is online, start fetching...") if err := w.record(body); err != nil { // blocking - w.log("Error occurred when start recording: %w", err) + w.log(logTypeError, "record error: %w", err) } continue // this excutes when recording is over/interrupted } w.IsOnline = false - w.log("Channel is offline.") + + w.log(logTypeInfo, "channel is offline, check again 1 min later") <-time.After(1 * time.Minute) // 1 minute cooldown to check online status } } -// requestChannelBody -func (w *Channel) requestChannelBody() (string, error) { - transport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: transport} - - resp, err := client.Get(w.ChannelURL) - if err != nil { - return "", fmt.Errorf("client get: %w", err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", fmt.Errorf("read body: %w", err) - } - - return string(body), nil -} - -// record -func (w *Channel) record(body string) error { - w.resetSession() - - if err := w.newFile(); err != nil { - return fmt.Errorf("new file: %w", err) - } - - rootURL, sourceURL, err := w.resolveSource(body) - if err != nil { - return fmt.Errorf("request hls: %w", err) - } - w.rootURL = rootURL - w.sourceURL = sourceURL - - go w.mergeSegments() - w.fetchSegments() // blocking - - return nil -} - -func (w *Channel) resetSession() { - w.buffer = make(map[int][]byte) - w.bufferLock = sync.Mutex{} - w.bufferIndex = 0 - w.segmentIndex = 0 - w.segmentUseds = []string{} - w.rootURL = "" - w.sourceURL = "" - w.retries = 0 - w.SegmentFilesize = 0 - w.SegmentDuration = 0 - w.splitIndex = 0 - w.sessionPattern = nil -} - -func (w *Channel) resolveSource(body string) (string, string, error) { - // Find the room dossier. - matches := regexpRoomDossier.FindAllStringSubmatch(body, -1) - - // Get the HLS source from the room dossier. - var roomData roomDossier - data, err := strconv.Unquote(strings.Replace(strconv.Quote(string(matches[0][1])), `\\u`, `\u`, -1)) - if err != nil { - return "", "", fmt.Errorf("unquote unicode: %w", err) - } - if err := json.Unmarshal([]byte(data), &roomData); err != nil { - return "", "", fmt.Errorf("unmarshal json: %w", err) - } - - // Get the HLS source. - transport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: transport} - - resp, err := client.Get(roomData.HLSSource) - if err != nil { - return "", "", fmt.Errorf("client get: %w", err) - } - if resp.StatusCode != http.StatusOK { - switch resp.StatusCode { - case http.StatusForbidden: - return "", "", fmt.Errorf("received status code %d, the stream is private?", resp.StatusCode) - default: - return "", "", fmt.Errorf("received status code %d", resp.StatusCode) - } - } - defer resp.Body.Close() - - m3u8Body, err := io.ReadAll(resp.Body) - if err != nil { - return "", "", fmt.Errorf("read body: %w", err) - } - - // Decode the m3u8 file. - p, _, err := m3u8.DecodeFrom(bytes.NewReader(m3u8Body), true) - if err != nil { - return "", "", fmt.Errorf("decode m3u8: %w", err) - } - playlist, ok := p.(*m3u8.MasterPlaylist) - if !ok { - return "", "", fmt.Errorf("cast to master playlist") - } - - var resolutions []*resolution - for _, v := range playlist.Variants { - width := strings.Split(v.Resolution, "x")[1] // 1920x1080 -> 1080 - fps := 30 - if strings.Contains(v.Name, "FPS:60.0") { - fps = 60 - } - variant, ok := lo.Find(resolutions, func(v *resolution) bool { - return strconv.Itoa(v.width) == width - }) - if ok { - variant.framerate[fps] = v.URI - continue - } - widthInt, err := strconv.Atoi(width) - if err != nil { - return "", "", fmt.Errorf("convert width string to int: %w", err) - } - resolutions = append(resolutions, &resolution{ - framerate: map[int]string{fps: v.URI}, - width: widthInt, - }) - } - - variant, ok := lo.Find(resolutions, func(v *resolution) bool { - return v.width == w.Resolution - }) - // Fallback to the nearest resolution if the preferred resolution is not found. - if !ok { - switch w.ResolutionFallback { - case ResolutionFallbackDownscale: - variant = lo.MaxBy(lo.Filter(resolutions, func(v *resolution, _ int) bool { - log.Println(v.width, w.Resolution) - return v.width < w.Resolution - }), func(v, max *resolution) bool { - return v.width > max.width - }) - case ResolutionFallbackUpscale: - variant = lo.MinBy(lo.Filter(resolutions, func(v *resolution, _ int) bool { - return v.width > w.Resolution - }), func(v, min *resolution) bool { - return v.width < min.width - }) - } - } - if variant == nil { - return "", "", fmt.Errorf("no available variant") - } - - url, ok := variant.framerate[w.Framerate] - // If the framerate is not found, fallback to the first found framerate, this block pretends there're only 30 and 60 fps. - // no complex logic here, im lazy. - if ok { - w.log("Framerate %d is used.", w.Framerate) - } else { - for k, v := range variant.framerate { - url = v - w.log("Framerate %d is not found, fallback to %d.", w.Framerate, k) - break - } - } - - rootURL := strings.TrimSuffix(roomData.HLSSource, "playlist.m3u8") - sourceURL := rootURL + url - return rootURL, sourceURL, nil -} - -func (w *Channel) mergeSegments() { - var segmentRetries int - - for { - if w.IsPaused || w.isStopped { - break - } - if segmentRetries > 5 { - w.log("Segment #%d error, the segment has been skipped.", w.bufferIndex) - w.bufferIndex++ - segmentRetries = 0 - continue - } - if len(w.buffer) == 0 { - <-time.After(1 * time.Second) - continue - } - buf, ok := w.buffer[w.bufferIndex] - if !ok { - segmentRetries++ - <-time.After(time.Duration(segmentRetries) * time.Second) - continue - } - lens, err := w.file.Write(buf) - if err != nil { - w.log("Error occurred while writing segment #%d to file: %v", w.bufferIndex, err) - w.retries++ - continue - } - w.log("Segment #%d written to file.", w.bufferIndex) - - w.SegmentFilesize += lens - segmentRetries = 0 - - if w.SplitFilesize > 0 && w.SegmentFilesize >= w.SplitFilesize*1024*1024 { - w.log("File size has exceeded, creating new file.") - - if err := w.nextFile(); err != nil { - w.log("Error occurred while creating file for next part: %v", err) - break - } - } - - if w.SplitDuration > 0 && w.SegmentDuration >= w.SplitDuration*60 { - w.log("Duration has exceeded, creating new file.") - - if err := w.nextFile(); err != nil { - w.log("Error occurred while creating file for next part: %v", err) - break - } - } - - w.bufferLock.Lock() - delete(w.buffer, w.bufferIndex) - w.bufferLock.Unlock() - - w.bufferIndex++ - } -} - -func (w *Channel) requestChunks() ([]*m3u8.MediaSegment, float64, error) { - transport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: transport} - - resp, err := client.Get(w.sourceURL) - if err != nil { - return nil, 3, fmt.Errorf("client get: %w", err) - } - if resp.StatusCode != http.StatusOK { - switch resp.StatusCode { - case http.StatusForbidden: - return nil, 3, fmt.Errorf("received status code %d, the stream is private?", resp.StatusCode) - default: - return nil, 3, fmt.Errorf("received status code %d", resp.StatusCode) - } - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, 3, fmt.Errorf("read body: %w", err) - } - - p, _, err := m3u8.DecodeFrom(bytes.NewReader(body), true) - if err != nil { - return nil, 3, fmt.Errorf("decode m3u8: %w", err) - } - playlist, ok := p.(*m3u8.MediaPlaylist) - if !ok { - return nil, 3, fmt.Errorf("cast to media playlist") - } - chunks := lo.Filter(playlist.Segments, func(v *m3u8.MediaSegment, _ int) bool { - return v != nil - }) - - //log.Println(playlist.TargetDuration) - - return chunks, 1, nil -} - -// fetchSegments -func (w *Channel) fetchSegments() { - var disconnectRetries int - - for { - if w.IsPaused || w.isStopped { - break - } - - chunks, wait, err := w.requestChunks() - if err != nil { - if disconnectRetries > 10 { - w.IsOnline = false - break - } - - w.log("Error occurred while parsing m3u8: %v", err) - disconnectRetries++ - - <-time.After(time.Duration(wait) * time.Second) - continue - } - - if disconnectRetries > 0 { - w.log("Stream is online") - w.IsOnline = true - disconnectRetries = 0 - } - - for _, v := range chunks { - if w.isSegmentFetched(v.URI) { - continue - } - - go w.requestSegment(v.URI, w.segmentIndex) - w.SegmentDuration += int(v.Duration) - w.segmentIndex++ - } - <-time.After(time.Duration(wait) * time.Second) - } -} - -func (w *Channel) requestSegment(url string, index int) error { - transport := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: transport} - - resp, err := client.Get(w.rootURL + url) - if err != nil { - return fmt.Errorf("client get: %w", err) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("received status code %d", resp.StatusCode) - } - - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("read body: %w", err) - } - - w.log("Segment #%d fetched.", index) - - w.bufferLock.Lock() - w.buffer[index] = body - w.bufferLock.Unlock() - - return nil -} - func (w *Channel) Pause() { w.IsPaused = true w.resetSession() - w.log("Channel was paused.") } func (w *Channel) Resume() { w.IsPaused = false - w.ResumeChannel <- true //BUG: - w.log("Channel was resumed.") + select { + case w.ResumeChannel <- true: + default: + } } func (w *Channel) Stop() { w.isStopped = true - w.log("Channel was stopped.") +} + +func (w *Channel) SegmentDurationStr() string { + return DurationStr(w.SegmentDuration) +} + +func (w *Channel) SplitDurationStr() string { + return DurationStr(w.SplitDuration * 60) +} + +func (w *Channel) SegmentFilesizeStr() string { + return ByteStr(w.SegmentFilesize) +} + +func (w *Channel) SplitFilesizeStr() string { + return MBStr(w.SplitFilesize) +} + +func (w *Channel) Filename() string { + if w.file == nil { + return "" + } + return w.file.Name() } diff --git a/chaturbate/channel_file.go b/chaturbate/channel_file.go index 191e1c4..8176edf 100644 --- a/chaturbate/channel_file.go +++ b/chaturbate/channel_file.go @@ -3,9 +3,9 @@ package chaturbate import ( "bytes" "fmt" - "html/template" "os" "path/filepath" + "text/template" "time" ) @@ -27,17 +27,14 @@ func (w *Channel) filename() (string, error) { } else { data["Sequence"] = w.splitIndex } - t, err := template.New("filename").Parse(w.filenamePattern) if err != nil { return "", err } - var buf bytes.Buffer if err := t.Execute(&buf, data); err != nil { return "", err } - return buf.String(), nil } @@ -45,7 +42,7 @@ func (w *Channel) filename() (string, error) { func (w *Channel) newFile() error { filename, err := w.filename() if err != nil { - return fmt.Errorf("error occurred while parsing filename pattern: %w", err) + return fmt.Errorf("filename pattern error: %w", err) } if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil { return fmt.Errorf("create folder: %w", err) @@ -54,7 +51,7 @@ func (w *Channel) newFile() error { if err != nil { return fmt.Errorf("cannot open file: %s: %w", filename, err) } - w.log("The video will be saved as %s.ts", filename) + w.log(logTypeInfo, "the stream will be saved as %s.ts", filename) w.file = file return nil } @@ -67,10 +64,3 @@ func (w *Channel) nextFile() error { return w.newFile() } - -func (w *Channel) Filename() string { - if w.file == nil { - return "" - } - return w.file.Name() -} diff --git a/chaturbate/channel_internal.go b/chaturbate/channel_internal.go new file mode 100644 index 0000000..f985727 --- /dev/null +++ b/chaturbate/channel_internal.go @@ -0,0 +1,387 @@ +package chaturbate + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/grafov/m3u8" + "github.com/samber/lo" +) + +// requestChannelBody requests the channel page and returns the body. +func (w *Channel) requestChannelBody() (string, error) { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: transport} + + resp, err := client.Get(w.ChannelURL) + if err != nil { + return "", fmt.Errorf("client get: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read body: %w", err) + } + + return string(body), nil +} + +// record starts the recording process, +// this function get called when the channel is online and back online from offline status. +// +// this is a blocking function until fetching segments gone wrong (or nothing to fetch, aka offline). +func (w *Channel) record(body string) error { + w.resetSession() + + if err := w.newFile(); err != nil { + return fmt.Errorf("new file: %w", err) + } + + rootURL, sourceURL, err := w.resolveSource(body) + if err != nil { + return fmt.Errorf("request hls: %w", err) + } + w.rootURL = rootURL + w.sourceURL = sourceURL + + go w.mergeSegments() + w.fetchSegments() // blocking + + return nil +} + +// resetSession resets the session data, +// usually called when the channel is online or paused to resumed. +func (w *Channel) resetSession() { + w.buffer = make(map[int][]byte) + w.bufferLock = sync.Mutex{} + w.bufferIndex = 0 + w.segmentIndex = 0 + w.segmentUseds = []string{} + w.rootURL = "" + w.sourceURL = "" + w.retries = 0 + w.SegmentFilesize = 0 + w.SegmentDuration = 0 + w.splitIndex = 0 + w.sessionPattern = nil +} + +// resolveSource resolves the HLS source from the channel page. +// the HLS Source is a list that contains all the available resolutions and framerates. +func (w *Channel) resolveSource(body string) (string, string, error) { + // Find the room dossier. + matches := regexpRoomDossier.FindAllStringSubmatch(body, -1) + + // Get the HLS source from the room dossier. + var roomData roomDossier + data, err := strconv.Unquote(strings.Replace(strconv.Quote(string(matches[0][1])), `\\u`, `\u`, -1)) + if err != nil { + return "", "", fmt.Errorf("unquote unicode: %w", err) + } + if err := json.Unmarshal([]byte(data), &roomData); err != nil { + return "", "", fmt.Errorf("unmarshal json: %w", err) + } + + // Get the HLS source. + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: transport} + + resp, err := client.Get(roomData.HLSSource) + if err != nil { + return "", "", fmt.Errorf("client get: %w", err) + } + if resp.StatusCode != http.StatusOK { + switch resp.StatusCode { + case http.StatusForbidden: + return "", "", fmt.Errorf("ticket/private stream?") + default: + return "", "", fmt.Errorf("status code %d", resp.StatusCode) + } + } + defer resp.Body.Close() + + m3u8Body, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", fmt.Errorf("read body: %w", err) + } + + // Decode the m3u8 file. + p, _, err := m3u8.DecodeFrom(bytes.NewReader(m3u8Body), true) + if err != nil { + return "", "", fmt.Errorf("decode m3u8: %w", err) + } + playlist, ok := p.(*m3u8.MasterPlaylist) + if !ok { + return "", "", fmt.Errorf("cast to master playlist") + } + + var resolutions []*resolution + for _, v := range playlist.Variants { + width := strings.Split(v.Resolution, "x")[1] // 1920x1080 -> 1080 + fps := 30 + if strings.Contains(v.Name, "FPS:60.0") { + fps = 60 + } + variant, ok := lo.Find(resolutions, func(v *resolution) bool { + return strconv.Itoa(v.width) == width + }) + if ok { + variant.framerate[fps] = v.URI + continue + } + widthInt, err := strconv.Atoi(width) + if err != nil { + return "", "", fmt.Errorf("convert width string to int: %w", err) + } + resolutions = append(resolutions, &resolution{ + framerate: map[int]string{fps: v.URI}, + width: widthInt, + }) + } + variant, ok := lo.Find(resolutions, func(v *resolution) bool { + return v.width == w.Resolution + }) + // Fallback to the nearest resolution if the preferred resolution is not found. + if !ok { + switch w.ResolutionFallback { + case ResolutionFallbackDownscale: + variant = lo.MaxBy(lo.Filter(resolutions, func(v *resolution, _ int) bool { + return v.width < w.Resolution + }), func(v, max *resolution) bool { + return v.width > max.width + }) + case ResolutionFallbackUpscale: + variant = lo.MinBy(lo.Filter(resolutions, func(v *resolution, _ int) bool { + return v.width > w.Resolution + }), func(v, min *resolution) bool { + return v.width < min.width + }) + } + } + if variant == nil { + return "", "", fmt.Errorf("no available resolution") + } + w.log(logTypeInfo, "resolution %dp is used", variant.width) + + url, ok := variant.framerate[w.Framerate] + // If the framerate is not found, fallback to the first found framerate, this block pretends there're only 30 and 60 fps. + // no complex logic here, im lazy. + if ok { + w.log(logTypeInfo, "framerate %dfps is used", w.Framerate) + } else { + for k, v := range variant.framerate { + url = v + w.log(logTypeWarning, "framerate %dfps not found, fallback to %dfps", w.Framerate, k) + w.Framerate = k + break + } + } + + rootURL := strings.TrimSuffix(roomData.HLSSource, "playlist.m3u8") + sourceURL := rootURL + url + return rootURL, sourceURL, nil +} + +// mergeSegments is a async function that runs in background for the channel, +// and it merges the segments from buffer to the file. +func (w *Channel) mergeSegments() { + var segmentRetries int + + for { + if w.IsPaused || w.isStopped { + break + } + if segmentRetries > 5 { + w.log(logTypeWarning, "segment #%d not found in buffer, skipped", w.bufferIndex) + w.bufferIndex++ + segmentRetries = 0 + continue + } + if len(w.buffer) == 0 { + <-time.After(1 * time.Second) + continue + } + buf, ok := w.buffer[w.bufferIndex] + if !ok { + segmentRetries++ + <-time.After(time.Duration(segmentRetries) * time.Second) + continue + } + lens, err := w.file.Write(buf) + if err != nil { + w.log(logTypeError, "segment #%d written error: %v", w.bufferIndex, err) + w.retries++ + continue + } + w.log(logTypeInfo, "segment #%d written", w.bufferIndex) + w.log(logTypeDebug, "duration: %s, size: %s", DurationStr(w.SegmentDuration), ByteStr(w.SegmentFilesize)) + + w.SegmentFilesize += lens + segmentRetries = 0 + + if w.SplitFilesize > 0 && w.SegmentFilesize >= w.SplitFilesize*1024*1024 { + w.log(logTypeInfo, "filesize exceeded, creating new file") + + if err := w.nextFile(); err != nil { + w.log(logTypeError, "next file error: %v", err) + break + } + } else if w.SplitDuration > 0 && w.SegmentDuration >= w.SplitDuration*60 { + w.log(logTypeInfo, "duration exceeded, creating new file") + + if err := w.nextFile(); err != nil { + w.log(logTypeError, "next file error: %v", err) + break + } + } + + w.bufferLock.Lock() + delete(w.buffer, w.bufferIndex) + w.bufferLock.Unlock() + + w.bufferIndex++ + } +} + +// fetchSegments is a blocking function, +// it will keep asking the segment list for the latest segments. +func (w *Channel) fetchSegments() { + var disconnectRetries int + + for { + if w.IsPaused || w.isStopped { + break + } + + chunks, wait, err := w.requestChunks() + if err != nil { + if disconnectRetries > 10 { + w.IsOnline = false + break + } + + w.log(logTypeError, "segment list error, will try again [%d/10]: %v", disconnectRetries, err) + disconnectRetries++ + + <-time.After(time.Duration(wait) * time.Second) + continue + } + + if disconnectRetries > 0 { + w.log(logTypeInfo, "channel is back online!") + w.IsOnline = true + disconnectRetries = 0 + } + + for _, v := range chunks { + if w.isSegmentFetched(v.URI) { + continue + } + + go func(index int) { + if err := w.requestSegment(v.URI, index); err != nil { + w.log(logTypeError, "segment #%d request error, ignored: %v", index, err) + return + } + }(w.segmentIndex) + w.SegmentDuration += int(v.Duration) + w.segmentIndex++ + } + <-time.After(time.Duration(wait) * time.Second) + } +} + +// requestChunks requests the segment list from the HLS source, +// the same segment list will be updated every few seconds from chaturbate. +func (w *Channel) requestChunks() ([]*m3u8.MediaSegment, float64, error) { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: transport} + + if w.sourceURL == "" { + return nil, 0, fmt.Errorf("channel seems to be paused?") + } + + resp, err := client.Get(w.sourceURL) + if err != nil { + return nil, 3, fmt.Errorf("client get: %w", err) + } + if resp.StatusCode != http.StatusOK { + switch resp.StatusCode { + case http.StatusForbidden: + return nil, 3, fmt.Errorf("ticket/private stream?") + default: + return nil, 3, fmt.Errorf("status code %d", resp.StatusCode) + } + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, 3, fmt.Errorf("read body: %w", err) + } + + p, _, err := m3u8.DecodeFrom(bytes.NewReader(body), true) + if err != nil { + return nil, 3, fmt.Errorf("decode m3u8: %w", err) + } + playlist, ok := p.(*m3u8.MediaPlaylist) + if !ok { + return nil, 3, fmt.Errorf("cast to media playlist") + } + chunks := lo.Filter(playlist.Segments, func(v *m3u8.MediaSegment, _ int) bool { + return v != nil + }) + return chunks, 1, nil +} + +// requestSegment requests the specific single segment and put it into the buffer. +// the mergeSegments function will merge the segment from buffer to the file in the backgrond. +func (w *Channel) requestSegment(url string, index int) error { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: transport} + + if w.rootURL == "" { + return fmt.Errorf("channel seems to be paused?") + } + + resp, err := client.Get(w.rootURL + url) + if err != nil { + return fmt.Errorf("client get: %w", err) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("received status code %d", resp.StatusCode) + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read body: %w", err) + } + + w.log(logTypeDebug, "segment #%d fetched", index) + + w.bufferLock.Lock() + w.buffer[index] = body + w.bufferLock.Unlock() + + return nil +} diff --git a/chaturbate/channel_update.go b/chaturbate/channel_update.go new file mode 100644 index 0000000..462d2a4 --- /dev/null +++ b/chaturbate/channel_update.go @@ -0,0 +1,21 @@ +package chaturbate + +type Update struct { + Username string `json:"username"` + Log string `json:"log"` + IsPaused bool `json:"is_paused"` + IsOnline bool `json:"is_online"` + IsStopped bool `json:"is_stopped"` + Filename string `json:"filename"` + LastStreamedAt string `json:"last_streamed_at"` + SegmentDuration int `json:"segment_duration"` + SegmentFilesize int `json:"segment_filesize"` +} + +func (u *Update) SegmentDurationStr() string { + return DurationStr(u.SegmentDuration) +} + +func (u *Update) SegmentFilesizeStr() string { + return ByteStr(u.SegmentFilesize) +} diff --git a/chaturbate/channel_util.go b/chaturbate/channel_util.go index 34688bf..d44ca47 100644 --- a/chaturbate/channel_util.go +++ b/chaturbate/channel_util.go @@ -5,10 +5,34 @@ import ( "time" ) +type logType string + +const ( + logTypeDebug logType = "DEBUG" + logTypeInfo logType = "INFO" + logTypeWarning logType = "WARN" + logTypeError logType = "ERROR" +) + // log -func (w *Channel) log(message string, v ...interface{}) { - updateLog := fmt.Sprintf("[%s] %s", time.Now().Format("2006-01-02 15:04:05"), fmt.Errorf(message, v...)) - consoleLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), w.Username, fmt.Errorf(message, v...)) +func (w *Channel) log(typ logType, message string, v ...interface{}) { + switch w.logType { + case logTypeInfo: + if typ == logTypeDebug { + return + } + case logTypeWarning: + if typ == logTypeDebug || typ == logTypeInfo { + return + } + case logTypeError: + if typ == logTypeDebug || typ == logTypeInfo || typ == logTypeWarning { + return + } + } + + updateLog := fmt.Sprintf("[%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, fmt.Errorf(message, v...)) + consoleLog := fmt.Sprintf("[%s] [%s] [%s] %s", time.Now().Format("2006-01-02 15:04:05"), typ, w.Username, fmt.Errorf(message, v...)) update := &Update{ Username: w.Username, @@ -23,7 +47,10 @@ func (w *Channel) log(message string, v ...interface{}) { update.Filename = w.file.Name() } - w.UpdateChannel <- update + select { + case w.UpdateChannel <- update: + default: + } fmt.Println(consoleLog) @@ -35,13 +62,32 @@ func (w *Channel) log(message string, v ...interface{}) { } } -// isDuplicateSegment returns true if the segment is already been fetched. +// isSegmentFetched returns true if the segment has been fetched. func (w *Channel) isSegmentFetched(url string) bool { for _, v := range w.segmentUseds { - if url[len(url)-10:] == v { + if url == v { return true } } - w.segmentUseds = append(w.segmentUseds, url[len(url)-10:]) + if len(w.segmentUseds) > 100 { + w.segmentUseds = w.segmentUseds[len(w.segmentUseds)-30:] + } + w.segmentUseds = append(w.segmentUseds, url) return false } + +func DurationStr(seconds int) string { + hours := seconds / 3600 + seconds %= 3600 + minutes := seconds / 60 + seconds %= 60 + return fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds) +} + +func MBStr(mibs int) string { + return fmt.Sprintf("%.2f MiB", float64(mibs)) +} + +func ByteStr(bytes int) string { + return fmt.Sprintf("%.2f MiB", float64(bytes)/1024/1024) +} diff --git a/chaturbate/chaturbate.go b/chaturbate/manager.go similarity index 86% rename from chaturbate/chaturbate.go rename to chaturbate/manager.go index 36d9ca0..18c31ad 100644 --- a/chaturbate/chaturbate.go +++ b/chaturbate/manager.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/google/uuid" + "github.com/urfave/cli/v2" ) const ( @@ -11,18 +12,6 @@ const ( ResolutionFallbackDownscale = "down" ) -type Update struct { - Username string `json:"username"` - Log string `json:"log"` - IsPaused bool `json:"is_paused"` - IsOnline bool `json:"is_online"` - IsStopped bool `json:"is_stopped"` - Filename string `json:"filename"` - LastStreamedAt string `json:"last_streamed_at"` - SegmentDuration int `json:"segment_duration"` - SegmentFilesize int `json:"segment_filesize"` -} - var ( ErrChannelNotFound = errors.New("channel not found") ErrChannelExists = errors.New("channel already exists") @@ -31,6 +20,7 @@ var ( ErrListenNotFound = errors.New("listen not found") ) +// Config type Config struct { Username string FilenamePattern string @@ -43,16 +33,18 @@ type Config struct { // Manager type Manager struct { + cli *cli.Context Channels map[string]*Channel Updates map[string]chan *Update } // NewManager -func NewManager() (*Manager, error) { +func NewManager(c *cli.Context) *Manager { return &Manager{ + cli: c, Channels: map[string]*Channel{}, Updates: map[string]chan *Update{}, - }, nil + } } // PauseChannel @@ -116,6 +108,7 @@ func (m *Manager) CreateChannel(conf *Config) error { Logs: []string{}, UpdateChannel: make(chan *Update), ResumeChannel: make(chan bool), + logType: logType(m.cli.String("log-level")), } go func() { for update := range c.UpdateChannel { @@ -127,7 +120,7 @@ func (m *Manager) CreateChannel(conf *Config) error { } }() m.Channels[conf.Username] = c - c.log("Channel created") + c.log(logTypeInfo, "channel created") go c.Run() return nil } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..bcdc9fb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3.0" + +services: + chaturbate-dvr: + build: . + environment: + - USERNAME=my_lovely_channel_name + volumes: + - ./video/my_lovely_channel_name:/usr/src/app/video diff --git a/handler/get_channel.go b/handler/get_channel.go index 23a9855..271006e 100644 --- a/handler/get_channel.go +++ b/handler/get_channel.go @@ -63,10 +63,10 @@ func (h *GetChannelHandler) Handle(c *gin.Context) { ChannelURL: channel.ChannelURL, Filename: channel.Filename(), LastStreamedAt: channel.LastStreamedAt, - SegmentDuration: DurationStr(channel.SegmentDuration), - SplitDuration: DurationStr(channel.SplitDuration), - SegmentFilesize: ByteStr(channel.SegmentFilesize), - SplitFilesize: MBStr(channel.SplitFilesize), + SegmentDuration: channel.SegmentDurationStr(), + SplitDuration: channel.SplitDurationStr(), + SegmentFilesize: channel.SegmentFilesizeStr(), + SplitFilesize: channel.SplitFilesizeStr(), IsOnline: channel.IsOnline, IsPaused: channel.IsPaused, Logs: channel.Logs, diff --git a/handler/get_settings.go b/handler/get_settings.go index ee68331..157f157 100644 --- a/handler/get_settings.go +++ b/handler/get_settings.go @@ -15,6 +15,7 @@ type GetSettingsHandlerRequest struct { } type GetSettingsHandlerResponse struct { + Version string `json:"version"` Framerate int `json:"framerate"` Resolution int `json:"resolution"` ResolutionFallback string `json:"resolution_fallback"` @@ -49,6 +50,7 @@ func (h *GetSettingsHandler) Handle(c *gin.Context) { return } c.JSON(http.StatusOK, &GetSettingsHandlerResponse{ + Version: h.cli.App.Version, Framerate: h.cli.Int("framerate"), Resolution: h.cli.Int("resolution"), ResolutionFallback: h.cli.String("resolution-fallback"), diff --git a/handler/list_channels.go b/handler/list_channels.go index 26e4d61..cfb2daf 100644 --- a/handler/list_channels.go +++ b/handler/list_channels.go @@ -70,10 +70,10 @@ func (h *ListChannelsHandler) Handle(c *gin.Context) { ChannelURL: channel.ChannelURL, Filename: channel.Filename(), LastStreamedAt: channel.LastStreamedAt, - SegmentDuration: DurationStr(channel.SegmentDuration), - SplitDuration: DurationStr(channel.SplitDuration), - SegmentFilesize: ByteStr(channel.SegmentFilesize), - SplitFilesize: MBStr(channel.SplitFilesize), + SegmentDuration: channel.SegmentDurationStr(), + SplitDuration: channel.SplitDurationStr(), + SegmentFilesize: channel.SegmentFilesizeStr(), + SplitFilesize: channel.SplitFilesizeStr(), IsOnline: channel.IsOnline, IsPaused: channel.IsPaused, Logs: channel.Logs, diff --git a/handler/listen_update.go b/handler/listen_update.go index 3effe92..290d293 100644 --- a/handler/listen_update.go +++ b/handler/listen_update.go @@ -46,8 +46,9 @@ func (h *ListenUpdateHandler) Handle(c *gin.Context) { "is_paused": update.IsPaused, "is_online": update.IsOnline, "last_streamed_at": update.LastStreamedAt, - "segment_duration": DurationStr(update.SegmentDuration), - "segment_filesize": ByteStr(update.SegmentFilesize), + "segment_duration": update.SegmentDurationStr(), + "segment_filesize": update.SegmentFilesizeStr(), + "filename": update.Filename, }) return true }) diff --git a/handler/terminate_program.go b/handler/terminate_program.go new file mode 100644 index 0000000..b7ab330 --- /dev/null +++ b/handler/terminate_program.go @@ -0,0 +1,46 @@ +package handler + +import ( + "log" + "net/http" + "os" + + "github.com/gin-gonic/gin" + "github.com/urfave/cli/v2" +) + +//======================================================= +// Request & Response +//======================================================= + +type TerminateProgramRequest struct { +} + +type TerminateProgramResponse struct { +} + +//======================================================= +// Factory +//======================================================= + +type TerminateProgramHandler struct { + cli *cli.Context +} + +func NewTerminateProgramHandler(cli *cli.Context) *TerminateProgramHandler { + return &TerminateProgramHandler{cli} +} + +//======================================================= +// Handle +//======================================================= + +func (h *TerminateProgramHandler) Handle(c *gin.Context) { + var req *TerminateProgramRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.AbortWithError(http.StatusBadRequest, err) + return + } + log.Println("program terminated by user request, see ya 👋") + os.Exit(0) +} diff --git a/handler/util.go b/handler/util.go deleted file mode 100644 index fd3a7a7..0000000 --- a/handler/util.go +++ /dev/null @@ -1,23 +0,0 @@ -package handler - -import "fmt" - -func DurationStr(seconds int) string { - hours := seconds / 3600 - seconds %= 3600 - minutes := seconds / 60 - seconds %= 60 - return fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds) -} - -func ByteStr(bytes int) string { - return fmt.Sprintf("%.2f MiB", float64(bytes)/1024/1024) -} - -func KBStr(kibs int) string { - return fmt.Sprintf("%.2f MiB", float64(kibs)/1024) -} - -func MBStr(mibs int) string { - return fmt.Sprintf("%.2f MiB", float64(mibs)) -} diff --git a/handler/view/index.html b/handler/view/index.html index 398517c..08e9ca8 100644 --- a/handler/view/index.html +++ b/handler/view/index.html @@ -3,9 +3,9 @@ - - - + + + @@ -16,6 +16,7 @@
+
@@ -26,8 +27,13 @@
+ +
+ +
+
Channel Username
@@ -37,7 +43,9 @@
+ +
Resolution
@@ -76,6 +84,9 @@
+ + +
Framerate
@@ -94,6 +105,9 @@
+ + +
Filename Pattern
@@ -105,9 +119,11 @@
+
+
@@ -135,28 +151,35 @@
+ + +
+ +
+
-
+ +
Chaturbate DVR
-
Version 1.0.0
+
Version
- @@ -173,10 +196,9 @@