Renaming modules, removing useless and changing logic of bot initialization

This commit is contained in:
Rafael Vargas
2022-01-06 18:25:17 -04:00
parent 9c327b8dbf
commit 74d91c15f8
22 changed files with 33 additions and 209 deletions

View File

@@ -0,0 +1,71 @@
import discord
from discord import Client
from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument
from discord.ext import commands
from config import config
class Control(commands.Cog):
"""Control the flow of the Bot"""
def __init__(self, bot: Client):
self.__bot = bot
self.__comandos = {
'MUSIC': ['resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np', 'shuffle', 'move', 'remove', 'reset'],
'WARFRAME': ['warframe'],
'RANDOM': ['escolha', 'cara', 'random'],
'HELP': ['help'],
'OTHERS': ['frase', 'drop']
}
@commands.Cog.listener()
async def on_ready(self):
print(config.STARTUP_MESSAGE)
await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | {config.BOT_PREFIX}help"))
print(config.STARTUP_COMPLETE_MESSAGE)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
if isinstance(error, MissingRequiredArgument):
await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver todos os comandos\n\nOu tente {config.BOT_PREFIX}command help para mais informações')
elif isinstance(error, CommandNotFound):
await ctx.channel.send(f'O comando não existe')
else:
await ctx.channel.send(f'Teve um erro aí bicho')
raise error
@commands.command(name="help", alisases=['ajuda'], help=config.HELP_HELP)
async def help_msg(self, ctx):
helptxt = ''
help_music = '-- MUSIC\n'
help_random = '-- RANDOM\n'
help_warframe = '-- WARFRAME\n'
help_help = '-- HELP\n'
help_others = '-- OTHERS\n'
for command in self.__bot.commands:
if command.name in self.__comandos['MUSIC']:
help_music += f'**{command}** - {command.help}\n'
elif command.name in self.__comandos['HELP']:
help_help += f'**{command}** - {command.help}\n'
elif command.name in self.__comandos['OTHERS']:
help_others += f'**{command}** - {command.help}\n'
elif command.name in self.__comandos['WARFRAME']:
help_warframe += f'**{command}** - {command.help}\n'
else:
help_random += f'**{command}** - {command.help}\n'
helptxt = f'{help_music}\n{help_warframe}\n{help_random}\n{help_others}\n{help_help}'
embedhelp = discord.Embed(
colour=config.COLOURS['grey'],
title=f'Comandos do {self.__bot.user.name}',
description=helptxt
)
embedhelp.set_thumbnail(url=self.__bot.user.avatar_url)
await ctx.send(embed=embedhelp)
def setup(bot):
bot.add_cog(Control(bot))

175
vulkan/commands/Music.py Normal file
View File

@@ -0,0 +1,175 @@
import discord
from discord.ext import commands
from config import config
from vulkan.music.Player import Player
from vulkan.music.utils import *
class Music(commands.Cog):
def __init__(self, bot):
self.__guilds = {}
self.__bot: discord.Client = bot
@commands.command(name="play", help=config.HELP_PLAY, aliases=['p', 'tocar'])
async def play(self, ctx, *args):
user_input = " ".join(args)
player = self.__get_player(ctx)
if player == None:
player = Player(self.__bot, ctx.guild)
self.__guilds[ctx.guild] = player
if is_connected(ctx) == None:
result = await player.connect(ctx)
if result['success'] == False:
await self.__send_embed(ctx, description=result['reason'], colour_name='red')
return
await player.play(ctx, user_input)
@commands.command(name="queue", help=config.HELP_QUEUE, aliases=['q', 'fila'])
async def queue(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
embed = await player.queue()
await ctx.send(embed=embed)
@commands.command(name="skip", help=config.HELP_SKIP, aliases=['pular'])
async def skip(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
await player.skip()
@commands.command(name='stop', help=config.HELP_STOP, aliases=['parar'])
async def stop(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
await player.stop()
@commands.command(name='pause', help=config.HELP_PAUSE, aliases=['pausar'])
async def pause(self, ctx):
player = self.__get_player(ctx)
if player == None:
print('No player')
return
else:
success = await player.pause()
if success:
await self.__send_embed(ctx, description='Song paused', colour_name='blue')
@commands.command(name='resume', help=config.HELP_RESUME, aliases=['soltar'])
async def resume(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
success = await player.resume()
if success:
await self.__send_embed(ctx, description='Song Playing', colour_name='blue')
@commands.command(name='loop', help=config.HELP_LOOP, aliases=['repeat'])
async def loop(self, ctx, args: str):
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.loop(args)
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='clear', help=config.HELP_CLEAR, aliases=['limpar'])
async def clear(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
await player.clear()
@commands.command(name='np', help=config.HELP_NP, aliases=['playing', 'now'])
async def now_playing(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
embed = await player.now_playing()
await self.__clean_messages(ctx)
await ctx.send(embed=embed)
@commands.command(name='shuffle', help=config.HELP_SHUFFLE, aliases=['aleatorio'])
async def shuffle(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.shuffle()
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='move', help=config.HELP_MOVE, aliases=['mover'])
async def move(self, ctx, pos1, pos2='1'):
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.move(pos1, pos2)
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='remove', help=config.HELP_REMOVE, aliases=['remover'])
async def remove(self, ctx, position):
"""Remove a song from the queue in the position"""
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.remove(position)
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='reset', help=config.HELP_RESET, aliases=['resetar'])
async def reset(self, ctx):
player = self.__get_player(ctx)
if player != None:
await player.stop()
self.__guilds[ctx.guild] = Player(self.__bot, ctx.guild)
async def __send_embed(self, ctx, title='', description='', colour_name='grey'):
try:
colour = config.COLOURS[colour_name]
except Exception as e:
colour = config.COLOURS['grey']
embedvc = discord.Embed(
title=title,
description=description,
colour=colour
)
await ctx.send(embed=embedvc)
async def __clean_messages(self, ctx):
"""Clear Bot messages if send recently"""
last_messages = await ctx.channel.history(limit=5).flatten()
for message in last_messages:
try:
if message.author == self.__bot.user:
if len(message.embeds) > 0:
embed = message.embeds[0]
if embed.title == 'Song Playing Now' or embed.title == 'Song Looping Now':
await message.delete()
except:
continue
def __get_player(self, ctx):
try:
return self.__guilds[ctx.guild]
except:
return None
def setup(bot):
bot.add_cog(Music(bot))

View File

@@ -0,0 +1,59 @@
from discord.client import Client
import requests
import json
from config import config
from discord.ext import commands
from random import random as rand
class Phrases(commands.Cog):
"""Deal with the generation of motivational phrases"""
def __init__(self, bot: Client):
self.__bot = bot
@commands.command(name='frase', help=config.HELP_FRASE)
async def phrase(self, ctx):
"""Send some phrase to the requester"""
secret = await self.__calculate_rgn()
if secret != None:
await ctx.send(secret)
else:
phrase = await self.__get_phrase()
await ctx.send(phrase)
async def __calculate_rgn(self):
"""Calculate the chance from the phrase function return a secret custom message"""
x = rand()
if x < 0.15:
return config.SECRET_MESSAGE
else:
return None
async def __get_phrase(self):
"""Get the phrase from the server"""
tries = 0
while True:
tries += 1
if tries > config.MAX_API_PHRASES_TRIES:
return 'O banco de dados dos cara tá off, bando de vagabundo, tenta depois aí bicho'
try:
response = requests.get(config.PHRASES_API)
data = json.loads(response.content)
phrase = data['quoteText']
author = data['quoteAuthor']
if phrase == '' or author == '':
continue
text = f'{phrase} \nBy: {author}'
return text
except Exception as e:
continue
def setup(bot):
bot.add_cog(Phrases(bot))

80
vulkan/commands/Random.py Normal file
View File

@@ -0,0 +1,80 @@
from random import randint, random
import discord
from discord.ext import commands
from config import config
class Random(commands.Cog):
"""Deal with returning random things"""
def __init__(self, bot):
self.__bot = bot
@commands.command(name='random', help=config.HELP_RANDOM)
async def random(self, ctx, arg: str):
try:
arg = int(arg)
except Exception as e:
embed = discord.Embed(
description='Manda um número aí ow animal',
colour=config.COLOURS['red']
)
await ctx.send(embed=embed)
return
if arg < 1:
a = arg
b = 1
else:
a = 1
b = arg
x = randint(a, b)
embed = discord.Embed(
title=f'Número Aleatório entre {a, b}',
description=x,
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
@commands.command(name='cara', help=config.HELP_CARA)
async def cara(self, ctx):
x = random()
if x < 0.5:
result = 'cara'
else:
result = 'coroa'
embed = discord.Embed(
title='Cara Cora',
description=f'Resultado: {result}',
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
@commands.command(name='escolha', help=config.HELP_ESCOLHA)
async def escolher(self, ctx, *args: str):
try:
user_input = " ".join(args)
itens = user_input.split(sep=',')
index = randint(0, len(itens)-1)
embed = discord.Embed(
title='Escolha de algo',
description=itens[index],
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
except Exception as e:
embed = discord.Embed(
title='Escolha de algo',
description='Erro: Envie várias coisas separadas por vírgula',
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
def setup(bot):
bot.add_cog(Random(bot))

118
vulkan/commands/Warframe.py Normal file
View File

@@ -0,0 +1,118 @@
import requests
import json
import discord
from discord.ext import commands
from config import config
from discord import Embed
class Warframe(commands.Cog):
"""Deal with the generation of warframe data"""
def __init__(self, bot: discord.Client):
self.__bot = bot
self.__open_functions = ['cetus', 'cambion', 'fissures']
@commands.command(name='warframe', help=config.HELP_WARFRAME)
async def warframe(self, ctx, arg) -> Embed:
if arg in self.__open_functions:
function = getattr(Warframe, f'_Warframe__{arg}')
embed = await function(self)
await ctx.send(embed=embed)
else:
info = f'Warframe commands: {self.__open_functions}'
embed = Embed(
title='Invalid Command',
description=info,
colour=config.COLOURS['blue']
)
await ctx.send(embed=embed)
async def __cetus(self) -> Embed:
description = await self.__get_cetus()
embed = discord.Embed(
title='Warframe Cetus Timing',
description=description,
colour=config.COLOURS['blue']
)
return embed
async def __get_cetus(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
tries += 1
if tries > config.MAX_API_CETUS_TRIES:
return 'Os DE baiano não tão com o banco de dados ligado'
try:
response = requests.get(config.CETUS_API)
data = json.loads(response.content)
short = data['shortString']
return short
except Exception as e:
continue
async def __cambion(self) -> Embed:
description = await self.__get_cambion()
embed = discord.Embed(
title='Warframe Cambion Timing',
description=description,
colour=config.COLOURS['blue']
)
return embed
async def __get_cambion(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
tries += 1
if tries > config.MAX_API_CAMBION_TRIES:
return 'Os DE baiano não tão com o banco de dados ligado'
try:
response = requests.get(config.CAMBION_API)
data = json.loads(response.content)
info = f'**Active:** {data["active"]}\n**Time Left:** {data["timeLeft"]}'
return info
except:
continue
async def __fissures(self) -> Embed:
description = await self.__get_fissures()
embed = discord.Embed(
title='Warframe Fissures Status',
description=description,
colour=config.COLOURS['blue']
)
return embed
async def __get_fissures(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
tries += 1
if tries > config.MAX_API_FISSURES_TRIES:
return 'Os DE baiano não tão com o banco de dados ligado'
try:
response = requests.get(config.FISSURES_API)
data = json.loads(response.content)
info = ''
for pos, fissure in enumerate(data, start=1):
info += f'`{pos}` - **Mission:** {fissure["missionType"]} | **Type:** {fissure["tier"]} | **Timing:** {fissure["eta"]} | **Storm:** {fissure["isStorm"]}\n'
return info
except Exception as e:
continue
def setup(bot):
bot.add_cog(Warframe(bot))

139
vulkan/music/Downloader.py Normal file
View File

@@ -0,0 +1,139 @@
import asyncio
import concurrent.futures
from config import config
from yt_dlp import YoutubeDL
from yt_dlp.utils import ExtractorError, DownloadError
from vulkan.music.Song import Song
from vulkan.music.utils import is_url
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
def __init__(self) -> None:
self.__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': config.MAX_PLAYLIST_LENGTH,
}
def download_one(self, song: Song) -> Song:
"""Receives a song object, finish his download and return it"""
if song.identifier == None:
return None
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Destroy the music with problems
return None
else:
song.finish_down(song_info)
return song
def extract_youtube_link(self, playlist_url: str) -> list:
"""Extract all songs direct URL from a Youtube Link
Arg: Url String
Return: List with the direct youtube URL of each song
"""
if is_url(playlist_url): # If Url
options = self.__YDL_OPTIONS
options['extract_flat'] = True
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(playlist_url, download=False)
songs_identifiers = []
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
songs_identifiers.append(
f"https://www.youtube.com/watch?v={entry['id']}")
else: # Or a single music
songs_identifiers.append(result['original_url'])
return songs_identifiers # Return a list
except (ExtractorError, DownloadError) as e:
return None
else:
print('Invalid type of playlist URL')
return None
async def preload(self, songs: list) -> None:
"""Download the full info of the song object"""
for song in songs:
asyncio.ensure_future(self.__download_songs(song))
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
Arg: URL from Youtube
Return: Dict with the full youtube information of the music, including source to play it
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = False
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
return result
except (ExtractorError, DownloadError) as e: # Any type of error in download
return None
async def __download_songs(self, song: Song) -> None:
"""Download a music object asynchronously"""
if song.source != None: # If Music already preloaded
return
def download_song(song):
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
else:
song.finish_down(song_info)
# Creating a loop task to download each song
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.MAX_PRELOAD_SONGS
)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
Arg: Music Name
Return: A dict containing the song information
"""
if type(title) != str:
return None
config = self.__YDL_OPTIONS
config['extract_flat'] = False
with YoutubeDL(self.__YDL_OPTIONS) as ydl:
try:
search = f"ytsearch:{title}"
result = ydl.extract_info(search, download=False)
if result == None:
return None
# Return a dict with the full info of first music
return result['entries'][0]
except Exception as e:
return None

View File

@@ -0,0 +1,89 @@
from abc import ABC, abstractproperty, abstractmethod
class IPlaylist(ABC):
"""Class to manage and control the songs to play and played"""
@abstractproperty
def looping_one(self):
pass
@abstractproperty
def looping_all(self):
pass
@abstractproperty
def songs_to_preload(self) -> list:
pass
@abstractmethod
def __len__(self):
pass
@abstractmethod
def next_song(self):
pass
@abstractmethod
def prev_song(self):
pass
@abstractmethod
def add_song(self, identifier: str) -> None:
pass
@abstractmethod
def shuffle(self) -> None:
pass
@abstractmethod
def revert(self) -> None:
pass
@abstractmethod
def clear(self) -> None:
pass
@abstractmethod
def loop_one(self) -> str:
pass
@abstractmethod
def loop_all(self) -> str:
pass
@abstractmethod
def loop_off(self) -> str:
pass
@abstractmethod
def destroy_song(self, song_destroy) -> None:
pass
class ISong(ABC):
"""Store the usefull information about a Song"""
@abstractmethod
def finish_down(self, info: dict) -> None:
pass
@abstractmethod
def source(self) -> str:
pass
@abstractmethod
def title(self) -> str:
pass
@abstractmethod
def duration(self) -> str:
pass
@abstractmethod
def identifier(self) -> str:
pass
@abstractmethod
def destroy(self) -> None:
pass

294
vulkan/music/Player.py Normal file
View File

@@ -0,0 +1,294 @@
import discord
from discord.ext import commands
from config import config
import datetime
from vulkan.music.Downloader import Downloader
from vulkan.music.Playlist import Playlist
from vulkan.music.Searcher import Searcher
from vulkan.music.Types import Provider
from vulkan.music.utils import *
class Player(commands.Cog):
def __init__(self, bot, guild):
self.__searcher: Searcher = Searcher()
self.__downloader: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__guild: discord.Guild = guild
self.__playing = False
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
async def connect(self, ctx):
if not ctx.author.voice:
return {'success': False, 'reason': config.NO_CHANNEL}
if self.__guild.voice_client == None:
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return {'success': True, 'reason': ''}
def __play_next(self, error, ctx):
song = self.__playlist.next_song()
if song != None: # If there is not a song for the song
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx, song):
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
async def play(self, ctx, *args) -> str:
user_input = " ".join(args)
try:
if self.__guild.voice_client == None:
voice_channel = ctx.author.voice.channel
await voice_channel.connect()
except:
embed = discord.Embed(
description=config.NO_CHANNEL, colour=config.COLOURS['red'])
await ctx.send(embed=embed)
else:
songs_quant = 0
try:
musics_identifiers, provider = self.__searcher.search(
user_input)
except:
return config.INVALID_INPUT
if provider == Provider.Unknown:
return config.INVALID_INPUT
if provider == Provider.YouTube:
try:
musics_identifiers = self.__downloader.extract_youtube_link(
musics_identifiers[0])
except:
await ctx.send('Problema com o download do Youtube')
for identifier in musics_identifiers: # Creating songs
last_song = self.__playlist.add_song(identifier)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(
last_song) # Download the new music
if song == None: # If song not downloaded
embed = discord.Embed(
description=config.DOWNLOADING_ERROR, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
elif not self.__playing: # If not playing
text = f'You added the song **{song.title}** to the queue'
embed = discord.Embed(
description=text, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
else: # If playing
title = config.SONG_ADDED
embed = self.__format_embed(song.info, title=title)
await ctx.send(embed=embed)
else:
text = f'You added {songs_quant} songs to the queue'
embed = discord.Embed(
description=text, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
if not self.__playing:
try_another = True
while try_another:
first = self.__playlist.next_song()
if first == None:
embed = discord.Embed(
description=config.DOWNLOADING_ERROR, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
break
while True:
if first.source != None: # If song got downloaded
try_another = False
break
if first.problematic: # If song got any error, try another one
break
else: # The song is downloading, check again
continue
if first != None:
await self.__play_music(ctx, first)
async def queue(self) -> discord.Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = 'Song Looping Now'
return self.__format_embed(info, title)
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
title = song.title if song.title else 'Downloading...'
text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n"
title = 'Songs in Queue'
if len(songs_preload) > 0:
if self.__playlist.looping_all:
title = 'Repeating All'
else:
text = 'There is no musics in queue'
embed = discord.Embed(
title=title,
description=text,
colour=config.COLOURS['blue']
)
return embed
async def skip(self) -> bool:
if self.__guild.voice_client != None:
self.__guild.voice_client.stop()
return True
else:
return False
async def stop(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def pause(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_playing():
self.__guild.voice_client.pause()
return True
async def resume(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_paused():
self.__guild.voice_client.resume()
return True
async def loop(self, args: str):
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop'
return description
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> discord.Embed:
if self.__playlist.looping_one:
title = 'Song Looping Now'
else:
title = 'Song Playing Now'
current_song = self.__playlist.current
embed = self.__format_embed(current_song.info, title)
return embed
async def shuffle(self) -> str:
try:
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
return 'Musics shuffled successfully'
except:
return 'An error ocurred :/'
async def move(self, pos1, pos2='1') -> str:
try:
pos1 = int(pos1)
pos2 = int(pos2)
except:
return 'This command require a number'
result = self.__playlist.move_songs(pos1, pos2)
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
return result
async def remove(self, position) -> str:
"""Remove a song from the queue in the position"""
try:
position = int(position)
except:
return 'This command require a number'
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info, title='') -> discord.Embed:
"""Configure the embed to show the song information"""
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=False)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
return embedvc

179
vulkan/music/Playlist.py Normal file
View File

@@ -0,0 +1,179 @@
from collections import deque
from config import config
import random
from vulkan.music.Interfaces import IPlaylist
from vulkan.music.Song import Song
class Playlist(IPlaylist):
"""Class to manage and control the songs to play and played"""
def __init__(self) -> None:
self.__queue = deque() # Store the musics to play
self.__songs_history = deque() # Store the musics played
self.__name_history = deque() # Store the name of musics played
self.__looping_one = False
self.__looping_all = False
self.__current: Song = None
@property
def looping_one(self) -> bool:
return self.__looping_one
@property
def looping_all(self) -> bool:
return self.__looping_all
@property
def current(self) -> Song:
return self.__current
@property
def songs_to_preload(self) -> list:
return list(self.__queue)[:config.MAX_PRELOAD_SONGS]
def __len__(self) -> int:
return len(self.__queue)
def next_song(self) -> Song:
"""Return the next song to play"""
if self.__current == None and len(self.__queue) == 0:
# If not playing and nothing to play
return None
# If playing
played_song = self.__current
# Check if need to repeat the played song
if self.__looping_one: # Insert the current song to play again
self.__queue.appendleft(played_song)
if self.__looping_all: # Insert the current song in the end of queue
self.__queue.append(played_song)
while True: # Try to get the source of next song
if len(self.__queue) == 0: # If no more song to play, return None
return None
# Att the current with the first one
self.__current = self.__queue[0]
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(
self.__current.identifier) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current
def prev_song(self) -> Song:
"""Return the source of the last song played
Return None or the source of the prev song
"""
if len(self.__songs_history) == 0:
return None
else:
return self.__songs_history[0].source
def add_song(self, identifier: str) -> Song:
"""Create a song object, add to queue and return it"""
song = Song(identifier, self) # Cria a musica com o identificador
self.__queue.append(song)
return song
def shuffle(self) -> None:
"""Shuffle the order of the songs to play"""
random.shuffle(self.__queue)
def revert(self) -> None:
"""Revert the order of the songs to play"""
self.__queue.reverse()
def clear(self) -> None:
"""Clear the songs to play song history"""
self.__queue.clear()
self.__songs_history.clear()
def loop_one(self) -> str:
"""Try to start the loop of the current song
Return: Embed descrition to show to user
"""
if self.__looping_all == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_one == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_one = True
return 'Repeating the current song'
def loop_all(self) -> str:
"""Try to start the loop of all songs
Return: Embed descrition to show to user
"""
if self.__looping_one == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_all == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_all = True
return 'Repeating all songs in queue'
def loop_off(self) -> str:
"""Disable both types of loop"""
if self.__looping_all == False and self.__looping_one == False:
return "The loop is already off, you fucking dick head"
self.__looping_all = False
self.__looping_one = False
return 'Loop disable'
def destroy_song(self, song_destroy: Song) -> None:
"""Destroy a song object from the queue"""
for song in self.__queue:
if song == song_destroy:
self.__queue.remove(song)
break
def move_songs(self, pos1, pos2) -> str:
"""Receive two position and try to change the songs in those positions, -1 is the last
Positions: First music is 1
Return (Error bool, string) with the status of the function, to show to user
"""
if pos1 == -1:
pos1 = len(self.__queue)
if pos2 == -1:
pos2 = len(self.__queue)
if pos2 not in range(1, len(self.__queue) + 1) or pos1 not in range(1, len(self.__queue) + 1):
return 'Numbers must be between 1 and queue length, or -1 for the last song'
try:
song1 = self.__queue[pos1-1]
song2 = self.__queue[pos2-1]
self.__queue[pos1-1] = song2
self.__queue[pos2-1] = song1
song1_name = song1.title if song1.title else song1.identifier
song2_name = song2.title if song2.title else song2.identifier
return f'Song `{song1_name}` in position `{pos1}` moved with `{song2_name}` in position `{pos2}` successfully'
except Exception as e:
print(e)
return 'There was a problem with the moving of songs'
def remove_song(self, position) -> tuple:
if position not in range(1, len(self.__queue) + 1) and position != -1:
return 'Numbers must be between 1 and queue length, or -1 for the last song'
else:
song = self.__queue[position-1]
self.__queue.remove(song)
song_name = song.title if song.title else song.identifier
return f'Song `{song_name}` removed successfully'

48
vulkan/music/Searcher.py Normal file
View File

@@ -0,0 +1,48 @@
from vulkan.music.Types import Provider
from vulkan.music.Spotify import SpotifySearch
from vulkan.music.utils import is_url
class Searcher():
"""Turn the user input into list of musics names, support youtube and spotify"""
def __init__(self) -> None:
self.__Spotify = SpotifySearch()
def search(self, music: str) -> list:
"""Return a list with the song names or an URL
Arg -> User Input, a string with the
Return -> A list of musics names and Provider Type
"""
url_type = self.__identify_source(music)
if url_type == Provider.YouTube:
return [music], Provider.YouTube
elif url_type == Provider.Spotify:
if self.__Spotify.connected == True:
musics = self.__Spotify.search(music)
return musics, Provider.Name
else:
return [], Provider.Unknown
elif url_type == Provider.Name:
return [music], Provider.Name
elif url_type == Provider.Unknown:
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
"""Identify the provider of a music"""
if not is_url(music):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music:
return Provider.YouTube
if "https://open.spotify.com" in music:
return Provider.Spotify
# If no match
return Provider.Unknown

66
vulkan/music/Song.py Normal file
View File

@@ -0,0 +1,66 @@
from vulkan.music.Interfaces import ISong, IPlaylist
class Song(ISong):
"""Store the usefull information about a Song"""
def __init__(self, identifier: str, playlist: IPlaylist) -> None:
"""Create a song with only the URL to the youtube song"""
self.__identifier = identifier
self.__info = {}
self.__problematic = False
self.__playlist: IPlaylist = playlist
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
'title', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail', 'original_url']
for key in self.__usefull_keys:
try:
self.__info[key] = info[key]
except Exception as e:
raise e
@property
def source(self) -> str:
"""Return the Song Source URL to play"""
if 'url' in self.__info.keys():
return self.__info['url']
else:
return None
@property
def title(self) -> str:
"""Return the Song Title"""
if 'title' in self.__info.keys():
return self.__info['title']
else:
return None
@property
def duration(self) -> str:
"""Return the Song Title"""
if 'duration' in self.__info.keys():
return self.__info['duration']
else:
return 0.0
@property
def identifier(self) -> str:
return self.__identifier
@property
def problematic(self) -> bool:
return self.__problematic
def destroy(self) -> None:
"""Mark this song with problems and removed from the playlist due to any type of error"""
self.__problematic = True
self.__playlist.destroy_song(self)
@property
def info(self):
return self.__info

121
vulkan/music/Spotify.py Normal file
View File

@@ -0,0 +1,121 @@
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from config import config
class SpotifySearch():
"""Search a Spotify music or playlist and return the musics names"""
def __init__(self) -> None:
self.__connected = False
self.__connect()
@property
def connected(self):
return self.__connected
def __connect(self) -> bool:
try:
# Initialize the connection with Spotify API
self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
client_id=config.SPOTIFY_ID, client_secret=config.SPOTIFY_SECRET))
self.__connected = True
return True
except:
return False
def search(self, music) -> list:
"""Search and return the title of musics on Spotify"""
type = music.split('/')[3].split('?')[0]
code = music.split('/')[4].split('?')[0]
if type == 'album':
musics = self.__get_album(code)
elif type == 'playlist':
musics = self.__get_playlist(code)
elif type == 'track':
musics = self.__get_track(code)
else:
return None
return musics
def __get_album(self, code) -> list:
"""Get the externals urls of a album
ARG: Spotify Code of the Album
"""
if self.__connected == True:
try:
# Load all music objects
results = self.__api.album_tracks(code)
musics = results['items']
while results['next']: # Get the next pages
results = self.__api.next(results)
musics.extend(results['items'])
musicsTitle = []
for music in musics:
try:
title = self.__extract_title(music)
musicsTitle.append(title)
except:
pass
return musicsTitle
except Exception as e:
raise e
def __get_playlist(self, code) -> list:
"""Get the externals urls of a playlist
Arg: Spotify Code of the Playlist
"""
try:
results = self.__api.playlist_items(code)
itens = results['items']
while results['next']: # Load the next pages
results = self.__api.next(results)
itens.extend(results['items'])
musics = []
for item in itens:
musics.append(item['track'])
titles = []
for music in musics:
try:
title = self.__extract_title(music)
titles.append(title)
except Exception as e:
raise e
return titles
except Exception as e:
raise e
def __get_track(self, code) -> list:
"""Convert a external_url track to the title of the music
ARG: Spotify Code of the Music
"""
results = self.__api.track(code)
name = results['name']
artists = ''
for artist in results['artists']:
artists += f'{artist["name"]} '
return [f'{name} {artists}']
def __extract_title(self, music: dict) -> str:
"""Receive a spotify music object and return his title
ARG: music dict returned by Spotify
"""
title = f'{music["name"]} '
for artist in music['artists']:
title += f'{artist["name"]} '
return title

9
vulkan/music/Types.py Normal file
View File

@@ -0,0 +1,9 @@
from enum import Enum
class Provider(Enum):
"""Store Enum Types of the Providers"""
Spotify = "Spotify"
YouTube = "YouTube"
Name = 'Track Name'
Unknown = "Unknown"

36
vulkan/music/utils.py Normal file
View File

@@ -0,0 +1,36 @@
import re
def is_connected(ctx):
try:
voice_channel = ctx.guild.voice_client.channel
return voice_channel
except:
return None
def format_time(duration):
if not duration:
return "00:00"
hours = duration // 60 // 60
minutes = duration // 60 % 60
seconds = duration % 60
return "{}{}{:02d}:{:02d}".format(
hours if hours else "",
":" if hours else "",
minutes,
seconds
)
def is_url(string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False