Vulkan/vulkan/music/Player.py
2022-01-08 17:07:02 -04:00

328 lines
11 KiB
Python

import discord
from discord import colour
from discord.ext import commands
from config import config
import datetime
from vulkan.music.Downloader import Downloader
from vulkan.music.Playlist import Playlist
from vulkan.music.Searcher import Searcher
from vulkan.music.Types import Provider
from vulkan.music.utils import *
class Player(commands.Cog):
def __init__(self, bot, guild):
self.__searcher: Searcher = Searcher()
self.__down: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__guild: discord.Guild = guild
self.__timer = Timer(self.__timeout_handler)
self.__playing = False
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
async def connect(self, ctx) -> None:
if not ctx.author.voice:
return False
if self.__guild.voice_client == None:
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return True
def __play_next(self, error, ctx) -> None:
if error != None:
return
song = self.__playlist.next_song()
if song != None:
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx, song) -> None:
try:
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
self.__timer.cancel()
self.__timer = Timer(self.__timeout_handler)
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
except:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.ERROR_PLAYING,
colour=config.COLOURS['red']
)
await ctx.send(embed=embed)
return
async def play(self, ctx, track=str, requester=str) -> str:
try:
songs_names, provider = self.__searcher.search(track)
if provider == Provider.Unknown or songs_names == None:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.INVALID_INPUT,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
elif provider == Provider.YouTube:
songs_names = self.__down.extract_youtube_link(songs_names[0])
songs_quant = 0
for name in songs_names:
song = self.__playlist.add_song(name, requester)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
except Exception as e:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
if songs_quant == 1:
song = self.__down.download_one(song)
if song == None:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
elif not self.__playing:
embed = discord.Embed(
title=config.SONG_PLAYER,
description=config.SONG_ADDED.format(song.title),
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
else:
embed = self.__format_embed(song.info, config.SONG_ADDED_TWO)
await ctx.send(embed=embed)
else:
embed = discord.Embed(
title=config.SONG_PLAYER,
description=config.SONGS_ADDED.format(songs_quant),
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
if not self.__playing:
try_another = True
while try_another: # will ensure the first song source to be ready
first_song = self.__playlist.next_song()
if first_song == None:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
break
while True:
if first_song.source != None: # If song got downloaded
try_another = False
break
if first_song.problematic: # If song got any error, try another one
break
if first_song != None:
await self.__play_music(ctx, first_song)
async def queue(self) -> discord.Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = config.ONE_SONG_LOOPING
return self.__format_embed(info, title)
songs_preload = self.__playlist.songs_to_preload
if len(songs_preload) == 0:
title = config.SONG_PLAYER
text = config.EMPTY_QUEUE
else:
if self.__playlist.looping_all:
title = config.ALL_SONGS_LOOPING
else:
title = config.QUEUE_TITLE
await self.__down.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload]))
total_songs = len(self.__playlist)
text = f'📜 Queue length: {total_songs} | ⌛ Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
song_name = song.title if song.title else config.SONG_DOWNLOADING
text += f"**`{pos}` - ** {song_name} - `{format_time(song.duration)}`\n"
embed = discord.Embed(
title=title,
description=text,
colour=config.COLOURS['blue']
)
return embed
async def skip(self) -> bool:
if self.__guild.voice_client != None:
self.__guild.voice_client.stop()
return True
else:
return False
async def stop(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def pause(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_playing():
self.__guild.voice_client.pause()
return True
async def resume(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_paused():
self.__guild.voice_client.resume()
return True
async def loop(self, args: str) -> str:
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = config.HELP_LONG_LOOP
return description
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> discord.Embed:
if self.__playlist.looping_one:
title = config.ONE_SONG_LOOPING
else:
title = config.SONG_PLAYING
current_song = self.__playlist.current
embed = self.__format_embed(current_song.info, title)
return embed
async def shuffle(self) -> str:
try:
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return config.SONGS_SHUFFLED
except:
return config.ERROR_SHUFFLING
async def move(self, pos1, pos2='1') -> str:
try:
pos1 = int(pos1)
pos2 = int(pos2)
except:
return config.ERROR_NUMBER
result = self.__playlist.move_songs(pos1, pos2)
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return result
async def remove(self, position) -> str:
"""Remove a song from the queue in the position"""
try:
position = int(position)
except:
return config.ERROR_NUMBER
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info=dict, title='') -> discord.Embed:
"""Configure the embed to show the song information"""
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=True)
embedvc.add_field(name=config.SONGINFO_REQUESTER,
value=info['requester'],
inline=True)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=True)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=True)
return embedvc
async def __timeout_handler(self) -> None:
if self.__guild.voice_client == None:
return
if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused():
self.__timer = Timer(self.__timeout_handler)
elif self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
await self.__guild.voice_client.disconnect()