mirror of
https://github.com/RafaelSolVargas/Vulkan.git
synced 2025-10-29 16:57:23 +00:00
Merge pull request #18 from RafaelSolVargas/addingTests
Adding tests and upgrading Spotify stability
This commit is contained in:
commit
5099a551a4
@ -76,5 +76,13 @@ class SearchMessages(Singleton):
|
||||
self.UNKNOWN_INPUT = f'This type of input was too strange, try something else or type {config.BOT_PREFIX}help play'
|
||||
self.UNKNOWN_INPUT_TITLE = 'Nothing Found'
|
||||
self.SPOTIFY_ERROR = 'Spotify could not process any songs with this input, verify your link or try again later.'
|
||||
self.GENERIC_TITLE = 'Input could not be processed'
|
||||
self.GENERIC_TITLE = 'URL could not be processed'
|
||||
self.YOUTUBE_ERROR = 'Youtube could not process any songs with this input, verify your link or try again later.'
|
||||
self.INVALID_SPOTIFY_URL = 'Invalid Spotify URL, verify your link.'
|
||||
|
||||
|
||||
class SpotifyMessages(Singleton):
|
||||
def __init__(self) -> None:
|
||||
if not super().created:
|
||||
self.INVALID_SPOTIFY_URL = 'Invalid Spotify URL, verify your link.'
|
||||
self.GENERIC_TITLE = 'URL could not be processed'
|
||||
|
||||
@ -1,13 +1,13 @@
|
||||
from typing import Union
|
||||
from discord.ext.commands import Context
|
||||
from Exceptions.Exceptions import Error
|
||||
from Exceptions.Exceptions import VulkanError
|
||||
from discord import Embed
|
||||
|
||||
|
||||
class ControllerResponse:
|
||||
def __init__(self, ctx: Context, embed: Embed = None, error: Error = None) -> None:
|
||||
def __init__(self, ctx: Context, embed: Embed = None, error: VulkanError = None) -> None:
|
||||
self.__ctx: Context = ctx
|
||||
self.__error: Error = error
|
||||
self.__error: VulkanError = error
|
||||
self.__embed: Embed = embed
|
||||
self.__success = False if error else True
|
||||
|
||||
@ -19,7 +19,7 @@ class ControllerResponse:
|
||||
def embed(self) -> Union[Embed, None]:
|
||||
return self.__embed
|
||||
|
||||
def error(self) -> Union[Error, None]:
|
||||
def error(self) -> Union[VulkanError, None]:
|
||||
return self.__error
|
||||
|
||||
@property
|
||||
|
||||
@ -3,7 +3,7 @@ from discord.ext.commands import Context
|
||||
from discord import Client
|
||||
from Controllers.AbstractController import AbstractController
|
||||
from Controllers.ControllerResponse import ControllerResponse
|
||||
from Exceptions.Exceptions import BadCommandUsage, Error, InvalidInput, NumberRequired, UnknownError, WrongLength
|
||||
from Exceptions.Exceptions import BadCommandUsage, VulkanError, InvalidInput, NumberRequired, UnknownError
|
||||
from Music.Downloader import Downloader
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ class MoveController(AbstractController):
|
||||
error = UnknownError()
|
||||
return ControllerResponse(self.ctx, embed, error)
|
||||
|
||||
def __validate_input(self, pos1: str, pos2: str) -> Union[Error, None]:
|
||||
def __validate_input(self, pos1: str, pos2: str) -> Union[VulkanError, None]:
|
||||
try:
|
||||
pos1 = int(pos1)
|
||||
pos2 = int(pos2)
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
from Exceptions.Exceptions import DownloadingError, Error
|
||||
from Exceptions.Exceptions import DownloadingError, VulkanError
|
||||
from discord.ext.commands import Context
|
||||
from discord import Client
|
||||
from Controllers.AbstractController import AbstractController
|
||||
@ -64,7 +64,7 @@ class PlayController(AbstractController):
|
||||
return response
|
||||
|
||||
except Exception as err:
|
||||
if isinstance(err, Error):
|
||||
if isinstance(err, VulkanError): # If error was already processed
|
||||
print(f'DEVELOPER NOTE -> PlayController Error: {err.message}')
|
||||
error = err
|
||||
embed = self.embeds.CUSTOM_ERROR(error)
|
||||
|
||||
@ -3,7 +3,7 @@ from discord.ext.commands import Context
|
||||
from discord import Client
|
||||
from Controllers.AbstractController import AbstractController
|
||||
from Controllers.ControllerResponse import ControllerResponse
|
||||
from Exceptions.Exceptions import BadCommandUsage, Error, ErrorRemoving, InvalidInput, NumberRequired, UnknownError
|
||||
from Exceptions.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, InvalidInput, NumberRequired
|
||||
|
||||
|
||||
class RemoveController(AbstractController):
|
||||
@ -38,7 +38,7 @@ class RemoveController(AbstractController):
|
||||
embed = self.embeds.ERROR_REMOVING()
|
||||
return ControllerResponse(self.ctx, embed, error)
|
||||
|
||||
def __validate_input(self, position: str) -> Union[Error, None]:
|
||||
def __validate_input(self, position: str) -> Union[VulkanError, None]:
|
||||
try:
|
||||
position = int(position)
|
||||
except:
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
from Config.Configs import Configs
|
||||
from Config.Messages import Messages
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
class VulkanError(Exception):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
self.__message = message
|
||||
self.__title = title
|
||||
@ -17,7 +16,7 @@ class Error(Exception):
|
||||
return self.__title
|
||||
|
||||
|
||||
class ImpossibleMove(Error):
|
||||
class ImpossibleMove(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
message = Messages()
|
||||
if title == '':
|
||||
@ -25,56 +24,56 @@ class ImpossibleMove(Error):
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class MusicUnavailable(Error):
|
||||
class MusicUnavailable(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class YoutubeError(Error):
|
||||
class YoutubeError(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class BadCommandUsage(Error):
|
||||
class BadCommandUsage(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class DownloadingError(Error):
|
||||
class DownloadingError(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class SpotifyError(Error):
|
||||
class SpotifyError(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class UnknownError(Error):
|
||||
class UnknownError(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class InvalidInput(Error):
|
||||
class InvalidInput(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class WrongLength(Error):
|
||||
class WrongLength(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class ErrorMoving(Error):
|
||||
class ErrorMoving(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class ErrorRemoving(Error):
|
||||
class ErrorRemoving(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
|
||||
class NumberRequired(Error):
|
||||
class NumberRequired(VulkanError):
|
||||
def __init__(self, message='', title='', *args: object) -> None:
|
||||
super().__init__(message, title, *args)
|
||||
|
||||
@ -27,7 +27,7 @@ class Downloader():
|
||||
'default_search': 'auto',
|
||||
'playliststart': 0,
|
||||
'extract_flat': False,
|
||||
'playlistend': config.MAX_PLAYLIST_FORCED_LENGTH,
|
||||
'playlistend': config.MAX_PLAYLIST_LENGTH,
|
||||
'quiet': True
|
||||
}
|
||||
__BASE_URL = 'https://www.youtube.com/watch?v={}'
|
||||
@ -53,10 +53,13 @@ class Downloader():
|
||||
|
||||
async def preload(self, songs: List[Song]) -> None:
|
||||
for song in songs:
|
||||
asyncio.ensure_future(self.__download_song(song))
|
||||
asyncio.ensure_future(self.download_song(song))
|
||||
|
||||
@run_async
|
||||
def extract_info(self, url: str) -> List[dict]:
|
||||
if url == '':
|
||||
return []
|
||||
|
||||
if Utils.is_url(url): # If Url
|
||||
options = Downloader.__YDL_OPTIONS_EXTRACT
|
||||
with YoutubeDL(options) as ydl:
|
||||
@ -105,7 +108,7 @@ class Downloader():
|
||||
print(f'DEVELOPER NOTE -> Error Downloading URL {e}')
|
||||
return None
|
||||
|
||||
async def __download_song(self, song: Song) -> None:
|
||||
async def download_song(self, song: Song) -> None:
|
||||
if song.source is not None: # If Music already preloaded
|
||||
return None
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ from Music.Downloader import Downloader
|
||||
from Music.Types import Provider
|
||||
from Music.Spotify import SpotifySearch
|
||||
from Utils.Utils import Utils
|
||||
from Utils.UrlAnalyzer import URLAnalyzer
|
||||
from Config.Messages import SearchMessages
|
||||
|
||||
|
||||
@ -19,6 +20,7 @@ class Searcher():
|
||||
|
||||
elif provider == Provider.YouTube:
|
||||
try:
|
||||
track = self.__cleanYoutubeInput(track)
|
||||
musics = await self.__down.extract_info(track)
|
||||
return musics
|
||||
except:
|
||||
@ -27,13 +29,29 @@ class Searcher():
|
||||
elif provider == Provider.Spotify:
|
||||
try:
|
||||
musics = self.__Spotify.search(track)
|
||||
if musics == None or len(musics) == 0:
|
||||
raise SpotifyError(self.__messages.SPOTIFY_ERROR, self.__messages.GENERIC_TITLE)
|
||||
|
||||
return musics
|
||||
except:
|
||||
except SpotifyError as error:
|
||||
raise error # Redirect already processed error
|
||||
except Exception as e:
|
||||
print(f'[Spotify Error] -> {e}')
|
||||
raise SpotifyError(self.__messages.SPOTIFY_ERROR, self.__messages.GENERIC_TITLE)
|
||||
|
||||
elif provider == Provider.Name:
|
||||
return [track]
|
||||
|
||||
def __cleanYoutubeInput(self, track: str) -> str:
|
||||
trackAnalyzer = URLAnalyzer(track)
|
||||
# Just ID and List arguments probably
|
||||
if trackAnalyzer.queryParamsQuant <= 2:
|
||||
return track
|
||||
|
||||
# Arguments used in Mix Youtube Playlists
|
||||
if 'start_radio' or 'index' in trackAnalyzer.queryParams.keys():
|
||||
return trackAnalyzer.getCleanedUrl()
|
||||
|
||||
def __identify_source(self, track) -> Provider:
|
||||
if not Utils.is_url(track):
|
||||
return Provider.Name
|
||||
|
||||
@ -11,10 +11,10 @@ class Song:
|
||||
self.destroy()
|
||||
return None
|
||||
|
||||
self.__usefull_keys = ['duration',
|
||||
'title', 'webpage_url',
|
||||
'channel', 'id', 'uploader',
|
||||
'thumbnail', 'original_url']
|
||||
self.__useful_keys = ['duration',
|
||||
'title', 'webpage_url',
|
||||
'channel', 'id', 'uploader',
|
||||
'thumbnail', 'original_url']
|
||||
self.__required_keys = ['url']
|
||||
|
||||
for key in self.__required_keys:
|
||||
@ -24,7 +24,7 @@ class Song:
|
||||
print(f'DEVELOPER NOTE -> {key} not found in info of music: {self.identifier}')
|
||||
self.destroy()
|
||||
|
||||
for key in self.__usefull_keys:
|
||||
for key in self.__useful_keys:
|
||||
if key in info.keys():
|
||||
self.__info[key] = info[key]
|
||||
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
from spotipy import Spotify
|
||||
from spotipy.oauth2 import SpotifyClientCredentials
|
||||
from spotipy.exceptions import SpotifyException
|
||||
from Exceptions.Exceptions import SpotifyError
|
||||
from Config.Configs import Configs
|
||||
from Config.Messages import SpotifyMessages
|
||||
|
||||
|
||||
class SpotifySearch():
|
||||
def __init__(self) -> None:
|
||||
self.__messages = SpotifyMessages()
|
||||
self.__config = Configs()
|
||||
self.__connected = False
|
||||
self.__connect()
|
||||
@ -17,22 +21,28 @@ class SpotifySearch():
|
||||
except Exception as e:
|
||||
print(f'DEVELOPER NOTE -> Spotify Connection Error {e}')
|
||||
|
||||
def search(self, music: str) -> list:
|
||||
type = music.split('/')[3].split('?')[0]
|
||||
code = music.split('/')[4].split('?')[0]
|
||||
def search(self, url: str) -> list:
|
||||
if not self.__checkUrlValid(url):
|
||||
raise SpotifyError(self.__messages.INVALID_SPOTIFY_URL, self.__messages.GENERIC_TITLE)
|
||||
|
||||
type = url.split('/')[3].split('?')[0]
|
||||
code = url.split('/')[4].split('?')[0]
|
||||
musics = []
|
||||
|
||||
if self.__connected:
|
||||
if type == 'album':
|
||||
musics = self.__get_album(code)
|
||||
elif type == 'playlist':
|
||||
musics = self.__get_playlist(code)
|
||||
elif type == 'track':
|
||||
musics = self.__get_track(code)
|
||||
elif type == 'artist':
|
||||
musics = self.__get_artist(code)
|
||||
try:
|
||||
if self.__connected:
|
||||
if type == 'album':
|
||||
musics = self.__get_album(code)
|
||||
elif type == 'playlist':
|
||||
musics = self.__get_playlist(code)
|
||||
elif type == 'track':
|
||||
musics = self.__get_track(code)
|
||||
elif type == 'artist':
|
||||
musics = self.__get_artist(code)
|
||||
|
||||
return musics
|
||||
return musics
|
||||
except SpotifyException:
|
||||
raise SpotifyError(self.__messages.INVALID_SPOTIFY_URL, self.__messages.GENERIC_TITLE)
|
||||
|
||||
def __get_album(self, code: str) -> list:
|
||||
results = self.__api.album_tracks(code)
|
||||
@ -94,3 +104,15 @@ class SpotifySearch():
|
||||
title += f'{artist["name"]} '
|
||||
|
||||
return title
|
||||
|
||||
def __checkUrlValid(self, url: str) -> bool:
|
||||
try:
|
||||
type = url.split('/')[3].split('?')[0]
|
||||
code = url.split('/')[4].split('?')[0]
|
||||
|
||||
if type == '' or code == '':
|
||||
return False
|
||||
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
@ -92,6 +92,10 @@ To run your Bot in Heroku 24/7, you will need the Procfile located in root, then
|
||||
- https://github.com/xrisk/heroku-opus.git
|
||||
|
||||
|
||||
## Testing
|
||||
The tests were written manually with no package due to problems with async function in other packages, to execute them type in root: <br>
|
||||
`python run_tests.py`<br>
|
||||
|
||||
## License
|
||||
- This program is free software: you can redistribute it and/or modify it under the terms of the [MIT License](https://github.com/RafaelSolVargas/Vulkan/blob/master/LICENSE).
|
||||
|
||||
@ -101,4 +105,4 @@ To run your Bot in Heroku 24/7, you will need the Procfile located in root, then
|
||||
|
||||
|
||||
## Acknowledgment
|
||||
- See the DingoLingo [project](https://github.com/Raptor123471/DingoLingo) from Raptor123471, it helped me a lot to build Vulkan.
|
||||
- See the DingoLingo [project](https://github.com/Raptor123471/DingoLingo) from Raptor123471, it helped me a lot to build Vulkan.
|
||||
10
Tests/Colors.py
Normal file
10
Tests/Colors.py
Normal file
@ -0,0 +1,10 @@
|
||||
class Colors:
|
||||
HEADER = '\033[95m'
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
UNDERLINE = '\033[4m'
|
||||
50
Tests/LoopRunner.py
Normal file
50
Tests/LoopRunner.py
Normal file
@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
from asyncio import AbstractEventLoop
|
||||
from threading import Thread
|
||||
from typing import Any, Coroutine, List
|
||||
|
||||
|
||||
class LoopRunner(Thread):
|
||||
"""
|
||||
Class to help deal with asyncio coroutines and loops
|
||||
Copyright: https://agariinc.medium.com/advanced-strategies-for-testing-async-code-in-python-6196a032d8d7
|
||||
"""
|
||||
|
||||
def __init__(self, loop: AbstractEventLoop) -> None:
|
||||
# We ensure to always use the same loop
|
||||
self.loop = loop
|
||||
Thread.__init__(self, name='runner')
|
||||
|
||||
def run(self) -> None:
|
||||
asyncio.set_event_loop(self.loop)
|
||||
try:
|
||||
self.loop.run_forever()
|
||||
finally:
|
||||
if self.loop.is_running():
|
||||
self.loop.close()
|
||||
|
||||
def run_coroutine(self, coroutine: Coroutine) -> Any:
|
||||
"""Run a coroutine inside the loop and return the result, doesn't allow concurrency"""
|
||||
result = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
|
||||
return result.result()
|
||||
|
||||
def _stop(self):
|
||||
self.loop.stop()
|
||||
|
||||
def run_in_thread(self, callback, *args):
|
||||
return self.loop.call_soon_threadsafe(callback, *args)
|
||||
|
||||
def stop(self):
|
||||
return self.run_in_thread(self._stop)
|
||||
|
||||
def run_coroutines_list(self, coroutineList: List[Coroutine]) -> None:
|
||||
"""Create multiple tasks in the loop and wait for them, use concurrency"""
|
||||
tasks = []
|
||||
for coroutine in coroutineList:
|
||||
tasks.append(self.loop.create_task(coroutine))
|
||||
|
||||
self.run_coroutine(self.__waitForMultipleTasks(tasks))
|
||||
|
||||
async def __waitForMultipleTasks(self, coroutines: List[Coroutine]) -> None:
|
||||
"""Function to trigger the await for asyncio.wait coroutines"""
|
||||
await asyncio.wait(coroutines)
|
||||
86
Tests/TestBase.py
Normal file
86
Tests/TestBase.py
Normal file
@ -0,0 +1,86 @@
|
||||
import asyncio
|
||||
from time import time
|
||||
from typing import Callable, List, Tuple
|
||||
from Tests.Colors import Colors
|
||||
from Music.Downloader import Downloader
|
||||
from Music.Searcher import Searcher
|
||||
from Tests.TestsHelper import TestsConstants
|
||||
from Tests.LoopRunner import LoopRunner
|
||||
|
||||
|
||||
class VulkanTesterBase:
|
||||
"""My own module to execute asyncio tests"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._downloader = Downloader()
|
||||
self._searcher = Searcher()
|
||||
self._constants = TestsConstants()
|
||||
# Get the list of methods objects of this class if start with test
|
||||
self._methodsList: List[Callable] = [getattr(self, func) for func in dir(self) if callable(
|
||||
getattr(self, func)) and func.startswith("test")]
|
||||
|
||||
def run(self) -> None:
|
||||
self.__printSeparator()
|
||||
methodsSummary: List[Tuple[Callable, bool]] = []
|
||||
testsSuccessQuant = 0
|
||||
testsStartTime = time()
|
||||
|
||||
for method in self._methodsList:
|
||||
currentTestStartTime = time()
|
||||
self.__printTestStart(method)
|
||||
success = False
|
||||
try:
|
||||
self._setUp()
|
||||
success = method()
|
||||
except Exception as e:
|
||||
success = False
|
||||
print(f'ERROR -> {e}')
|
||||
finally:
|
||||
self._tearDown()
|
||||
|
||||
methodsSummary.append((method, success))
|
||||
runTime = time() - currentTestStartTime # Get the run time of the current test
|
||||
if success:
|
||||
testsSuccessQuant += 1
|
||||
self.__printTestSuccess(method, runTime)
|
||||
else:
|
||||
self.__printTestFailure(method, runTime)
|
||||
|
||||
self.__printSeparator()
|
||||
|
||||
testsRunTime = time() - testsStartTime
|
||||
self.__printTestsSummary(methodsSummary, testsSuccessQuant, testsRunTime)
|
||||
|
||||
def _setUp(self) -> None:
|
||||
self._runner = LoopRunner(asyncio.new_event_loop())
|
||||
self._runner.start()
|
||||
|
||||
def _tearDown(self) -> None:
|
||||
self._runner.stop()
|
||||
self._runner.join()
|
||||
|
||||
def __printTestsSummary(self, methods: List[Tuple[Callable, bool]], totalSuccess: int, runTime: int) -> None:
|
||||
for index, methodResult in enumerate(methods):
|
||||
method = methodResult[0]
|
||||
success = methodResult[1]
|
||||
|
||||
if success:
|
||||
print(f'{Colors.OKGREEN} {index} -> {method.__name__} = Success {Colors.ENDC}')
|
||||
else:
|
||||
print(f'{Colors.FAIL} {index} -> {method.__name__} = Failed {Colors.ENDC}')
|
||||
|
||||
print()
|
||||
print(
|
||||
f'TESTS EXECUTED: {len(methods)} | SUCCESS: {totalSuccess} | FAILED: {len(methods) - totalSuccess} | TIME: {runTime:.2f}sec')
|
||||
|
||||
def __printTestStart(self, method: Callable) -> None:
|
||||
print(f'🧪 - Starting {method.__name__}')
|
||||
|
||||
def __printTestSuccess(self, method: Callable, runTime: int) -> None:
|
||||
print(f'{method.__name__} -> {Colors.OKGREEN} Success {Colors.ENDC} | ⏰ - {runTime:.2f}sec')
|
||||
|
||||
def __printTestFailure(self, method: Callable, runTime: int) -> None:
|
||||
print(f'{method.__name__} -> {Colors.FAIL} Test Failed {Colors.ENDC} | ⏰ - {runTime:.2f}sec')
|
||||
|
||||
def __printSeparator(self) -> None:
|
||||
print('=-=' * 15)
|
||||
21
Tests/TestsHelper.py
Normal file
21
Tests/TestsHelper.py
Normal file
@ -0,0 +1,21 @@
|
||||
from Config.Configs import Singleton
|
||||
|
||||
|
||||
class TestsConstants(Singleton):
|
||||
def __init__(self) -> None:
|
||||
if not super().created:
|
||||
self.EMPTY_STRING_ERROR_MSG = 'Downloader with Empty String should be empty list.'
|
||||
self.MUSIC_TITLE_STRING = 'Experience || AMV || Anime Mix'
|
||||
|
||||
self.YT_MUSIC_URL = 'https://www.youtube.com/watch?v=MvJoiv842mk'
|
||||
self.YT_MIX_URL = 'https://www.youtube.com/watch?v=ePjtnSPFWK8&list=RDMMePjtnSPFWK8&start_radio=1'
|
||||
self.YT_PERSONAL_PLAYLIST_URL = 'https://www.youtube.com/playlist?list=PLbbKJHHZR9ShYuKAr71cLJCFbYE-83vhS'
|
||||
# Links from playlists in channels some times must be extracted with force by Downloader
|
||||
self.YT_CHANNEL_PLAYLIST_URL = 'https://www.youtube.com/watch?v=MvJoiv842mk&list=PLAI1099Tvk0zWU8X4dwc4vv4MpePQ4DLl'
|
||||
|
||||
self.SPOTIFY_TRACK_URL = 'https://open.spotify.com/track/7wpnz7hje4FbnjZuWQtJHP'
|
||||
self.SPOTIFY_PLAYLIST_URL = 'https://open.spotify.com/playlist/37i9dQZF1EIV9u4LtkBkSF'
|
||||
self.SPOTIFY_ARTIST_URL = 'https://open.spotify.com/artist/4HF14RSTZQcEafvfPCFEpI'
|
||||
self.SPOTIFY_ALBUM_URL = 'https://open.spotify.com/album/71O60S5gIJSIAhdnrDIh3N'
|
||||
self.SPOTIFY_WRONG1_URL = 'https://open.spotify.com/wrongUrl'
|
||||
self.SPOTIFY_WRONG2_URL = 'https://open.spotify.com/track/WrongID'
|
||||
91
Tests/VDownloaderTests.py
Normal file
91
Tests/VDownloaderTests.py
Normal file
@ -0,0 +1,91 @@
|
||||
from typing import List
|
||||
from Tests.TestBase import VulkanTesterBase
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
from asyncio import Task
|
||||
|
||||
|
||||
class VulkanDownloaderTest(VulkanTesterBase):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def test_emptyString(self) -> bool:
|
||||
musicsList = self._runner.run_coroutine(self._downloader.extract_info(''))
|
||||
|
||||
if musicsList == []:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_YoutubeMusicUrl(self) -> bool:
|
||||
musicsList = self._runner.run_coroutine(self._searcher.search(self._constants.YT_MUSIC_URL))
|
||||
|
||||
if len(musicsList) > 0:
|
||||
print(musicsList[0])
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_YoutubeChannelPlaylist(self) -> None:
|
||||
# Search the link to determine names
|
||||
musicsList = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.YT_CHANNEL_PLAYLIST_URL))
|
||||
|
||||
if len(musicsList) == 0:
|
||||
return False
|
||||
|
||||
# Create and store songs in list
|
||||
playlist = Playlist()
|
||||
songsList: List[Song] = []
|
||||
for info in musicsList:
|
||||
song = Song(identifier=info, playlist=playlist, requester='')
|
||||
playlist.add_song(song)
|
||||
songsList.append(song)
|
||||
|
||||
# Create a list of coroutines without waiting for them
|
||||
tasks: List[Task] = []
|
||||
for song in songsList:
|
||||
tasks.append(self._downloader.download_song(song))
|
||||
|
||||
# Send for runner to execute them concurrently
|
||||
self._runner.run_coroutines_list(tasks)
|
||||
|
||||
for song in songsList:
|
||||
if song.problematic or song.title == None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def test_YoutubeMixPlaylist(self) -> None:
|
||||
# Search the link to determine names
|
||||
music = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.YT_MIX_URL))
|
||||
|
||||
# Musics from Mix should download only the first music
|
||||
if len(music) == 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_musicTitle(self):
|
||||
playlist = Playlist()
|
||||
song = Song(self._constants.MUSIC_TITLE_STRING, playlist, '')
|
||||
playlist.add_song(song)
|
||||
|
||||
self._runner.run_coroutine(self._downloader.download_song(song))
|
||||
|
||||
if song.problematic:
|
||||
return False
|
||||
else:
|
||||
print(song.title)
|
||||
return True
|
||||
|
||||
def test_YoutubePersonalPlaylist(self) -> None:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.YT_PERSONAL_PLAYLIST_URL))
|
||||
|
||||
if len(musics) > 0:
|
||||
print(musics)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
66
Tests/VSpotifyTests.py
Normal file
66
Tests/VSpotifyTests.py
Normal file
@ -0,0 +1,66 @@
|
||||
from Tests.TestBase import VulkanTesterBase
|
||||
from Exceptions.Exceptions import SpotifyError
|
||||
|
||||
|
||||
class VulkanSpotifyTest(VulkanTesterBase):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def test_spotifyTrack(self) -> bool:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_TRACK_URL))
|
||||
|
||||
if len(musics) > 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_spotifyPlaylist(self) -> bool:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_PLAYLIST_URL))
|
||||
|
||||
if len(musics) > 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_spotifyArtist(self) -> bool:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_ARTIST_URL))
|
||||
|
||||
if len(musics) > 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_spotifyAlbum(self) -> bool:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_ARTIST_URL))
|
||||
|
||||
if len(musics) > 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def test_spotifyWrongUrlShouldThrowException(self) -> bool:
|
||||
try:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_WRONG1_URL))
|
||||
|
||||
except SpotifyError as e:
|
||||
print(f'Spotify Error -> {e.message}')
|
||||
return True
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def test_spotifyWrongUrlTwoShouldThrowException(self) -> bool:
|
||||
try:
|
||||
musics = self._runner.run_coroutine(
|
||||
self._searcher.search(self._constants.SPOTIFY_WRONG2_URL))
|
||||
|
||||
except SpotifyError as e:
|
||||
print(f'Spotify Error -> {e.message}')
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
34
Utils/UrlAnalyzer.py
Normal file
34
Utils/UrlAnalyzer.py
Normal file
@ -0,0 +1,34 @@
|
||||
from typing import Dict
|
||||
|
||||
|
||||
class URLAnalyzer:
|
||||
def __init__(self, url: str) -> None:
|
||||
self.__url = url
|
||||
self.__queryParamsQuant = self.__url.count('&') + self.__url.count('?')
|
||||
self.__queryParams: Dict[str, str] = self.__getAllQueryParams()
|
||||
|
||||
@property
|
||||
def queryParams(self) -> dict:
|
||||
return self.__queryParams
|
||||
|
||||
@property
|
||||
def queryParamsQuant(self) -> int:
|
||||
return self.__queryParamsQuant
|
||||
|
||||
def getCleanedUrl(self) -> str:
|
||||
firstE = self.__url.index('&')
|
||||
return self.__url[:firstE]
|
||||
|
||||
def __getAllQueryParams(self) -> dict:
|
||||
if self.__queryParamsQuant <= 1:
|
||||
return {}
|
||||
|
||||
params = {}
|
||||
arguments = self.__url.split('&')
|
||||
arguments.pop(0)
|
||||
|
||||
for queryParam in arguments:
|
||||
queryName, queryValue = queryParam.split('=')
|
||||
params[queryName] = queryValue
|
||||
|
||||
return params
|
||||
@ -1,5 +1,5 @@
|
||||
from Config.Messages import Messages
|
||||
from Exceptions.Exceptions import Error
|
||||
from Exceptions.Exceptions import VulkanError
|
||||
from discord import Embed
|
||||
from Config.Configs import Configs
|
||||
from Config.Colors import Colors
|
||||
@ -130,7 +130,7 @@ class Embeds:
|
||||
)
|
||||
return embed
|
||||
|
||||
def CUSTOM_ERROR(self, error: Error) -> Embed:
|
||||
def CUSTOM_ERROR(self, error: VulkanError) -> Embed:
|
||||
embed = Embed(
|
||||
title=error.title,
|
||||
description=error.message,
|
||||
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
8
run_tests.py
Normal file
8
run_tests.py
Normal file
@ -0,0 +1,8 @@
|
||||
from Tests.VDownloaderTests import VulkanDownloaderTest
|
||||
from Tests.VSpotifyTests import VulkanSpotifyTest
|
||||
|
||||
|
||||
tester = VulkanDownloaderTest()
|
||||
# tester.run()
|
||||
tester = VulkanSpotifyTest()
|
||||
tester.run()
|
||||
Loading…
x
Reference in New Issue
Block a user