./videos/username/.diff --git a/.gitignore b/.old/.gitignore similarity index 100% rename from .gitignore rename to .old/.gitignore diff --git a/DEV.md b/.old/DEV.md similarity index 100% rename from DEV.md rename to .old/DEV.md diff --git a/Dockerfile b/.old/Dockerfile similarity index 94% rename from Dockerfile rename to .old/Dockerfile index f348ac7..e6dee2b 100644 --- a/Dockerfile +++ b/.old/Dockerfile @@ -1,11 +1,11 @@ -FROM golang:latest - -WORKDIR /usr/src/app - -COPY go.mod go.sum ./ -RUN go mod download && go mod verify - -COPY . . -RUN go build - +FROM golang:latest + +WORKDIR /usr/src/app + +COPY go.mod go.sum ./ +RUN go mod download && go mod verify + +COPY . . +RUN go build + CMD [ "sh", "-c", "./chaturbate-dvr -u $USERNAME" ] \ No newline at end of file diff --git a/LICENSE b/.old/LICENSE similarity index 98% rename from LICENSE rename to .old/LICENSE index d92b2a0..b6a3517 100644 --- a/LICENSE +++ b/.old/LICENSE @@ -1,21 +1,21 @@ -MIT License - -Copyright (c) 2022 TeaCat - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +MIT License + +Copyright (c) 2022 TeaCat + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README-tw.md b/.old/README-tw.md similarity index 100% rename from README-tw.md rename to .old/README-tw.md diff --git a/README.md b/.old/README.md similarity index 100% rename from README.md rename to .old/README.md diff --git a/bin/arm64/darwin/chatubrate-dvr b/.old/bin/arm64/darwin/chatubrate-dvr similarity index 100% rename from bin/arm64/darwin/chatubrate-dvr rename to .old/bin/arm64/darwin/chatubrate-dvr diff --git a/bin/arm64/linux/chatubrate-dvr b/.old/bin/arm64/linux/chatubrate-dvr similarity index 100% rename from bin/arm64/linux/chatubrate-dvr rename to .old/bin/arm64/linux/chatubrate-dvr diff --git a/bin/arm64/windows/chatubrate-dvr.exe b/.old/bin/arm64/windows/chatubrate-dvr.exe similarity index 100% rename from bin/arm64/windows/chatubrate-dvr.exe rename to .old/bin/arm64/windows/chatubrate-dvr.exe diff --git a/bin/darwin/chatubrate-dvr b/.old/bin/darwin/chatubrate-dvr similarity index 100% rename from bin/darwin/chatubrate-dvr rename to .old/bin/darwin/chatubrate-dvr diff --git a/bin/linux/chatubrate-dvr b/.old/bin/linux/chatubrate-dvr similarity index 100% rename from bin/linux/chatubrate-dvr rename to .old/bin/linux/chatubrate-dvr diff --git a/bin/windows/chatubrate-dvr.exe b/.old/bin/windows/chatubrate-dvr.exe similarity index 100% rename from bin/windows/chatubrate-dvr.exe rename to .old/bin/windows/chatubrate-dvr.exe diff --git a/chaturbate-dvr b/.old/chaturbate-dvr similarity index 100% rename from chaturbate-dvr rename to .old/chaturbate-dvr diff --git a/docker-compose.yml b/.old/docker-compose.yml similarity index 95% rename from docker-compose.yml rename to .old/docker-compose.yml index 7b253a3..721b562 100644 --- a/docker-compose.yml +++ b/.old/docker-compose.yml @@ -1,9 +1,9 @@ -version: "3.0" - -services: - chaturbate-dvr: - build: . - environment: - - USERNAME=my_lovely_channel_name - volumes: +version: "3.0" + +services: + chaturbate-dvr: + build: . + environment: + - USERNAME=my_lovely_channel_name + volumes: - ./video/my_lovely_channel_name:/usr/src/app/video \ No newline at end of file diff --git a/.old/go.mod b/.old/go.mod new file mode 100644 index 0000000..331c3a7 --- /dev/null +++ b/.old/go.mod @@ -0,0 +1,23 @@ +module github.com/YamiOdymel/chaturbate-dvr + +go 1.19 + +require ( + github.com/TwiN/go-color v1.1.0 + github.com/grafov/m3u8 v0.11.1 + github.com/parnurzeal/gorequest v0.2.16 + github.com/urfave/cli/v2 v2.3.0 +) + +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect + github.com/elazarl/goproxy v0.0.0-20210801061803-8e322dfb79c4 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/russross/blackfriday/v2 v2.0.1 // indirect + github.com/samber/lo v1.38.1 // indirect + github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect + github.com/smartystreets/goconvey v1.7.2 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect + golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 // indirect + moul.io/http2curl v1.0.0 // indirect +) diff --git a/.old/go.sum b/.old/go.sum new file mode 100644 index 0000000..1d0535b --- /dev/null +++ b/.old/go.sum @@ -0,0 +1,46 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/TwiN/go-color v1.1.0 h1:yhLAHgjp2iAxmNjDiVb6Z073NE65yoaPlcki1Q22yyQ= +github.com/TwiN/go-color v1.1.0/go.mod h1:aKVf4e1mD4ai2FtPifkDPP5iyoCwiK08YGzGwerjKo0= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/elazarl/goproxy v0.0.0-20210801061803-8e322dfb79c4 h1:lS3P5Nw3oPO05Lk2gFiYUOL3QPaH+fRoI1wFOc4G1UY= +github.com/elazarl/goproxy v0.0.0-20210801061803-8e322dfb79c4/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= +github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/grafov/m3u8 v0.11.1 h1:igZ7EBIB2IAsPPazKwRKdbhxcoBKO3lO1UY57PZDeNA= +github.com/grafov/m3u8 v0.11.1/go.mod h1:nqzOkfBiZJENr52zTVd/Dcl03yzphIMbJqkXGu+u080= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/parnurzeal/gorequest v0.2.16 h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ= +github.com/parnurzeal/gorequest v0.2.16/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= +github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= +github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= +github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= +golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20211109214657-ef0fda0de508 h1:v3NKo+t/Kc3EASxaKZ82lwK6mCf4ZeObQBduYFZHo7c= +golang.org/x/net v0.0.0-20211109214657-ef0fda0de508/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8= +moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE= diff --git a/.old/main.go b/.old/main.go new file mode 100644 index 0000000..9052e5c --- /dev/null +++ b/.old/main.go @@ -0,0 +1,510 @@ +package main + +import ( + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/TwiN/go-color" + "github.com/samber/lo" + + "github.com/grafov/m3u8" + "github.com/parnurzeal/gorequest" + "github.com/urfave/cli/v2" +) + +// chaturbateURL is the base url of the website. +const chaturbateURL = "https://chaturbate.com/" + +// retriesAfterOnlined tells the retries for stream when disconnected but not really offlined. +var retriesAfterOnlined = 0 + +// temp stores the used segment to prevent fetched the duplicates. +var temp []string + +// segmentIndex is current stored segment index. +var segmentIndex int + +// segmentMap is the map stores temporary video segments, it will be merged into master video file then got deleted. +var segmentMap map[string][]byte = make(map[string][]byte) + +var segmentMapLock sync.Mutex + +// stripLimit reprsents the maximum Bytes sizes to split the video into chunks. +var stripLimit int + +// stripQuota represents how many Bytes left til the next video chunk stripping. +var stripQuota int + +// preferredFPS represents the preferred framerate. +var preferredFPS string + +// preferredResolution represents the preferred resolution, e.g. `240`, `480`, `540`, `720`, `1080`. +var preferredResolution string + +// preferredResolutionFallback represents the preferred resolution fallback, `up`, `down` or `no`. +var preferredResolutionFallback string + +// path save video +const savePath = "video" + +// error/message handler +var ( + errInternal = errors.New("err") + errNoUsername = errors.New("recording: channel username required `-u [USERNAME]` option") + errSegRetFail = color.Colorize(color.Red, ("[FAILED] to fetch the video segments after retried, %s might went offline or is in ticket/privat show.")) + errSegRetFailOnline = color.Colorize(color.Red, ("[FAILED] to fetch the video segments, will try again. [%d/10]")) + infoIsOnline = color.Colorize(color.Green, ("[RECORDING] %s is online! start fetching..")) + infoBackOnline = color.Colorize(color.Green, ("[INFO] %s is back online!")) + infoMergeSegment = color.Colorize(color.Green, ("[INFO] inserting %d segment to the master file. [total: %d]")) + infoSkipped = color.Colorize(color.Blue, ("[INFO] skipped %s due to the empty body!\n")) + infoNotOnline = color.Colorize(color.Gray, ("[INFO] %s is not online, check again in %d minute(s)")) + warningSegment = color.Colorize(color.Yellow, ("[WARNING] cannot find segment %d, will try again. [%d/5]")) +) + +// roomDossier is the struct to parse the HLS source from the content body. +type roomDossier struct { + HLSSource string `json:"hls_source"` +} + +// unescapeUnicode escapes the unicode from the content body. +func unescapeUnicode(raw string) string { + str, err := strconv.Unquote(strings.Replace(strconv.Quote(string(raw)), `\\u`, `\u`, -1)) + if err != nil { + panic(err) + } + return str +} + +// getChannelURL returns the full channel url to the specified user. +func getChannelURL(username string) string { + return fmt.Sprintf("%s%s", chaturbateURL, username) +} + +// getBody gets the channel page content body. +func getBody(username string) string { + resp, body, errs := gorequest.New().TLSClientConfig(&tls.Config{InsecureSkipVerify: true}).Get(getChannelURL(username)).End() + if len(errs) > 0 { + log.Println(color.Colorize(color.Red, errs[0].Error())) + } + if resp == nil || resp.StatusCode != 200 { + return "" + } + return body +} + +// getOnlineStatus check if the user is currently online by checking the playlist exists in the content body or not. +func getOnlineStatus(username string) bool { + return strings.Contains(getBody(username), "playlist.m3u8") +} + +// getHLSSource extracts the playlist url from the room detail page body. +func getHLSSource(body string) (string, string) { + // Get the room data from the page body. + r := regexp.MustCompile(`window\.initialRoomDossier = "(.*?)"`) + matches := r.FindAllStringSubmatch(body, -1) + + // Extract the data and get the HLS source URL. + var roomData roomDossier + data := unescapeUnicode(matches[0][1]) + err := json.Unmarshal([]byte(data), &roomData) + if err != nil { + panic(err) + } + + return roomData.HLSSource, strings.TrimSuffix(roomData.HLSSource, "playlist.m3u8") +} + +// parseHLSSource parses the HLS table and return the maximum resolution m3u8 source. +func parseHLSSource(url string, baseURL string) string { + resp, body, errs := gorequest.New().TLSClientConfig(&tls.Config{InsecureSkipVerify: true}).Get(url).End() + if len(errs) > 0 { + log.Println(color.Colorize(color.Red, errs[0].Error())) + } + if resp == nil || resp.StatusCode == 403 { + return "" + } + p, _, _ := m3u8.DecodeFrom(strings.NewReader(body), true) + master, ok := p.(*m3u8.MasterPlaylist) + if !ok { + return "" + } + + resolutions := make(map[string][]string) + resolutionInts := []string{} + + for _, v := range master.Variants { + resStr := strings.Split(v.Resolution, "x") + resolutionInts = append(resolutionInts, resStr[1]) + // If the resolution exists in local, it might be a higher framerate source, store it for later use + if _, ok := resolutions[resStr[1]]; ok { + resolutions[resStr[1]] = append(resolutions[resStr[1]], v.URI) + continue + } + if strings.Contains(v.Name, "FPS:60.0") { + if _, ok := resolutions[resStr[1]]; !ok { + resolutions[resStr[1]] = []string{"", v.URI} // The video has no 30 FPS, we fill it with an empty URI + } else { + resolutions[resStr[1]] = []string{v.URI} + } + } else { + resolutions[resStr[1]] = []string{v.URI} + } + } + + log.Printf("Found available resolutions: %s", strings.TrimPrefix(lo.Reduce(resolutionInts, func(prev string, cur string, _ int) string { + return fmt.Sprintf("%s, %s", prev, cur) + }, ""), ", ")) + + pickedResolution, ok := resolutions[preferredResolution] + if !ok { + var comparison []string + if preferredResolutionFallback == "down" { + comparison = lo.Reverse(lo.Map(resolutionInts, func(v string, _ int) string { return v })) + } else { + comparison = resolutionInts + } + fallbackResolution, ok := lo.Find(comparison, func(v string) bool { + sizeInt, _ := strconv.Atoi(v) + prefInt, _ := strconv.Atoi(preferredResolution) + // + if preferredResolutionFallback == "down" { + return sizeInt < prefInt + } else { + return sizeInt > prefInt + } + }) + if ok { + pickedResolution = resolutions[fallbackResolution] + log.Printf("Preferred video resolution %sp not found, use %sp instead.", preferredResolution, fallbackResolution) + } else { + if preferredResolutionFallback == "down" { + pickedResolution = resolutions[resolutionInts[0]] + log.Printf("No fallback video resolution was found, use worse quality %sp instead.", resolutionInts[0]) + } else { + pickedResolution = resolutions[resolutionInts[len(resolutionInts)-1]] + log.Printf("No fallback video resolution was found, use best quality %sp instead.", resolutionInts[len(resolutionInts)-1]) + } + } + } else { + log.Printf("Fetching video resolution in %sp.", preferredResolution) + } + + var uri string + + if preferredFPS == "60" && len(pickedResolution) > 1 { + log.Printf("Fetching video in 60 FPS.") + uri = pickedResolution[1] + } else { + log.Printf("Fetching video in 30 FPS.") + uri = pickedResolution[0] + + if uri == "" { + log.Printf("The video has no 30 FPS, use 60 FPS instead.") + uri = pickedResolution[1] + } + }1 + + return fmt.Sprintf("%s%s", baseURL, uri) +} + +// parseM3U8Source gets the current segment list, the channel might goes offline if 403 was returned. +func parseM3U8Source(url string) (chunks []*m3u8.MediaSegment, wait float64, err error) { + resp, body, errs := gorequest.New().TLSClientConfig(&tls.Config{InsecureSkipVerify: true}).Get(url).End() + if len(errs) > 0 { + log.Println(color.Colorize(color.Red, errs[0].Error())) + } + // Retry after 3 seconds if the connection lost or status code returns 403 (the channel might went offline). + if len(errs) > 0 || resp == nil || resp.StatusCode == http.StatusForbidden { + return nil, 3, errInternal + } + + // Decode the segment table. + p, _, err := m3u8.DecodeFrom(strings.NewReader(body), true) + if err != nil { + log.Println(color.Colorize(color.Red, err.Error())) + } + media, ok := p.(*m3u8.MediaPlaylist) + if !ok { + return nil, 3, errInternal + } + wait = media.TargetDuration / 1.5 + + // Ignore the empty segments. + for _, v := range media.Segments { + if v != nil { + chunks = append(chunks, v) + } + } + return +} + +// capture captures the specified channel streaming. +func capture(username string) { + // Define the video filename by current time //04.09.22 added username into filename mK33y. + filename := username + "_" + time.Now().Format("2006-01-02_15-04-05") + var m3u8Source, baseURL, hlsSource string + var tried int + for { + tried++ + // + if tried > 10 { + panic(errors.New("cannot fetch the Playlist correctly after 10 tries")) + } + // Get the channel page content body. + body := getBody(username) + // + if body == "" { + continue + } + // Get the master playlist URL from extracting the channel body. + hlsSource, baseURL = getHLSSource(body) + // Get the best resolution m3u8 by parsing the HLS source table. + m3u8Source = parseHLSSource(hlsSource, baseURL) + // + if m3u8Source != "" { + break + } + <-time.After(time.Millisecond * 500) + } + // Create the master video file. + masterFile, err := os.OpenFile("./"+savePath+"/"+filename+".ts", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777) + if err != nil { + log.Println(color.Colorize(color.Red, err.Error())) + } + // + log.Printf("the video will be saved as \"./"+savePath+"/%s\".", filename+".ts") + + go combineSegment(masterFile, filename) + watchStream(m3u8Source, username, masterFile, filename, baseURL) +} + +// watchStream watches the stream and ends if the channel went offline. +func watchStream(m3u8Source string, username string, masterFile *os.File, filename string, baseURL string) { + // Keep fetching the stream chunks until the playlist cannot be accessed after retried x times. + for { + // Get the chunks. + chunks, wait, err := parseM3U8Source(m3u8Source) + // Exit the fetching loop if the channel went offline. + if err != nil { + if retriesAfterOnlined > 10 { + log.Printf(errSegRetFail, username) + break + } else { + log.Printf(errSegRetFailOnline, retriesAfterOnlined) + retriesAfterOnlined++ + // Wait to fetch the next playlist. + <-time.After(time.Duration(wait*1000) * time.Millisecond) + continue + } + } + if retriesAfterOnlined != 0 { + log.Printf(infoBackOnline, username) + retriesAfterOnlined = 0 + } + for _, v := range chunks { + // Ignore the duplicated chunks. + if isDuplicateSegment(v.URI) { + continue + } + segmentIndex++ + go fetchSegment(masterFile, v, baseURL, filename, segmentIndex) + } + <-time.After(time.Duration(wait*1000) * time.Millisecond) + } +} + +// isDuplicateSegment returns true if the segment is already been fetched. +func isDuplicateSegment(URI string) bool { + for _, v := range temp { + if URI[len(URI)-10:] == v { + return true + } + } + temp = append(temp, URI[len(URI)-10:]) + return false +} + +// combineSegment combines the segments to the master video file in the background. +// fixed segment problems mK33y. +// still needs some attention here +func combineSegment(master *os.File, filename string) { + index := 1 + stripIndex := 1 + var retry int + <-time.After(4 * time.Second) + + for { + <-time.After(300 * time.Millisecond) + + if index >= segmentIndex { + <-time.After(1 * time.Second) + continue + } + + if _, ok := segmentMap[fmt.Sprintf("./%s/%s~%d.ts", savePath, filename, index)]; !ok { + if retry >= 5 { + index++ + retry = 0 + continue + } + if retry != 0 { + log.Printf(warningSegment, index, retry) + } + retry++ + <-time.After(time.Duration(1*retry) * time.Second) + continue + } + if retry != 0 { + retry = 0 + } + // + b := segmentMap[fmt.Sprintf("./%s/%s~%d.ts", savePath, filename, index)] + // + var err error + if stripLimit != 0 && stripQuota <= 0 { + newMasterFilename := "./" + savePath + "/" + filename + "_" + strconv.Itoa(stripIndex) + ".ts" + master, err = os.OpenFile(newMasterFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777) + if err != nil { + log.Println(color.Colorize(color.Red, err.Error())) + } + log.Printf("exceeded the specified stripping limit, creating new video file. (file: %s)", newMasterFilename) + stripQuota = stripLimit + stripIndex++ + } + master.Write(b) + // + log.Printf(infoMergeSegment, index, segmentIndex) + segmentMapLock.Lock() + delete(segmentMap, fmt.Sprintf("./%s/%s~%d.ts", savePath, filename, index)) + segmentMapLock.Unlock() + index++ + } +} + +// fetchSegment fetches the segment and append to the master file. +func fetchSegment(master *os.File, segment *m3u8.MediaSegment, baseURL string, filename string, index int) { + _, body, _ := gorequest.New().TLSClientConfig(&tls.Config{InsecureSkipVerify: true}).Get(fmt.Sprintf("%s%s", baseURL, segment.URI)).EndBytes() + log.Printf("fetching %s (size: %d)\n", segment.URI, len(body)) + if len(body) == 0 { + log.Printf(infoSkipped, segment.URI) + return + } + stripQuota -= len(body) + segmentMapLock.Lock() + segmentMap[fmt.Sprintf("./%s/%s~%d.ts", savePath, filename, index)] = body + segmentMapLock.Unlock() +} + +// endpoint implements the application main function endpoint. +func endpoint(c *cli.Context) error { + if c.String("username") == "" { + log.Fatal(errNoUsername) + } + // Converts `strip` from MiB to Bytes + stripLimit = c.Int("strip") * 1024 * 1024 + stripQuota = c.Int("strip") * 1024 * 1024 + // + preferredFPS = c.String("fps") + preferredResolution = c.String("resolution") + preferredResolutionFallback = c.String("resolution-fallback") + // + + fmt.Println(" .o88b. db db .d8b. d888888b db db d8888b. d8888b. .d8b. d888888b d88888b") + fmt.Println("d8P Y8 88 88 d8' `8b `~~88~~' 88 88 88 `8D 88 `8D d8' `8b `~~88~~' 88'") + fmt.Println("8P 88ooo88 88ooo88 88 88 88 88oobY' 88oooY' 88ooo88 88 88ooooo") + fmt.Println("8b 88~~~88 88~~~88 88 88 88 88`8b 88~~~b. 88~~~88 88 88~~~~~") + fmt.Println("Y8b d8 88 88 88 88 88 88b d88 88 `88. 88 8D 88 88 88 88.") + fmt.Println(" `Y88P' YP YP YP YP YP ~Y8888P' 88 YD Y8888P' YP YP YP Y88888P") + fmt.Println("d8888b. db db d8888b.") + fmt.Println("88 `8D 88 88 88 `8D") + fmt.Println("88 88 Y8 8P 88oobY'") + fmt.Println("88 88 `8b d8' 88`8b") + fmt.Println("88 .8D `8bd8' 88 `88.") + fmt.Println("Y8888D' YP 88 YD") + fmt.Println("---") + + // Mkdir video folder + if _, err := os.Stat("./" + savePath); os.IsNotExist(err) { + os.Mkdir("./"+savePath, 0777) + } + // + if c.Int("strip") != 0 { + log.Printf("specifying stripping limit as %d MiB(s)", c.Int("strip")) + } + + for { + // Capture the stream if the user is currently online. + if getOnlineStatus(c.String("username")) { + log.Printf(infoIsOnline, c.String("username")) + capture(c.String("username")) + segmentIndex = 0 + temp = []string{} + retriesAfterOnlined = 0 + continue + } + // Otherwise we keep checking the channel status until the user is online. + log.Printf(infoNotOnline, c.String("username"), c.Int("interval")) + <-time.After(time.Minute * time.Duration(c.Int("interval"))) + } +} + +func main() { + app := &cli.App{ + Version: "0.94 Alpha", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "username", + Aliases: []string{"u"}, + Value: "", + Usage: "channel username to watching", + }, + &cli.IntFlag{ + Name: "interval", + Aliases: []string{"i"}, + Value: 1, + Usage: "minutes to check if a channel goes online or not", + }, + &cli.IntFlag{ + Name: "strip", + Aliases: []string{"s"}, + Value: 0, + Usage: "MB sizes to split the video into chunks", + }, + &cli.StringFlag{ + Name: "resolution", + Aliases: []string{"r"}, + Value: "1080", + Usage: "Video resolution, could be `240`, `480`, `540`, `720`, `1080`", + }, + &cli.StringFlag{ + Name: "resolution-fallback", + Aliases: []string{"rf"}, + Value: "down", + Usage: "Looking for larger or smaller resolution (`up` for larger, `down` for smaller) if a specified resolution was not found", + }, + &cli.StringFlag{ + Name: "fps", + Aliases: []string{"f"}, + Value: "60", + Usage: "Preferred framerate, only works if streaming source supports it, otherwise it will always be 30 FPS", + }, + }, + Name: "chaturbate-dvr", + Usage: "watching a specified chaturbate channel and auto saves the stream as local file", + Action: endpoint, + } + err := app.Run(os.Args) + if err != nil { + log.Fatal(err) + } +} diff --git a/adapter/manager/config.go b/adapter/manager/config.go new file mode 100644 index 0000000..228ee21 --- /dev/null +++ b/adapter/manager/config.go @@ -0,0 +1,21 @@ +package manager + +import ( + streamDomain "github.com/teacat/chaturbate-dvr/domain/stream" +) + +// Config is a configuration for the manager, loads from config.json. +type Config struct { + Streams []*ConfigStream `json:"streams"` +} + +// ConfigStream is a configuration for a stream. +type ConfigStream struct { + Username string `json:"username"` + Resolution int `json:"resolution"` + ResolutionFallback streamDomain.ResolutionFallback `json:"resolution_fallback"` + Framerate int `json:"framerate"` + SplitByDuration int `json:"split_by_duration"` + SplitByFilesize int `json:"split_by_filesize"` + IsPaused bool `json:"is_paused"` +} diff --git a/adapter/manager/manager.go b/adapter/manager/manager.go new file mode 100644 index 0000000..e469da2 --- /dev/null +++ b/adapter/manager/manager.go @@ -0,0 +1,109 @@ +package manager + +import ( + "encoding/json" + "errors" + "fmt" + "os" + + streamDomain "github.com/teacat/chaturbate-dvr/domain/stream" +) + +var ( + ErrStreamExists = errors.New("stream exists") + ErrStreamNotExists = errors.New("stream not exists") +) + +// Manager manages the streams. +type Manager struct { + config *Config + streams map[string]*stream +} + +// New creates a new Manager. +func New() (*Manager, error) { + if err := os.MkdirAll("./videos", 0777); err != nil { + return nil, fmt.Errorf("create videos directory: %w", err) + } + config := &Config{} + + b, err := os.ReadFile("./config.json") + if os.IsNotExist(err) { + b, err := json.Marshal(config) + if err != nil { + return nil, fmt.Errorf("marshal config: %w", err) + } + if err := os.WriteFile("./config.json", b, 0777); err != nil { + return nil, fmt.Errorf("write config: %w", err) + } + } else if err != nil { + return nil, fmt.Errorf("read config: %w", err) + } else { + if err := json.Unmarshal(b, config); err != nil { + return nil, fmt.Errorf("unmarshal config: %w", err) + } + } + manager := &Manager{ + config: config, + streams: make(map[string]*stream), + } + manager.Ready() + return manager, nil +} + +// Ready prepares the manager. +func (m *Manager) Ready() error { + for _, s := range m.config.Streams { + if err := m.AddStream(s.Username, s.ResolutionFallback, s.Resolution, s.Framerate, s.SplitByFilesize, s.SplitByDuration, s.IsPaused); err != nil { + return fmt.Errorf("add stream: %w", err) + } + } + return nil +} + +func (m *Manager) ListStreams() ([]*streamDomain.StreamDTO, error) { + return nil, nil +} + +// PauseStream pauses the stream. +func (m *Manager) PauseStream(username string) error { + if _, ok := m.streams[username]; !ok { + return ErrStreamNotExists + } + m.streams[username].pause() + return nil +} + +// AddStream adds a stream to watching list and starts watching. +func (m *Manager) AddStream(username string, resFallback streamDomain.ResolutionFallback, resolution, framerate, splitByFilesize, splitByDuration int, isPaused bool) error { + if _, ok := m.streams[username]; ok { + return ErrStreamExists + } + // TODO: Sanitize, Trim username. + s, _, _ := newStream(username) + if !isPaused { + s.start() + } + m.streams[username] = s + return nil +} + +func (m *Manager) StopStream(username string) error { + if _, ok := m.streams[username]; !ok { + return ErrStreamNotExists + } + m.streams[username].stop() + return nil +} + +func (m *Manager) ResumeStream(username string) error { + if _, ok := m.streams[username]; !ok { + return ErrStreamNotExists + } + m.streams[username].resume() + return nil +} + +func (m *Manager) SubscribeStreams(chUpd chan<- *streamDomain.StreamUpdateDTO, chOut chan<- *streamDomain.StreamOutputDTO) error { + return nil +} diff --git a/adapter/manager/stream.go b/adapter/manager/stream.go new file mode 100644 index 0000000..d25b93f --- /dev/null +++ b/adapter/manager/stream.go @@ -0,0 +1,147 @@ +package manager + +import ( + "crypto/tls" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + streamDomain "github.com/teacat/chaturbate-dvr/domain/stream" +) + +type stream struct { + username string + channelURL string + isPaused bool + isOnline bool + resolution int + resolutionFallback streamDomain.ResolutionFallback + framerate int + splitByDuration int + splitByFilesize int + session *streamSession + chUpdate chan<- *streamDomain.StreamUpdateDTO + chOutput chan<- *streamDomain.StreamOutputDTO +} + +type streamSession struct { + buffer map[int][]byte + bufferIndex int + file *os.File + retries int + resolution int + framerate int + durationTotal int + durationQuota int + filesizeTotal int + filesizeQuota int +} + +func newStream(username string) (*stream, chan<- *streamDomain.StreamUpdateDTO, chan<- *streamDomain.StreamOutputDTO) { + chUpd := make(chan *streamDomain.StreamUpdateDTO) + chOut := make(chan *streamDomain.StreamOutputDTO) + + return &stream{ + username: username, + channelURL: "https://chaturbate.com/" + username, + // TODO: resolution, framerate split, duration split, filesize split + isPaused: true, + chUpdate: chUpd, + chOutput: chOut, + }, chUpd, chOut +} + +func (s *stream) start() { + for { + body, err := s.retrieveChannel() + if err != nil { + s.log("Error occurred while retrieving channel webpage: %s", err) + } + if s.isOnline { + s.log("%s is now online.", s.username) + if err := s.startRecording(body); err != nil { // blocking + s.log("Error occurred while start recording: %s", err) + } + continue + } + s.log("%s went offline.", s.username) + <-time.After(time.Minute * time.Duration(1)) // 1 minute cooldown + } +} + +func (s *stream) startRecording(body string) error { + folder := fmt.Sprintf("./videos/%s", s.username) + if err := os.MkdirAll(folder, 0777); err != nil { + return fmt.Errorf("create folder: %w", err) + } + + basename := fmt.Sprintf("./videos/%s/%s_%s", s.username, time.Now().Format("2006-01-02_15-04-05")) + file, err := os.OpenFile(basename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + + s.log("The video will be saved as %s.ts", basename) + s.session = &streamSession{ + buffer: make(map[int][]byte), + bufferIndex: 0, + retries: 0, + file: file, + } + + // + streamURI, err := parseHLS(s.resolution, s.framerate, s.resolutionFallback, body) + if err != nil { + return fmt.Errorf("parse hls: %w", err) + } + + s.concatStreams() + + s.retrieveStream(streamURI) +} + +func (s *stream) retrieveStream(uri string) { + +} + +func (s *stream) pause() { +} + +func (s *stream) stop() { +} + +func (s *stream) resume() { +} + +func (s *stream) retrieveChannel() (string, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + resp, err := client.Get(s.channelURL) + if err != nil { + return "", fmt.Errorf("client get: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read body: %w", err) + } + + s.isOnline = strings.Contains(string(body), "playlist.m3u8") + + return string(body), nil +} + +func (s *stream) log(message string, v ...interface{}) { + s.chOutput <- &streamDomain.StreamOutputDTO{ + Username: s.username, + Output: "[" + time.Now().Format("2006-01-02 15:04:05") + "] " + fmt.Sprintf(message, v...), + } + +} diff --git a/adapter/manager/util.go b/adapter/manager/util.go new file mode 100644 index 0000000..a5946d0 --- /dev/null +++ b/adapter/manager/util.go @@ -0,0 +1,116 @@ +package manager + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "regexp" + "strconv" + "strings" + + "github.com/grafov/m3u8" + "github.com/samber/lo" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +var ( + regexpRoomDossier = regexp.MustCompile(`window\.initialRoomDossier = "(.*?)"`) +) + +type roomDossier struct { + HLSSource string `json:"hls_source"` +} + +type source struct { + framerate map[int]string // key: framerate, value: url + size int +} + +func parseHLS(resolution, framerate int, resFallback stream.ResolutionFallback, body string) (string, error) { + // Find the room dossier. + matches := regexpRoomDossier.FindAllStringSubmatch(body, -1) + + // Get the HLS source from the room dossier. + var roomData roomDossier + data, err := strconv.Unquote(strings.Replace(strconv.Quote(string(matches[0][1])), `\\u`, `\u`, -1)) + if err != nil { + return "", fmt.Errorf("unquote unicode: %w", err) + } + if err := json.Unmarshal([]byte(data), &roomData); err != nil { + return "", fmt.Errorf("unmarshal json: %w", err) + } + + // Get the HLS source. + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{Transport: tr} + + resp, err := client.Get(roomData.HLSSource) + if err != nil { + return "", fmt.Errorf("client get: %w", err) + } + defer resp.Body.Close() + + m3u8Body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("read body: %w", err) + } + if resp.StatusCode == http.StatusForbidden { + return "", fmt.Errorf("received status code %d", resp.StatusCode) + } + + // Decode the m3u8 file. + p, _, err := m3u8.DecodeFrom(bytes.NewReader(m3u8Body), true) + if err != nil { + return "", fmt.Errorf("decode m3u8: %w", err) + } + playlist, ok := p.(*m3u8.MasterPlaylist) + if !ok { + return "", fmt.Errorf("cast to master playlist") + } + + var sources []*source + + // + for _, v := range playlist.Variants { + } + + variant, ok := lo.Find(sources, func(v *source) bool { + return v.size == resolution + }) + // If the variant is not found, we fallback to the nearest resolution. + if !ok { + switch resFallback { + case stream.ResolutionFallbackDownscale: + variant = lo.MinBy(sources, func(v, min *source) bool { + // return v.size < resolution && v.size < min.size + return math.Abs(float64(v.size-resolution)) < math.Abs(float64(min.size-resolution)) + }) + case stream.ResolutionFallbackUpscale: + variant = lo.MaxBy(sources, func(v, max *source) bool { + return math.Abs(float64(v.size-resolution)) > math.Abs(float64(max.size-resolution)) + }) + } + } + if variant == nil { + return "", fmt.Errorf("variant not found") + } + + uri, ok := variant.framerate[framerate] + // If the framerate is not found, we fallback to the nearest framerate. + if !ok { + for _, v := range variant.framerate { + uri = v + // TODO: log fps + break + } + } + + baseURL := strings.TrimSuffix(roomData.HLSSource, "playlist.m3u8") + return baseURL + uri, nil +} diff --git a/domain/stream/repository.go b/domain/stream/repository.go new file mode 100644 index 0000000..26b9a1a --- /dev/null +++ b/domain/stream/repository.go @@ -0,0 +1,10 @@ +package stream + +type Manager interface { + ListStreams() ([]*StreamDTO, error) + AddStream(username string, resFallback ResolutionFallback, resolution, framerate, splitByFilesize, splitByDuration int, isPaused bool) error + PauseStream(username string) error + StopStream(username string) error + ResumeStream(username string) error + SubscribeStreams(chUpd chan<- *StreamUpdateDTO, chOut chan<- *StreamOutputDTO) error +} diff --git a/domain/stream/stream.go b/domain/stream/stream.go new file mode 100644 index 0000000..3efe85c --- /dev/null +++ b/domain/stream/stream.go @@ -0,0 +1,162 @@ +package stream + +import "fmt" + +//======================================================= +// Enum +//======================================================= + +type ResolutionFallback string + +const ( + ResolutionFallbackUnknown ResolutionFallback = "" + ResolutionFallbackUpscale ResolutionFallback = "upscale" + ResolutionFallbackDownscale ResolutionFallback = "downscale" +) + +//======================================================= +// Entity +//======================================================= + +type Stream struct { + channelURL string + channelUsername string + splitFilesize int + splitDuration int + resolution int + resolutionFallback ResolutionFallback + framerate int +} + +type StreamDTO struct { + Username string + LastStreamedAt int64 + SegmentDuration int + SegmentDurationSplit int + SegmentFilesize int + SegmentFilesizeSplit int + IsOnline bool + IsPaused bool +} + +type StreamUpdateDTO struct { + Username string + IsOnline bool + IsPaused bool + LastStreamedAt string + SegmentDuration string + SegmentFilesize string +} + +type StreamOutputDTO struct { + Username string + Output string +} + +//======================================================= +// Domain +//======================================================= + +// Start starts the stream and recording. +func (s *Stream) Start() error { + return nil +} + +// Pause pauses the stream and keep the stream in the list. +func (s *Stream) Pause() error { + return nil +} + +// Stop stops the stream and removes the stream from the list. +func (s *Stream) Stop() error { + return nil +} + +// Resume resumes the paused stream. +func (s *Stream) Resume() error { + return nil +} + +//======================================================= +// Factory +//======================================================= + +type StreamFactory struct { + sanitizer *streamSanitizer +} + +func NewStreamFactory() *StreamFactory { + return &StreamFactory{ + sanitizer: newStreamSanitizer(), + } +} + +func (f *StreamFactory) New(username string, resFallback ResolutionFallback, resolution, framerate, splitFilesize, splitDuration int) (*Stream, error) { + username, err := f.sanitizer.sanitizeUsername(username) + if err != nil { + return nil, fmt.Errorf("sanitize username: %w", err) + } + resolution, err = f.sanitizer.sanitizeResolution(resolution) + if err != nil { + return nil, fmt.Errorf("sanitize resolution: %w", err) + } + resFallback, err = f.sanitizer.sanitizeResolutionFallback(resFallback) + if err != nil { + return nil, fmt.Errorf("sanitize resolution fallback: %w", err) + } + framerate, err = f.sanitizer.sanitizeFramerate(framerate) + if err != nil { + return nil, fmt.Errorf("sanitize framerate: %w", err) + } + splitFilesize, err = f.sanitizer.sanitizeSplitByFilesize(splitFilesize) + if err != nil { + return nil, fmt.Errorf("sanitize split by filesize: %w", err) + } + splitDuration, err = f.sanitizer.sanitizeSplitByDuration(splitDuration) + if err != nil { + return nil, fmt.Errorf("sanitize split by duration: %w", err) + } + return &Stream{ + channelUsername: username, + resolution: resolution, + resolutionFallback: resFallback, + framerate: framerate, + splitFilesize: splitFilesize, + splitDuration: splitDuration, + }, nil +} + +//======================================================= +// Sanitizer +//======================================================= + +type streamSanitizer struct { +} + +func newStreamSanitizer() *streamSanitizer { + return &streamSanitizer{} +} + +func (s *streamSanitizer) sanitizeUsername(v string) (string, error) { + return v, nil +} + +func (s *streamSanitizer) sanitizeResolution(v int) (int, error) { + return v, nil +} + +func (s *streamSanitizer) sanitizeResolutionFallback(v ResolutionFallback) (ResolutionFallback, error) { + return v, nil +} + +func (s *streamSanitizer) sanitizeFramerate(v int) (int, error) { + return v, nil +} + +func (s *streamSanitizer) sanitizeSplitByFilesize(v int) (int, error) { + return v, nil +} + +func (s *streamSanitizer) sanitizeSplitByDuration(v int) (int, error) { + return v, nil +} diff --git a/gateway/gateway.go b/gateway/gateway.go new file mode 100644 index 0000000..48a42ff --- /dev/null +++ b/gateway/gateway.go @@ -0,0 +1,29 @@ +package gateway + +import ( + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" + "github.com/teacat/chaturbate-dvr/gateway/handler" +) + +type Gateway struct { + routes map[string]gin.HandlerFunc +} + +func New(manager stream.Manager) *Gateway { + g := &Gateway{ + routes: make(map[string]gin.HandlerFunc), + } + g.routes = map[string]gin.HandlerFunc{ + "/api/list_streams": handle(handler.NewListStreamsHandler(manager)), + "/api/start_stream": handle(handler.NewStartStreamHandler(manager)), + "/api/stop_stream": handle(handler.NewStopStreamHandler(manager)), + "/api/pause_stream": handle(handler.NewPauseStreamHandler(manager)), + "/api/resume_stream": handle(handler.NewResumeStreamHandler(manager)), + } + return g +} + +func (g *Gateway) Routes() map[string]gin.HandlerFunc { + return g.routes +} diff --git a/gateway/handler/fetch_updates.go b/gateway/handler/fetch_updates.go new file mode 100644 index 0000000..28f4938 --- /dev/null +++ b/gateway/handler/fetch_updates.go @@ -0,0 +1,90 @@ +package handler + +import ( + "encoding/json" + "fmt" + + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type FetchUpdatesRequest struct { +} + +type FetchUpdatesResponse struct { + Updates []*FetchUpdatesResponseUpdate `json:"updates"` + Outputs []*FetchUpdatesResponseOutput `json:"outputs"` +} + +type FetchUpdatesResponseUpdate struct { + Username string `json:"username"` + IsOnline bool `json:"is_online"` + IsPaused bool `json:"is_paused"` + LastStreamedAt string `json:"last_streamed_at"` + SegmentDuration string `json:"segment_duration"` + SegmentFilesize string `json:"segment_filesize"` +} + +type FetchUpdatesResponseOutput struct { + Username string `json:"username"` + Output string `json:"output"` +} + +//======================================================= +// Factory +//======================================================= + +type FetchUpdatesHandler struct { + manager stream.Manager +} + +func NewFetchUpdatesHandler() *FetchUpdatesHandler { + return &FetchUpdatesHandler{} +} + +//======================================================= +// Handle +//======================================================= + +func (h *FetchUpdatesHandler) Handle(ctx *gin.Context, req *FetchUpdatesRequest) (*FetchUpdatesResponse, error) { + chUpd := make(chan *stream.StreamUpdateDTO) + chOut := make(chan *stream.StreamOutputDTO) + + if err := h.manager.SubscribeStreams(chUpd, chOut); err != nil { + return nil, fmt.Errorf("subscribe pool: %w", err) + } + + for { + select { + case upd := <-chUpd: + b, err := json.Marshal(&FetchUpdatesResponseUpdate{ + Username: upd.Username, + IsOnline: upd.IsOnline, + IsPaused: upd.IsPaused, + LastStreamedAt: upd.LastStreamedAt, + SegmentDuration: upd.SegmentDuration, + SegmentFilesize: upd.SegmentFilesize, + }) + if err != nil { + return nil, fmt.Errorf("marshal update: %w", err) + } + ctx.SSEvent("update", b) + + case out := <-chOut: + b, err := json.Marshal(&FetchUpdatesResponseOutput{ + Username: out.Username, + Output: out.Output, + }) + if err != nil { + return nil, fmt.Errorf("marshal output: %w", err) + } + ctx.SSEvent("output", b) + } + } + + return nil, nil +} diff --git a/gateway/handler/list_streams.go b/gateway/handler/list_streams.go new file mode 100644 index 0000000..ff9ea3d --- /dev/null +++ b/gateway/handler/list_streams.go @@ -0,0 +1,76 @@ +package handler + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type ListStreamsRequest struct { +} + +type ListStreamsResponse struct { + Streams []*ListStreamsResponseStream `json:"streams"` +} + +type ListStreamsResponseStream struct { + Username string `json:"username"` + ChannelURL string `json:"channel_url"` + SavedTo string `json:"saved_to"` + LastStreamedAt string `json:"last_streamed_at"` + SegmentDuration string `json:"segment_duration"` + SegmentDurationSplit string `json:"segment_duration_split"` + SegmentFilesize string `json:"segment_filesize"` + SegmentFilesizeSplit string `json:"segment_filesize_split"` + IsOnline bool `json:"is_online"` + IsPaused bool `json:"is_paused"` +} + +//======================================================= +// Factory +//======================================================= + +type ListStreamsHandler struct { + manager stream.Manager +} + +func NewListStreamsHandler(manager stream.Manager) *ListStreamsHandler { + return &ListStreamsHandler{ + manager: manager, + } +} + +//======================================================= +// Handle +//======================================================= + +func (h *ListStreamsHandler) Handle(ctx *gin.Context, req *ListStreamsRequest) (*ListStreamsResponse, error) { + streams, err := h.manager.ListStreams() + if err != nil { + return nil, fmt.Errorf("list streams: %w", err) + } + resp := &ListStreamsResponse{ + Streams: make([]*ListStreamsResponseStream, len(streams)), + } + for i, s := range streams { + resp.Streams[i] = &ListStreamsResponseStream{ + Username: s.Username, + ChannelURL: fmt.Sprintf("https://chaturbate.com/%s/", s.Username), + SavedTo: fmt.Sprintf("./videos/%s/", s.Username), + LastStreamedAt: time.Unix(s.LastStreamedAt, 0).Format("2006-01-02 15:04:05"), + SegmentDuration: formatPlaytime(s.SegmentDuration), + SegmentDurationSplit: formatPlaytime(s.SegmentDurationSplit), + SegmentFilesize: fmt.Sprintf("%d MB", s.SegmentFilesize), + SegmentFilesizeSplit: fmt.Sprintf("%d MB", s.SegmentFilesizeSplit), + IsOnline: s.IsOnline, + IsPaused: s.IsPaused, + } + } + return resp, nil +} diff --git a/gateway/handler/pause_stream.go b/gateway/handler/pause_stream.go new file mode 100644 index 0000000..3845137 --- /dev/null +++ b/gateway/handler/pause_stream.go @@ -0,0 +1,44 @@ +package handler + +import ( + "fmt" + + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type PauseStreamRequest struct { + Username string `json:"username"` +} + +type PauseStreamResponse struct { +} + +//======================================================= +// Factory +//======================================================= + +type PauseStreamHandler struct { + manager stream.Manager +} + +func NewPauseStreamHandler(manager stream.Manager) *PauseStreamHandler { + return &PauseStreamHandler{ + manager: manager, + } +} + +//======================================================= +// Handle +//======================================================= + +func (h *PauseStreamHandler) Handle(ctx *gin.Context, req *PauseStreamRequest) (*PauseStreamResponse, error) { + if err := h.manager.PauseStream(req.Username); err != nil { + return nil, fmt.Errorf("pause stream: %w", err) + } + return &PauseStreamResponse{}, nil +} diff --git a/gateway/handler/resume_stream.go b/gateway/handler/resume_stream.go new file mode 100644 index 0000000..a5d212d --- /dev/null +++ b/gateway/handler/resume_stream.go @@ -0,0 +1,44 @@ +package handler + +import ( + "fmt" + + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type ResumeStreamRequest struct { + Username string +} + +type ResumeStreamResponse struct { +} + +//======================================================= +// Factory +//======================================================= + +type ResumeStreamHandler struct { + manager stream.Manager +} + +func NewResumeStreamHandler(manager stream.Manager) *ResumeStreamHandler { + return &ResumeStreamHandler{ + manager: manager, + } +} + +//======================================================= +// Handle +//======================================================= + +func (h *ResumeStreamHandler) Handle(ctx *gin.Context, req *ResumeStreamRequest) (*ResumeStreamResponse, error) { + if err := h.manager.ResumeStream(req.Username); err != nil { + return nil, fmt.Errorf("resume stream: %w", err) + } + return &ResumeStreamResponse{}, nil +} diff --git a/gateway/handler/start_stream.go b/gateway/handler/start_stream.go new file mode 100644 index 0000000..08c76a7 --- /dev/null +++ b/gateway/handler/start_stream.go @@ -0,0 +1,57 @@ +package handler + +import ( + "fmt" + + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type StartStreamRequest struct { + Username string + Resolution int + ResolutionFallback stream.ResolutionFallback + Framerate int + SplitByFilesize int + SplitByDuration int +} + +type StartStreamResponse struct { +} + +//======================================================= +// Factory +//======================================================= + +type StartStreamHandler struct { + manager stream.Manager +} + +func NewStartStreamHandler(manager stream.Manager) *StartStreamHandler { + return &StartStreamHandler{ + manager: manager, + } +} + +//======================================================= +// Handle +//======================================================= + +func (h *StartStreamHandler) Handle(ctx *gin.Context, req *StartStreamRequest) (*StartStreamResponse, error) { + if err := h.manager.AddStream( + req.Username, + req.ResolutionFallback, + req.Resolution, + req.Framerate, + req.SplitByFilesize, + req.SplitByDuration, + false, + ); err != nil { + return nil, fmt.Errorf("add stream: %w", err) + } + return &StartStreamResponse{}, nil +} diff --git a/gateway/handler/stop_stream.go b/gateway/handler/stop_stream.go new file mode 100644 index 0000000..be81684 --- /dev/null +++ b/gateway/handler/stop_stream.go @@ -0,0 +1,42 @@ +package handler + +import ( + "github.com/gin-gonic/gin" + "github.com/teacat/chaturbate-dvr/domain/stream" +) + +//======================================================= +// Request & Response +//======================================================= + +type StopStreamRequest struct { + Username string +} + +type StopStreamResponse struct { +} + +//======================================================= +// Factory +//======================================================= + +type StopStreamHandler struct { + manager stream.Manager +} + +func NewStopStreamHandler(manager stream.Manager) *StopStreamHandler { + return &StopStreamHandler{ + manager: manager, + } +} + +//======================================================= +// Handle +//======================================================= + +func (h *StopStreamHandler) Handle(ctx *gin.Context, req *StopStreamRequest) (*StopStreamResponse, error) { + if err := h.manager.StopStream(req.Username); err != nil { + return nil, err + } + return &StopStreamResponse{}, nil +} diff --git a/gateway/handler/template/index.html b/gateway/handler/template/index.html new file mode 100644 index 0000000..e8d02df --- /dev/null +++ b/gateway/handler/template/index.html @@ -0,0 +1,235 @@ + + +
+ + + + + + + +./videos/username/../videos/cherylloving_/
+