commit 089b47fc442196b8648e37f7428a9a4f0adce367 Author: Rafael Vargas Date: Fri Jan 7 11:53:12 2022 -0400 First commit diff --git a/Procfile b/Procfile new file mode 100644 index 0000000..b96cada --- /dev/null +++ b/Procfile @@ -0,0 +1 @@ +worker: python main.py \ No newline at end of file diff --git a/config/config.py b/config/config.py new file mode 100644 index 0000000..13c3933 --- /dev/null +++ b/config/config.py @@ -0,0 +1,66 @@ +from decouple import config + +CETUS_API = config('CETUS_API') +CAMBION_API = config('CAMBION_API') +FISSURES_API = config('FISSURES_API') +BOT_TOKEN = config('BOT_TOKEN') +SPOTIFY_ID = config('SPOTIFY_ID') +SPOTIFY_SECRET = config('SPOTIFY_SECRET') +SECRET_MESSAGE = config('SECRET_MESSAGE') +PHRASES_API = config('PHRASES_API') + +BOT_PREFIX = '!' +VC_TIMEOUT = 600 + +STARTUP_MESSAGE = 'Starting Vulkan...' +STARTUP_COMPLETE_MESSAGE = 'Vulkan is now operating.' + +MAX_PLAYLIST_LENGTH = 50 +MAX_API_PHRASES_TRIES = 10 +MAX_API_CETUS_TRIES = 10 +MAX_API_CAMBION_TRIES = 10 +MAX_API_FISSURES_TRIES = 10 +MAX_PRELOAD_SONGS = 10 + +SONGINFO_UPLOADER = "Uploader: " +SONGINFO_DURATION = "Duration: " +SONGINFO_REQUESTER = 'Requester: ' + +HELP_SKIP = 'Skip the current playing song' +HELP_RESUME = 'Resumes the song player' +HELP_CLEAR = 'Clear the queue' +HELP_STOP = 'Stop the song player, removing Vulkan from voice channel' +HELP_LOOP = '(one/all/off) - Control the loop of songs' +HELP_NP = 'Show the info of the current song' +HELP_QUEUE = f'Show the first {MAX_PRELOAD_SONGS} songs in queue' +HELP_PAUSE = 'Pauses the song player' +HELP_SHUFFLE = 'Shuffle the songs playing' +HELP_PLAY = '(title/youtube/spotify) - Plays a song' +HELP_MOVE = '(x, y) - Moves a song from position x to y in queue' +HELP_REMOVE = '(x, -1) - Remove a song in the position x or -1 for the last song' +HELP_RESET = 'Reset the Player of a guild' +HELP_WARFRAME = f'({BOT_PREFIX}warframe help for more)' +HELP_RANDOM = '(x) - Return a random number between 1 and x' +HELP_ESCOLHA = '(x, y, z...) - Choose randomly one item passed' +HELP_CARA = 'Return cara or coroa' +HELP_DROP = '(user_name) - Try to remove the user from the current voice channel' +HELP_FRASE = "Send a randomly phrase, perhaps you get the braba" +HELP_HELP = 'This command :)' + +SONGS_ADDED = 'You added {} songs to the queue' +SONG_ADDED = 'You added the song {} to the queue' +SONG_QUEUE_TITLE = 'Songs Queue' + +ERROR_TITLE = 'Error :/' +NO_CHANNEL = 'To play some music, connect to any voice channel first.' +NO_GUILD = 'This guild are not connected to Vulkan' +INVALID_INPUT = 'This type of input was too strange, try something better' +DOWNLOADING_ERROR = 'An error occurred while downloading' +EXTRACTING_ERROR = 'An error ocurred while searching for the songs' + +COLOURS = { + 'red': 0xDC143C, + 'green': 0x58D68D, + 'grey': 0x708090, + 'blue': 0x3498DB +} diff --git a/main.py b/main.py new file mode 100644 index 0000000..7a3a98f --- /dev/null +++ b/main.py @@ -0,0 +1,22 @@ +import discord +import os + +from config import config +from discord.ext import commands + +intents = discord.Intents.default() +intents.members = True + +bot = commands.Bot(command_prefix=config.BOT_PREFIX, pm_help=True, + case_insensitive=True, intents=intents) +bot.remove_command('help') + +if config.BOT_TOKEN == "": + exit() + +for filename in os.listdir('./vulkan/commands'): + if filename.endswith('.py'): + bot.load_extension(f'vulkan.commands.{filename[:-3]}') + + +bot.run(config.BOT_TOKEN, bot=True, reconnect=True) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fb59f1c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +discord.py +discord.py[voice] +yt_dlp +spotipy +python-decouple +requests +json \ No newline at end of file diff --git a/vulkan/commands/Admin.py b/vulkan/commands/Admin.py new file mode 100644 index 0000000..a474292 --- /dev/null +++ b/vulkan/commands/Admin.py @@ -0,0 +1,51 @@ +from discord.ext import commands +from discord import Member, Client +from config import config + + +class Admin(commands.Cog): + """Deal with administration of users in server""" + + def __init__(self, bot: Client): + self.__bot = bot + + @commands.command(name='drop', help='Manda um membro para a Terapia') + async def drop(self, ctx, name): + user: Member = None + guild = ctx.guild + + for member in guild.members: + if member.name == name: + user = member + break + + if user == None: + await ctx.send(f'{name} não foi encontrado, utilize o nome de usuário ao invés do apelido do servidor') + return + + permanent_drops = False + maximum_drops = None + + try: + maximum_drops = config.MEMBERS_MAXIMUM_DROPS[user.name] + except KeyError: + permanent_drops = True + except Exception as e: + await ctx.send('Houve algum erro :/') + return + + if maximum_drops == 0: + await ctx.send(f'{user.name} já foi dropado várias vezes, larga o cara bicho') + return + + if user.voice == None: + await ctx.send(f'{user.name} precisa estar conectado a um canal de voz antes') + else: + await user.move_to(None) # Remove from voice + if not permanent_drops: + # Att the life of user + config.MEMBERS_MAXIMUM_DROPS[user.name] -= 1 + + +def setup(bot): + bot.add_cog(Admin(bot)) diff --git a/vulkan/commands/Control.py b/vulkan/commands/Control.py new file mode 100644 index 0000000..5b1edc4 --- /dev/null +++ b/vulkan/commands/Control.py @@ -0,0 +1,71 @@ +import discord +from discord import Client +from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument +from discord.ext import commands +from config import config + + +class Control(commands.Cog): + """Control the flow of the Bot""" + + def __init__(self, bot: Client): + self.__bot = bot + self.__comandos = { + 'MUSIC': ['resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np', 'shuffle', 'move', 'remove', 'reset'], + 'WARFRAME': ['warframe'], + 'RANDOM': ['escolha', 'cara', 'random'], + 'HELP': ['help'], + 'OTHERS': ['frase', 'drop'] + } + + @commands.Cog.listener() + async def on_ready(self): + print(config.STARTUP_MESSAGE) + await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | {config.BOT_PREFIX}help")) + print(config.STARTUP_COMPLETE_MESSAGE) + + @commands.Cog.listener() + async def on_command_error(self, ctx, error): + if isinstance(error, MissingRequiredArgument): + await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver todos os comandos\n\nOu tente {config.BOT_PREFIX}command help para mais informações') + elif isinstance(error, CommandNotFound): + await ctx.channel.send(f'O comando não existe') + else: + await ctx.channel.send(f'Teve um erro aí bicho') + raise error + + @commands.command(name="help", alisases=['ajuda'], help=config.HELP_HELP) + async def help_msg(self, ctx): + helptxt = '' + help_music = '-- MUSIC\n' + help_random = '-- RANDOM\n' + help_warframe = '-- WARFRAME\n' + help_help = '-- HELP\n' + help_others = '-- OTHERS\n' + + for command in self.__bot.commands: + if command.name in self.__comandos['MUSIC']: + help_music += f'**{command}** - {command.help}\n' + elif command.name in self.__comandos['HELP']: + help_help += f'**{command}** - {command.help}\n' + elif command.name in self.__comandos['OTHERS']: + help_others += f'**{command}** - {command.help}\n' + elif command.name in self.__comandos['WARFRAME']: + help_warframe += f'**{command}** - {command.help}\n' + else: + help_random += f'**{command}** - {command.help}\n' + + helptxt = f'{help_music}\n{help_warframe}\n{help_random}\n{help_others}\n{help_help}' + + embedhelp = discord.Embed( + colour=config.COLOURS['grey'], + title=f'Comandos do {self.__bot.user.name}', + description=helptxt + ) + + embedhelp.set_thumbnail(url=self.__bot.user.avatar_url) + await ctx.send(embed=embedhelp) + + +def setup(bot): + bot.add_cog(Control(bot)) diff --git a/vulkan/commands/Music.py b/vulkan/commands/Music.py new file mode 100644 index 0000000..79c4c7f --- /dev/null +++ b/vulkan/commands/Music.py @@ -0,0 +1,181 @@ +import discord +from discord.ext import commands + +from config import config +from vulkan.music.Player import Player +from vulkan.music.utils import * + + +class Music(commands.Cog): + def __init__(self, bot): + self.__guilds = {} + self.__bot: discord.Client = bot + + @commands.Cog.listener() + async def on_ready(self): + for guild in self.__bot.guilds: + self.__guilds[guild] = Player(self.__bot, guild) + + @commands.command(name="play", help=config.HELP_PLAY, aliases=['p', 'tocar']) + async def play(self, ctx, *args): + user_input = " ".join(args) + requester = ctx.author.name + + player = self.__get_player(ctx) + if player == None: + await self.__send_embed(ctx, description=config.NO_GUILD, colour_name='red') + return + + if is_connected(ctx) == None: + result = await player.connect(ctx) + if result['success'] == False: + await self.__send_embed(ctx, description=result['reason'], colour_name='red') + return + + await player.play(ctx, user_input, requester) + + @commands.command(name="queue", help=config.HELP_QUEUE, aliases=['q', 'fila']) + async def queue(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + + embed = await player.queue() + await ctx.send(embed=embed) + + @commands.command(name="skip", help=config.HELP_SKIP, aliases=['pular']) + async def skip(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + await player.skip() + + @commands.command(name='stop', help=config.HELP_STOP, aliases=['parar']) + async def stop(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + await player.stop() + + @commands.command(name='pause', help=config.HELP_PAUSE, aliases=['pausar']) + async def pause(self, ctx): + player = self.__get_player(ctx) + if player == None: + print('No player') + return + else: + success = await player.pause() + if success: + await self.__send_embed(ctx, description='Song paused', colour_name='blue') + + @commands.command(name='resume', help=config.HELP_RESUME, aliases=['soltar']) + async def resume(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + success = await player.resume() + if success: + await self.__send_embed(ctx, description='Song Playing', colour_name='blue') + + @commands.command(name='loop', help=config.HELP_LOOP, aliases=['repeat']) + async def loop(self, ctx, args: str): + player = self.__get_player(ctx) + if player == None: + return + else: + result = await player.loop(args) + await self.__send_embed(ctx, description=result, colour_name='blue') + + @commands.command(name='clear', help=config.HELP_CLEAR, aliases=['limpar']) + async def clear(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + await player.clear() + + @commands.command(name='np', help=config.HELP_NP, aliases=['playing', 'now']) + async def now_playing(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + embed = await player.now_playing() + await self.__clean_messages(ctx) + await ctx.send(embed=embed) + + @commands.command(name='shuffle', help=config.HELP_SHUFFLE, aliases=['aleatorio']) + async def shuffle(self, ctx): + player = self.__get_player(ctx) + if player == None: + return + else: + result = await player.shuffle() + await self.__send_embed(ctx, description=result, colour_name='blue') + + @commands.command(name='move', help=config.HELP_MOVE, aliases=['mover']) + async def move(self, ctx, pos1, pos2='1'): + player = self.__get_player(ctx) + if player == None: + return + else: + result = await player.move(pos1, pos2) + await self.__send_embed(ctx, description=result, colour_name='blue') + + @commands.command(name='remove', help=config.HELP_REMOVE, aliases=['remover']) + async def remove(self, ctx, position): + """Remove a song from the queue in the position""" + player = self.__get_player(ctx) + if player == None: + return + else: + result = await player.remove(position) + await self.__send_embed(ctx, description=result, colour_name='blue') + + @commands.command(name='reset', help=config.HELP_RESET, aliases=['resetar']) + async def reset(self, ctx): + player = self.__get_player(ctx) + if player != None: + await player.stop() + + self.__guilds[ctx.guild] = Player(self.__bot, ctx.guild) + + async def __send_embed(self, ctx, title='', description='', colour_name='grey'): + try: + colour = config.COLOURS[colour_name] + except Exception as e: + colour = config.COLOURS['grey'] + + embedvc = discord.Embed( + title=title, + description=description, + colour=colour + ) + await ctx.send(embed=embedvc) + + async def __clean_messages(self, ctx): + """Clear Bot messages if send recently""" + last_messages = await ctx.channel.history(limit=5).flatten() + + for message in last_messages: + try: + if message.author == self.__bot.user: + if len(message.embeds) > 0: + embed = message.embeds[0] + if embed.title == 'Song Playing Now' or embed.title == 'Song Looping Now': + await message.delete() + except: + continue + + def __get_player(self, ctx): + try: + return self.__guilds[ctx.guild] + except: + return None + + +def setup(bot): + bot.add_cog(Music(bot)) diff --git a/vulkan/commands/Phrases.py b/vulkan/commands/Phrases.py new file mode 100644 index 0000000..6dced57 --- /dev/null +++ b/vulkan/commands/Phrases.py @@ -0,0 +1,59 @@ +from discord.client import Client +import requests +import json +from config import config +from discord.ext import commands +from random import random as rand + + +class Phrases(commands.Cog): + """Deal with the generation of motivational phrases""" + + def __init__(self, bot: Client): + self.__bot = bot + + @commands.command(name='frase', help=config.HELP_FRASE) + async def phrase(self, ctx): + """Send some phrase to the requester""" + secret = await self.__calculate_rgn() + if secret != None: + await ctx.send(secret) + else: + phrase = await self.__get_phrase() + await ctx.send(phrase) + + async def __calculate_rgn(self): + """Calculate the chance from the phrase function return a secret custom message""" + x = rand() + if x < 0.15: + return config.SECRET_MESSAGE + else: + return None + + async def __get_phrase(self): + """Get the phrase from the server""" + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_PHRASES_TRIES: + return 'O banco de dados dos cara tá off, bando de vagabundo, tenta depois aí bicho' + + try: + response = requests.get(config.PHRASES_API) + data = json.loads(response.content) + + phrase = data['quoteText'] + author = data['quoteAuthor'] + + if phrase == '' or author == '': + continue + + text = f'{phrase} \nBy: {author}' + + return text + except Exception as e: + continue + + +def setup(bot): + bot.add_cog(Phrases(bot)) diff --git a/vulkan/commands/Random.py b/vulkan/commands/Random.py new file mode 100644 index 0000000..546fefa --- /dev/null +++ b/vulkan/commands/Random.py @@ -0,0 +1,80 @@ +from random import randint, random +import discord +from discord.ext import commands +from config import config + + +class Random(commands.Cog): + """Deal with returning random things""" + + def __init__(self, bot): + self.__bot = bot + + @commands.command(name='random', help=config.HELP_RANDOM) + async def random(self, ctx, arg: str): + try: + arg = int(arg) + + except Exception as e: + embed = discord.Embed( + description='Manda um número aí ow animal', + colour=config.COLOURS['red'] + ) + await ctx.send(embed=embed) + return + + if arg < 1: + a = arg + b = 1 + else: + a = 1 + b = arg + + x = randint(a, b) + embed = discord.Embed( + title=f'Número Aleatório entre {a, b}', + description=x, + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + @commands.command(name='cara', help=config.HELP_CARA) + async def cara(self, ctx): + x = random() + if x < 0.5: + result = 'cara' + else: + result = 'coroa' + + embed = discord.Embed( + title='Cara Cora', + description=f'Resultado: {result}', + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + @commands.command(name='escolha', help=config.HELP_ESCOLHA) + async def escolher(self, ctx, *args: str): + try: + user_input = " ".join(args) + itens = user_input.split(sep=',') + + index = randint(0, len(itens)-1) + + embed = discord.Embed( + title='Escolha de algo', + description=itens[index], + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + except Exception as e: + embed = discord.Embed( + title='Escolha de algo', + description='Erro: Envie várias coisas separadas por vírgula', + colour=config.COLOURS['green'] + ) + await ctx.send(embed=embed) + + +def setup(bot): + bot.add_cog(Random(bot)) diff --git a/vulkan/commands/Warframe.py b/vulkan/commands/Warframe.py new file mode 100644 index 0000000..197777a --- /dev/null +++ b/vulkan/commands/Warframe.py @@ -0,0 +1,118 @@ +import requests +import json +import discord +from discord.ext import commands +from config import config +from discord import Embed + + +class Warframe(commands.Cog): + """Deal with the generation of warframe data""" + + def __init__(self, bot: discord.Client): + self.__bot = bot + self.__open_functions = ['cetus', 'cambion', 'fissures'] + + @commands.command(name='warframe', help=config.HELP_WARFRAME) + async def warframe(self, ctx, arg) -> Embed: + if arg in self.__open_functions: + function = getattr(Warframe, f'_Warframe__{arg}') + embed = await function(self) + + await ctx.send(embed=embed) + else: + info = f'Warframe commands: {self.__open_functions}' + + embed = Embed( + title='Invalid Command', + description=info, + colour=config.COLOURS['blue'] + ) + await ctx.send(embed=embed) + + async def __cetus(self) -> Embed: + description = await self.__get_cetus() + embed = discord.Embed( + title='Warframe Cetus Timing', + description=description, + colour=config.COLOURS['blue'] + ) + return embed + + async def __get_cetus(self) -> str: + """Return the information of the Warframe API""" + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_CETUS_TRIES: + return 'Os DE baiano não tão com o banco de dados ligado' + + try: + response = requests.get(config.CETUS_API) + data = json.loads(response.content) + short = data['shortString'] + + return short + + except Exception as e: + continue + + async def __cambion(self) -> Embed: + description = await self.__get_cambion() + embed = discord.Embed( + title='Warframe Cambion Timing', + description=description, + colour=config.COLOURS['blue'] + ) + return embed + + async def __get_cambion(self) -> str: + """Return the information of the Warframe API""" + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_CAMBION_TRIES: + return 'Os DE baiano não tão com o banco de dados ligado' + + try: + response = requests.get(config.CAMBION_API) + data = json.loads(response.content) + + info = f'**Active:** {data["active"]}\n**Time Left:** {data["timeLeft"]}' + + return info + except: + continue + + async def __fissures(self) -> Embed: + description = await self.__get_fissures() + embed = discord.Embed( + title='Warframe Fissures Status', + description=description, + colour=config.COLOURS['blue'] + ) + return embed + + async def __get_fissures(self) -> str: + """Return the information of the Warframe API""" + tries = 0 + while True: + tries += 1 + if tries > config.MAX_API_FISSURES_TRIES: + return 'Os DE baiano não tão com o banco de dados ligado' + + try: + response = requests.get(config.FISSURES_API) + data = json.loads(response.content) + + info = '' + for pos, fissure in enumerate(data, start=1): + info += f'`{pos}` - **Mission:** {fissure["missionType"]} | **Type:** {fissure["tier"]} | **Timing:** {fissure["eta"]} | **Storm:** {fissure["isStorm"]}\n' + + return info + except Exception as e: + continue + + +def setup(bot): + bot.add_cog(Warframe(bot)) diff --git a/vulkan/music/Downloader.py b/vulkan/music/Downloader.py new file mode 100644 index 0000000..1510e11 --- /dev/null +++ b/vulkan/music/Downloader.py @@ -0,0 +1,139 @@ +import asyncio +import concurrent.futures + +from config import config +from yt_dlp import YoutubeDL +from yt_dlp.utils import ExtractorError, DownloadError + +from vulkan.music.Song import Song +from vulkan.music.utils import is_url + + +class Downloader(): + """Download musics direct URL and title or Source from Youtube using a music name or Youtube URL""" + + def __init__(self) -> None: + self.__YDL_OPTIONS = {'format': 'bestaudio/best', + 'default_search': 'auto', + 'playliststart': 0, + 'extract_flat': True, + 'playlistend': config.MAX_PLAYLIST_LENGTH, + } + + def download_one(self, song: Song) -> Song: + """Receives a song object, finish his download and return it""" + if song.identifier == None: + return None + + if is_url(song.identifier): # Youtube URL + song_info = self.__download_url(song.identifier) + else: # Song name + song_info = self.__download_title(song.identifier) + + if song_info == None: + song.destroy() # Destroy the music with problems + return None + else: + song.finish_down(song_info) + return song + + def extract_youtube_link(self, playlist_url: str) -> list: + """Extract all songs direct URL from a Youtube Link + + Arg: Url String + Return: List with the direct youtube URL of each song + """ + if is_url(playlist_url): # If Url + options = self.__YDL_OPTIONS + options['extract_flat'] = True + + with YoutubeDL(options) as ydl: + try: + result = ydl.extract_info(playlist_url, download=False) + songs_identifiers = [] + + if result.get('entries'): # If got a dict of musics + for entry in result['entries']: + songs_identifiers.append( + f"https://www.youtube.com/watch?v={entry['id']}") + + else: # Or a single music + songs_identifiers.append(result['original_url']) + + return songs_identifiers # Return a list + except (ExtractorError, DownloadError) as e: + return None + else: + print('Invalid type of playlist URL') + return None + + async def preload(self, songs: list) -> None: + """Download the full info of the song object""" + for song in songs: + asyncio.ensure_future(self.__download_songs(song)) + + def __download_url(self, url) -> dict: + """Download musics full info and source from Music URL + + Arg: URL from Youtube + Return: Dict with the full youtube information of the music, including source to play it + """ + options = self.__YDL_OPTIONS + options['extract_flat'] = False + + with YoutubeDL(options) as ydl: + try: + result = ydl.extract_info(url, download=False) + + return result + except (ExtractorError, DownloadError) as e: # Any type of error in download + return None + + async def __download_songs(self, song: Song) -> None: + """Download a music object asynchronously""" + if song.source != None: # If Music already preloaded + return + + def download_song(song): + if is_url(song.identifier): # Youtube URL + song_info = self.__download_url(song.identifier) + else: # Song name + song_info = self.__download_title(song.identifier) + + if song_info == None: + song.destroy() # Remove the song with problems from the playlist + else: + song.finish_down(song_info) + + # Creating a loop task to download each song + loop = asyncio.get_event_loop() + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=config.MAX_PRELOAD_SONGS + ) + await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)}, + return_when=asyncio.ALL_COMPLETED) + + def __download_title(self, title: str) -> dict: + """Download a music full information using his name. + + Arg: Music Name + Return: A dict containing the song information + """ + if type(title) != str: + return None + + config = self.__YDL_OPTIONS + config['extract_flat'] = False + + with YoutubeDL(self.__YDL_OPTIONS) as ydl: + try: + search = f"ytsearch:{title}" + result = ydl.extract_info(search, download=False) + + if result == None: + return None + + # Return a dict with the full info of first music + return result['entries'][0] + except Exception as e: + return None diff --git a/vulkan/music/Interfaces.py b/vulkan/music/Interfaces.py new file mode 100644 index 0000000..fdfedd2 --- /dev/null +++ b/vulkan/music/Interfaces.py @@ -0,0 +1,89 @@ +from abc import ABC, abstractproperty, abstractmethod + + +class IPlaylist(ABC): + """Class to manage and control the songs to play and played""" + + @abstractproperty + def looping_one(self): + pass + + @abstractproperty + def looping_all(self): + pass + + @abstractproperty + def songs_to_preload(self) -> list: + pass + + @abstractmethod + def __len__(self): + pass + + @abstractmethod + def next_song(self): + pass + + @abstractmethod + def prev_song(self): + pass + + @abstractmethod + def add_song(self, identifier: str) -> None: + pass + + @abstractmethod + def shuffle(self) -> None: + pass + + @abstractmethod + def revert(self) -> None: + pass + + @abstractmethod + def clear(self) -> None: + pass + + @abstractmethod + def loop_one(self) -> str: + pass + + @abstractmethod + def loop_all(self) -> str: + pass + + @abstractmethod + def loop_off(self) -> str: + pass + + @abstractmethod + def destroy_song(self, song_destroy) -> None: + pass + + +class ISong(ABC): + """Store the usefull information about a Song""" + + @abstractmethod + def finish_down(self, info: dict) -> None: + pass + + @abstractmethod + def source(self) -> str: + pass + + @abstractmethod + def title(self) -> str: + pass + + @abstractmethod + def duration(self) -> str: + pass + + @abstractmethod + def identifier(self) -> str: + pass + + @abstractmethod + def destroy(self) -> None: + pass diff --git a/vulkan/music/Player.py b/vulkan/music/Player.py new file mode 100644 index 0000000..a6b8541 --- /dev/null +++ b/vulkan/music/Player.py @@ -0,0 +1,308 @@ +import discord +from discord.ext import commands +from config import config +import datetime + +from vulkan.music.Downloader import Downloader +from vulkan.music.Playlist import Playlist +from vulkan.music.Searcher import Searcher +from vulkan.music.Types import Provider +from vulkan.music.utils import * + + +class Player(commands.Cog): + def __init__(self, bot, guild): + self.__searcher: Searcher = Searcher() + self.__down: Downloader = Downloader() + self.__playlist: Playlist = Playlist() + self.__bot: discord.Client = bot + self.__guild: discord.Guild = guild + + self.__timer = Timer(self.__timeout_handler) + self.__playing = False + + self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'} + self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', + 'options': '-vn'} + + async def connect(self, ctx): + if not ctx.author.voice: + return {'success': False, 'reason': config.NO_CHANNEL} + + if self.__guild.voice_client == None: + await ctx.author.voice.channel.connect(reconnect=True, timeout=None) + return {'success': True, 'reason': ''} + + def __play_next(self, error, ctx): + song = self.__playlist.next_song() + if song != None: # If there is not a song for the song + coro = self.__play_music(ctx, song) + self.__bot.loop.create_task(coro) + else: + self.__playing = False + + async def __play_music(self, ctx, song): + self.__playing = True + + player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS) + self.__guild.voice_client.play( + player, after=lambda e: self.__play_next(e, ctx)) + + self.__timer.cancel() + self.__timer = Timer(self.__timeout_handler) + + await ctx.invoke(self.__bot.get_command('np')) + + songs = self.__playlist.songs_to_preload + await self.__down.preload(songs) + + async def play(self, ctx, track, requester) -> str: + try: + songs_names, provider = self.__searcher.search(track) + if provider == Provider.Unknown: + embed = discord.Embed( + title=config.ERROR_TITLE, + description=config.INVALID_INPUT, + colours=config.COLOURS['blue']) + await ctx.send(embed=embed) + return + + elif provider == Provider.YouTube: + songs_names = self.__down.extract_youtube_link(songs_names[0]) + + songs_quant = 0 + for name in songs_names: + song = self.__playlist.add_song(name, requester) + songs_quant += 1 + + songs_preload = self.__playlist.songs_to_preload + await self.__down.preload(songs_preload) + + except: + embed = discord.Embed( + title=config.ERROR_TITLE, + description=config.DOWNLOADING_ERROR, + colours=config.COLOURS['blue']) + await ctx.send(embed=embed) + return + + if songs_quant == 1: + song = self.__down.download_one(song) + + if song == None: + embed = discord.Embed( + title=config.ERROR_TITLE, + description=config.DOWNLOADING_ERROR, + colours=config.COLOURS['blue']) + await ctx.send(embed=embed) + return + elif not self.__playing: + embed = discord.Embed( + title=config.SONG_QUEUE_TITLE, + description=config.SONG_ADDED.format(song.title), + colour=config.COLOURS['blue']) + await ctx.send(embed=embed) + else: + embed = self.__format_embed(song.info, config.SONG_ADDED) + await ctx.send(embed=embed) + else: + embed = discord.Embed( + title=config.SONG_QUEUE_TITLE, + description=config.SONGS_ADDED.format(songs_quant), + colour=config.COLOURS['blue']) + await ctx.send(embed=embed) + + if not self.__playing: + try_another = True + + while try_another: # This will ensure the first song source to be ready + first_song = self.__playlist.next_song() + if first_song == None: + embed = discord.Embed( + title=config.ERROR_TITLE, + description=config.DOWNLOADING_ERROR, + colour=config.COLOURS['blue']) + await ctx.send(embed=embed) + break + + while True: + if first_song.source != None: # If song got downloaded + try_another = False + break + + if first_song.problematic: # If song got any error, try another one + break + + if first_song != None: + await self.__play_music(ctx, first_song) + + async def queue(self) -> discord.Embed: + if self.__playlist.looping_one: + info = self.__playlist.current.info + title = 'Song Looping Now' + return self.__format_embed(info, title) + + songs_preload = self.__playlist.songs_to_preload + await self.__down.preload(songs_preload) + total_time = format_time(sum([int(song.duration if song.duration else 0) + for song in songs_preload])) # Sum the duration + total_songs = len(self.__playlist) + text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n' + + for pos, song in enumerate(songs_preload, start=1): + title = song.title if song.title else 'Downloading...' + text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n" + + title = 'Songs in Queue' + if len(songs_preload) > 0: + if self.__playlist.looping_all: + title = 'Repeating All' + else: + text = 'There is no musics in queue' + + embed = discord.Embed( + title=title, + description=text, + colour=config.COLOURS['blue'] + ) + + return embed + + async def skip(self) -> bool: + if self.__guild.voice_client != None: + self.__guild.voice_client.stop() + return True + else: + return False + + async def stop(self) -> bool: + if self.__guild.voice_client == None: + return False + + if self.__guild.voice_client.is_connected(): + self.__playlist.clear() + self.__playlist.loop_off() + self.__guild.voice_client.stop() + await self.__guild.voice_client.disconnect() + return True + + async def pause(self) -> bool: + if self.__guild.voice_client == None: + return False + + if self.__guild.voice_client.is_playing(): + self.__guild.voice_client.pause() + return True + + async def resume(self) -> bool: + if self.__guild.voice_client == None: + return False + + if self.__guild.voice_client.is_paused(): + self.__guild.voice_client.resume() + return True + + async def loop(self, args: str): + args = args.lower() + if args == 'one': + description = self.__playlist.loop_one() + elif args == 'all': + description = self.__playlist.loop_all() + elif args == 'off': + description = self.__playlist.loop_off() + else: + description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop' + + return description + + async def clear(self) -> None: + self.__playlist.clear() + + async def now_playing(self) -> discord.Embed: + if self.__playlist.looping_one: + title = 'Song Looping Now' + else: + title = 'Song Playing Now' + + current_song = self.__playlist.current + embed = self.__format_embed(current_song.info, title) + + return embed + + async def shuffle(self) -> str: + try: + self.__playlist.shuffle() + songs = self.__playlist.songs_to_preload + + await self.__down.preload(songs) + return 'Musics shuffled successfully' + except: + return 'An error ocurred :/' + + async def move(self, pos1, pos2='1') -> str: + try: + pos1 = int(pos1) + pos2 = int(pos2) + + except: + return 'This command require a number' + + result = self.__playlist.move_songs(pos1, pos2) + + songs = self.__playlist.songs_to_preload + await self.__down.preload(songs) + return result + + async def remove(self, position) -> str: + """Remove a song from the queue in the position""" + try: + position = int(position) + + except: + return 'This command require a number' + + result = self.__playlist.remove_song(position) + return result + + def __format_embed(self, info, title='') -> discord.Embed: + """Configure the embed to show the song information""" + embedvc = discord.Embed( + title=title, + description=f"[{info['title']}]({info['original_url']})", + color=config.COLOURS['blue'] + ) + + embedvc.add_field(name=config.SONGINFO_UPLOADER, + value=info['uploader'], + inline=True) + + embedvc.add_field(name=config.SONGINFO_REQUESTER, + value=info['requester'], + inline=True) + + if 'thumbnail' in info.keys(): + embedvc.set_thumbnail(url=info['thumbnail']) + + if 'duration' in info.keys(): + duration = str(datetime.timedelta(seconds=info['duration'])) + embedvc.add_field(name=config.SONGINFO_DURATION, + value=f"{duration}", + inline=True) + else: + embedvc.add_field(name=config.SONGINFO_DURATION, + value=config.SONGINFO_UNKNOWN_DURATION, + inline=True) + + return embedvc + + async def __timeout_handler(self) -> None: + if self.__guild.voice_client == None: + return + + if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused(): + self.__timer = Timer(self.__timeout_handler) + + elif self.__guild.voice_client.is_connected(): + self.__playlist.clear() + self.__playlist.loop_off() + await self.__guild.voice_client.disconnect() diff --git a/vulkan/music/Playlist.py b/vulkan/music/Playlist.py new file mode 100644 index 0000000..f2c34d5 --- /dev/null +++ b/vulkan/music/Playlist.py @@ -0,0 +1,179 @@ +from collections import deque +from config import config +import random + +from vulkan.music.Interfaces import IPlaylist +from vulkan.music.Song import Song + + +class Playlist(IPlaylist): + """Class to manage and control the songs to play and played""" + + def __init__(self) -> None: + self.__queue = deque() # Store the musics to play + self.__songs_history = deque() # Store the musics played + self.__name_history = deque() # Store the name of musics played + + self.__looping_one = False + self.__looping_all = False + + self.__current: Song = None + + @property + def looping_one(self) -> bool: + return self.__looping_one + + @property + def looping_all(self) -> bool: + return self.__looping_all + + @property + def current(self) -> Song: + return self.__current + + @property + def songs_to_preload(self) -> list: + return list(self.__queue)[:config.MAX_PRELOAD_SONGS] + + def __len__(self) -> int: + return len(self.__queue) + + def next_song(self) -> Song: + """Return the next song to play""" + if self.__current == None and len(self.__queue) == 0: + # If not playing and nothing to play + return None + + # If playing + played_song = self.__current + + # Check if need to repeat the played song + if self.__looping_one: # Insert the current song to play again + self.__queue.appendleft(played_song) + + if self.__looping_all: # Insert the current song in the end of queue + self.__queue.append(played_song) + + while True: # Try to get the source of next song + if len(self.__queue) == 0: # If no more song to play, return None + return None + + # Att the current with the first one + self.__current = self.__queue[0] + self.__queue.popleft() # Remove the current from queue + self.__name_history.append( + self.__current.identifier) # Add to name history + self.__songs_history.append(self.__current) # Add to song history + + return self.__current + + def prev_song(self) -> Song: + """Return the source of the last song played + + Return None or the source of the prev song + """ + if len(self.__songs_history) == 0: + return None + else: + return self.__songs_history[0].source + + def add_song(self, identifier: str, requester: str) -> Song: + """Create a song object, add to queue and return it""" + song = Song(identifier=identifier, playlist=self, requester=requester) + self.__queue.append(song) + return song + + def shuffle(self) -> None: + """Shuffle the order of the songs to play""" + random.shuffle(self.__queue) + + def revert(self) -> None: + """Revert the order of the songs to play""" + self.__queue.reverse() + + def clear(self) -> None: + """Clear the songs to play song history""" + self.__queue.clear() + self.__songs_history.clear() + + def loop_one(self) -> str: + """Try to start the loop of the current song + + Return: Embed descrition to show to user + """ + if self.__looping_all == True: + return 'Vulkan already looping one music, disable loop first' + elif self.__looping_one == True: + return "I'm already doing this, you dumb ass" + else: + self.__looping_one = True + return 'Repeating the current song' + + def loop_all(self) -> str: + """Try to start the loop of all songs + + Return: Embed descrition to show to user + """ + if self.__looping_one == True: + return 'Vulkan already looping one music, disable loop first' + elif self.__looping_all == True: + return "I'm already doing this, you dumb ass" + else: + self.__looping_all = True + return 'Repeating all songs in queue' + + def loop_off(self) -> str: + """Disable both types of loop""" + if self.__looping_all == False and self.__looping_one == False: + return "The loop is already off, you fucking dick head" + + self.__looping_all = False + self.__looping_one = False + return 'Loop disable' + + def destroy_song(self, song_destroy: Song) -> None: + """Destroy a song object from the queue""" + for song in self.__queue: + if song == song_destroy: + self.__queue.remove(song) + break + + def move_songs(self, pos1, pos2) -> str: + """Receive two position and try to change the songs in those positions, -1 is the last + + Positions: First music is 1 + Return (Error bool, string) with the status of the function, to show to user + """ + if pos1 == -1: + pos1 = len(self.__queue) + if pos2 == -1: + pos2 = len(self.__queue) + + if pos2 not in range(1, len(self.__queue) + 1) or pos1 not in range(1, len(self.__queue) + 1): + return 'Numbers must be between 1 and queue length, or -1 for the last song' + + try: + song1 = self.__queue[pos1-1] + song2 = self.__queue[pos2-1] + + self.__queue[pos1-1] = song2 + self.__queue[pos2-1] = song1 + + song1_name = song1.title if song1.title else song1.identifier + song2_name = song2.title if song2.title else song2.identifier + + return f'Song `{song1_name}` in position `{pos1}` moved with `{song2_name}` in position `{pos2}` successfully' + except Exception as e: + print(e) + return 'There was a problem with the moving of songs' + + def remove_song(self, position) -> tuple: + if position not in range(1, len(self.__queue) + 1) and position != -1: + return 'Numbers must be between 1 and queue length, or -1 for the last song' + else: + song = self.__queue[position-1] + self.__queue.remove(song) + + song_name = song.title if song.title else song.identifier + + return f'Song `{song_name}` removed successfully' diff --git a/vulkan/music/Searcher.py b/vulkan/music/Searcher.py new file mode 100644 index 0000000..648a105 --- /dev/null +++ b/vulkan/music/Searcher.py @@ -0,0 +1,48 @@ +from vulkan.music.Types import Provider +from vulkan.music.Spotify import SpotifySearch +from vulkan.music.utils import is_url + + +class Searcher(): + """Turn the user input into list of musics names, support youtube and spotify""" + + def __init__(self) -> None: + self.__Spotify = SpotifySearch() + + def search(self, music: str) -> list: + """Return a list with the song names or an URL + + Arg -> User Input, a string with the + Return -> A list of musics names and Provider Type + """ + url_type = self.__identify_source(music) + + if url_type == Provider.YouTube: + return [music], Provider.YouTube + + elif url_type == Provider.Spotify: + if self.__Spotify.connected == True: + musics = self.__Spotify.search(music) + return musics, Provider.Name + else: + return [], Provider.Unknown + + elif url_type == Provider.Name: + return [music], Provider.Name + + elif url_type == Provider.Unknown: + return None, Provider.Unknown + + def __identify_source(self, music) -> Provider: + """Identify the provider of a music""" + if not is_url(music): + return Provider.Name + + if "https://www.youtu" in music or "https://youtu.be" in music: + return Provider.YouTube + + if "https://open.spotify.com" in music: + return Provider.Spotify + + # If no match + return Provider.Unknown diff --git a/vulkan/music/Song.py b/vulkan/music/Song.py new file mode 100644 index 0000000..9161742 --- /dev/null +++ b/vulkan/music/Song.py @@ -0,0 +1,66 @@ +from vulkan.music.Interfaces import ISong, IPlaylist + + +class Song(ISong): + """Store the usefull information about a Song""" + + def __init__(self, identifier: str, playlist: IPlaylist, requester: str) -> None: + """Create a song with only the URL to the youtube song""" + self.__identifier = identifier + self.__info = {'requester': requester} + self.__problematic = False + self.__playlist: IPlaylist = playlist + + def finish_down(self, info: dict) -> None: + """Get and store the full information of the song""" + self.__usefull_keys = ['url', 'duration', + 'title', 'webpage_url', + 'channel', 'id', 'uploader', + 'thumbnail', 'original_url'] + + for key in self.__usefull_keys: + try: + self.__info[key] = info[key] + except Exception as e: + raise e + + @property + def source(self) -> str: + """Return the Song Source URL to play""" + if 'url' in self.__info.keys(): + return self.__info['url'] + else: + return None + + @property + def title(self) -> str: + """Return the Song Title""" + if 'title' in self.__info.keys(): + return self.__info['title'] + else: + return None + + @property + def duration(self) -> str: + """Return the Song Title""" + if 'duration' in self.__info.keys(): + return self.__info['duration'] + else: + return 0.0 + + @property + def identifier(self) -> str: + return self.__identifier + + @property + def problematic(self) -> bool: + return self.__problematic + + def destroy(self) -> None: + """Mark this song with problems and removed from the playlist due to any type of error""" + self.__problematic = True + self.__playlist.destroy_song(self) + + @property + def info(self): + return self.__info diff --git a/vulkan/music/Spotify.py b/vulkan/music/Spotify.py new file mode 100644 index 0000000..0ac7a61 --- /dev/null +++ b/vulkan/music/Spotify.py @@ -0,0 +1,121 @@ +import spotipy +from spotipy.oauth2 import SpotifyClientCredentials +from config import config + + +class SpotifySearch(): + """Search a Spotify music or playlist and return the musics names""" + + def __init__(self) -> None: + self.__connected = False + self.__connect() + + @property + def connected(self): + return self.__connected + + def __connect(self) -> bool: + try: + # Initialize the connection with Spotify API + self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials( + client_id=config.SPOTIFY_ID, client_secret=config.SPOTIFY_SECRET)) + self.__connected = True + return True + except: + return False + + def search(self, music) -> list: + """Search and return the title of musics on Spotify""" + type = music.split('/')[3].split('?')[0] + code = music.split('/')[4].split('?')[0] + if type == 'album': + musics = self.__get_album(code) + elif type == 'playlist': + musics = self.__get_playlist(code) + elif type == 'track': + musics = self.__get_track(code) + else: + return None + + return musics + + def __get_album(self, code) -> list: + """Get the externals urls of a album + + ARG: Spotify Code of the Album + """ + if self.__connected == True: + try: + # Load all music objects + results = self.__api.album_tracks(code) + musics = results['items'] + + while results['next']: # Get the next pages + results = self.__api.next(results) + musics.extend(results['items']) + + musicsTitle = [] + + for music in musics: + try: + title = self.__extract_title(music) + musicsTitle.append(title) + except: + pass + return musicsTitle + except Exception as e: + raise e + + def __get_playlist(self, code) -> list: + """Get the externals urls of a playlist + + Arg: Spotify Code of the Playlist + """ + try: + results = self.__api.playlist_items(code) + itens = results['items'] + + while results['next']: # Load the next pages + results = self.__api.next(results) + itens.extend(results['items']) + + musics = [] + for item in itens: + musics.append(item['track']) + + titles = [] + for music in musics: + try: + title = self.__extract_title(music) + titles.append(title) + except Exception as e: + raise e + + return titles + + except Exception as e: + raise e + + def __get_track(self, code) -> list: + """Convert a external_url track to the title of the music + + ARG: Spotify Code of the Music + """ + results = self.__api.track(code) + name = results['name'] + artists = '' + for artist in results['artists']: + artists += f'{artist["name"]} ' + + return [f'{name} {artists}'] + + def __extract_title(self, music: dict) -> str: + """Receive a spotify music object and return his title + + ARG: music dict returned by Spotify + """ + title = f'{music["name"]} ' + for artist in music['artists']: + title += f'{artist["name"]} ' + + return title diff --git a/vulkan/music/Types.py b/vulkan/music/Types.py new file mode 100644 index 0000000..3635c29 --- /dev/null +++ b/vulkan/music/Types.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class Provider(Enum): + """Store Enum Types of the Providers""" + Spotify = "Spotify" + YouTube = "YouTube" + Name = 'Track Name' + Unknown = "Unknown" diff --git a/vulkan/music/utils.py b/vulkan/music/utils.py new file mode 100644 index 0000000..2f1cf17 --- /dev/null +++ b/vulkan/music/utils.py @@ -0,0 +1,51 @@ +import re +import asyncio +from config import config + + +def is_connected(ctx): + try: + voice_channel = ctx.guild.voice_client.channel + return voice_channel + except: + return None + + +def format_time(duration): + if not duration: + return "00:00" + + hours = duration // 60 // 60 + minutes = duration // 60 % 60 + seconds = duration % 60 + + return "{}{}{:02d}:{:02d}".format( + hours if hours else "", + ":" if hours else "", + minutes, + seconds + ) + + +def is_url(string) -> bool: + """Verify if a string is a url""" + regex = re.compile( + "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+") + + if re.search(regex, string): + return True + else: + return False + + +class Timer: + def __init__(self, callback): + self.__callback = callback + self.__task = asyncio.create_task(self.__executor()) + + async def __executor(self): + await asyncio.sleep(config.VC_TIMEOUT) + await self.__callback() + + def cancel(self): + self.__task.cancel()