Merge pull request #3 from RafaelSolVargas/using-asyncio

Turning Vulkan more asynchronous
This commit is contained in:
Rafael Vargas 2021-12-30 14:53:40 -04:00 committed by GitHub
commit b5974b28e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 407 additions and 371 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ __pycache__
examples
.env
errors
.cache

View File

@ -12,52 +12,18 @@ INITIAL_EXTENSIONS = {'vulkanbot.commands.Phrases', 'vulkanbot.commands.Warframe
'vulkanbot.general.Filter', 'vulkanbot.general.Control', 'vulkanbot.music.Music',
'vulkanbot.commands.Random'}
VC_TIMEOUT = 600 # seconds
VC_TIMEOUT_DEFAULT = True
STARTUP_MESSAGE = 'Starting Vulkan...'
STARTUP_COMPLETE_MESSAGE = 'Vulkan is now operating.'
USER_NOT_IN_VC_MESSAGE = "Error: Please join the active voice channel to use this command"
NOT_CONNECTED_MESSAGE = "Error: Bot not connected to any voice channel"
ALREADY_CONNECTED_MESSAGE = "Error: Already connected to a voice channel"
CHANNEL_NOT_FOUND_MESSAGE = "Error: Could not find channel"
INFO_HISTORY_TITLE = "Songs Played:"
MAX_HISTORY_LENGTH = 10
MAX_PLAYLIST_LENGTH = 50
MAX_QUEUE_LENGTH = 10
MAX_TRACKNAME_HISTORY_LENGTH = 15
MAX_API_PHRASES_TRIES = 10
MAX_API_CETUS_TRIES = 10
MAX_PRELOAD_SONGS = 10
SONGINFO_UPLOADER = "Uploader: "
SONGINFO_DURATION = "Duration: "
SONGINFO_SECONDS = "s"
SONGINFO_LIKES = "Likes: "
SONGINFO_DISLIKES = "Dislikes: "
SONGINFO_NOW_PLAYING = "Now Playing"
SONGINFO_QUEUE_ADDED = "Added to queue"
SONGINFO_SONGINFO = "Song info"
SONGINFO_PLAYLIST_QUEUED = "Queued playlist :page_with_curl:"
SONGINFO_UNKNOWN_DURATION = "Unknown"
HELP_HISTORY_LONG = "Shows the " + \
str(MAX_TRACKNAME_HISTORY_LENGTH) + " last played songs."
HELP_PAUSE_LONG = "Pauses the AudioPlayer. Playback can be continued with the resume command."
HELP_VOL_LONG = "Changes the volume of the AudioPlayer. Argument specifies the % to which the volume should be set."
HELP_PREV_LONG = "Plays the previous song again."
HELP_RESUME_LONG = "Resumes the AudioPlayer."
HELP_SKIP_LONG = "Skips the currently playing song and goes to the next item in the queue."
HELP_SONGINFO_LONG = "Shows details about the song currently being played and posts a link to the song."
HELP_STOP_LONG = "Stops the AudioPlayer and clears the songqueue"
HELP_YT_LONG = (
"$p [link/video title/key words/playlist-link/soundcloud link/spotify link/bandcamp link/twitter link]")
HELP_CLEAR_LONG = "Clears the queue."
HELP_LOOP_LONG = "Loops the currently playing song and locks the queue. Use the command again to disable loop."
HELP_QUEUE_LONG = "Shows the number of songs in queue, up to 10."
HELP_SHUFFLE_LONG = "Randomly sort the songs in the current queue"
ABSOLUTE_PATH = ''
COOKIE_PATH = '/config/cookies/cookies.txt'

View File

@ -10,7 +10,7 @@ class Control(commands.Cog):
def __init__(self, bot):
self.__bot = bot
self.__comandos = {
'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear'],
'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np'],
'RANDOM': ['escolha', 'cara', 'random'],
'HELP': ['help'],
'OTHERS': ['cetus', 'frase']
@ -27,13 +27,13 @@ class Control(commands.Cog):
@commands.Cog.listener()
async def on_ready(self):
print(config.STARTUP_MESSAGE)
await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help"))
await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | {config.BOT_PREFIX}help"))
print(config.STARTUP_COMPLETE_MESSAGE)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
if isinstance(error, MissingRequiredArgument):
await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver os comandos')
await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver todos os comandos\n\nOu tente {config.BOT_PREFIX}command help para mais informações')
elif isinstance(error, CommandNotFound):
await ctx.channel.send(f'O comando não existe')
else:

View File

@ -1,10 +1,12 @@
import re
import asyncio
import concurrent.futures
from config import config
from yt_dlp import YoutubeDL
from yt_dlp.utils import ExtractorError, DownloadError
from vulkanbot.music.Types import Provider
from vulkanbot.music.Song import Song
from vulkanbot.music.utils import is_url
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
@ -17,29 +19,60 @@ class Downloader():
'playlistend': config.MAX_PLAYLIST_LENGTH,
}
def download_urls(self, musics_input, provider: Provider) -> list:
"""Download the musics direct URL from Youtube and return in a list
def download_one(self, song: Song) -> Song:
"""Receives a song object, finish his download and return it"""
if song.identifier == None:
print('Invalid song identifier type')
return
Arg: List with names or youtube url or a Unique String
Return: List with the direct youtube URL of each music
"""
if type(provider) != Provider:
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Destroy the music with problems
return None
if type(musics_input) != list and type(musics_input) != str:
return None
if provider == Provider.Name: # Send a list of names
musics_urls = self.__download_titles(musics_input)
return musics_urls
elif provider == Provider.YouTube: # Send a URL or Title
url = self.__download_one(musics_input)
return url
else:
song.finish_down(song_info)
return song
def extract_youtube_link(self, playlist_url: str) -> list:
"""Extract all songs direct URL from a Youtube Link
Arg: Url String
Return: List with the direct youtube URL of each song
"""
if is_url(playlist_url): # If Url
options = self.__YDL_OPTIONS
options['extract_flat'] = True
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(playlist_url, download=False)
songs_identifiers = []
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
songs_identifiers.append(f"https://www.youtube.com/watch?v={entry['id']}")
else: # Or a single music
songs_identifiers.append(result['original_url'])
return songs_identifiers # Return a list
except (ExtractorError, DownloadError) as e:
print(e)
return None
else:
print('Invalid type of playlist URL')
return None
def download_source(self, url) -> dict:
async def preload(self, songs: list) -> None:
"""Download the full info of the song object"""
for song in songs:
asyncio.ensure_future(self.__download_songs(song))
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
Arg: URL from Youtube
@ -47,6 +80,7 @@ class Downloader():
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = False
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
@ -54,92 +88,52 @@ class Downloader():
return result
except (ExtractorError, DownloadError) as e: # Any type of error in download
print(e)
return None
def __download_one(self, music: str) -> list:
"""Download one music/playlist direct link from Youtube
Arg: Playlist URL or Music Name to download direct URL
Return: List with the Youtube URL of each music downloaded
"""
if type(music) != str:
async def __download_songs(self, song: Song):
if song.source != None: # If Music already preloaded
return
if self.__is_url(music): # If Url
info = self.__download_links(music) # List of dict
else: # If Title
info = self.__download_titles(music) # List of dict
def download_song(song):
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
return info
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
else:
song.finish_down(song_info)
def __download_titles(self, musics_names: list) -> list:
"""Download a music direct URL using his name.
# Creating a loop task to download each song
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.MAX_PRELOAD_SONGS
)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
Arg: Music Name
Return: List with one dict, containing the music direct URL and title
Return: A dict containing the song information
"""
if type(musics_names) == str: # Turn str into list
musics_names = [musics_names]
if type(title) != str:
print('Invalid music identifier type')
return
config = self.__YDL_OPTIONS
config['extract_flat'] = False
musics_info = []
with YoutubeDL(self.__YDL_OPTIONS) as ydl:
try:
for name in musics_names:
search = f"ytsearch:{name}"
result = ydl.extract_info(search, download=False)
search = f"ytsearch:{title}"
result = ydl.extract_info(search, download=False)
id = result['entries'][0]['id']
music_info = {
'url': f"https://www.youtube.com/watch?v={id}",
'title': result['entries'][0]['title']
}
musics_info.append(music_info)
if result == None:
return
return musics_info # Return a list
# Return a dict with the full info of first music
return result['entries'][0]
except Exception as e:
raise e
def __download_links(self, url: str) -> list:
"""Download musics direct links from Playlist URL or Music URL
Arg_Url: URL from Youtube
Return: List of dicts, with the title and url of each music
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = True
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
musics_info = []
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
music_info = {
'title': entry['title'],
'url': f"https://www.youtube.com/watch?v={entry['id']}"
}
musics_info.append(music_info)
else: # Or a single music
music_info = {
'url': result['original_url'],
'title': result['title']
}
musics_info.append(music_info)
return musics_info # Return a list
except ExtractorError or DownloadError:
pass
def __is_url(self, string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False
print(e)

View File

@ -0,0 +1,91 @@
from abc import ABC, abstractproperty, abstractmethod
class IPlaylist(ABC):
"""Class to manage and control the songs to play and played"""
@abstractproperty
def looping_one(self):
pass
@abstractproperty
def looping_all(self):
pass
@abstractproperty
def songs_to_preload(self) -> list:
pass
@abstractmethod
def __len__(self):
pass
@abstractmethod
def next_song(self):
pass
@abstractmethod
def prev_song(self):
pass
@abstractmethod
def add_song(self, identifier: str) -> None:
pass
@abstractmethod
def shuffle(self) -> None:
pass
@abstractmethod
def revert(self) -> None:
pass
@abstractmethod
def clear(self) -> None:
pass
@abstractmethod
def loop_one(self) -> str:
pass
@abstractmethod
def loop_all(self) -> str:
pass
@abstractmethod
def loop_off(self) -> str:
pass
@abstractmethod
def destroy_song(self, song_destroy) -> None:
pass
class ISong(ABC):
"""Store the usefull information about a Song"""
@abstractmethod
def finish_down(self, info: dict) -> None:
pass
@abstractmethod
def source(self) -> str:
pass
@abstractmethod
def title(self) -> str:
pass
@abstractmethod
def duration(self) -> str:
pass
@abstractmethod
def identifier(self) -> str:
pass
@abstractmethod
def destroy(self) -> None:
pass

View File

@ -1,22 +1,22 @@
import discord
from discord.ext import commands
import datetime
import asyncio
from config import config
from vulkanbot.music.Downloader import Downloader
from vulkanbot.music.Playlist import Playlist
from vulkanbot.music.Searcher import Searcher
from vulkanbot.music.Types import Provider
from vulkanbot.music.utils import *
class Music(commands.Cog):
def __init__(self, bot):
self.__searcher = Searcher()
self.__downloader = Downloader()
self.__playlist = Playlist()
self.__searcher: Searcher = Searcher()
self.__downloader: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__playing = False
self.__bot = bot
self.__ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe'
self.__vc = "" # Objeto voice_bot do discord
@ -24,98 +24,109 @@ class Music(commands.Cog):
self.FFMPEG_OPTIONS = {'executable': self.__ffmpeg,
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'}
def __play_next(self):
def __play_next(self, error, ctx):
while True:
if len(self.__playlist) > 0:
source = self.__playlist.next_song()
if source == None: # If there is not a source
if source == None: # If there is not a source for the song
continue
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next())
coro = self.__play_music(ctx, source)
self.__bot.loop.create_task(coro)
break
else:
self.__playing = False
break
# infinite loop checking
async def __play_music(self):
while True:
if len(self.__playlist) > 0:
source = self.__playlist.next_song()
if source == None:
continue
async def __play_music(self, ctx, song):
self.__playing = True
self.__playing = True
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next())
break
else:
self.__playing = False
await self.__vc.disconnect()
break
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next(e, ctx))
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
@commands.command(name="play", help="Toca música - YouTube/Spotify/Título", aliases=['p', 'tocar'])
async def play(self, ctx, *args):
user_input = " ".join(args)
try:
if self.__vc == "" or not self.__vc.is_connected() or self.__vc == None:
if len(self.__bot.voice_clients) == 0:
voice_channel = ctx.author.voice.channel
self.__vc = await voice_channel.connect()
except Exception as e:
# If voice_channel is None:
print(e)
await self.__send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey')
return
else:
songs_quant = 0
musics_names, provider = self.__searcher.search(user_input)
for music in musics_names:
music_info = self.__downloader.download_urls(music, provider)
musics_identifiers, provider = self.__searcher.search(user_input)
for music in music_info:
self.__playlist.add_song(music)
songs_quant += 1
if provider == Provider.Unknown: # If type not identified
await self.__send_embed(ctx, description='Entrada inválida, tente algo melhor', colour_name='blue')
return
if songs_quant == 1:
await self.__send_embed(ctx, description=f"Você adicionou a música **{music_info[0]['title']}** à fila!", colour_name='green')
if provider == Provider.YouTube: # If youtube source
musics_identifiers = self.__downloader.extract_youtube_link(musics_identifiers[0])
for identifier in musics_identifiers: # Creating songs
last_song = self.__playlist.add_song(identifier)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(last_song) # Download the new music
if song == None: # If song not downloaded
await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue')
elif not self.__playing : # If not playing
await self.__send_embed(ctx, description=f'Você adicionou a música **{song.title}** à playlist', colour_name='blue')
else: # If playing
await ctx.send(embed=song.embed(title='Song added to Queue'))
else:
await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='green')
await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='blue')
if not self.__playing:
await self.__play_music()
first = self.__playlist.songs_to_preload[0]
self.__downloader.download_one(first)
first_song = self.__playlist.next_song()
await self.__play_music(ctx, first_song)
@commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila'])
async def queue(self, ctx):
if self.__playlist.looping_one: # If Repeting one
# Send the current song with this title
await self.this(ctx)
if self.__playlist.looping_one: # If Repeating one
await self.now_playing(ctx)
return
fila = self.__playlist.queue()
total = len(fila)
text = f'Total musics: {total}\n\n'
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0) for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
# Create the string to description
for pos, song in enumerate(fila):
if pos >= config.MAX_QUEUE_LENGTH: # Max songs to apper in queue list
break
for pos, song in enumerate(songs_preload, start=1):
title = song.title if song.title else 'Downloading...'
text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n"
text += f"**{pos+1} - ** {song}\n"
if text != "":
if self.__playlist.looping_all: # If repeting all
await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='green')
else: # Repeting off
await self.__send_embed(ctx, title='Queue', description=text, colour_name='green')
if len(songs_preload) > 0:
if self.__playlist.looping_all: # If repeating all
await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='blue')
else: # Repeating off
await self.__send_embed(ctx, title='Songs in Queue', description=text, colour_name='blue')
else: # No music
await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red')
@commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular'])
async def skip(self, ctx):
if self.__vc != '' and self.__vc:
print('Skip')
if len(self.__bot.voice_clients) > 0:
self.__vc.stop()
@commands.command(name='stop', help='Para de tocar músicas')
@ -135,7 +146,7 @@ class Music(commands.Cog):
self.__vc.pause()
await self.__send_embed(ctx, description='Música pausada', colour_name='green')
@commands.command(name='resume', help='Despausa a música atual')
@commands.command(name='resume', help='Solta a música atual')
async def resume(self, ctx):
if self.__vc == '':
return
@ -157,6 +168,19 @@ class Music(commands.Cog):
await self.__send_embed(ctx, description=description, colour_name='grey')
@commands.command(name='clear', help='Limpa a fila de músicas a tocar')
async def clear(self, ctx):
self.__playlist.clear()
@commands.command(name='np', help='Mostra a música que está tocando no instante')
async def now_playing(self, ctx):
if self.__playlist.looping_one:
title = 'Song Looping Now'
else:
title = 'Song Playing Now'
current_song = self.__playlist.current
await ctx.send(embed=current_song.embed(title=title))
async def __send_embed(self, ctx, title='', description='', colour_name='grey'):
try:
@ -171,43 +195,6 @@ class Music(commands.Cog):
)
await ctx.send(embed=embedvc)
@commands.command(name='clear', help='Limpa a fila de músicas a tocar')
async def clear(self, ctx):
self.__playlist.clear()
@commands.command(name='this', help='Mostra a música que está tocando no instante')
async def this(self, ctx):
if self.__playlist.looping_one:
title = 'Music Looping Now'
else:
title = 'Music Playing Now'
info = self.__playlist.get_current()
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['url']})",
color=config.COLOURS['grey']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=False)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
await ctx.send(embed=embedvc)
def setup(bot):
bot.add_cog(Music(bot))

View File

@ -1,14 +1,16 @@
from collections import deque
import random
from config import config
from vulkanbot.music.Interfaces import IPlaylist
from vulkanbot.music.Song import Song
from vulkanbot.music.Downloader import Downloader
class Playlist():
class Playlist(IPlaylist):
"""Class to manage and control the songs to play and played"""
def __init__(self) -> None:
self.__down = Downloader()
self.__queue = deque() # Store the musics to play
self.__songs_history = deque() # Store the musics played
self.__name_history = deque() # Store the name of musics played
@ -16,7 +18,7 @@ class Playlist():
self.__looping_one = False
self.__looping_all = False
self.__current = None
self.__current: Song = None
@property
def looping_one(self):
@ -26,19 +28,22 @@ class Playlist():
def looping_all(self):
return self.__looping_all
@property
def current(self):
return self.__current
@property
def songs_to_preload(self) -> list:
return list(self.__queue)[:config.MAX_PRELOAD_SONGS]
def __len__(self):
if self.__looping_one == True or self.__looping_all == True:
return 1
else:
return len(self.__queue)
def next_song(self):
"""Return the source of the next song to play"""
if self.__current == None: # If not playing
if len(self.__queue) == 0: # If nothing to play
return None
else: # If there is music to play
return self.__start()
def next_song(self) -> Song:
"""Return the next song to play"""
if self.__current == None and len(self.__queue) == 0:
# If not playing and nothing to play
return None
# If playing
played_song = self.__current
@ -54,55 +59,15 @@ class Playlist():
if len(self.__queue) == 0: # If no more song to play, return None
return None
# If there is more to play
# Finish download of the next song
source = self.__prepare_next(self.__queue[0])
if source == None: # If there is a problem in the download
self.__queue.popleft() # Remove the music with problems
self.__current = self.__queue[0] # Att the current with the first one
self.__queue.popleft() # Remove the current from queue
if self.__current.source == None: # Try until find one source
continue
return source
def get_current(self):
"""Return current music embed"""
if self.__current:
return self.__current.embed()
else:
return 'Nenhuma música tocando'
def __prepare_next(self, next_song: Song) -> str:
"""Finish the download of the music and return the source"""
if next_song.source == None: # Check if source has already downloaded
url = next_song.url # Get the URL
info = self.__down.download_source(url) # Download the source
if info == None: # If there is a problem in the download
return None
next_song.finish_down(info) # Updating the info of song
# Att the Playlist info
self.__current = next_song # Att the current
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(self.__current.title) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current.source # Return the source of current
def __start(self) -> None:
"""Start the play of the first musics and return his source"""
# Finish download of the next song
url = self.__queue[0].url # Get the URL
info = self.__down.download_source(url) # Download the source
self.__queue[0].finish_down(info) # Att the song
# Att Playlist info
self.__current = self.__queue[0] # Att the current
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(self.__current.title) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current.source # Return the source of current
else:
self.__name_history.append(self.__current.title) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current
def prev_song(self):
"""Return the source of the last song played
@ -114,14 +79,11 @@ class Playlist():
else:
return self.__songs_history[0].source
def add_song(self, music: dict) -> None:
"""Receives a music object and store to the play queue"""
if (not 'title' in music.keys()) or (not 'url' in music.keys()):
print('Music without necessary keys')
return
song = Song(title=music['title'], url=music['url']) # Cria a musica
def add_song(self, identifier: str) -> Song:
"""Create a song object, add to queue and return it"""
song = Song(identifier, self) # Cria a musica com o identificador
self.__queue.append(song)
return song
def shuffle(self) -> None:
"""Shuffle the order of the songs to play"""
@ -171,9 +133,9 @@ class Playlist():
self.__looping_one = False
return 'Loop disable'
def queue(self) -> list:
list_songs = []
def destroy_song(self, song_destroy: Song) -> None:
"""Destroy a song object from the queue"""
for song in self.__queue:
title = song.title
list_songs.append(title)
return list_songs
if song == song_destroy:
self.__queue.remove(song)
break

View File

@ -1,6 +1,6 @@
import re
from vulkanbot.music.Types import Provider
from vulkanbot.music.Spotify import SpotifySearch
from vulkanbot.music.utils import is_url
class Searcher():
@ -11,9 +11,10 @@ class Searcher():
print(f'Spotify Connected: {self.__Spotify.connect()}')
def search(self, music: str) -> list:
"""Return a list with the track name of a music or playlist
"""Return a list with the song names or an URL
Return -> A list of musics names
Arg -> User Input, a string with the
Return -> A list of musics names and Provider Type
"""
url_type = self.__identify_source(music)
@ -27,9 +28,12 @@ class Searcher():
elif url_type == Provider.Name:
return [music], Provider.Name
elif url_type == Provider.Unknown:
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
"""Identify the provider of a music"""
if not self.__is_url(music):
if not is_url(music):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music:
@ -41,12 +45,3 @@ class Searcher():
# If no match
return Provider.Unknown
def __is_url(self, string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False

View File

@ -1,25 +1,25 @@
from discord.embeds import Embed
from discord import Embed
import datetime
from vulkanbot.music.Interfaces import ISong, IPlaylist
from config import config
class Song():
class Song(ISong):
"""Store the usefull information about a Song"""
def __init__(self, url: str, title: str) -> None:
def __init__(self, identifier: str, playlist: IPlaylist) -> None:
"""Create a song with only the URL to the youtube song"""
self.__url = url
self.__title = title
self.__identifier = identifier
self.__info = {}
self.__playlist: IPlaylist = playlist
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
'description', 'webpage_url',
'title', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail']
self.__extract_info(info)
'thumbnail', 'original_url']
def __extract_info(self, info) -> None:
"""Extract the usefull information returned by the Downloader"""
for key in self.__usefull_keys:
try:
self.__info[key] = info[key]
@ -27,38 +27,63 @@ class Song():
print(e)
raise e
def embed(self) -> Embed:
"""Configure and return the info to create the embed for this song"""
info = {
'title': self.__title,
'url': self.__url,
'uploader': self.__info['uploader']
}
if 'thumbnail' in self.__info.keys():
info['thumbnail'] = self.__info['thumbnail']
if 'duration' in self.__info.keys():
info['duration'] = self.__info['duration']
return info
@property
def info(self) -> dict:
"""Return the compiled info of this song"""
if self.__info:
return self.__info
@property
def title(self) -> str:
return self.__title
@property
def source(self) -> str:
"""Return the Song Source URL to play"""
if 'url' in self.__info.keys():
return self.__info['url']
else:
return None
@property
def url(self) -> str:
return self.__url
def title(self) -> str:
"""Return the Song Title"""
if 'title' in self.__info.keys():
return self.__info['title']
else:
return None
@property
def duration(self) -> str:
"""Return the Song Title"""
if 'duration' in self.__info.keys():
return self.__info['duration']
else:
return 0.0
@property
def identifier(self) -> str:
return self.__identifier
def destroy(self) -> None:
"""Destroy the song from the playlist due to any type of error"""
self.__playlist.destroy_song(self)
del self
def embed(self, title: str) -> Embed:
"""Configure the embed to show the song information"""
embedvc = Embed(
title=title,
description=f"[{self.__info['title']}]({self.__info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=self.__info['uploader'],
inline=False)
if 'thumbnail' in self.__info.keys():
embedvc.set_thumbnail(url=self.__info['thumbnail'])
if 'duration' in self.__info.keys():
duration = str(datetime.timedelta(seconds=self.__info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
return embedvc

View File

@ -4,18 +4,6 @@ from enum import Enum
class Provider(Enum):
"""Store Enum Types of the Providers"""
Spotify = "Spotify"
Spotify_Playlist = "Spotify Playlist"
YouTube = "YouTube"
Name = 'Track Name'
Unknown = "Unknown"
class Playlist_Types(Enum):
Spotify_Playlist = "Spotify Playlist"
YouTube_Playlist = "YouTube Playlist"
Unknown = "Unknown"
class Origins(Enum):
Default = "Default"
Playlist = "Playlist"

27
vulkanbot/music/utils.py Normal file
View File

@ -0,0 +1,27 @@
import re
def format_time(duration):
if not duration:
return "00:00"
hours = duration // 60 // 60
minutes = duration // 60 % 60
seconds = duration % 60
return "{}{}{:02d}:{:02d}".format(
hours if hours else "",
":" if hours else "",
minutes,
seconds
)
def is_url(string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False