diff --git a/config/config.py b/config/config.py index e531c56..f185aeb 100644 --- a/config/config.py +++ b/config/config.py @@ -4,10 +4,13 @@ CETUS_API = dotenv_values('.env')['CETUS_API'] BOT_TOKEN = dotenv_values('.env')['BOT_TOKEN'] SPOTIFY_ID = dotenv_values('.env')['SPOTIFY_ID'] SPOTIFY_SECRET = dotenv_values('.env')['SPOTIFY_SECRET'] +SECRET_MESSAGE = dotenv_values('.env')['SECRET_MESSAGE'] +PHRASES_API = dotenv_values('.env')['PHRASES_API'] BOT_PREFIX = '!' -INITIAL_EXTENSIONS = {'vulkan.commands.Phrases', 'vulkan.commands.Warframe', - 'vulkan.general.Filter', 'vulkan.general.Control', 'vulkan.music.Music'} +INITIAL_EXTENSIONS = {'vulkanbot.commands.Phrases', 'vulkanbot.commands.Warframe', + 'vulkanbot.general.Filter', 'vulkanbot.general.Control', 'vulkanbot.music.Music', + 'vulkanbot.commands.Random'} VC_TIMEOUT = 600 # seconds VC_TIMEOUT_DEFAULT = True @@ -22,7 +25,11 @@ CHANNEL_NOT_FOUND_MESSAGE = "Error: Could not find channel" INFO_HISTORY_TITLE = "Songs Played:" MAX_HISTORY_LENGTH = 10 +MAX_PLAYLIST_LENGTH = 50 +MAX_QUEUE_LENGTH = 10 MAX_TRACKNAME_HISTORY_LENGTH = 15 +MAX_API_PHRASES_TRIES = 10 +MAX_API_CETUS_TRIES = 10 SONGINFO_UPLOADER = "Uploader: " SONGINFO_DURATION = "Duration: " @@ -58,7 +65,7 @@ COOKIE_PATH = '/config/cookies/cookies.txt' COLOURS = { 'red': 0xDC143C, - 'green': 0x00FF7F, + 'green': 0x58D68D, 'grey': 0x708090, - 'blue': 0x0000CD + 'blue': 0x3498DB } diff --git a/main.py b/main.py index 7723ce8..608f10e 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,7 @@ import discord from config import config from discord.ext import commands -from vulkan.ErrorHandler import ErrorHandler +from vulkanbot.ErrorHandler import ErrorHandler intents = discord.Intents.default() diff --git a/vulkan/commands/Phrases.py b/vulkan/commands/Phrases.py deleted file mode 100644 index b9513b9..0000000 --- a/vulkan/commands/Phrases.py +++ /dev/null @@ -1,59 +0,0 @@ -import requests -import json -import discord -from discord.ext import commands -from random import random as rand - - -class Phrases(commands.Cog): - """Deal with the generation of motivational phrases""" - - def __init__(self, bot): - self.__bot = bot - - @property - def bot(self): - return self.__bot - - @bot.setter - def bot(self, newBot): - self.__bot = newBot - - @commands.command(name='frase', help='Envia uma frase legal no seu PV') - async def send_phrase(self, ctx): - # There is a chance that the phrase will be send for the dev - sended = await self.calculate_rgn(ctx) - if sended: - return - - while True: - try: - response = requests.get( - 'http://api.forismatic.com/api/1.0/?method=getQuote&key=457653&format=json&lang=en') - data = json.loads(response.content) - - phrase = data['quoteText'] - author = data['quoteAuthor'] - - text = f'{phrase} \nBy: {author}' - await ctx.send(text) - break - except json.decoder.JSONDecodeError: - continue - except Exception as e: - print(e) - await ctx.channel.send('Houve um erro inesperado :/') - break - - async def calculate_rgn(self, ctx): - x = rand() - print(x) - if x < 0.15: - await ctx.send('Se leu seu cu é meu\nBy: Minha Pica') - return True - else: - return False - - -def setup(bot): - bot.add_cog(Phrases(bot)) diff --git a/vulkan/commands/Warframe.py b/vulkan/commands/Warframe.py deleted file mode 100644 index dca4bb7..0000000 --- a/vulkan/commands/Warframe.py +++ /dev/null @@ -1,51 +0,0 @@ -import requests -import json -import discord -from dotenv import dotenv_values -from discord.ext import commands -from config import config - - -class Warframe(commands.Cog): - """Deal with the generation of warframe data""" - - def __init__(self, bot): - self.__bot = bot - - @property - def bot(self): - return self.__bot - - @bot.setter - def bot(self, newBot): - self.__bot = newBot - - @commands.command(name='cetus', help='Informa o tempo atual de Cetus - Warframe') - async def get_cetus(self, ctx): - try: - response = requests.get(config.CETUS_API) - data = json.loads(response.content) - short = data['shortString'] - - responseText = f'{short}' - - embed = discord.Embed( - title='Warframe Cetus Timing', - description=responseText, - colour=0xFF0000 - ) - await ctx.send(embed=embed) - - except Exception as e: - print(e) - responseText = f'Houve um erro inesperado :/' - embed = discord.Embed( - title='Warframe Cetus Timing', - description=responseText, - colour=0xFF0000 - ) - await ctx.send(embed=embed) - - -def setup(bot): - bot.add_cog(Warframe(bot)) diff --git a/vulkan/general/Control.py b/vulkan/general/Control.py deleted file mode 100644 index 12849cf..0000000 --- a/vulkan/general/Control.py +++ /dev/null @@ -1,38 +0,0 @@ -import discord -from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument -from discord.ext import commands -from config import config - - -class Control(commands.Cog): - """Control the flow of the Bot""" - - def __init__(self, bot): - self.__bot = bot - - @property - def bot(self): - return self.__bot - - @bot.setter - def bot(self, newBot): - self.__bot = newBot - - @commands.Cog.listener() - async def on_ready(self): - print(config.STARTUP_MESSAGE) - await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help")) - print(config.STARTUP_COMPLETE_MESSAGE) - - @commands.Cog.listener() - async def on_command_error(self, ctx, error): - if isinstance(error, MissingRequiredArgument): - await ctx.channel.send(f'Falta argumentos. Digite {self.__bot.prefix}help para ver os comandos') - elif isinstance(error, CommandNotFound): - await ctx.channel.send(f'O comando não existe') - else: - raise error - - -def setup(bot): - bot.add_cog(Control(bot)) diff --git a/vulkan/music/Music.py b/vulkan/music/Music.py deleted file mode 100644 index f80e03a..0000000 --- a/vulkan/music/Music.py +++ /dev/null @@ -1,239 +0,0 @@ -import discord -from discord import colour -from discord.embeds import Embed -from discord.ext import commands -from discord.ext.commands.core import command -from youtube_dl import YoutubeDL - -colours = { - 'red': 0xDC143C, - 'green': 0x00FF7F, - 'grey': 0x708090, - 'blue': 0x0000CD -} - - -class Music(commands.Cog): - def __init__(self, client): - self.client = client - self.is_playing = False - self.repetingOne = False - self.repetingAll = False - self.current = () - # 2d array containing [song, channel] - # self.music_queue vai conter as buscas recebidas feitas no youtube em ordem - # Caminho do executável para rodar na minha máquina - self.ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe' - # Segue o padrão de [[{'source', 'title'}, canal], [musica, canal]] - self.music_queue = [] - self.vc = "" # Objeto voice_client do discord - self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'} - self.FFMPEG_OPTIONS = {'executable': self.ffmpeg, - 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'} - - def search_yt(self, item): - with YoutubeDL(self.YDL_OPTIONS) as ydl: - try: # Busca um video no youtube e traz o titulo e a fonte dele em formato de dict - info = ydl.extract_info("ytsearch:%s" % - item, download=False)['entries'][0] - - except Exception: - return False - # Retorna a fonte e o titulo buscado - return {'source': info['formats'][0]['url'], 'title': info['title']} - - def play_next(self): - if len(self.music_queue) > 0: - if self.repetingOne: - # Coloca a musica atual no topo da fila - self.music_queue.insert(0, self.current) - elif self.repetingAll: - # Joga a musica atual para o final da fila - self.music_queue.append(self.current) - - self.is_playing = True - source = self.music_queue[0][0]['source'] - self.current = self.music_queue[0] # Update current music - self.music_queue.pop(0) # Remove from the queue - player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) - self.vc.play(player, after=lambda e: self.play_next()) # Play - else: - self.is_playing = False - self.repetingAll = False - self.repetingOne = False - - # infinite loop checking - async def play_music(self): - if len(self.music_queue) > 0: - self.is_playing = True - source = self.music_queue[0][0]['source'] - - # Try to connect to voice channel if you are not already connected - if self.vc == "" or not self.vc.is_connected() or self.vc == None: - # Conecta o voice_client no channel da primeira música da lista - self.vc = await self.music_queue[0][1].connect() - else: - await self.vc.move_to(self.music_queue[0][1]) - - self.current = self.music_queue[0] # Update current music - self.music_queue.pop(0) # Remove from the queue - player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) - # Start the player - self.vc.play(player, after=lambda e: self.play_next()) - else: - self.is_playing = False - await self.vc.disconnect() - - @commands.command(name="help", alisases=['ajuda'], help="Comando de ajuda") - async def ajuda(self, ctx): - helptxt = '' - for command in self.client.commands: - helptxt += f'**{command}** - {command.help}\n' - embedhelp = discord.Embed( - colour=1646116, # grey - title=f'Comandos do {self.client.user.name}', - description=helptxt - ) - embedhelp.set_thumbnail(url=self.client.user.avatar_url) - await ctx.send(embed=embedhelp) - - @commands.command(name="play", help="Toca uma música do YouTube", aliases=['p', 'tocar']) - async def p(self, ctx, *args): - query = " ".join(args) - - try: - # Nome do canal de voz que vai entrar - voice_channel = ctx.author.voice.channel - except: - # If voice_channel is None: - await self.send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey') - return - else: - song = self.search_yt(query) - if type(song) == type(True): # Caso seja retornado um booleano da busca - await self.send_embed(ctx, description='Algo deu errado! Tente escrever o nome da música novamente!', colour_name='red') - return - else: - await self.send_embed(ctx, description=f"Você adicionou a música **{song['title']}** à fila!", colour_name='green') - self.music_queue.append([song, voice_channel]) - - if self.is_playing == False: - await self.play_music() - - @commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila']) - async def q(self, ctx): - fila = "" - for x in range(len(self.music_queue)): - fila += f"**{x+1} - ** {self.music_queue[x][0]['title']}\n" - - if self.repetingOne: # If Repeting one - await self.send_embed(ctx, title='Repeting One Music', - description=f'Música: **{self.current[0]["title"]}**', colour_name='green') - elif fila != "": - if self.repetingAll: # If repeting all - await self.send_embed(ctx, title='Repetindo todas', description=fila, colour_name='green') - else: # Repeting off - await self.send_embed(ctx, description=fila, colour_name='green') - else: # No music - await self.send_embed(ctx, description='Não existem músicas na fila.', colour_name='red') - - @commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular']) - async def skip(self, ctx): - if self.vc != "" and self.vc: - self.vc.stop() - await self.send_embed(ctx, description=f'Você pulou a música\nRepetindo Uma: {self.repetingOne} \ - \nRepetindo Todas: {self.repetingAll}', colour_name='green') - - @commands.command(name='stop', help='Para de tocar músicas') - async def stop(self, ctx): - if self.vc == '': - return - if self.vc.is_connected(): - # Remove todas as músicas da lista - self.music_queue = [] - self.current = () - self.repetingOne = False - self.repetingAll = False - self.is_playing = False - self.vc.stop() - await self.vc.disconnect() - - @commands.command(name='pause', help='Pausa a música') - async def pause(self, ctx): - if self.vc == '': - return - if self.vc.is_playing(): - self.vc.pause() - await self.send_embed(ctx, description='Música pausada', colour_name='green') - - @commands.command(name='resume', help='Despausa a música atual') - async def resume(self, ctx): - if self.vc == '': - return - if self.vc.is_paused(): - self.vc.resume() - await self.send_embed(ctx, description='Música tocando', colour_name='green') - - @commands.command(name='repeat_one', help='Repete a música atual') - async def repeat_one(self, ctx): - if not self.is_playing: # Garante que o Bot está tocando - await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red') - return - - if self.repetingAll: # Verifica se o repeting all não está ligado - await self.send_embed(ctx, title='Já está repetindo todas', colour_name='red') - return - else: # Liga o repeting one - self.repetingOne = True - await self.send_embed(ctx, description='Repetir uma música ligado', colour_name='green') - - @commands.command(name='repeat_all', help='Repete toda a fila') - async def repeat_all(self, ctx): - if not self.is_playing: # Garante que o Bot está tocando - await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red') - return - - if self.repetingOne: # Verifica se o repeting all não está ligado - await self.send_embed(ctx, title='Já está repetindo uma música', colour_name='red') - return - else: # Liga o repeting one - self.repetingAll = True - await self.send_embed(ctx, description='Repetir todas as músicas ligado', colour_name='green') - - @commands.command(name='repeat_off', help='Desativa o repetir músicas') - async def repeat_off(self, ctx): - if not self.is_playing: # Garante que o Bot está tocando - await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red') - return - else: - self.repetingOne = False - self.repetingAll = False - await self.send_embed(ctx, description='Repetir músicas desligado', colour_name='green') - - @skip.error # Erros para kick - async def skip_error(self, ctx, error): - if isinstance(error, commands.MissingPermissions): - embedvc = discord.Embed( - colour=12255232, - description=f"Você precisa da permissão **Gerenciar canais** para pular músicas." - ) - await ctx.send(embed=embedvc) - else: - raise error - - async def send_embed(self, ctx, title='', description='', colour_name='red'): - try: - colour = colours[colour_name] - except Exception as e: - colour = colours['red'] - - embedvc = discord.Embed( - title=title, - description=description, - colour=colour - ) - await ctx.send(embed=embedvc) - - -def setup(client): - client.add_cog(Music(client)) diff --git a/vulkan/ErrorHandler.py b/vulkanbot/ErrorHandler.py similarity index 100% rename from vulkan/ErrorHandler.py rename to vulkanbot/ErrorHandler.py diff --git a/vulkanbot/commands/Phrases.py b/vulkanbot/commands/Phrases.py new file mode 100644 index 0000000..7a0751d --- /dev/null +++ b/vulkanbot/commands/Phrases.py @@ -0,0 +1,64 @@ +import requests +import json +from config import config +from discord.ext import commands +from random import random as rand + + +class Phrases(commands.Cog): + """Deal with the generation of motivational phrases""" + + def __init__(self, bot): + self.__bot = bot + + @property + def bot(self): + return self.__bot + + @bot.setter + def bot(self, newBot): + self.__bot = newBot + + @commands.command(name='frase', help='Envia uma frase pica, talvez a braba') + async def phrase(self, ctx): + # There is a chance that the phrase will be send for the dev + secret = await self.__calculate_rgn() + if secret != None: + await ctx.send(secret) + else: + phrase = await self.__get_phrase() + await ctx.send(phrase) + + async def __calculate_rgn(self): + x = rand() + if x < 0.15: + return config.SECRET_MESSAGE + else: + return None + + async def __get_phrase(self): + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_PHRASES_TRIES: + return 'O banco de dados dos cara tá off, bando de vagabundo, tenta depois aí bicho' + + try: + response = requests.get(config.PHRASES_API) + data = json.loads(response.content) + + phrase = data['quoteText'] + author = data['quoteAuthor'] + + if phrase == '' or author == '': # Don't accept incomplete phrases + continue + + text = f'{phrase} \nBy: {author}' + + return text + except Exception as e: + continue + + +def setup(bot): + bot.add_cog(Phrases(bot)) diff --git a/vulkanbot/commands/Random.py b/vulkanbot/commands/Random.py new file mode 100644 index 0000000..e0a3005 --- /dev/null +++ b/vulkanbot/commands/Random.py @@ -0,0 +1,80 @@ +from random import randint, random +import discord +from discord.ext import commands +from config import config + + +class Random(commands.Cog): + """Deal with returning random things""" + + def __init__(self, bot): + self.__bot = bot + + @commands.command(name='random', help='Número aleatório de 1 a X') + async def random(self, ctx, arg: str): + try: + arg = int(arg) + + except Exception as e: + embed = discord.Embed( + description='Manda um número aí ow animal', + colour=config.COLOURS['red'] + ) + await ctx.send(embed=embed) + return + + if arg < 1: + a = arg + b = 1 + else: + a = 1 + b = arg + + x = randint(a, b) + embed = discord.Embed( + title=f'Número Aleatório entre {a, b}', + description=x, + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + @commands.command(name='cara', help='coroa') + async def cara(self, ctx): + x = random() + if x < 0.5: + result = 'cara' + else: + result = 'coroa' + + embed = discord.Embed( + title='Cara Cora', + description=f'Resultado: {result}', + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + @commands.command(name='escolha', help='Escolhe um dos itens, separador: Vírgula') + async def escolher(self, ctx, *args: str): + try: + user_input = " ".join(args) + itens = user_input.split(sep=',') + + index = randint(0, len(itens)-1) + + embed = discord.Embed( + title='Escolha de algo', + description=itens[index], + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + except Exception as e: + embed = discord.Embed( + title='Escolha de algo', + description='Erro: Envie várias coisas separadas por vírgula', + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + +def setup(bot): + bot.add_cog(Random(bot)) diff --git a/vulkanbot/commands/Warframe.py b/vulkanbot/commands/Warframe.py new file mode 100644 index 0000000..129f0ae --- /dev/null +++ b/vulkanbot/commands/Warframe.py @@ -0,0 +1,52 @@ +import requests +import json +import discord +from discord.ext import commands +from config import config + + +class Warframe(commands.Cog): + """Deal with the generation of warframe data""" + + def __init__(self, bot): + self.__bot = bot + + @property + def bot(self): + return self.__bot + + @bot.setter + def bot(self, newBot): + self.__bot = newBot + + @commands.command(name='cetus', help='Informa o tempo atual de Cetus - Warframe') + async def cetus(self, ctx): + description = await self.__get_api() + embed = discord.Embed( + title='Warframe Cetus Timing', + description=description, + colour=config.COLOURS['blue'] + ) + await ctx.send(embed=embed) + + async def __get_api(self): + """Return the information of the Warframe API""" + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_CETUS_TRIES: + return 'Os DE baiano não tão com o banco de dados ligado' + + try: + response = requests.get(config.CETUS_API) + data = json.loads(response.content) + short = data['shortString'] + + return short + + except Exception as e: + continue + + +def setup(bot): + bot.add_cog(Warframe(bot)) diff --git a/vulkanbot/commands/__init__.py b/vulkanbot/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vulkanbot/general/Control.py b/vulkanbot/general/Control.py new file mode 100644 index 0000000..3b61c2e --- /dev/null +++ b/vulkanbot/general/Control.py @@ -0,0 +1,77 @@ +import discord +from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument +from discord.ext import commands +from config import config + + +class Control(commands.Cog): + """Control the flow of the Bot""" + + def __init__(self, bot): + self.__bot = bot + self.__comandos = { + 'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear'], + 'RANDOM': ['escolha', 'cara', 'random'], + 'HELP': ['help'], + 'OTHERS': ['cetus', 'frase'] + } + + @property + def bot(self): + return self.__bot + + @bot.setter + def bot(self, newBot): + self.__bot = newBot + + @commands.Cog.listener() + async def on_ready(self): + print(config.STARTUP_MESSAGE) + await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help")) + print(config.STARTUP_COMPLETE_MESSAGE) + + @commands.Cog.listener() + async def on_command_error(self, ctx, error): + if isinstance(error, MissingRequiredArgument): + await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver os comandos') + elif isinstance(error, CommandNotFound): + await ctx.channel.send(f'O comando não existe') + else: + raise error + + @commands.command(name="help", alisases=['ajuda'], help="Comando de ajuda") + async def help_msg(self, ctx): + helptxt = '' + help_music = '-- MUSIC\n' + help_random = '-- RANDOM\n' + help_help = '-- HELP\n' + help_others = '-- OTHERS\n' + + for command in self.__bot.commands: + if command.name in self.__comandos['MUSIC']: + help_music += f'**{command}** - {command.help}\n' + elif command.name in self.__comandos['HELP']: + help_help += f'**{command}** - {command.help}\n' + elif command.name in self.__comandos['OTHERS']: + help_others += f'**{command}** - {command.help}\n' + else: + help_random += f'**{command}** - {command.help}\n' + + helptxt = f'{help_music}\n{help_random}\n{help_others}\n{help_help}' + + embedhelp = discord.Embed( + colour=config.COLOURS['grey'], + title=f'Comandos do {self.__bot.user.name}', + description=helptxt + ) + + embedhelp.set_thumbnail(url=self.__bot.user.avatar_url) + await ctx.send(embed=embedhelp) + + @commands.Cog.listener() + async def on_error(self, error): + print('On Error') + + +def setup(bot): + bot.add_cog(Control(bot)) diff --git a/vulkan/general/Filter.py b/vulkanbot/general/Filter.py similarity index 100% rename from vulkan/general/Filter.py rename to vulkanbot/general/Filter.py diff --git a/vulkanbot/general/__init__.py b/vulkanbot/general/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vulkanbot/music/Downloader.py b/vulkanbot/music/Downloader.py new file mode 100644 index 0000000..04605da --- /dev/null +++ b/vulkanbot/music/Downloader.py @@ -0,0 +1,145 @@ +import re +from config import config +from yt_dlp import YoutubeDL +from yt_dlp.utils import ExtractorError, DownloadError + +from vulkanbot.music.Types import Provider + + +class Downloader(): + """Download musics direct URL and title or Source from Youtube using a music name or Youtube URL""" + + def __init__(self) -> None: + self.__YDL_OPTIONS = {'format': 'bestaudio/best', + 'default_search': 'auto', + 'playliststart': 0, + 'extract_flat': True, + 'playlistend': config.MAX_PLAYLIST_LENGTH, + } + + def download_urls(self, musics_input, provider: Provider) -> list: + """Download the musics direct URL from Youtube and return in a list + + Arg: List with names or youtube url or a Unique String + Return: List with the direct youtube URL of each music + """ + if type(provider) != Provider: + return None + + if type(musics_input) != list and type(musics_input) != str: + return None + + if provider == Provider.Name: # Send a list of names + musics_urls = self.__download_titles(musics_input) + return musics_urls + + elif provider == Provider.YouTube: # Send a URL or Title + url = self.__download_one(musics_input) + return url + else: + return None + + def download_source(self, url) -> dict: + """Download musics full info and source from Music URL + + Arg: URL from Youtube + Return: Dict with the full youtube information of the music, including source to play it + """ + options = self.__YDL_OPTIONS + options['extract_flat'] = False + with YoutubeDL(options) as ydl: + try: + result = ydl.extract_info(url, download=False) + + return result + except (ExtractorError, DownloadError) as e: # Any type of error in download + print(e) + return None + + def __download_one(self, music: str) -> list: + """Download one music/playlist direct link from Youtube + + Arg: Playlist URL or Music Name to download direct URL + Return: List with the Youtube URL of each music downloaded + """ + if type(music) != str: + return + + if self.__is_url(music): # If Url + info = self.__download_links(music) # List of dict + else: # If Title + info = self.__download_titles(music) # List of dict + + return info + + def __download_titles(self, musics_names: list) -> list: + """Download a music direct URL using his name. + + Arg: Music Name + Return: List with one dict, containing the music direct URL and title + """ + if type(musics_names) == str: # Turn str into list + musics_names = [musics_names] + + musics_info = [] + with YoutubeDL(self.__YDL_OPTIONS) as ydl: + try: + for name in musics_names: + search = f"ytsearch:{name}" + result = ydl.extract_info(search, download=False) + + id = result['entries'][0]['id'] + music_info = { + 'url': f"https://www.youtube.com/watch?v={id}", + 'title': result['entries'][0]['title'] + } + musics_info.append(music_info) + + return musics_info # Return a list + except Exception as e: + raise e + + def __download_links(self, url: str) -> list: + """Download musics direct links from Playlist URL or Music URL + + Arg_Url: URL from Youtube + Return: List of dicts, with the title and url of each music + """ + options = self.__YDL_OPTIONS + options['extract_flat'] = True + with YoutubeDL(options) as ydl: + try: + result = ydl.extract_info(url, download=False) + musics_info = [] + + if result.get('entries'): # If got a dict of musics + for entry in result['entries']: + music_info = { + 'title': entry['title'], + 'url': f"https://www.youtube.com/watch?v={entry['id']}" + } + + musics_info.append(music_info) + else: # Or a single music + music_info = { + 'url': result['original_url'], + 'title': result['title'] + } + musics_info.append(music_info) + + return musics_info # Return a list + except ExtractorError or DownloadError: + pass + + def __is_url(self, string) -> bool: + """Verify if a string is a url""" + regex = re.compile( + "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") + + if re.search(regex, string): + return True + else: + return False + + + diff --git a/vulkanbot/music/Music.py b/vulkanbot/music/Music.py new file mode 100644 index 0000000..49083e9 --- /dev/null +++ b/vulkanbot/music/Music.py @@ -0,0 +1,213 @@ +import discord +from discord.ext import commands +import datetime +import asyncio + +from config import config +from vulkanbot.music.Downloader import Downloader +from vulkanbot.music.Playlist import Playlist +from vulkanbot.music.Searcher import Searcher + + +class Music(commands.Cog): + def __init__(self, bot): + self.__searcher = Searcher() + self.__downloader = Downloader() + self.__playlist = Playlist() + + self.__playing = False + self.__bot = bot + self.__ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe' + self.__vc = "" # Objeto voice_bot do discord + + self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'} + self.FFMPEG_OPTIONS = {'executable': self.__ffmpeg, + 'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'} + + def __play_next(self): + while True: + if len(self.__playlist) > 0: + source = self.__playlist.next_song() + if source == None: # If there is not a source + continue + + player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) + self.__vc.play(player, after=lambda e: self.__play_next()) + break + else: + self.__playing = False + break + + # infinite loop checking + async def __play_music(self): + while True: + if len(self.__playlist) > 0: + source = self.__playlist.next_song() + if source == None: + continue + + self.__playing = True + player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS) + self.__vc.play(player, after=lambda e: self.__play_next()) + break + else: + self.__playing = False + await self.__vc.disconnect() + break + + @commands.command(name="play", help="Toca música - YouTube/Spotify/Título", aliases=['p', 'tocar']) + async def play(self, ctx, *args): + user_input = " ".join(args) + + try: + if self.__vc == "" or not self.__vc.is_connected() or self.__vc == None: + voice_channel = ctx.author.voice.channel + self.__vc = await voice_channel.connect() + except Exception as e: + # If voice_channel is None: + print(e) + await self.__send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey') + return + else: + songs_quant = 0 + musics_names, provider = self.__searcher.search(user_input) + for music in musics_names: + music_info = self.__downloader.download_urls(music, provider) + + for music in music_info: + self.__playlist.add_song(music) + songs_quant += 1 + + if songs_quant == 1: + await self.__send_embed(ctx, description=f"Você adicionou a música **{music_info[0]['title']}** à fila!", colour_name='green') + else: + await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='green') + + if not self.__playing: + await self.__play_music() + + @commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila']) + async def queue(self, ctx): + if self.__playlist.looping_one: # If Repeting one + # Send the current song with this title + await self.this(ctx) + return + + fila = self.__playlist.queue() + total = len(fila) + text = f'Total musics: {total}\n\n' + + # Create the string to description + for pos, song in enumerate(fila): + if pos >= config.MAX_QUEUE_LENGTH: # Max songs to apper in queue list + break + + text += f"**{pos+1} - ** {song}\n" + + if text != "": + if self.__playlist.looping_all: # If repeting all + await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='green') + else: # Repeting off + await self.__send_embed(ctx, title='Queue', description=text, colour_name='green') + else: # No music + await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red') + + @commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular']) + async def skip(self, ctx): + if self.__vc != '' and self.__vc: + print('Skip') + self.__vc.stop() + + @commands.command(name='stop', help='Para de tocar músicas') + async def stop(self, ctx): + if self.__vc == '': + return + if self.__vc.is_connected(): + self.__playlist.clear() + self.__vc.stop() + await self.__vc.disconnect() + + @commands.command(name='pause', help='Pausa a música') + async def pause(self, ctx): + if self.__vc == '': + return + if self.__vc.is_playing(): + self.__vc.pause() + await self.__send_embed(ctx, description='Música pausada', colour_name='green') + + @commands.command(name='resume', help='Despausa a música atual') + async def resume(self, ctx): + if self.__vc == '': + return + if self.__vc.is_paused(): + self.__vc.resume() + await self.__send_embed(ctx, description='Música tocando', colour_name='green') + + @commands.command(name='loop', help='Controla a repetição de músicas') + async def loop(self, ctx, args: str): + args = args.lower() + if args == 'one': + description = self.__playlist.loop_one() + elif args == 'all': + description = self.__playlist.loop_all() + elif args == 'off': + description = self.__playlist.loop_off() + else: + description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop' + + await self.__send_embed(ctx, description=description, colour_name='grey') + + + async def __send_embed(self, ctx, title='', description='', colour_name='grey'): + try: + colour = config.COLOURS[colour_name] + except Exception as e: + colour = config.COLOURS['grey'] + + embedvc = discord.Embed( + title=title, + description=description, + colour=colour + ) + await ctx.send(embed=embedvc) + + @commands.command(name='clear', help='Limpa a fila de músicas a tocar') + async def clear(self, ctx): + self.__playlist.clear() + + @commands.command(name='this', help='Mostra a música que está tocando no instante') + async def this(self, ctx): + if self.__playlist.looping_one: + title = 'Music Looping Now' + else: + title = 'Music Playing Now' + + info = self.__playlist.get_current() + embedvc = discord.Embed( + title=title, + description=f"[{info['title']}]({info['url']})", + color=config.COLOURS['grey'] + ) + + embedvc.add_field(name=config.SONGINFO_UPLOADER, + value=info['uploader'], + inline=False) + + if 'thumbnail' in info.keys(): + embedvc.set_thumbnail(url=info['thumbnail']) + + if 'duration' in info.keys(): + duration = str(datetime.timedelta(seconds=info['duration'])) + embedvc.add_field(name=config.SONGINFO_DURATION, + value=f"{duration}", + inline=False) + else: + embedvc.add_field(name=config.SONGINFO_DURATION, + value=config.SONGINFO_UNKNOWN_DURATION, + inline=False) + + await ctx.send(embed=embedvc) + + +def setup(bot): + bot.add_cog(Music(bot)) diff --git a/vulkanbot/music/Playlist.py b/vulkanbot/music/Playlist.py new file mode 100644 index 0000000..b541ad4 --- /dev/null +++ b/vulkanbot/music/Playlist.py @@ -0,0 +1,179 @@ +from collections import deque +import random +from vulkanbot.music.Song import Song +from vulkanbot.music.Downloader import Downloader + + +class Playlist(): + """Class to manage and control the songs to play and played""" + + def __init__(self) -> None: + self.__down = Downloader() + self.__queue = deque() # Store the musics to play + self.__songs_history = deque() # Store the musics played + self.__name_history = deque() # Store the name of musics played + + self.__looping_one = False + self.__looping_all = False + + self.__current = None + + @property + def looping_one(self): + return self.__looping_one + + @property + def looping_all(self): + return self.__looping_all + + def __len__(self): + if self.__looping_one == True or self.__looping_all == True: + return 1 + else: + return len(self.__queue) + + def next_song(self): + """Return the source of the next song to play""" + if self.__current == None: # If not playing + if len(self.__queue) == 0: # If nothing to play + return None + else: # If there is music to play + return self.__start() + + # If playing + played_song = self.__current + + # Check if need to repeat the played song + if self.__looping_one: # Insert the current song to play again + self.__queue.appendleft(played_song) + + if self.__looping_all: # Insert the current song in the end of queue + self.__queue.append(played_song) + + while True: # Try to get the source of next song + if len(self.__queue) == 0: # If no more song to play, return None + return None + + # If there is more to play + # Finish download of the next song + source = self.__prepare_next(self.__queue[0]) + if source == None: # If there is a problem in the download + self.__queue.popleft() # Remove the music with problems + continue + + return source + + def get_current(self): + """Return current music embed""" + if self.__current: + return self.__current.embed() + else: + return 'Nenhuma música tocando' + + def __prepare_next(self, next_song: Song) -> str: + """Finish the download of the music and return the source""" + if next_song.source == None: # Check if source has already downloaded + url = next_song.url # Get the URL + info = self.__down.download_source(url) # Download the source + if info == None: # If there is a problem in the download + return None + + next_song.finish_down(info) # Updating the info of song + + # Att the Playlist info + self.__current = next_song # Att the current + self.__queue.popleft() # Remove the current from queue + self.__name_history.append(self.__current.title) # Add to name history + self.__songs_history.append(self.__current) # Add to song history + + return self.__current.source # Return the source of current + + def __start(self) -> None: + """Start the play of the first musics and return his source""" + # Finish download of the next song + url = self.__queue[0].url # Get the URL + info = self.__down.download_source(url) # Download the source + + self.__queue[0].finish_down(info) # Att the song + + # Att Playlist info + self.__current = self.__queue[0] # Att the current + self.__queue.popleft() # Remove the current from queue + self.__name_history.append(self.__current.title) # Add to name history + self.__songs_history.append(self.__current) # Add to song history + + return self.__current.source # Return the source of current + + def prev_song(self): + """Return the source of the last song played + + Return None or the source of the prev song + """ + if len(self.__songs_history) == 0: + return None + else: + return self.__songs_history[0].source + + def add_song(self, music: dict) -> None: + """Receives a music object and store to the play queue""" + if (not 'title' in music.keys()) or (not 'url' in music.keys()): + print('Music without necessary keys') + return + + song = Song(title=music['title'], url=music['url']) # Cria a musica + self.__queue.append(song) + + def shuffle(self) -> None: + """Shuffle the order of the songs to play""" + random.shuffle(self.__queue) + + def revert(self) -> None: + """Revert the order of the songs to play""" + self.__queue.reverse() + + def clear(self) -> None: + """Clear the songs to play song history""" + self.__queue.clear() + self.__songs_history.clear() + + def loop_one(self) -> str: + """Try to start the loop of the current song + + Return: Embed descrition to show to user + """ + if self.__looping_all == True: + return 'Vulkan already looping one music, disable loop first' + elif self.__looping_one == True: + return "I'm already doing this, you dumb ass" + else: + self.__looping_one = True + return 'Repeating the current song' + + def loop_all(self) -> str: + """Try to start the loop of all songs + + Return: Embed descrition to show to user + """ + if self.__looping_one == True: + return 'Vulkan already looping one music, disable loop first' + elif self.__looping_all == True: + return "I'm already doing this, you dumb ass" + else: + self.__looping_all = True + return 'Repeating all songs in queue' + + def loop_off(self) -> str: + """Disable both types of loop""" + if self.__looping_all == False and self.__looping_one == False: + return "The loop is already off, you fucking dick head" + + self.__looping_all = False + self.__looping_one = False + return 'Loop disable' + + def queue(self) -> list: + list_songs = [] + for song in self.__queue: + title = song.title + list_songs.append(title) + return list_songs diff --git a/vulkanbot/music/Searcher.py b/vulkanbot/music/Searcher.py new file mode 100644 index 0000000..93795d7 --- /dev/null +++ b/vulkanbot/music/Searcher.py @@ -0,0 +1,52 @@ +import re +from vulkanbot.music.Types import Provider +from vulkanbot.music.Spotify import SpotifySearch + + +class Searcher(): + """Turn the user input into list of musics names, support youtube and spotify""" + + def __init__(self) -> None: + self.__Spotify = SpotifySearch() + print(f'Spotify Connected: {self.__Spotify.connect()}') + + def search(self, music: str) -> list: + """Return a list with the track name of a music or playlist + + Return -> A list of musics names + """ + url_type = self.__identify_source(music) + + if url_type == Provider.YouTube: + return [music], Provider.YouTube + + elif url_type == Provider.Spotify: + musics = self.__Spotify.search(music) + return musics, Provider.Name + + elif url_type == Provider.Name: + return [music], Provider.Name + + def __identify_source(self, music) -> Provider: + """Identify the provider of a music""" + if not self.__is_url(music): + return Provider.Name + + if "https://www.youtu" in music or "https://youtu.be" in music: + return Provider.YouTube + + if "https://open.spotify.com" in music: + return Provider.Spotify + + # If no match + return Provider.Unknown + + def __is_url(self, string) -> bool: + """Verify if a string is a url""" + regex = re.compile( + "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") + + if re.search(regex, string): + return True + else: + return False diff --git a/vulkanbot/music/Song.py b/vulkanbot/music/Song.py new file mode 100644 index 0000000..355c69a --- /dev/null +++ b/vulkanbot/music/Song.py @@ -0,0 +1,64 @@ +from discord.embeds import Embed + + +class Song(): + """Store the usefull information about a Song""" + + def __init__(self, url: str, title: str) -> None: + """Create a song with only the URL to the youtube song""" + self.__url = url + self.__title = title + self.__info = {} + + def finish_down(self, info: dict) -> None: + """Get and store the full information of the song""" + self.__usefull_keys = ['url', 'duration', + 'description', 'webpage_url', + 'channel', 'id', 'uploader', + 'thumbnail'] + self.__extract_info(info) + + def __extract_info(self, info) -> None: + """Extract the usefull information returned by the Downloader""" + for key in self.__usefull_keys: + try: + self.__info[key] = info[key] + except Exception as e: + print(e) + raise e + + def embed(self) -> Embed: + """Configure and return the info to create the embed for this song""" + info = { + 'title': self.__title, + 'url': self.__url, + 'uploader': self.__info['uploader'] + } + + if 'thumbnail' in self.__info.keys(): + info['thumbnail'] = self.__info['thumbnail'] + + if 'duration' in self.__info.keys(): + info['duration'] = self.__info['duration'] + + return info + + @property + def info(self) -> dict: + """Return the compiled info of this song""" + if self.__info: + return self.__info + + @property + def title(self) -> str: + return self.__title + + @property + def source(self) -> str: + """Return the Song Source URL to play""" + if 'url' in self.__info.keys(): + return self.__info['url'] + + @property + def url(self) -> str: + return self.__url diff --git a/vulkanbot/music/Spotify.py b/vulkanbot/music/Spotify.py new file mode 100644 index 0000000..b346180 --- /dev/null +++ b/vulkanbot/music/Spotify.py @@ -0,0 +1,149 @@ +import spotipy +import re +from spotipy.oauth2 import SpotifyClientCredentials +from bs4 import BeautifulSoup +from config import config +import aiohttp + + +class Browser(): + def __init__(self) -> None: + self.__url_regex = re.compile( + "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") + self.__session = aiohttp.ClientSession( + headers={'User-Agent': 'python-requests/2.20.0'}) + + async def search(self, url) -> str: + """Convert the external_url link to the title of music using browser""" + if re.search(self.__url_regex, url): + result = self.__url_regex.search(url) + url = result.group(0) + + async with self.__session.get(url) as response: + page = await response.text() + soup = BeautifulSoup(page, 'html.parser') + + title = soup.find('title') + title = title.string + title = title.replace('- song by', '') + title = title.replace('| Spotify', '') + return title + + +class SpotifySearch(): + """Search a Spotify music or playlist and return the musics names""" + + def __init__(self) -> None: + self.__connected = False + self.__browser = Browser() + + def connect(self) -> bool: + try: + # Initialize the connection with Spotify API + self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials( + client_id=config.SPOTIFY_ID, client_secret=config.SPOTIFY_SECRET)) + self.__connected = True + return True + except: + return False + + def search(self, music) -> list: + """Search and return the title of musics on Spotify""" + type = music.split('/')[3].split('?')[0] + code = music.split('/')[4].split('?')[0] + if type == 'album': + musics = self.__get_album(code) + elif type == 'playlist': + musics = self.__get_playlist(code) + elif type == 'track': + musics = self.__get_track(code) + else: + return None + + return musics + + def __get_album(self, code) -> list: + """Get the externals urls of a album + + ARG: Spotify Code of the Album + """ + if self.__connected == True: + try: + # Load all music objects + results = self.__api.album_tracks(code) + musics = results['items'] + + while results['next']: # Get the next pages + results = self.__api.next(results) + musics.extend(results['items']) + + musicsTitle = [] + + for music in musics: + try: + title = self.__extract_title(music) + musicsTitle.append(title) + except: + pass + return musicsTitle + except Exception as e: + raise e + + def __get_playlist(self, code) -> list: + """Get the externals urls of a playlist + + Arg: Spotify Code of the Playlist + """ + try: + results = self.__api.playlist_items(code) + itens = results['items'] + + while results['next']: # Load the next pages + results = self.__api.next(results) + itens.extend(results['items']) + + musics = [] + for item in itens: + musics.append(item['track']) + + titles = [] + for music in musics: + try: + title = self.__extract_title(music) + titles.append(title) + except Exception as e: + raise e + + return titles + + except Exception as e: + raise e + + def __get_track(self, code) -> list: + """Convert a external_url track to the title of the music + + ARG: Spotify Code of the Music + """ + results = self.__api.track(code) + name = results['name'] + artists = '' + for artist in results['artists']: + artists += f'{artist["name"]} ' + + return [f'{name} {artists}'] + + def __extract_title(self, music: dict) -> str: + """Receive a spotify music object and return his title + + ARG: music dict returned by Spotify + """ + title = f'{music["name"]} ' + for artist in music['artists']: + title += f'{artist["name"]} ' + + return title + + async def __convert_spotify(self, url) -> str: + """(Experimental) - Convert the external_url link to the title of music using browser""" + title = self.__browser(url) + return title diff --git a/vulkanbot/music/Types.py b/vulkanbot/music/Types.py new file mode 100644 index 0000000..c30a349 --- /dev/null +++ b/vulkanbot/music/Types.py @@ -0,0 +1,21 @@ +from enum import Enum + + +class Provider(Enum): + """Store Enum Types of the Providers""" + Spotify = "Spotify" + Spotify_Playlist = "Spotify Playlist" + YouTube = "YouTube" + Name = 'Track Name' + Unknown = "Unknown" + + +class Playlist_Types(Enum): + Spotify_Playlist = "Spotify Playlist" + YouTube_Playlist = "YouTube Playlist" + Unknown = "Unknown" + + +class Origins(Enum): + Default = "Default" + Playlist = "Playlist"