Vulkan/Music/Player.py
2022-03-23 13:48:01 -04:00

408 lines
14 KiB
Python

from discord.ext import commands
from Config.Config import Configs
from discord import Client, Guild, FFmpegPCMAudio, Embed
from discord.ext.commands import Context
from datetime import timedelta
from Music.Downloader import Downloader
from Music.Playlist import Playlist
from Music.Searcher import Searcher
from Music.Song import Song
from Music.Types import Provider
from Utils.Utils import *
class Player(commands.Cog):
def __init__(self, bot: Client, guild: Guild):
self.__searcher: Searcher = Searcher()
self.__down: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: Client = bot
self.__guild: Guild = guild
self.__timer = Timer(self.__timeout_handler)
self.__playing = False
self.__config = Configs()
# Flag to control if the player should stop totally the playing
self.__force_stop = False
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
@property
def playing(self) -> bool:
return self.__playing
@property
def playlist(self) -> Playlist:
return self.__playlist
async def connect(self, ctx: Context) -> bool:
if not ctx.author.voice:
return False
if self.__guild.voice_client == None:
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return True
def __play_next(self, error, ctx: Context) -> None:
if self.__force_stop: # If it's forced to stop player
self.__force_stop = False
return None
song = self.__playlist.next_song()
if song != None:
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx: Context, song: Song) -> None:
try:
source = self.__ensure_source(song)
if source == None:
self.__play_next(None, ctx)
self.__playing = True
player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
self.__timer.cancel()
self.__timer = Timer(self.__timeout_handler)
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
except:
self.__play_next(None, ctx)
async def play(self, ctx: Context, track: str, requester: str) -> str:
try:
links, provider = self.__searcher.search(track)
if provider == Provider.Unknown or links == None:
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.INVALID_INPUT,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
if provider == Provider.YouTube:
links = await self.__down.extract_info(links[0])
if len(links) == 0:
embed = Embed(
title=self.__config.ERROR_TITLE,
description="This video is unavailable",
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
songs_quant = 0
for info in links:
song = self.__playlist.add_song(info, requester)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
except Exception as e:
print(f'DEVELOPER NOTE -> Error while Downloading in Player: {e}')
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.DOWNLOADING_ERROR,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return
if songs_quant == 1:
song = self.__down.finish_one_song(song)
pos = len(self.__playlist)
if song.problematic:
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.DOWNLOADING_ERROR,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
elif not self.__playing:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.SONG_ADDED.format(song.title),
colour=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
else:
embed = self.__format_embed(song.info, self.__config.SONG_ADDED_TWO, pos)
await ctx.send(embed=embed)
else:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.SONGS_ADDED.format(songs_quant),
colour=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
if not self.__playing:
first_song = self.__playlist.next_song()
await self.__play_music(ctx, first_song)
async def play_prev(self, ctx: Context) -> None:
"""Stop the currently playing cycle, load the previous song and play"""
if self.__playlist.looping_one or self.__playlist.looping_all: # Do not allow play if loop
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.LOOP_ON,
colour=self.__config.COLOURS['blue']
)
await ctx.send(embed=embed)
return None
song = self.__playlist.prev_song() # Prepare the prev song to play again
if song == None:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.NOT_PREVIOUS,
colour=self.__config.COLOURS['blue']
)
await ctx.send(embed=embed)
else:
if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused():
# Will forbidden next_song to execute after stopping current player
self.__force_stop = True
self.__guild.voice_client.stop()
self.__playing = False
await self.__play_music(ctx, song)
async def queue(self) -> Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = self.__config.ONE_SONG_LOOPING
return self.__format_embed(info, title)
songs_preload = self.__playlist.songs_to_preload
if len(songs_preload) == 0:
title = self.__config.SONG_PLAYER
text = self.__config.EMPTY_QUEUE
else:
if self.__playlist.looping_all:
title = self.__config.ALL_SONGS_LOOPING
else:
title = self.__config.QUEUE_TITLE
await self.__down.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload]))
total_songs = len(self.__playlist)
text = f'📜 Queue length: {total_songs} | ⌛ Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
song_name = song.title if song.title else self.__config.SONG_DOWNLOADING
text += f"**`{pos}` - ** {song_name} - `{format_time(song.duration)}`\n"
embed = Embed(
title=title,
description=text,
colour=self.__config.COLOURS['blue']
)
return embed
def history(self) -> Embed:
history = self.__playlist.songs_history
print(f'Player -> {history}')
if len(history) == 0:
text = self.__config.HISTORY_EMPTY
else:
text = f'\n📜 History Length: {len(history)} | Max: {self.__config.MAX_SONGS_HISTORY}\n'
for pos, song in enumerate(history, start=1):
text += f"**`{pos}` - ** {song.title} - `{format_time(song.duration)}`\n"
embed = Embed(
title=self.__config.HISTORY_TITLE,
description=text,
colour=self.__config.COLOURS['blue']
)
return embed
async def stop(self) -> bool:
if self.__guild.voice_client is None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def force_stop(self) -> None:
try:
if self.__guild.voice_client is None:
return
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
self.__playlist.clear()
self.__playlist.loop_off()
except Exception as e:
print(f'DEVELOPER NOTE -> Force Stop Error: {e}')
async def pause(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_playing():
self.__guild.voice_client.pause()
return True
async def resume(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_paused():
self.__guild.voice_client.resume()
return True
async def loop(self, args: str) -> str:
args = args.lower()
if self.__playlist.current == None:
return self.__config.PLAYER_NOT_PLAYING
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
raise commands.UserInputError(self.__config.MY_ERROR_BAD_COMMAND)
return description
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> Embed:
if not self.__playing:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.PLAYER_NOT_PLAYING,
colour=self.__config.COLOURS['blue']
)
return embed
if self.__playlist.looping_one:
title = self.__config.ONE_SONG_LOOPING
else:
title = self.__config.SONG_PLAYING
current_song = self.__playlist.current
embed = self.__format_embed(current_song.info, title)
return embed
async def shuffle(self) -> str:
try:
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return self.__config.SONGS_SHUFFLED
except:
return self.__config.ERROR_SHUFFLING
async def move(self, pos1, pos2='1') -> str:
if not self.__playing:
return self.__config.PLAYER_NOT_PLAYING
try:
pos1 = int(pos1)
pos2 = int(pos2)
except:
return self.__config.ERROR_NUMBER
result = self.__playlist.move_songs(pos1, pos2)
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return result
async def remove(self, position) -> str:
"""Remove a song from the queue in the position"""
if not self.__playing:
return self.__config.PLAYER_NOT_PLAYING
try:
position = int(position)
except:
return self.__config.ERROR_NUMBER
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info: dict, title='', position='Playing Now') -> Embed:
"""Configure the embed to show the song information"""
embedvc = Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=self.__config.COLOURS['blue']
)
embedvc.add_field(name=self.__config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=False)
embedvc.add_field(name=self.__config.SONGINFO_REQUESTER,
value=info['requester'],
inline=True)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(timedelta(seconds=info['duration']))
embedvc.add_field(name=self.__config.SONGINFO_DURATION,
value=f"{duration}",
inline=True)
else:
embedvc.add_field(name=self.__config.SONGINFO_DURATION,
value=self.__config.SONGINFO_UNKNOWN_DURATION,
inline=True)
embedvc.add_field(name=self.__config.SONGINFO_POSITION,
value=position,
inline=True)
return embedvc
async def __timeout_handler(self) -> None:
if self.__guild.voice_client == None:
return
if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused():
self.__timer = Timer(self.__timeout_handler)
elif self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
await self.__guild.voice_client.disconnect()
def __ensure_source(self, song: Song) -> str:
while True:
if song.source != None: # If song got downloaded
return song.source
if song.problematic: # If song got any error
return None