Upgrading tests in Downloader

This commit is contained in:
Rafael Vargas 2022-07-10 01:23:59 -03:00
parent 4fb9d8d1ba
commit cd5f4567be
5 changed files with 110 additions and 16 deletions

View File

@ -27,7 +27,7 @@ class Downloader():
'default_search': 'auto',
'playliststart': 0,
'extract_flat': False,
'playlistend': config.MAX_PLAYLIST_FORCED_LENGTH,
'playlistend': config.MAX_PLAYLIST_LENGTH,
'quiet': True
}
__BASE_URL = 'https://www.youtube.com/watch?v={}'
@ -53,7 +53,7 @@ class Downloader():
async def preload(self, songs: List[Song]) -> None:
for song in songs:
asyncio.ensure_future(self.__download_song(song))
asyncio.ensure_future(self.download_song(song))
@run_async
def extract_info(self, url: str) -> List[dict]:
@ -108,7 +108,7 @@ class Downloader():
print(f'DEVELOPER NOTE -> Error Downloading URL {e}')
return None
async def __download_song(self, song: Song) -> None:
async def download_song(self, song: Song) -> None:
if song.source is not None: # If Music already preloaded
return None

View File

@ -3,6 +3,7 @@ from Music.Downloader import Downloader
from Music.Types import Provider
from Music.Spotify import SpotifySearch
from Utils.Utils import Utils
from Utils.UrlAnalyzer import URLAnalyzer
from Config.Messages import SearchMessages
@ -19,6 +20,7 @@ class Searcher():
elif provider == Provider.YouTube:
try:
track = self.__cleanYoutubeInput(track)
musics = await self.__down.extract_info(track)
return musics
except:
@ -34,6 +36,16 @@ class Searcher():
elif provider == Provider.Name:
return [track]
def __cleanYoutubeInput(self, track: str) -> str:
trackAnalyzer = URLAnalyzer(track)
# Just ID and List arguments probably
if trackAnalyzer.queryParamsQuant <= 2:
return track
# Arguments used in Mix Youtube Playlists
if 'start_radio' or 'index' in trackAnalyzer.queryParams.keys():
return trackAnalyzer.getCleanedUrl()
def __identify_source(self, track) -> Provider:
if not Utils.is_url(track):
return Provider.Name

View File

@ -2,6 +2,7 @@ import asyncio
from typing import List
import unittest
from Music.Downloader import Downloader
from Music.Searcher import Searcher
from Music.Playlist import Playlist
from Music.Song import Song
from Tests.TestsHelper import TestsConstants
@ -21,6 +22,7 @@ def myAsyncTest(coro):
class TestDownloader(unittest.IsolatedAsyncioTestCase):
def __init__(self, methodName: str = ...) -> None:
self.downloader = Downloader()
self.searcher = Searcher()
self.constants = TestsConstants()
super().__init__(methodName)
@ -32,27 +34,69 @@ class TestDownloader(unittest.IsolatedAsyncioTestCase):
@myAsyncTest
async def test_YoutubeMusicUrl(self) -> None:
musicInfo = await self.downloader.extract_info(self.constants.YOUTUBE_MUSIC_URL)
musics = await self.searcher.search(self.constants.YT_MUSIC_URL)
self.assertTrue(self.__infoExtractedSuccessfully(musicInfo))
self.assertTrue(len(musics) > 0)
def test_musicTitle(self):
@myAsyncTest
async def test_YoutubePersonalPlaylist(self) -> None:
musics = await self.searcher.search(self.constants.YT_PERSONAL_PLAYLIST_URL)
self.assertTrue(len(musics) > 0)
@myAsyncTest
async def test_YoutubeChannelPlaylist(self) -> None:
# Search the link to determine names
musicsInfo = await self.searcher.search(self.constants.YT_CHANNEL_PLAYLIST_URL)
self.assertTrue(len(musicsInfo) > 0)
# Create and store songs in list
playlist = Playlist()
songsList = []
for info in musicsInfo:
song = Song(identifier=info, playlist=playlist, requester='')
playlist.add_song(song)
songsList.append(song)
# We need to trigger and wait multiple tasks, so we create multiple tasks with asyncio
# and then we await for each one of them. We use this because download_song is a Coroutine
tasks: List[asyncio.Task] = []
for song in songsList:
task = asyncio.create_task(self.downloader.download_song(song))
tasks.append(task)
# Await for each task to finish
for task in tasks:
await task
self.assertTrue(self.__verifySuccessfullyPreload(songsList))
@myAsyncTest
async def test_YoutubeMixPlaylist(self) -> None:
music = await self.searcher.search(self.constants.YT_MIX_URL)
# Musics from Mix should download only the first music
self.assertTrue(len(music) == 1)
@myAsyncTest
async def test_musicTitle(self):
playlist = Playlist()
song = Song(self.constants.MUSIC_TITLE_STRING, playlist, '')
playlist.add_song(song)
self.downloader.finish_one_song(song)
task = asyncio.create_task(self.downloader.download_song(song))
await task
self.assertFalse(song.problematic)
def __downloadSucceeded(self, downloadReturn: List[dict]) -> bool:
# print(downloadReturn)
return True
def __verifySuccessfullyPreload(self, songs: List[Song]) -> bool:
for song in songs:
if song.title == None:
print('Song failed to download')
return False
def __infoExtractedSuccessfully(self, info: List[dict]) -> bool:
if len(info) > 0:
return True
else:
return False
return True
if __name__ == 'main':

View File

@ -7,4 +7,8 @@ class TestsConstants(Singleton):
self.EMPTY_STRING_ERROR_MSG = 'Downloader with Empty String should be empty list.'
self.SPOTIFY_TRACK_URL = 'https://open.spotify.com/track/7wpnz7hje4FbnjZuWQtJHP'
self.MUSIC_TITLE_STRING = 'Experience || AMV || Anime Mix'
self.YOUTUBE_MUSIC_URL = 'https://www.youtube.com/watch?v=MvJoiv842mk'
self.YT_MUSIC_URL = 'https://www.youtube.com/watch?v=MvJoiv842mk'
self.YT_MIX_URL = 'https://www.youtube.com/watch?v=ePjtnSPFWK8&list=RDMMePjtnSPFWK8&start_radio=1'
self.YT_PERSONAL_PLAYLIST_URL = 'https://www.youtube.com/playlist?list=PLbbKJHHZR9ShYuKAr71cLJCFbYE-83vhS'
# Links from playlists in channels some times must be extracted with force by Downloader
self.YT_CHANNEL_PLAYLIST_URL = 'https://www.youtube.com/watch?v=MvJoiv842mk&list=PLAI1099Tvk0zWU8X4dwc4vv4MpePQ4DLl'

34
Utils/UrlAnalyzer.py Normal file
View File

@ -0,0 +1,34 @@
from typing import Dict
class URLAnalyzer:
def __init__(self, url: str) -> None:
self.__url = url
self.__queryParamsQuant = self.__url.count('&') + self.__url.count('?')
self.__queryParams: Dict[str, str] = self.__getAllQueryParams()
@property
def queryParams(self) -> dict:
return self.__queryParams
@property
def queryParamsQuant(self) -> int:
return self.__queryParamsQuant
def getCleanedUrl(self) -> str:
firstE = self.__url.index('&')
return self.__url[:firstE]
def __getAllQueryParams(self) -> dict:
if self.__queryParamsQuant <= 1:
return {}
params = {}
arguments = self.__url.split('&')
arguments.pop(0)
for queryParam in arguments:
queryName, queryValue = queryParam.split('=')
params[queryName] = queryValue
return params