diff --git a/.gitignore b/.gitignore index ebb8b9a..6671836 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __pycache__ examples .env -errors \ No newline at end of file +errors +.cache \ No newline at end of file diff --git a/config/config.py b/config/config.py index f185aeb..2064ce1 100644 --- a/config/config.py +++ b/config/config.py @@ -12,52 +12,18 @@ INITIAL_EXTENSIONS = {'vulkanbot.commands.Phrases', 'vulkanbot.commands.Warframe 'vulkanbot.general.Filter', 'vulkanbot.general.Control', 'vulkanbot.music.Music', 'vulkanbot.commands.Random'} -VC_TIMEOUT = 600 # seconds -VC_TIMEOUT_DEFAULT = True STARTUP_MESSAGE = 'Starting Vulkan...' STARTUP_COMPLETE_MESSAGE = 'Vulkan is now operating.' -USER_NOT_IN_VC_MESSAGE = "Error: Please join the active voice channel to use this command" -NOT_CONNECTED_MESSAGE = "Error: Bot not connected to any voice channel" -ALREADY_CONNECTED_MESSAGE = "Error: Already connected to a voice channel" -CHANNEL_NOT_FOUND_MESSAGE = "Error: Could not find channel" -INFO_HISTORY_TITLE = "Songs Played:" -MAX_HISTORY_LENGTH = 10 MAX_PLAYLIST_LENGTH = 50 -MAX_QUEUE_LENGTH = 10 -MAX_TRACKNAME_HISTORY_LENGTH = 15 MAX_API_PHRASES_TRIES = 10 MAX_API_CETUS_TRIES = 10 +MAX_PRELOAD_SONGS = 10 SONGINFO_UPLOADER = "Uploader: " SONGINFO_DURATION = "Duration: " -SONGINFO_SECONDS = "s" -SONGINFO_LIKES = "Likes: " -SONGINFO_DISLIKES = "Dislikes: " -SONGINFO_NOW_PLAYING = "Now Playing" -SONGINFO_QUEUE_ADDED = "Added to queue" -SONGINFO_SONGINFO = "Song info" -SONGINFO_PLAYLIST_QUEUED = "Queued playlist :page_with_curl:" -SONGINFO_UNKNOWN_DURATION = "Unknown" - - -HELP_HISTORY_LONG = "Shows the " + \ - str(MAX_TRACKNAME_HISTORY_LENGTH) + " last played songs." -HELP_PAUSE_LONG = "Pauses the AudioPlayer. Playback can be continued with the resume command." -HELP_VOL_LONG = "Changes the volume of the AudioPlayer. Argument specifies the % to which the volume should be set." -HELP_PREV_LONG = "Plays the previous song again." -HELP_RESUME_LONG = "Resumes the AudioPlayer." -HELP_SKIP_LONG = "Skips the currently playing song and goes to the next item in the queue." -HELP_SONGINFO_LONG = "Shows details about the song currently being played and posts a link to the song." -HELP_STOP_LONG = "Stops the AudioPlayer and clears the songqueue" -HELP_YT_LONG = ( - "$p [link/video title/key words/playlist-link/soundcloud link/spotify link/bandcamp link/twitter link]") -HELP_CLEAR_LONG = "Clears the queue." -HELP_LOOP_LONG = "Loops the currently playing song and locks the queue. Use the command again to disable loop." -HELP_QUEUE_LONG = "Shows the number of songs in queue, up to 10." -HELP_SHUFFLE_LONG = "Randomly sort the songs in the current queue" ABSOLUTE_PATH = '' COOKIE_PATH = '/config/cookies/cookies.txt' diff --git a/vulkanbot/general/Control.py b/vulkanbot/general/Control.py index 3b61c2e..75116a9 100644 --- a/vulkanbot/general/Control.py +++ b/vulkanbot/general/Control.py @@ -10,7 +10,7 @@ class Control(commands.Cog): def __init__(self, bot): self.__bot = bot self.__comandos = { - 'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear'], + 'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np'], 'RANDOM': ['escolha', 'cara', 'random'], 'HELP': ['help'], 'OTHERS': ['cetus', 'frase'] @@ -27,13 +27,13 @@ class Control(commands.Cog): @commands.Cog.listener() async def on_ready(self): print(config.STARTUP_MESSAGE) - await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help")) + await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | {config.BOT_PREFIX}help")) print(config.STARTUP_COMPLETE_MESSAGE) @commands.Cog.listener() async def on_command_error(self, ctx, error): if isinstance(error, MissingRequiredArgument): - await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver os comandos') + await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver todos os comandos\n\nOu tente {config.BOT_PREFIX}command help para mais informações') elif isinstance(error, CommandNotFound): await ctx.channel.send(f'O comando não existe') else: diff --git a/vulkanbot/music/Downloader.py b/vulkanbot/music/Downloader.py index 04605da..a51b43b 100644 --- a/vulkanbot/music/Downloader.py +++ b/vulkanbot/music/Downloader.py @@ -1,10 +1,12 @@ -import re +import asyncio +import concurrent.futures + from config import config from yt_dlp import YoutubeDL from yt_dlp.utils import ExtractorError, DownloadError -from vulkanbot.music.Types import Provider - +from vulkanbot.music.Song import Song +from vulkanbot.music.utils import is_url class Downloader(): """Download musics direct URL and title or Source from Youtube using a music name or Youtube URL""" @@ -17,29 +19,60 @@ class Downloader(): 'playlistend': config.MAX_PLAYLIST_LENGTH, } - def download_urls(self, musics_input, provider: Provider) -> list: - """Download the musics direct URL from Youtube and return in a list + def download_one(self, song: Song) -> Song: + """Receives a song object, finish his download and return it""" + if song.identifier == None: + print('Invalid song identifier type') + return - Arg: List with names or youtube url or a Unique String - Return: List with the direct youtube URL of each music - """ - if type(provider) != Provider: + if is_url(song.identifier): # Youtube URL + song_info = self.__download_url(song.identifier) + else: # Song name + song_info = self.__download_title(song.identifier) + + if song_info == None: + song.destroy() # Destroy the music with problems return None - - if type(musics_input) != list and type(musics_input) != str: - return None - - if provider == Provider.Name: # Send a list of names - musics_urls = self.__download_titles(musics_input) - return musics_urls - - elif provider == Provider.YouTube: # Send a URL or Title - url = self.__download_one(musics_input) - return url else: + song.finish_down(song_info) + return song + + def extract_youtube_link(self, playlist_url: str) -> list: + """Extract all songs direct URL from a Youtube Link + + Arg: Url String + Return: List with the direct youtube URL of each song + """ + if is_url(playlist_url): # If Url + options = self.__YDL_OPTIONS + options['extract_flat'] = True + + with YoutubeDL(options) as ydl: + try: + result = ydl.extract_info(playlist_url, download=False) + songs_identifiers = [] + + if result.get('entries'): # If got a dict of musics + for entry in result['entries']: + songs_identifiers.append(f"https://www.youtube.com/watch?v={entry['id']}") + + else: # Or a single music + songs_identifiers.append(result['original_url']) + + return songs_identifiers # Return a list + except (ExtractorError, DownloadError) as e: + print(e) + return None + else: + print('Invalid type of playlist URL') return None - def download_source(self, url) -> dict: + async def preload(self, songs: list) -> None: + """Download the full info of the song object""" + for song in songs: + asyncio.ensure_future(self.__download_songs(song)) + + def __download_url(self, url) -> dict: """Download musics full info and source from Music URL Arg: URL from Youtube @@ -47,6 +80,7 @@ class Downloader(): """ options = self.__YDL_OPTIONS options['extract_flat'] = False + with YoutubeDL(options) as ydl: try: result = ydl.extract_info(url, download=False) @@ -54,92 +88,52 @@ class Downloader(): return result except (ExtractorError, DownloadError) as e: # Any type of error in download print(e) - return None - def __download_one(self, music: str) -> list: - """Download one music/playlist direct link from Youtube - - Arg: Playlist URL or Music Name to download direct URL - Return: List with the Youtube URL of each music downloaded - """ - if type(music) != str: + async def __download_songs(self, song: Song): + if song.source != None: # If Music already preloaded return + + def download_song(song): + if is_url(song.identifier): # Youtube URL + song_info = self.__download_url(song.identifier) + else: # Song name + song_info = self.__download_title(song.identifier) - if self.__is_url(music): # If Url - info = self.__download_links(music) # List of dict - else: # If Title - info = self.__download_titles(music) # List of dict + if song_info == None: + song.destroy() # Remove the song with problems from the playlist + else: + song.finish_down(song_info) - return info + # Creating a loop task to download each song + loop = asyncio.get_event_loop() + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=config.MAX_PRELOAD_SONGS + ) + await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)}, + return_when=asyncio.ALL_COMPLETED) - def __download_titles(self, musics_names: list) -> list: - """Download a music direct URL using his name. + def __download_title(self, title: str) -> dict: + """Download a music full information using his name. Arg: Music Name - Return: List with one dict, containing the music direct URL and title + Return: A dict containing the song information """ - if type(musics_names) == str: # Turn str into list - musics_names = [musics_names] + if type(title) != str: + print('Invalid music identifier type') + return + + config = self.__YDL_OPTIONS + config['extract_flat'] = False - musics_info = [] with YoutubeDL(self.__YDL_OPTIONS) as ydl: try: - for name in musics_names: - search = f"ytsearch:{name}" - result = ydl.extract_info(search, download=False) + search = f"ytsearch:{title}" + result = ydl.extract_info(search, download=False) - id = result['entries'][0]['id'] - music_info = { - 'url': f"https://www.youtube.com/watch?v={id}", - 'title': result['entries'][0]['title'] - } - musics_info.append(music_info) - - return musics_info # Return a list + if result == None: + return + + # Return a dict with the full info of first music + return result['entries'][0] except Exception as e: - raise e - - def __download_links(self, url: str) -> list: - """Download musics direct links from Playlist URL or Music URL - - Arg_Url: URL from Youtube - Return: List of dicts, with the title and url of each music - """ - options = self.__YDL_OPTIONS - options['extract_flat'] = True - with YoutubeDL(options) as ydl: - try: - result = ydl.extract_info(url, download=False) - musics_info = [] - - if result.get('entries'): # If got a dict of musics - for entry in result['entries']: - music_info = { - 'title': entry['title'], - 'url': f"https://www.youtube.com/watch?v={entry['id']}" - } - - musics_info.append(music_info) - else: # Or a single music - music_info = { - 'url': result['original_url'], - 'title': result['title'] - } - musics_info.append(music_info) - - return musics_info # Return a list - except ExtractorError or DownloadError: - pass - - def __is_url(self, string) -> bool: - """Verify if a string is a url""" - regex = re.compile( - "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") - - if re.search(regex, string): - return True - else: - return False - - - + print(e) diff --git a/vulkanbot/music/Interfaces.py b/vulkanbot/music/Interfaces.py new file mode 100644 index 0000000..1254339 --- /dev/null +++ b/vulkanbot/music/Interfaces.py @@ -0,0 +1,91 @@ +from abc import ABC, abstractproperty, abstractmethod + + +class IPlaylist(ABC): + """Class to manage and control the songs to play and played""" + + + @abstractproperty + def looping_one(self): + pass + + @abstractproperty + def looping_all(self): + pass + + @abstractproperty + def songs_to_preload(self) -> list: + pass + + @abstractmethod + def __len__(self): + pass + + @abstractmethod + def next_song(self): + pass + + @abstractmethod + def prev_song(self): + pass + + @abstractmethod + def add_song(self, identifier: str) -> None: + pass + + @abstractmethod + def shuffle(self) -> None: + pass + + @abstractmethod + def revert(self) -> None: + pass + + @abstractmethod + def clear(self) -> None: + pass + + @abstractmethod + def loop_one(self) -> str: + pass + + @abstractmethod + def loop_all(self) -> str: + pass + + @abstractmethod + def loop_off(self) -> str: + pass + + @abstractmethod + def destroy_song(self, song_destroy) -> None: + pass + + +class ISong(ABC): + """Store the usefull information about a Song""" + + @abstractmethod + def finish_down(self, info: dict) -> None: + pass + + @abstractmethod + def source(self) -> str: + pass + + @abstractmethod + def title(self) -> str: + pass + + @abstractmethod + def duration(self) -> str: + pass + + @abstractmethod + def identifier(self) -> str: + pass + + @abstractmethod + def destroy(self) -> None: + pass + diff --git a/vulkanbot/music/Music.py b/vulkanbot/music/Music.py index 49083e9..12a609c 100644 --- a/vulkanbot/music/Music.py +++ b/vulkanbot/music/Music.py @@ -1,22 +1,22 @@ import discord from discord.ext import commands -import datetime -import asyncio from config import config from vulkanbot.music.Downloader import Downloader from vulkanbot.music.Playlist import Playlist from vulkanbot.music.Searcher import Searcher +from vulkanbot.music.Types import Provider +from vulkanbot.music.utils import * class Music(commands.Cog): def __init__(self, bot): - self.__searcher = Searcher() - self.__downloader = Downloader() - self.__playlist = Playlist() + self.__searcher: Searcher = Searcher() + self.__downloader: Downloader = Downloader() + self.__playlist: Playlist = Playlist() + self.__bot: discord.Client = bot self.__playing = False - self.__bot = bot self.__ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe' self.__vc = "" # Objeto voice_bot do discord @@ -24,98 +24,109 @@ class Music(commands.Cog): self.FFMPEG_OPTIONS = {'executable': self.__ffmpeg, 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'} - def __play_next(self): + def __play_next(self, error, ctx): while True: if len(self.__playlist) > 0: source = self.__playlist.next_song() - if source == None: # If there is not a source + if source == None: # If there is not a source for the song continue - - player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) - self.__vc.play(player, after=lambda e: self.__play_next()) + + coro = self.__play_music(ctx, source) + self.__bot.loop.create_task(coro) break else: self.__playing = False break - # infinite loop checking - async def __play_music(self): - while True: - if len(self.__playlist) > 0: - source = self.__playlist.next_song() - if source == None: - continue + async def __play_music(self, ctx, song): + self.__playing = True - self.__playing = True - player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) - self.__vc.play(player, after=lambda e: self.__play_next()) - break - else: - self.__playing = False - await self.__vc.disconnect() - break + player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS) + self.__vc.play(player, after=lambda e: self.__play_next(e, ctx)) + + await ctx.invoke(self.__bot.get_command('np')) + + songs = self.__playlist.songs_to_preload + await self.__downloader.preload(songs) @commands.command(name="play", help="Toca música - YouTube/Spotify/Título", aliases=['p', 'tocar']) async def play(self, ctx, *args): user_input = " ".join(args) try: - if self.__vc == "" or not self.__vc.is_connected() or self.__vc == None: + if len(self.__bot.voice_clients) == 0: voice_channel = ctx.author.voice.channel self.__vc = await voice_channel.connect() except Exception as e: - # If voice_channel is None: print(e) await self.__send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey') return else: songs_quant = 0 - musics_names, provider = self.__searcher.search(user_input) - for music in musics_names: - music_info = self.__downloader.download_urls(music, provider) + musics_identifiers, provider = self.__searcher.search(user_input) + + if provider == Provider.Unknown: # If type not identified + await self.__send_embed(ctx, description='Entrada inválida, tente algo melhor', colour_name='blue') + return - for music in music_info: - self.__playlist.add_song(music) - songs_quant += 1 + if provider == Provider.YouTube: # If youtube source + musics_identifiers = self.__downloader.extract_youtube_link(musics_identifiers[0]) - if songs_quant == 1: - await self.__send_embed(ctx, description=f"Você adicionou a música **{music_info[0]['title']}** à fila!", colour_name='green') + for identifier in musics_identifiers: # Creating songs + last_song = self.__playlist.add_song(identifier) + songs_quant += 1 + + songs_preload = self.__playlist.songs_to_preload + await self.__downloader.preload(songs_preload) + + if songs_quant == 1: # If only one music downloaded + song = self.__downloader.download_one(last_song) # Download the new music + + if song == None: # If song not downloaded + await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue') + + elif not self.__playing : # If not playing + await self.__send_embed(ctx, description=f'Você adicionou a música **{song.title}** à playlist', colour_name='blue') + + else: # If playing + await ctx.send(embed=song.embed(title='Song added to Queue')) else: - await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='green') + await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='blue') if not self.__playing: - await self.__play_music() + first = self.__playlist.songs_to_preload[0] + self.__downloader.download_one(first) + first_song = self.__playlist.next_song() + + await self.__play_music(ctx, first_song) @commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila']) async def queue(self, ctx): - if self.__playlist.looping_one: # If Repeting one - # Send the current song with this title - await self.this(ctx) + if self.__playlist.looping_one: # If Repeating one + await self.now_playing(ctx) return - fila = self.__playlist.queue() - total = len(fila) - text = f'Total musics: {total}\n\n' + songs_preload = self.__playlist.songs_to_preload + await self.__downloader.preload(songs_preload) + total_time = format_time(sum([int(song.duration if song.duration else 0) for song in songs_preload])) # Sum the duration + total_songs = len(self.__playlist) + text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n' - # Create the string to description - for pos, song in enumerate(fila): - if pos >= config.MAX_QUEUE_LENGTH: # Max songs to apper in queue list - break + for pos, song in enumerate(songs_preload, start=1): + title = song.title if song.title else 'Downloading...' + text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n" - text += f"**{pos+1} - ** {song}\n" - - if text != "": - if self.__playlist.looping_all: # If repeting all - await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='green') - else: # Repeting off - await self.__send_embed(ctx, title='Queue', description=text, colour_name='green') + if len(songs_preload) > 0: + if self.__playlist.looping_all: # If repeating all + await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='blue') + else: # Repeating off + await self.__send_embed(ctx, title='Songs in Queue', description=text, colour_name='blue') else: # No music await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red') @commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular']) async def skip(self, ctx): - if self.__vc != '' and self.__vc: - print('Skip') + if len(self.__bot.voice_clients) > 0: self.__vc.stop() @commands.command(name='stop', help='Para de tocar músicas') @@ -135,7 +146,7 @@ class Music(commands.Cog): self.__vc.pause() await self.__send_embed(ctx, description='Música pausada', colour_name='green') - @commands.command(name='resume', help='Despausa a música atual') + @commands.command(name='resume', help='Solta a música atual') async def resume(self, ctx): if self.__vc == '': return @@ -157,6 +168,19 @@ class Music(commands.Cog): await self.__send_embed(ctx, description=description, colour_name='grey') + @commands.command(name='clear', help='Limpa a fila de músicas a tocar') + async def clear(self, ctx): + self.__playlist.clear() + + @commands.command(name='np', help='Mostra a música que está tocando no instante') + async def now_playing(self, ctx): + if self.__playlist.looping_one: + title = 'Song Looping Now' + else: + title = 'Song Playing Now' + + current_song = self.__playlist.current + await ctx.send(embed=current_song.embed(title=title)) async def __send_embed(self, ctx, title='', description='', colour_name='grey'): try: @@ -171,43 +195,6 @@ class Music(commands.Cog): ) await ctx.send(embed=embedvc) - @commands.command(name='clear', help='Limpa a fila de músicas a tocar') - async def clear(self, ctx): - self.__playlist.clear() - - @commands.command(name='this', help='Mostra a música que está tocando no instante') - async def this(self, ctx): - if self.__playlist.looping_one: - title = 'Music Looping Now' - else: - title = 'Music Playing Now' - - info = self.__playlist.get_current() - embedvc = discord.Embed( - title=title, - description=f"[{info['title']}]({info['url']})", - color=config.COLOURS['grey'] - ) - - embedvc.add_field(name=config.SONGINFO_UPLOADER, - value=info['uploader'], - inline=False) - - if 'thumbnail' in info.keys(): - embedvc.set_thumbnail(url=info['thumbnail']) - - if 'duration' in info.keys(): - duration = str(datetime.timedelta(seconds=info['duration'])) - embedvc.add_field(name=config.SONGINFO_DURATION, - value=f"{duration}", - inline=False) - else: - embedvc.add_field(name=config.SONGINFO_DURATION, - value=config.SONGINFO_UNKNOWN_DURATION, - inline=False) - - await ctx.send(embed=embedvc) - def setup(bot): bot.add_cog(Music(bot)) diff --git a/vulkanbot/music/Playlist.py b/vulkanbot/music/Playlist.py index b541ad4..1e427bc 100644 --- a/vulkanbot/music/Playlist.py +++ b/vulkanbot/music/Playlist.py @@ -1,14 +1,16 @@ from collections import deque import random +from config import config + +from vulkanbot.music.Interfaces import IPlaylist from vulkanbot.music.Song import Song -from vulkanbot.music.Downloader import Downloader -class Playlist(): + +class Playlist(IPlaylist): """Class to manage and control the songs to play and played""" def __init__(self) -> None: - self.__down = Downloader() self.__queue = deque() # Store the musics to play self.__songs_history = deque() # Store the musics played self.__name_history = deque() # Store the name of musics played @@ -16,7 +18,7 @@ class Playlist(): self.__looping_one = False self.__looping_all = False - self.__current = None + self.__current: Song = None @property def looping_one(self): @@ -26,19 +28,22 @@ class Playlist(): def looping_all(self): return self.__looping_all + @property + def current(self): + return self.__current + + @property + def songs_to_preload(self) -> list: + return list(self.__queue)[:config.MAX_PRELOAD_SONGS] + def __len__(self): - if self.__looping_one == True or self.__looping_all == True: - return 1 - else: return len(self.__queue) - def next_song(self): - """Return the source of the next song to play""" - if self.__current == None: # If not playing - if len(self.__queue) == 0: # If nothing to play - return None - else: # If there is music to play - return self.__start() + def next_song(self) -> Song: + """Return the next song to play""" + if self.__current == None and len(self.__queue) == 0: + # If not playing and nothing to play + return None # If playing played_song = self.__current @@ -54,55 +59,15 @@ class Playlist(): if len(self.__queue) == 0: # If no more song to play, return None return None - # If there is more to play - # Finish download of the next song - source = self.__prepare_next(self.__queue[0]) - if source == None: # If there is a problem in the download - self.__queue.popleft() # Remove the music with problems + self.__current = self.__queue[0] # Att the current with the first one + self.__queue.popleft() # Remove the current from queue + if self.__current.source == None: # Try until find one source continue - - return source - - def get_current(self): - """Return current music embed""" - if self.__current: - return self.__current.embed() - else: - return 'Nenhuma música tocando' - - def __prepare_next(self, next_song: Song) -> str: - """Finish the download of the music and return the source""" - if next_song.source == None: # Check if source has already downloaded - url = next_song.url # Get the URL - info = self.__down.download_source(url) # Download the source - if info == None: # If there is a problem in the download - return None - - next_song.finish_down(info) # Updating the info of song - - # Att the Playlist info - self.__current = next_song # Att the current - self.__queue.popleft() # Remove the current from queue - self.__name_history.append(self.__current.title) # Add to name history - self.__songs_history.append(self.__current) # Add to song history - - return self.__current.source # Return the source of current - - def __start(self) -> None: - """Start the play of the first musics and return his source""" - # Finish download of the next song - url = self.__queue[0].url # Get the URL - info = self.__down.download_source(url) # Download the source - - self.__queue[0].finish_down(info) # Att the song - - # Att Playlist info - self.__current = self.__queue[0] # Att the current - self.__queue.popleft() # Remove the current from queue - self.__name_history.append(self.__current.title) # Add to name history - self.__songs_history.append(self.__current) # Add to song history - - return self.__current.source # Return the source of current + + else: + self.__name_history.append(self.__current.title) # Add to name history + self.__songs_history.append(self.__current) # Add to song history + return self.__current def prev_song(self): """Return the source of the last song played @@ -114,14 +79,11 @@ class Playlist(): else: return self.__songs_history[0].source - def add_song(self, music: dict) -> None: - """Receives a music object and store to the play queue""" - if (not 'title' in music.keys()) or (not 'url' in music.keys()): - print('Music without necessary keys') - return - - song = Song(title=music['title'], url=music['url']) # Cria a musica + def add_song(self, identifier: str) -> Song: + """Create a song object, add to queue and return it""" + song = Song(identifier, self) # Cria a musica com o identificador self.__queue.append(song) + return song def shuffle(self) -> None: """Shuffle the order of the songs to play""" @@ -171,9 +133,9 @@ class Playlist(): self.__looping_one = False return 'Loop disable' - def queue(self) -> list: - list_songs = [] + def destroy_song(self, song_destroy: Song) -> None: + """Destroy a song object from the queue""" for song in self.__queue: - title = song.title - list_songs.append(title) - return list_songs + if song == song_destroy: + self.__queue.remove(song) + break diff --git a/vulkanbot/music/Searcher.py b/vulkanbot/music/Searcher.py index 93795d7..0b066f5 100644 --- a/vulkanbot/music/Searcher.py +++ b/vulkanbot/music/Searcher.py @@ -1,6 +1,6 @@ -import re from vulkanbot.music.Types import Provider from vulkanbot.music.Spotify import SpotifySearch +from vulkanbot.music.utils import is_url class Searcher(): @@ -11,9 +11,10 @@ class Searcher(): print(f'Spotify Connected: {self.__Spotify.connect()}') def search(self, music: str) -> list: - """Return a list with the track name of a music or playlist + """Return a list with the song names or an URL - Return -> A list of musics names + Arg -> User Input, a string with the + Return -> A list of musics names and Provider Type """ url_type = self.__identify_source(music) @@ -26,10 +27,13 @@ class Searcher(): elif url_type == Provider.Name: return [music], Provider.Name + + elif url_type == Provider.Unknown: + return None, Provider.Unknown def __identify_source(self, music) -> Provider: """Identify the provider of a music""" - if not self.__is_url(music): + if not is_url(music): return Provider.Name if "https://www.youtu" in music or "https://youtu.be" in music: @@ -41,12 +45,3 @@ class Searcher(): # If no match return Provider.Unknown - def __is_url(self, string) -> bool: - """Verify if a string is a url""" - regex = re.compile( - "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") - - if re.search(regex, string): - return True - else: - return False diff --git a/vulkanbot/music/Song.py b/vulkanbot/music/Song.py index 355c69a..9ddc478 100644 --- a/vulkanbot/music/Song.py +++ b/vulkanbot/music/Song.py @@ -1,64 +1,89 @@ -from discord.embeds import Embed +from discord import Embed +import datetime +from vulkanbot.music.Interfaces import ISong, IPlaylist +from config import config -class Song(): +class Song(ISong): """Store the usefull information about a Song""" - def __init__(self, url: str, title: str) -> None: + def __init__(self, identifier: str, playlist: IPlaylist) -> None: """Create a song with only the URL to the youtube song""" - self.__url = url - self.__title = title + self.__identifier = identifier self.__info = {} + self.__playlist: IPlaylist = playlist def finish_down(self, info: dict) -> None: - """Get and store the full information of the song""" + """Get and store the full information of the song""" self.__usefull_keys = ['url', 'duration', - 'description', 'webpage_url', + 'title', 'webpage_url', 'channel', 'id', 'uploader', - 'thumbnail'] - self.__extract_info(info) - - def __extract_info(self, info) -> None: - """Extract the usefull information returned by the Downloader""" + 'thumbnail', 'original_url'] + for key in self.__usefull_keys: try: self.__info[key] = info[key] except Exception as e: print(e) - raise e - - def embed(self) -> Embed: - """Configure and return the info to create the embed for this song""" - info = { - 'title': self.__title, - 'url': self.__url, - 'uploader': self.__info['uploader'] - } - - if 'thumbnail' in self.__info.keys(): - info['thumbnail'] = self.__info['thumbnail'] - - if 'duration' in self.__info.keys(): - info['duration'] = self.__info['duration'] - - return info - - @property - def info(self) -> dict: - """Return the compiled info of this song""" - if self.__info: - return self.__info - - @property - def title(self) -> str: - return self.__title + raise e @property def source(self) -> str: """Return the Song Source URL to play""" if 'url' in self.__info.keys(): return self.__info['url'] + else: + return None @property - def url(self) -> str: - return self.__url + def title(self) -> str: + """Return the Song Title""" + if 'title' in self.__info.keys(): + return self.__info['title'] + else: + return None + + @property + def duration(self) -> str: + """Return the Song Title""" + if 'duration' in self.__info.keys(): + return self.__info['duration'] + else: + return 0.0 + + @property + def identifier(self) -> str: + return self.__identifier + + def destroy(self) -> None: + """Destroy the song from the playlist due to any type of error""" + self.__playlist.destroy_song(self) + del self + + def embed(self, title: str) -> Embed: + """Configure the embed to show the song information""" + + embedvc = Embed( + title=title, + description=f"[{self.__info['title']}]({self.__info['original_url']})", + color=config.COLOURS['blue'] + ) + + embedvc.add_field(name=config.SONGINFO_UPLOADER, + value=self.__info['uploader'], + inline=False) + + if 'thumbnail' in self.__info.keys(): + embedvc.set_thumbnail(url=self.__info['thumbnail']) + + if 'duration' in self.__info.keys(): + duration = str(datetime.timedelta(seconds=self.__info['duration'])) + embedvc.add_field(name=config.SONGINFO_DURATION, + value=f"{duration}", + inline=False) + else: + embedvc.add_field(name=config.SONGINFO_DURATION, + value=config.SONGINFO_UNKNOWN_DURATION, + inline=False) + + return embedvc diff --git a/vulkanbot/music/Types.py b/vulkanbot/music/Types.py index c30a349..3635c29 100644 --- a/vulkanbot/music/Types.py +++ b/vulkanbot/music/Types.py @@ -4,18 +4,6 @@ from enum import Enum class Provider(Enum): """Store Enum Types of the Providers""" Spotify = "Spotify" - Spotify_Playlist = "Spotify Playlist" YouTube = "YouTube" Name = 'Track Name' Unknown = "Unknown" - - -class Playlist_Types(Enum): - Spotify_Playlist = "Spotify Playlist" - YouTube_Playlist = "YouTube Playlist" - Unknown = "Unknown" - - -class Origins(Enum): - Default = "Default" - Playlist = "Playlist" diff --git a/vulkanbot/music/utils.py b/vulkanbot/music/utils.py new file mode 100644 index 0000000..2dd078c --- /dev/null +++ b/vulkanbot/music/utils.py @@ -0,0 +1,27 @@ +import re + +def format_time(duration): + if not duration: + return "00:00" + + hours = duration // 60 // 60 + minutes = duration // 60 % 60 + seconds = duration % 60 + + return "{}{}{:02d}:{:02d}".format( + hours if hours else "", + ":" if hours else "", + minutes, + seconds + ) + + +def is_url(string) -> bool: + """Verify if a string is a url""" + regex = re.compile( + "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") + + if re.search(regex, string): + return True + else: + return False