Merge pull request #2 from RafaelSolVargas/upgrading-music

Upgrading music
This commit is contained in:
Rafael Vargas 2021-12-28 22:26:16 -04:00 committed by GitHub
commit a92fe18b61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1108 additions and 392 deletions

View File

@ -4,10 +4,13 @@ CETUS_API = dotenv_values('.env')['CETUS_API']
BOT_TOKEN = dotenv_values('.env')['BOT_TOKEN'] BOT_TOKEN = dotenv_values('.env')['BOT_TOKEN']
SPOTIFY_ID = dotenv_values('.env')['SPOTIFY_ID'] SPOTIFY_ID = dotenv_values('.env')['SPOTIFY_ID']
SPOTIFY_SECRET = dotenv_values('.env')['SPOTIFY_SECRET'] SPOTIFY_SECRET = dotenv_values('.env')['SPOTIFY_SECRET']
SECRET_MESSAGE = dotenv_values('.env')['SECRET_MESSAGE']
PHRASES_API = dotenv_values('.env')['PHRASES_API']
BOT_PREFIX = '!' BOT_PREFIX = '!'
INITIAL_EXTENSIONS = {'vulkan.commands.Phrases', 'vulkan.commands.Warframe', INITIAL_EXTENSIONS = {'vulkanbot.commands.Phrases', 'vulkanbot.commands.Warframe',
'vulkan.general.Filter', 'vulkan.general.Control', 'vulkan.music.Music'} 'vulkanbot.general.Filter', 'vulkanbot.general.Control', 'vulkanbot.music.Music',
'vulkanbot.commands.Random'}
VC_TIMEOUT = 600 # seconds VC_TIMEOUT = 600 # seconds
VC_TIMEOUT_DEFAULT = True VC_TIMEOUT_DEFAULT = True
@ -22,7 +25,11 @@ CHANNEL_NOT_FOUND_MESSAGE = "Error: Could not find channel"
INFO_HISTORY_TITLE = "Songs Played:" INFO_HISTORY_TITLE = "Songs Played:"
MAX_HISTORY_LENGTH = 10 MAX_HISTORY_LENGTH = 10
MAX_PLAYLIST_LENGTH = 50
MAX_QUEUE_LENGTH = 10
MAX_TRACKNAME_HISTORY_LENGTH = 15 MAX_TRACKNAME_HISTORY_LENGTH = 15
MAX_API_PHRASES_TRIES = 10
MAX_API_CETUS_TRIES = 10
SONGINFO_UPLOADER = "Uploader: " SONGINFO_UPLOADER = "Uploader: "
SONGINFO_DURATION = "Duration: " SONGINFO_DURATION = "Duration: "
@ -58,7 +65,7 @@ COOKIE_PATH = '/config/cookies/cookies.txt'
COLOURS = { COLOURS = {
'red': 0xDC143C, 'red': 0xDC143C,
'green': 0x00FF7F, 'green': 0x58D68D,
'grey': 0x708090, 'grey': 0x708090,
'blue': 0x0000CD 'blue': 0x3498DB
} }

View File

@ -3,7 +3,7 @@ import discord
from config import config from config import config
from discord.ext import commands from discord.ext import commands
from vulkan.ErrorHandler import ErrorHandler from vulkanbot.ErrorHandler import ErrorHandler
intents = discord.Intents.default() intents = discord.Intents.default()

View File

@ -1,59 +0,0 @@
import requests
import json
import discord
from discord.ext import commands
from random import random as rand
class Phrases(commands.Cog):
"""Deal with the generation of motivational phrases"""
def __init__(self, bot):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.command(name='frase', help='Envia uma frase legal no seu PV')
async def send_phrase(self, ctx):
# There is a chance that the phrase will be send for the dev
sended = await self.calculate_rgn(ctx)
if sended:
return
while True:
try:
response = requests.get(
'http://api.forismatic.com/api/1.0/?method=getQuote&key=457653&format=json&lang=en')
data = json.loads(response.content)
phrase = data['quoteText']
author = data['quoteAuthor']
text = f'{phrase} \nBy: {author}'
await ctx.send(text)
break
except json.decoder.JSONDecodeError:
continue
except Exception as e:
print(e)
await ctx.channel.send('Houve um erro inesperado :/')
break
async def calculate_rgn(self, ctx):
x = rand()
print(x)
if x < 0.15:
await ctx.send('Se leu seu cu é meu\nBy: Minha Pica')
return True
else:
return False
def setup(bot):
bot.add_cog(Phrases(bot))

View File

@ -1,51 +0,0 @@
import requests
import json
import discord
from dotenv import dotenv_values
from discord.ext import commands
from config import config
class Warframe(commands.Cog):
"""Deal with the generation of warframe data"""
def __init__(self, bot):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.command(name='cetus', help='Informa o tempo atual de Cetus - Warframe')
async def get_cetus(self, ctx):
try:
response = requests.get(config.CETUS_API)
data = json.loads(response.content)
short = data['shortString']
responseText = f'{short}'
embed = discord.Embed(
title='Warframe Cetus Timing',
description=responseText,
colour=0xFF0000
)
await ctx.send(embed=embed)
except Exception as e:
print(e)
responseText = f'Houve um erro inesperado :/'
embed = discord.Embed(
title='Warframe Cetus Timing',
description=responseText,
colour=0xFF0000
)
await ctx.send(embed=embed)
def setup(bot):
bot.add_cog(Warframe(bot))

View File

@ -1,38 +0,0 @@
import discord
from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument
from discord.ext import commands
from config import config
class Control(commands.Cog):
"""Control the flow of the Bot"""
def __init__(self, bot):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.Cog.listener()
async def on_ready(self):
print(config.STARTUP_MESSAGE)
await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help"))
print(config.STARTUP_COMPLETE_MESSAGE)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
if isinstance(error, MissingRequiredArgument):
await ctx.channel.send(f'Falta argumentos. Digite {self.__bot.prefix}help para ver os comandos')
elif isinstance(error, CommandNotFound):
await ctx.channel.send(f'O comando não existe')
else:
raise error
def setup(bot):
bot.add_cog(Control(bot))

View File

@ -1,239 +0,0 @@
import discord
from discord import colour
from discord.embeds import Embed
from discord.ext import commands
from discord.ext.commands.core import command
from youtube_dl import YoutubeDL
colours = {
'red': 0xDC143C,
'green': 0x00FF7F,
'grey': 0x708090,
'blue': 0x0000CD
}
class Music(commands.Cog):
def __init__(self, client):
self.client = client
self.is_playing = False
self.repetingOne = False
self.repetingAll = False
self.current = ()
# 2d array containing [song, channel]
# self.music_queue vai conter as buscas recebidas feitas no youtube em ordem
# Caminho do executável para rodar na minha máquina
self.ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe'
# Segue o padrão de [[{'source', 'title'}, canal], [musica, canal]]
self.music_queue = []
self.vc = "" # Objeto voice_client do discord
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'executable': self.ffmpeg,
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'}
def search_yt(self, item):
with YoutubeDL(self.YDL_OPTIONS) as ydl:
try: # Busca um video no youtube e traz o titulo e a fonte dele em formato de dict
info = ydl.extract_info("ytsearch:%s" %
item, download=False)['entries'][0]
except Exception:
return False
# Retorna a fonte e o titulo buscado
return {'source': info['formats'][0]['url'], 'title': info['title']}
def play_next(self):
if len(self.music_queue) > 0:
if self.repetingOne:
# Coloca a musica atual no topo da fila
self.music_queue.insert(0, self.current)
elif self.repetingAll:
# Joga a musica atual para o final da fila
self.music_queue.append(self.current)
self.is_playing = True
source = self.music_queue[0][0]['source']
self.current = self.music_queue[0] # Update current music
self.music_queue.pop(0) # Remove from the queue
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
self.vc.play(player, after=lambda e: self.play_next()) # Play
else:
self.is_playing = False
self.repetingAll = False
self.repetingOne = False
# infinite loop checking
async def play_music(self):
if len(self.music_queue) > 0:
self.is_playing = True
source = self.music_queue[0][0]['source']
# Try to connect to voice channel if you are not already connected
if self.vc == "" or not self.vc.is_connected() or self.vc == None:
# Conecta o voice_client no channel da primeira música da lista
self.vc = await self.music_queue[0][1].connect()
else:
await self.vc.move_to(self.music_queue[0][1])
self.current = self.music_queue[0] # Update current music
self.music_queue.pop(0) # Remove from the queue
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
# Start the player
self.vc.play(player, after=lambda e: self.play_next())
else:
self.is_playing = False
await self.vc.disconnect()
@commands.command(name="help", alisases=['ajuda'], help="Comando de ajuda")
async def ajuda(self, ctx):
helptxt = ''
for command in self.client.commands:
helptxt += f'**{command}** - {command.help}\n'
embedhelp = discord.Embed(
colour=1646116, # grey
title=f'Comandos do {self.client.user.name}',
description=helptxt
)
embedhelp.set_thumbnail(url=self.client.user.avatar_url)
await ctx.send(embed=embedhelp)
@commands.command(name="play", help="Toca uma música do YouTube", aliases=['p', 'tocar'])
async def p(self, ctx, *args):
query = " ".join(args)
try:
# Nome do canal de voz que vai entrar
voice_channel = ctx.author.voice.channel
except:
# If voice_channel is None:
await self.send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey')
return
else:
song = self.search_yt(query)
if type(song) == type(True): # Caso seja retornado um booleano da busca
await self.send_embed(ctx, description='Algo deu errado! Tente escrever o nome da música novamente!', colour_name='red')
return
else:
await self.send_embed(ctx, description=f"Você adicionou a música **{song['title']}** à fila!", colour_name='green')
self.music_queue.append([song, voice_channel])
if self.is_playing == False:
await self.play_music()
@commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila'])
async def q(self, ctx):
fila = ""
for x in range(len(self.music_queue)):
fila += f"**{x+1} - ** {self.music_queue[x][0]['title']}\n"
if self.repetingOne: # If Repeting one
await self.send_embed(ctx, title='Repeting One Music',
description=f'Música: **{self.current[0]["title"]}**', colour_name='green')
elif fila != "":
if self.repetingAll: # If repeting all
await self.send_embed(ctx, title='Repetindo todas', description=fila, colour_name='green')
else: # Repeting off
await self.send_embed(ctx, description=fila, colour_name='green')
else: # No music
await self.send_embed(ctx, description='Não existem músicas na fila.', colour_name='red')
@commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular'])
async def skip(self, ctx):
if self.vc != "" and self.vc:
self.vc.stop()
await self.send_embed(ctx, description=f'Você pulou a música\nRepetindo Uma: {self.repetingOne} \
\nRepetindo Todas: {self.repetingAll}', colour_name='green')
@commands.command(name='stop', help='Para de tocar músicas')
async def stop(self, ctx):
if self.vc == '':
return
if self.vc.is_connected():
# Remove todas as músicas da lista
self.music_queue = []
self.current = ()
self.repetingOne = False
self.repetingAll = False
self.is_playing = False
self.vc.stop()
await self.vc.disconnect()
@commands.command(name='pause', help='Pausa a música')
async def pause(self, ctx):
if self.vc == '':
return
if self.vc.is_playing():
self.vc.pause()
await self.send_embed(ctx, description='Música pausada', colour_name='green')
@commands.command(name='resume', help='Despausa a música atual')
async def resume(self, ctx):
if self.vc == '':
return
if self.vc.is_paused():
self.vc.resume()
await self.send_embed(ctx, description='Música tocando', colour_name='green')
@commands.command(name='repeat_one', help='Repete a música atual')
async def repeat_one(self, ctx):
if not self.is_playing: # Garante que o Bot está tocando
await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red')
return
if self.repetingAll: # Verifica se o repeting all não está ligado
await self.send_embed(ctx, title='Já está repetindo todas', colour_name='red')
return
else: # Liga o repeting one
self.repetingOne = True
await self.send_embed(ctx, description='Repetir uma música ligado', colour_name='green')
@commands.command(name='repeat_all', help='Repete toda a fila')
async def repeat_all(self, ctx):
if not self.is_playing: # Garante que o Bot está tocando
await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red')
return
if self.repetingOne: # Verifica se o repeting all não está ligado
await self.send_embed(ctx, title='Já está repetindo uma música', colour_name='red')
return
else: # Liga o repeting one
self.repetingAll = True
await self.send_embed(ctx, description='Repetir todas as músicas ligado', colour_name='green')
@commands.command(name='repeat_off', help='Desativa o repetir músicas')
async def repeat_off(self, ctx):
if not self.is_playing: # Garante que o Bot está tocando
await self.send_embed(ctx, title='Vulkan não está tocando agora', colour_name='red')
return
else:
self.repetingOne = False
self.repetingAll = False
await self.send_embed(ctx, description='Repetir músicas desligado', colour_name='green')
@skip.error # Erros para kick
async def skip_error(self, ctx, error):
if isinstance(error, commands.MissingPermissions):
embedvc = discord.Embed(
colour=12255232,
description=f"Você precisa da permissão **Gerenciar canais** para pular músicas."
)
await ctx.send(embed=embedvc)
else:
raise error
async def send_embed(self, ctx, title='', description='', colour_name='red'):
try:
colour = colours[colour_name]
except Exception as e:
colour = colours['red']
embedvc = discord.Embed(
title=title,
description=description,
colour=colour
)
await ctx.send(embed=embedvc)
def setup(client):
client.add_cog(Music(client))

View File

@ -0,0 +1,64 @@
import requests
import json
from config import config
from discord.ext import commands
from random import random as rand
class Phrases(commands.Cog):
"""Deal with the generation of motivational phrases"""
def __init__(self, bot):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.command(name='frase', help='Envia uma frase pica, talvez a braba')
async def phrase(self, ctx):
# There is a chance that the phrase will be send for the dev
secret = await self.__calculate_rgn()
if secret != None:
await ctx.send(secret)
else:
phrase = await self.__get_phrase()
await ctx.send(phrase)
async def __calculate_rgn(self):
x = rand()
if x < 0.15:
return config.SECRET_MESSAGE
else:
return None
async def __get_phrase(self):
tries = 0
while True:
tries += 1
if tries > config.MAX_API_PHRASES_TRIES:
return 'O banco de dados dos cara tá off, bando de vagabundo, tenta depois aí bicho'
try:
response = requests.get(config.PHRASES_API)
data = json.loads(response.content)
phrase = data['quoteText']
author = data['quoteAuthor']
if phrase == '' or author == '': # Don't accept incomplete phrases
continue
text = f'{phrase} \nBy: {author}'
return text
except Exception as e:
continue
def setup(bot):
bot.add_cog(Phrases(bot))

View File

@ -0,0 +1,80 @@
from random import randint, random
import discord
from discord.ext import commands
from config import config
class Random(commands.Cog):
"""Deal with returning random things"""
def __init__(self, bot):
self.__bot = bot
@commands.command(name='random', help='Número aleatório de 1 a X')
async def random(self, ctx, arg: str):
try:
arg = int(arg)
except Exception as e:
embed = discord.Embed(
description='Manda um número aí ow animal',
colour=config.COLOURS['red']
)
await ctx.send(embed=embed)
return
if arg < 1:
a = arg
b = 1
else:
a = 1
b = arg
x = randint(a, b)
embed = discord.Embed(
title=f'Número Aleatório entre {a, b}',
description=x,
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
@commands.command(name='cara', help='coroa')
async def cara(self, ctx):
x = random()
if x < 0.5:
result = 'cara'
else:
result = 'coroa'
embed = discord.Embed(
title='Cara Cora',
description=f'Resultado: {result}',
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
@commands.command(name='escolha', help='Escolhe um dos itens, separador: Vírgula')
async def escolher(self, ctx, *args: str):
try:
user_input = " ".join(args)
itens = user_input.split(sep=',')
index = randint(0, len(itens)-1)
embed = discord.Embed(
title='Escolha de algo',
description=itens[index],
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
except Exception as e:
embed = discord.Embed(
title='Escolha de algo',
description='Erro: Envie várias coisas separadas por vírgula',
colour=config.COLOURS['green']
)
await ctx.send(embed=embed)
def setup(bot):
bot.add_cog(Random(bot))

View File

@ -0,0 +1,52 @@
import requests
import json
import discord
from discord.ext import commands
from config import config
class Warframe(commands.Cog):
"""Deal with the generation of warframe data"""
def __init__(self, bot):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.command(name='cetus', help='Informa o tempo atual de Cetus - Warframe')
async def cetus(self, ctx):
description = await self.__get_api()
embed = discord.Embed(
title='Warframe Cetus Timing',
description=description,
colour=config.COLOURS['blue']
)
await ctx.send(embed=embed)
async def __get_api(self):
"""Return the information of the Warframe API"""
tries = 0
while True:
tries += 1
if tries > config.MAX_API_CETUS_TRIES:
return 'Os DE baiano não tão com o banco de dados ligado'
try:
response = requests.get(config.CETUS_API)
data = json.loads(response.content)
short = data['shortString']
return short
except Exception as e:
continue
def setup(bot):
bot.add_cog(Warframe(bot))

View File

View File

@ -0,0 +1,77 @@
import discord
from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument
from discord.ext import commands
from config import config
class Control(commands.Cog):
"""Control the flow of the Bot"""
def __init__(self, bot):
self.__bot = bot
self.__comandos = {
'MUSIC': ['this', 'resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear'],
'RANDOM': ['escolha', 'cara', 'random'],
'HELP': ['help'],
'OTHERS': ['cetus', 'frase']
}
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.Cog.listener()
async def on_ready(self):
print(config.STARTUP_MESSAGE)
await self.__bot.change_presence(status=discord.Status.online, activity=discord.Game(name=f"Vulkan | type {config.BOT_PREFIX}help"))
print(config.STARTUP_COMPLETE_MESSAGE)
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
if isinstance(error, MissingRequiredArgument):
await ctx.channel.send(f'Falta argumentos. Digite {config.BOT_PREFIX}help para ver os comandos')
elif isinstance(error, CommandNotFound):
await ctx.channel.send(f'O comando não existe')
else:
raise error
@commands.command(name="help", alisases=['ajuda'], help="Comando de ajuda")
async def help_msg(self, ctx):
helptxt = ''
help_music = '-- MUSIC\n'
help_random = '-- RANDOM\n'
help_help = '-- HELP\n'
help_others = '-- OTHERS\n'
for command in self.__bot.commands:
if command.name in self.__comandos['MUSIC']:
help_music += f'**{command}** - {command.help}\n'
elif command.name in self.__comandos['HELP']:
help_help += f'**{command}** - {command.help}\n'
elif command.name in self.__comandos['OTHERS']:
help_others += f'**{command}** - {command.help}\n'
else:
help_random += f'**{command}** - {command.help}\n'
helptxt = f'{help_music}\n{help_random}\n{help_others}\n{help_help}'
embedhelp = discord.Embed(
colour=config.COLOURS['grey'],
title=f'Comandos do {self.__bot.user.name}',
description=helptxt
)
embedhelp.set_thumbnail(url=self.__bot.user.avatar_url)
await ctx.send(embed=embedhelp)
@commands.Cog.listener()
async def on_error(self, error):
print('On Error')
def setup(bot):
bot.add_cog(Control(bot))

View File

View File

@ -0,0 +1,145 @@
import re
from config import config
from yt_dlp import YoutubeDL
from yt_dlp.utils import ExtractorError, DownloadError
from vulkanbot.music.Types import Provider
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
def __init__(self) -> None:
self.__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': config.MAX_PLAYLIST_LENGTH,
}
def download_urls(self, musics_input, provider: Provider) -> list:
"""Download the musics direct URL from Youtube and return in a list
Arg: List with names or youtube url or a Unique String
Return: List with the direct youtube URL of each music
"""
if type(provider) != Provider:
return None
if type(musics_input) != list and type(musics_input) != str:
return None
if provider == Provider.Name: # Send a list of names
musics_urls = self.__download_titles(musics_input)
return musics_urls
elif provider == Provider.YouTube: # Send a URL or Title
url = self.__download_one(musics_input)
return url
else:
return None
def download_source(self, url) -> dict:
"""Download musics full info and source from Music URL
Arg: URL from Youtube
Return: Dict with the full youtube information of the music, including source to play it
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = False
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
return result
except (ExtractorError, DownloadError) as e: # Any type of error in download
print(e)
return None
def __download_one(self, music: str) -> list:
"""Download one music/playlist direct link from Youtube
Arg: Playlist URL or Music Name to download direct URL
Return: List with the Youtube URL of each music downloaded
"""
if type(music) != str:
return
if self.__is_url(music): # If Url
info = self.__download_links(music) # List of dict
else: # If Title
info = self.__download_titles(music) # List of dict
return info
def __download_titles(self, musics_names: list) -> list:
"""Download a music direct URL using his name.
Arg: Music Name
Return: List with one dict, containing the music direct URL and title
"""
if type(musics_names) == str: # Turn str into list
musics_names = [musics_names]
musics_info = []
with YoutubeDL(self.__YDL_OPTIONS) as ydl:
try:
for name in musics_names:
search = f"ytsearch:{name}"
result = ydl.extract_info(search, download=False)
id = result['entries'][0]['id']
music_info = {
'url': f"https://www.youtube.com/watch?v={id}",
'title': result['entries'][0]['title']
}
musics_info.append(music_info)
return musics_info # Return a list
except Exception as e:
raise e
def __download_links(self, url: str) -> list:
"""Download musics direct links from Playlist URL or Music URL
Arg_Url: URL from Youtube
Return: List of dicts, with the title and url of each music
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = True
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
musics_info = []
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
music_info = {
'title': entry['title'],
'url': f"https://www.youtube.com/watch?v={entry['id']}"
}
musics_info.append(music_info)
else: # Or a single music
music_info = {
'url': result['original_url'],
'title': result['title']
}
musics_info.append(music_info)
return musics_info # Return a list
except ExtractorError or DownloadError:
pass
def __is_url(self, string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False

213
vulkanbot/music/Music.py Normal file
View File

@ -0,0 +1,213 @@
import discord
from discord.ext import commands
import datetime
import asyncio
from config import config
from vulkanbot.music.Downloader import Downloader
from vulkanbot.music.Playlist import Playlist
from vulkanbot.music.Searcher import Searcher
class Music(commands.Cog):
def __init__(self, bot):
self.__searcher = Searcher()
self.__downloader = Downloader()
self.__playlist = Playlist()
self.__playing = False
self.__bot = bot
self.__ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe'
self.__vc = "" # Objeto voice_bot do discord
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'executable': self.__ffmpeg,
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'}
def __play_next(self):
while True:
if len(self.__playlist) > 0:
source = self.__playlist.next_song()
if source == None: # If there is not a source
continue
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next())
break
else:
self.__playing = False
break
# infinite loop checking
async def __play_music(self):
while True:
if len(self.__playlist) > 0:
source = self.__playlist.next_song()
if source == None:
continue
self.__playing = True
player = discord.FFmpegPCMAudio(source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next())
break
else:
self.__playing = False
await self.__vc.disconnect()
break
@commands.command(name="play", help="Toca música - YouTube/Spotify/Título", aliases=['p', 'tocar'])
async def play(self, ctx, *args):
user_input = " ".join(args)
try:
if self.__vc == "" or not self.__vc.is_connected() or self.__vc == None:
voice_channel = ctx.author.voice.channel
self.__vc = await voice_channel.connect()
except Exception as e:
# If voice_channel is None:
print(e)
await self.__send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey')
return
else:
songs_quant = 0
musics_names, provider = self.__searcher.search(user_input)
for music in musics_names:
music_info = self.__downloader.download_urls(music, provider)
for music in music_info:
self.__playlist.add_song(music)
songs_quant += 1
if songs_quant == 1:
await self.__send_embed(ctx, description=f"Você adicionou a música **{music_info[0]['title']}** à fila!", colour_name='green')
else:
await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='green')
if not self.__playing:
await self.__play_music()
@commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila'])
async def queue(self, ctx):
if self.__playlist.looping_one: # If Repeting one
# Send the current song with this title
await self.this(ctx)
return
fila = self.__playlist.queue()
total = len(fila)
text = f'Total musics: {total}\n\n'
# Create the string to description
for pos, song in enumerate(fila):
if pos >= config.MAX_QUEUE_LENGTH: # Max songs to apper in queue list
break
text += f"**{pos+1} - ** {song}\n"
if text != "":
if self.__playlist.looping_all: # If repeting all
await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='green')
else: # Repeting off
await self.__send_embed(ctx, title='Queue', description=text, colour_name='green')
else: # No music
await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red')
@commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular'])
async def skip(self, ctx):
if self.__vc != '' and self.__vc:
print('Skip')
self.__vc.stop()
@commands.command(name='stop', help='Para de tocar músicas')
async def stop(self, ctx):
if self.__vc == '':
return
if self.__vc.is_connected():
self.__playlist.clear()
self.__vc.stop()
await self.__vc.disconnect()
@commands.command(name='pause', help='Pausa a música')
async def pause(self, ctx):
if self.__vc == '':
return
if self.__vc.is_playing():
self.__vc.pause()
await self.__send_embed(ctx, description='Música pausada', colour_name='green')
@commands.command(name='resume', help='Despausa a música atual')
async def resume(self, ctx):
if self.__vc == '':
return
if self.__vc.is_paused():
self.__vc.resume()
await self.__send_embed(ctx, description='Música tocando', colour_name='green')
@commands.command(name='loop', help='Controla a repetição de músicas')
async def loop(self, ctx, args: str):
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop'
await self.__send_embed(ctx, description=description, colour_name='grey')
async def __send_embed(self, ctx, title='', description='', colour_name='grey'):
try:
colour = config.COLOURS[colour_name]
except Exception as e:
colour = config.COLOURS['grey']
embedvc = discord.Embed(
title=title,
description=description,
colour=colour
)
await ctx.send(embed=embedvc)
@commands.command(name='clear', help='Limpa a fila de músicas a tocar')
async def clear(self, ctx):
self.__playlist.clear()
@commands.command(name='this', help='Mostra a música que está tocando no instante')
async def this(self, ctx):
if self.__playlist.looping_one:
title = 'Music Looping Now'
else:
title = 'Music Playing Now'
info = self.__playlist.get_current()
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['url']})",
color=config.COLOURS['grey']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=False)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
await ctx.send(embed=embedvc)
def setup(bot):
bot.add_cog(Music(bot))

179
vulkanbot/music/Playlist.py Normal file
View File

@ -0,0 +1,179 @@
from collections import deque
import random
from vulkanbot.music.Song import Song
from vulkanbot.music.Downloader import Downloader
class Playlist():
"""Class to manage and control the songs to play and played"""
def __init__(self) -> None:
self.__down = Downloader()
self.__queue = deque() # Store the musics to play
self.__songs_history = deque() # Store the musics played
self.__name_history = deque() # Store the name of musics played
self.__looping_one = False
self.__looping_all = False
self.__current = None
@property
def looping_one(self):
return self.__looping_one
@property
def looping_all(self):
return self.__looping_all
def __len__(self):
if self.__looping_one == True or self.__looping_all == True:
return 1
else:
return len(self.__queue)
def next_song(self):
"""Return the source of the next song to play"""
if self.__current == None: # If not playing
if len(self.__queue) == 0: # If nothing to play
return None
else: # If there is music to play
return self.__start()
# If playing
played_song = self.__current
# Check if need to repeat the played song
if self.__looping_one: # Insert the current song to play again
self.__queue.appendleft(played_song)
if self.__looping_all: # Insert the current song in the end of queue
self.__queue.append(played_song)
while True: # Try to get the source of next song
if len(self.__queue) == 0: # If no more song to play, return None
return None
# If there is more to play
# Finish download of the next song
source = self.__prepare_next(self.__queue[0])
if source == None: # If there is a problem in the download
self.__queue.popleft() # Remove the music with problems
continue
return source
def get_current(self):
"""Return current music embed"""
if self.__current:
return self.__current.embed()
else:
return 'Nenhuma música tocando'
def __prepare_next(self, next_song: Song) -> str:
"""Finish the download of the music and return the source"""
if next_song.source == None: # Check if source has already downloaded
url = next_song.url # Get the URL
info = self.__down.download_source(url) # Download the source
if info == None: # If there is a problem in the download
return None
next_song.finish_down(info) # Updating the info of song
# Att the Playlist info
self.__current = next_song # Att the current
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(self.__current.title) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current.source # Return the source of current
def __start(self) -> None:
"""Start the play of the first musics and return his source"""
# Finish download of the next song
url = self.__queue[0].url # Get the URL
info = self.__down.download_source(url) # Download the source
self.__queue[0].finish_down(info) # Att the song
# Att Playlist info
self.__current = self.__queue[0] # Att the current
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(self.__current.title) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current.source # Return the source of current
def prev_song(self):
"""Return the source of the last song played
Return None or the source of the prev song
"""
if len(self.__songs_history) == 0:
return None
else:
return self.__songs_history[0].source
def add_song(self, music: dict) -> None:
"""Receives a music object and store to the play queue"""
if (not 'title' in music.keys()) or (not 'url' in music.keys()):
print('Music without necessary keys')
return
song = Song(title=music['title'], url=music['url']) # Cria a musica
self.__queue.append(song)
def shuffle(self) -> None:
"""Shuffle the order of the songs to play"""
random.shuffle(self.__queue)
def revert(self) -> None:
"""Revert the order of the songs to play"""
self.__queue.reverse()
def clear(self) -> None:
"""Clear the songs to play song history"""
self.__queue.clear()
self.__songs_history.clear()
def loop_one(self) -> str:
"""Try to start the loop of the current song
Return: Embed descrition to show to user
"""
if self.__looping_all == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_one == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_one = True
return 'Repeating the current song'
def loop_all(self) -> str:
"""Try to start the loop of all songs
Return: Embed descrition to show to user
"""
if self.__looping_one == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_all == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_all = True
return 'Repeating all songs in queue'
def loop_off(self) -> str:
"""Disable both types of loop"""
if self.__looping_all == False and self.__looping_one == False:
return "The loop is already off, you fucking dick head"
self.__looping_all = False
self.__looping_one = False
return 'Loop disable'
def queue(self) -> list:
list_songs = []
for song in self.__queue:
title = song.title
list_songs.append(title)
return list_songs

View File

@ -0,0 +1,52 @@
import re
from vulkanbot.music.Types import Provider
from vulkanbot.music.Spotify import SpotifySearch
class Searcher():
"""Turn the user input into list of musics names, support youtube and spotify"""
def __init__(self) -> None:
self.__Spotify = SpotifySearch()
print(f'Spotify Connected: {self.__Spotify.connect()}')
def search(self, music: str) -> list:
"""Return a list with the track name of a music or playlist
Return -> A list of musics names
"""
url_type = self.__identify_source(music)
if url_type == Provider.YouTube:
return [music], Provider.YouTube
elif url_type == Provider.Spotify:
musics = self.__Spotify.search(music)
return musics, Provider.Name
elif url_type == Provider.Name:
return [music], Provider.Name
def __identify_source(self, music) -> Provider:
"""Identify the provider of a music"""
if not self.__is_url(music):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music:
return Provider.YouTube
if "https://open.spotify.com" in music:
return Provider.Spotify
# If no match
return Provider.Unknown
def __is_url(self, string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False

64
vulkanbot/music/Song.py Normal file
View File

@ -0,0 +1,64 @@
from discord.embeds import Embed
class Song():
"""Store the usefull information about a Song"""
def __init__(self, url: str, title: str) -> None:
"""Create a song with only the URL to the youtube song"""
self.__url = url
self.__title = title
self.__info = {}
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
'description', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail']
self.__extract_info(info)
def __extract_info(self, info) -> None:
"""Extract the usefull information returned by the Downloader"""
for key in self.__usefull_keys:
try:
self.__info[key] = info[key]
except Exception as e:
print(e)
raise e
def embed(self) -> Embed:
"""Configure and return the info to create the embed for this song"""
info = {
'title': self.__title,
'url': self.__url,
'uploader': self.__info['uploader']
}
if 'thumbnail' in self.__info.keys():
info['thumbnail'] = self.__info['thumbnail']
if 'duration' in self.__info.keys():
info['duration'] = self.__info['duration']
return info
@property
def info(self) -> dict:
"""Return the compiled info of this song"""
if self.__info:
return self.__info
@property
def title(self) -> str:
return self.__title
@property
def source(self) -> str:
"""Return the Song Source URL to play"""
if 'url' in self.__info.keys():
return self.__info['url']
@property
def url(self) -> str:
return self.__url

149
vulkanbot/music/Spotify.py Normal file
View File

@ -0,0 +1,149 @@
import spotipy
import re
from spotipy.oauth2 import SpotifyClientCredentials
from bs4 import BeautifulSoup
from config import config
import aiohttp
class Browser():
def __init__(self) -> None:
self.__url_regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
self.__session = aiohttp.ClientSession(
headers={'User-Agent': 'python-requests/2.20.0'})
async def search(self, url) -> str:
"""Convert the external_url link to the title of music using browser"""
if re.search(self.__url_regex, url):
result = self.__url_regex.search(url)
url = result.group(0)
async with self.__session.get(url) as response:
page = await response.text()
soup = BeautifulSoup(page, 'html.parser')
title = soup.find('title')
title = title.string
title = title.replace('- song by', '')
title = title.replace('| Spotify', '')
return title
class SpotifySearch():
"""Search a Spotify music or playlist and return the musics names"""
def __init__(self) -> None:
self.__connected = False
self.__browser = Browser()
def connect(self) -> bool:
try:
# Initialize the connection with Spotify API
self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
client_id=config.SPOTIFY_ID, client_secret=config.SPOTIFY_SECRET))
self.__connected = True
return True
except:
return False
def search(self, music) -> list:
"""Search and return the title of musics on Spotify"""
type = music.split('/')[3].split('?')[0]
code = music.split('/')[4].split('?')[0]
if type == 'album':
musics = self.__get_album(code)
elif type == 'playlist':
musics = self.__get_playlist(code)
elif type == 'track':
musics = self.__get_track(code)
else:
return None
return musics
def __get_album(self, code) -> list:
"""Get the externals urls of a album
ARG: Spotify Code of the Album
"""
if self.__connected == True:
try:
# Load all music objects
results = self.__api.album_tracks(code)
musics = results['items']
while results['next']: # Get the next pages
results = self.__api.next(results)
musics.extend(results['items'])
musicsTitle = []
for music in musics:
try:
title = self.__extract_title(music)
musicsTitle.append(title)
except:
pass
return musicsTitle
except Exception as e:
raise e
def __get_playlist(self, code) -> list:
"""Get the externals urls of a playlist
Arg: Spotify Code of the Playlist
"""
try:
results = self.__api.playlist_items(code)
itens = results['items']
while results['next']: # Load the next pages
results = self.__api.next(results)
itens.extend(results['items'])
musics = []
for item in itens:
musics.append(item['track'])
titles = []
for music in musics:
try:
title = self.__extract_title(music)
titles.append(title)
except Exception as e:
raise e
return titles
except Exception as e:
raise e
def __get_track(self, code) -> list:
"""Convert a external_url track to the title of the music
ARG: Spotify Code of the Music
"""
results = self.__api.track(code)
name = results['name']
artists = ''
for artist in results['artists']:
artists += f'{artist["name"]} '
return [f'{name} {artists}']
def __extract_title(self, music: dict) -> str:
"""Receive a spotify music object and return his title
ARG: music dict returned by Spotify
"""
title = f'{music["name"]} '
for artist in music['artists']:
title += f'{artist["name"]} '
return title
async def __convert_spotify(self, url) -> str:
"""(Experimental) - Convert the external_url link to the title of music using browser"""
title = self.__browser(url)
return title

21
vulkanbot/music/Types.py Normal file
View File

@ -0,0 +1,21 @@
from enum import Enum
class Provider(Enum):
"""Store Enum Types of the Providers"""
Spotify = "Spotify"
Spotify_Playlist = "Spotify Playlist"
YouTube = "YouTube"
Name = 'Track Name'
Unknown = "Unknown"
class Playlist_Types(Enum):
Spotify_Playlist = "Spotify Playlist"
YouTube_Playlist = "YouTube Playlist"
Unknown = "Unknown"
class Origins(Enum):
Default = "Default"
Playlist = "Playlist"