Merge pull request #8 from RafaelSolVargas/creating_player

Spliting Music and Player classes
This commit is contained in:
Rafael Vargas 2022-01-06 16:58:43 -04:00 committed by GitHub
commit 31074fee77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 406 additions and 225 deletions

View File

@ -45,6 +45,7 @@ HELP_SHUFFLE = 'Shuffle the songs playing'
HELP_PLAY = '(title/youtube/spotify) - Plays a song'
HELP_MOVE = '(x, y) - Moves a song from position x to y in queue'
HELP_REMOVE = '(x, -1) - Remove a song in the position x or -1 for the last song'
HELP_RESET = 'Reset the Player of a guild'
HELP_WARFRAME = f'({BOT_PREFIX}warframe help for more)'
HELP_RANDOM = '(x) - Return a random number between 1 and x'
HELP_ESCOLHA = '(x, y, z...) - Choose randomly one item passed'
@ -53,6 +54,11 @@ HELP_DROP = '(user_name) - Try to remove the user from the current voice channel
HELP_FRASE = "Send a randomly phrase, perhaps you get the braba"
HELP_HELP = 'This command :)'
NO_CHANNEL = 'To play some music, connect to any voice channel first.'
INVALID_INPUT = 'This type of input was too strange, try something better'
DOWNLOADING_ERROR = 'An error occurred while downloading'
SONG_ADDED = 'Song added to the Queue'
ABSOLUTE_PATH = ''
COOKIE_PATH = '/config/cookies/cookies.txt'

View File

@ -11,7 +11,7 @@ class Control(commands.Cog):
def __init__(self, bot: Client):
self.__bot = bot
self.__comandos = {
'MUSIC': ['resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np', 'shuffle', 'move', 'remove'],
'MUSIC': ['resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np', 'shuffle', 'move', 'remove', 'reset'],
'WARFRAME': ['warframe'],
'RANDOM': ['escolha', 'cara', 'random'],
'HELP': ['help'],

View File

@ -2,239 +2,140 @@ import discord
from discord.ext import commands
from config import config
from vulkanbot.music.Downloader import Downloader
from vulkanbot.music.Playlist import Playlist
from vulkanbot.music.Searcher import Searcher
from vulkanbot.music.Types import Provider
from vulkanbot.music.Player import Player
from vulkanbot.music.utils import *
class Music(commands.Cog):
def __init__(self, bot):
self.__searcher: Searcher = Searcher()
self.__downloader: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__guilds = {}
self.__bot: discord.Client = bot
self.__playing = False
self.__vc = ""
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
def __play_next(self, error, ctx):
source = self.__playlist.next_song()
if source != None: # If there is not a source for the song
coro = self.__play_music(ctx, source)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx, song):
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__vc.play(player, after=lambda e: self.__play_next(e, ctx))
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
@commands.command(name="play", help=config.HELP_PLAY, aliases=['p', 'tocar'])
async def play(self, ctx, *args):
user_input = " ".join(args)
try:
if len(self.__bot.voice_clients) == 0:
voice_channel = ctx.author.voice.channel
self.__vc = await voice_channel.connect()
except Exception as e:
print(e)
await self.__send_embed(ctx, title='Para tocar música, primeiro se conecte a um canal de voz.', colour_name='grey')
return
else:
songs_quant = 0
musics_identifiers, provider = self.__searcher.search(user_input)
player = self.__get_player(ctx)
if player == None:
player = Player(self.__bot, ctx.guild)
self.__guilds[ctx.guild] = player
if provider == Provider.Unknown: # If type not identified
await self.__send_embed(ctx, description='Entrada inválida, tente algo melhor', colour_name='blue')
if is_connected(ctx) == None:
result = await player.connect(ctx)
if result['success'] == False:
await self.__send_embed(ctx, description=result['reason'], colour_name='red')
return
if provider == Provider.YouTube: # If youtube source
musics_identifiers = self.__downloader.extract_youtube_link(
musics_identifiers[0])
for identifier in musics_identifiers: # Creating songs
last_song = self.__playlist.add_song(identifier)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(
last_song) # Download the new music
if song == None: # If song not downloaded
await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue')
elif not self.__playing: # If not playing
await self.__send_embed(ctx, description=f'Você adicionou a música **{song.title}** à playlist', colour_name='blue')
else: # If playing
await ctx.send(embed=song.embed(title='Song added to Queue'))
else:
await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='blue')
if not self.__playing:
try_another = True
while try_another:
first = self.__playlist.next_song()
if first == None:
await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue')
break
while True:
if first.source != None: # If song got downloaded
try_another = False
break
if first.problematic: # If song got any error, try another one
break
else: # The song is downloading, checking another time
continue
if first != None:
await self.__play_music(ctx, first)
await player.play(ctx, user_input)
@commands.command(name="queue", help=config.HELP_QUEUE, aliases=['q', 'fila'])
async def queue(self, ctx):
if self.__playlist.looping_one: # If Repeating one
await self.now_playing(ctx)
player = self.__get_player(ctx)
if player == None:
return
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
title = song.title if song.title else 'Downloading...'
text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n"
if len(songs_preload) > 0:
if self.__playlist.looping_all: # If repeating all
await self.__send_embed(ctx, title='Repeating all', description=text, colour_name='blue')
else: # Repeating off
await self.__send_embed(ctx, title='Songs in Queue', description=text, colour_name='blue')
else: # No music
await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red')
embed = await player.queue()
await ctx.send(embed=embed)
@commands.command(name="skip", help=config.HELP_SKIP, aliases=['pular'])
async def skip(self, ctx):
if len(self.__bot.voice_clients) > 0:
self.__vc.stop()
player = self.__get_player(ctx)
if player == None:
return
else:
await player.skip()
@commands.command(name='stop', help=config.HELP_STOP)
@commands.command(name='stop', help=config.HELP_STOP, aliases=['parar'])
async def stop(self, ctx):
if self.__vc == '':
player = self.__get_player(ctx)
if player == None:
return
if self.__vc.is_connected():
self.__playlist.clear()
self.__vc.stop()
await self.__vc.disconnect()
else:
await player.stop()
@commands.command(name='pause', help=config.HELP_PAUSE)
@commands.command(name='pause', help=config.HELP_PAUSE, aliases=['pausar'])
async def pause(self, ctx):
if self.__vc == '':
player = self.__get_player(ctx)
if player == None:
print('No player')
return
if self.__vc.is_playing():
self.__vc.pause()
await self.__send_embed(ctx, description='Música pausada', colour_name='green')
else:
success = await player.pause()
if success:
await self.__send_embed(ctx, description='Song paused', colour_name='blue')
@commands.command(name='resume', help=config.HELP_RESUME)
@commands.command(name='resume', help=config.HELP_RESUME, aliases=['soltar'])
async def resume(self, ctx):
if self.__vc == '':
player = self.__get_player(ctx)
if player == None:
return
if self.__vc.is_paused():
self.__vc.resume()
await self.__send_embed(ctx, description='Música tocando', colour_name='green')
else:
success = await player.resume()
if success:
await self.__send_embed(ctx, description='Song Playing', colour_name='blue')
@commands.command(name='loop', help=config.HELP_LOOP)
@commands.command(name='loop', help=config.HELP_LOOP, aliases=['repeat'])
async def loop(self, ctx, args: str):
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop'
await self.__send_embed(ctx, description=description, colour_name='grey')
@commands.command(name='clear', help=config.HELP_CLEAR)
async def clear(self, ctx):
self.__playlist.clear()
@commands.command(name='np', help=config.HELP_NP)
async def now_playing(self, ctx):
if self.__playlist.looping_one:
title = 'Song Looping Now'
else:
title = 'Song Playing Now'
current_song = self.__playlist.current
await self.__clean_messages(ctx)
await ctx.send(embed=current_song.embed(title=title))
@commands.command(name='shuffle', help=config.HELP_SHUFFLE)
async def shuffle(self, ctx):
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await self.__send_embed(ctx, description='Musicas embaralhadas', colour_name='blue')
@commands.command(name='move', help=config.HELP_MOVE)
async def move(self, ctx, pos1, pos2='1'):
try:
pos1 = int(pos1)
pos2 = int(pos2)
except Exception as e:
print(e)
await ctx.send('This command require a number')
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.loop(args)
await self.__send_embed(ctx, description=result, colour_name='blue')
result = self.__playlist.move_songs(pos1, pos2)
@commands.command(name='clear', help=config.HELP_CLEAR, aliases=['limpar'])
async def clear(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
await player.clear()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await ctx.send(result)
@commands.command(name='np', help=config.HELP_NP, aliases=['playing', 'now'])
async def now_playing(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
embed = await player.now_playing()
await self.__clean_messages(ctx)
await ctx.send(embed=embed)
@commands.command(name='remove', help=config.HELP_MOVE)
@commands.command(name='shuffle', help=config.HELP_SHUFFLE, aliases=['aleatorio'])
async def shuffle(self, ctx):
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.shuffle()
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='move', help=config.HELP_MOVE, aliases=['mover'])
async def move(self, ctx, pos1, pos2='1'):
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.move(pos1, pos2)
await self.__send_embed(ctx, description=result, colour_name='blue')
@commands.command(name='remove', help=config.HELP_REMOVE, aliases=['remover'])
async def remove(self, ctx, position):
"""Remove a song from the queue in the position"""
try:
position = int(position)
except Exception as e:
print(e)
await ctx.send('This command require a number')
player = self.__get_player(ctx)
if player == None:
return
else:
result = await player.remove(position)
await self.__send_embed(ctx, description=result, colour_name='blue')
result = self.__playlist.remove_song(position)
await ctx.send(result)
@commands.command(name='reset', help=config.HELP_RESET, aliases=['resetar'])
async def reset(self, ctx):
player = self.__get_player(ctx)
if player != None:
await player.stop()
self.__guilds[ctx.guild] = Player(self.__bot, ctx.guild)
async def __send_embed(self, ctx, title='', description='', colour_name='grey'):
try:
@ -258,12 +159,18 @@ class Music(commands.Cog):
if message.author == self.__bot.user:
if len(message.embeds) > 0:
embed = message.embeds[0]
if embed.title == 'Song Playing Now':
if embed.title == 'Song Playing Now' or embed.title == 'Song Looping Now':
await message.delete()
except Exception as e:
print(e)
continue
def __get_player(self, ctx):
try:
return self.__guilds[ctx.guild]
except:
return None
def setup(bot):
bot.add_cog(Music(bot))

288
vulkanbot/music/Player.py Normal file
View File

@ -0,0 +1,288 @@
import discord
from discord.ext import commands
from config import config
import datetime
from vulkanbot.music.Downloader import Downloader
from vulkanbot.music.Playlist import Playlist
from vulkanbot.music.Searcher import Searcher
from vulkanbot.music.Types import Provider
from vulkanbot.music.utils import *
class Player(commands.Cog):
def __init__(self, bot, guild):
self.__searcher: Searcher = Searcher()
self.__downloader: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__guild: discord.Guild = guild
self.__playing = False
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
async def connect(self, ctx):
if not ctx.author.voice:
return {'success': False, 'reason': config.NO_CHANNEL}
if self.__guild.voice_client == None:
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return {'success': True, 'reason': ''}
def __play_next(self, error, ctx):
song = self.__playlist.next_song()
if song != None: # If there is not a song for the song
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx, song):
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
async def play(self, ctx, *args) -> str:
user_input = " ".join(args)
try:
if self.__guild.voice_client == None:
voice_channel = ctx.author.voice.channel
await voice_channel.connect()
except Exception as e:
print(e)
return config.NO_CHANNEL
else:
songs_quant = 0
musics_identifiers, provider = self.__searcher.search(user_input)
if provider == Provider.Unknown:
return config.INVALID_INPUT
if provider == Provider.YouTube:
musics_identifiers = self.__downloader.extract_youtube_link(
musics_identifiers[0])
for identifier in musics_identifiers: # Creating songs
print('Creating Song')
last_song = self.__playlist.add_song(identifier)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(
last_song) # Download the new music
if song == None: # If song not downloaded
embed = discord.Embed(
description=config.DOWNLOADING_ERROR, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
elif not self.__playing: # If not playing
text = f'You added the song **{song.title}** to the queue'
embed = discord.Embed(
description=text, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
else: # If playing
title = config.SONG_ADDED
embed = self.__format_embed(song.info, title=title)
await ctx.send(embed=embed)
else:
text = f'You added {songs_quant} songs to the queue'
embed = discord.Embed(
description=text, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
if not self.__playing:
try_another = True
while try_another:
first = self.__playlist.next_song()
if first == None:
embed = discord.Embed(
description=config.DOWNLOADING_ERROR, colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
break
while True:
if first.source != None: # If song got downloaded
try_another = False
break
if first.problematic: # If song got any error, try another one
break
else: # The song is downloading, check again
continue
if first != None:
await self.__play_music(ctx, first)
async def queue(self) -> discord.Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = 'Song Looping Now'
return self.__format_embed(info, title)
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
title = song.title if song.title else 'Downloading...'
text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n"
title = 'Songs in Queue'
if len(songs_preload) > 0:
if self.__playlist.looping_all:
title = 'Repeating All'
else:
text = 'There is no musics in queue'
embed = discord.Embed(
title=title,
description=text,
colour=config.COLOURS['blue']
)
return embed
async def skip(self) -> bool:
if self.__guild.voice_client != None:
self.__guild.voice_client.stop()
return True
else:
return False
async def stop(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def pause(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_playing():
self.__guild.voice_client.pause()
return True
async def resume(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_paused():
self.__guild.voice_client.resume()
return True
async def loop(self, args: str):
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop'
return description
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> discord.Embed:
if self.__playlist.looping_one:
title = 'Song Looping Now'
else:
title = 'Song Playing Now'
current_song = self.__playlist.current
embed = self.__format_embed(current_song.info, title)
return embed
async def shuffle(self) -> str:
try:
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
return 'Musics shuffled successfully'
except:
return 'An error ocurred :/'
async def move(self, pos1, pos2='1') -> str:
try:
pos1 = int(pos1)
pos2 = int(pos2)
except:
return 'This command require a number'
result = self.__playlist.move_songs(pos1, pos2)
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
return result
async def remove(self, position) -> str:
"""Remove a song from the queue in the position"""
try:
position = int(position)
except Exception as e:
print(e)
return 'This command require a number'
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info, title='') -> discord.Embed:
"""Configure the embed to show the song information"""
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=False)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
return embedvc

View File

@ -1,7 +1,3 @@
from discord import Embed
from config import config
import datetime
from vulkanbot.music.Interfaces import ISong, IPlaylist
@ -66,30 +62,6 @@ class Song(ISong):
self.__problematic = True
self.__playlist.destroy_song(self)
def embed(self, title: str) -> Embed:
"""Configure the embed to show the song information"""
embedvc = Embed(
title=title,
description=f"[{self.__info['title']}]({self.__info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=self.__info['uploader'],
inline=False)
if 'thumbnail' in self.__info.keys():
embedvc.set_thumbnail(url=self.__info['thumbnail'])
if 'duration' in self.__info.keys():
duration = str(datetime.timedelta(seconds=self.__info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=False)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=False)
return embedvc
@property
def info(self):
return self.__info

View File

@ -1,6 +1,14 @@
import re
def is_connected(ctx):
try:
voice_channel = ctx.guild.voice_client.channel
return voice_channel
except:
return None
def format_time(duration):
if not duration:
return "00:00"