Adding some typing to the project and rebuilding the Downloader

This commit is contained in:
Rafael Vargas 2022-03-19 17:17:38 -04:00
parent 5510b5af78
commit e59efb0010
11 changed files with 300 additions and 180 deletions

View File

@ -3,3 +3,4 @@ discord.py[voice]
python-decouple==3.5
spotipy==2.19.0
yt-dlp==2021.12.1
typing_extensions==4.0.1

43
teste.py Normal file
View File

@ -0,0 +1,43 @@
from vulkan.music.Downloader import Downloader
from vulkan.music.Playlist import Playlist
from vulkan.music.Song import Song
import asyncio
from yt_dlp import YoutubeDL
from vulkan.music.Types import Provider
# Link pego de mix
link1 = 'https://youtu.be/5w61TizfZXY?list=RDMM5w61TizfZXY'
# Video especifico
link2 = 'https://www.youtube.com/watch?v=WFEtDqLLv84&ab_channel=MMAK'
# Link pego de mix
link3 = 'https://www.youtube.com/watch?v=5w61TizfZXY&list=RDMM5w61TizfZXY&ab_channel=CantusFidei'
# Playlist
link4 = 'https://www.youtube.com/playlist?list=PLbbKJHHZR9SgWK6SBOwnTaaQauvhjJaNE'
# Nome
link5 = 'Rumbling'
down = Downloader()
playlist = Playlist()
__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': 5,
'noplaylist': True
}
async def main():
down = Downloader()
link = 'https://youtu.be/5w61TizfZXY?list=RDMM5w61TizfZXY'
infos = await down.extract_info('Rumbling')
song = playlist.add_song('Rumbling', 'Rafael')
await down.preload([song])
print(song.source)
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

View File

@ -59,7 +59,7 @@ class Control(commands.Cog):
if not my_error:
raise error
else:
print(error)
print(f'DEVELOPER NOTE -> Comand Error: {error}')
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.UNKNOWN_ERROR,

View File

@ -1,16 +1,17 @@
import discord
from typing import Dict
from discord import Guild, Client, Embed
from discord.ext import commands
from discord.ext.commands import Context
from config import config
from config import help
from vulkan.music.Player import Player
from vulkan.music.utils import *
from vulkan.music.utils import is_connected
class Music(commands.Cog):
def __init__(self, bot) -> None:
self.__guilds = {}
self.__bot: discord.Client = bot
self.__guilds: Dict[Guild, Player] = {}
self.__bot: Client = bot
@commands.Cog.listener()
async def on_ready(self) -> None:
@ -23,29 +24,29 @@ class Music(commands.Cog):
print(f'Player for guild {guild.name} created')
@commands.Cog.listener()
async def on_guild_join(self, guild) -> None:
async def on_guild_join(self, guild: Guild) -> None:
"""Load a player when joining a guild"""
self.__guilds[guild] = Player(self.__bot, guild)
print(f'Player for guild {guild.name} created')
@commands.Cog.listener()
async def on_guild_remove(self, guild) -> None:
async def on_guild_remove(self, guild: Guild) -> None:
"""Removes the player of the guild if banned"""
if guild in self.__guilds.keys():
self.__guilds.pop(guild, None)
print(f'Player for guild {guild.name} destroyed')
@commands.command(name="play", help=help.HELP_PLAY, description=help.HELP_PLAY_LONG, aliases=['p', 'tocar'])
async def play(self, ctx, *args) -> None:
async def play(self, ctx: Context, *args) -> None:
track = " ".join(args)
requester = ctx.author.name
player = self.__get_player(ctx)
if player == None:
if player is None:
await self.__send_embed(ctx, config.ERROR_TITLE, config.NO_GUILD, 'red')
return
if is_connected(ctx) == None:
if is_connected(ctx) is None:
success = await player.connect(ctx)
if success == False:
await self.__send_embed(ctx, config.IMPOSSIBLE_MOVE, config.NO_CHANNEL, 'red')
@ -54,34 +55,34 @@ class Music(commands.Cog):
await player.play(ctx, track, requester)
@commands.command(name="queue", help=help.HELP_QUEUE, description=help.HELP_QUEUE_LONG, aliases=['q', 'fila'])
async def queue(self, ctx) -> None:
async def queue(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
embed = await player.queue()
await ctx.send(embed=embed)
@commands.command(name="skip", help=help.HELP_SKIP, description=help.HELP_SKIP_LONG, aliases=['s', 'pular'])
async def skip(self, ctx) -> None:
async def skip(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
await player.skip(ctx)
@commands.command(name='stop', help=help.HELP_STOP, description=help.HELP_STOP_LONG, aliases=['parar'])
async def stop(self, ctx) -> None:
async def stop(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
await player.stop()
@commands.command(name='pause', help=help.HELP_PAUSE, description=help.HELP_PAUSE_LONG, aliases=['pausar'])
async def pause(self, ctx) -> None:
async def pause(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
success = await player.pause()
@ -89,9 +90,9 @@ class Music(commands.Cog):
await self.__send_embed(ctx, config.SONG_PLAYER, config.SONG_PAUSED, 'blue')
@commands.command(name='resume', help=help.HELP_RESUME, description=help.HELP_RESUME_LONG, aliases=['soltar'])
async def resume(self, ctx) -> None:
async def resume(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
success = await player.resume()
@ -99,12 +100,12 @@ class Music(commands.Cog):
await self.__send_embed(ctx, config.SONG_PLAYER, config.SONG_RESUMED, 'blue')
@commands.command(name='prev', help=help.HELP_PREV, description=help.HELP_PREV_LONG, aliases=['anterior'])
async def prev(self, ctx) -> None:
async def prev(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
if is_connected(ctx) == None:
if is_connected(ctx) is None:
success = await player.connect(ctx)
if success == False:
await self.__send_embed(ctx, config.IMPOSSIBLE_MOVE, config.NO_CHANNEL, 'red')
@ -113,35 +114,35 @@ class Music(commands.Cog):
await player.play_prev(ctx)
@commands.command(name='history', help=help.HELP_HISTORY, description=help.HELP_HISTORY_LONG, aliases=['historico'])
async def history(self, ctx) -> None:
async def history(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
embed = player.history()
await ctx.send(embed=embed)
@commands.command(name='loop', help=help.HELP_LOOP, description=help.HELP_LOOP_LONG, aliases=['l', 'repeat'])
async def loop(self, ctx, args: str) -> None:
async def loop(self, ctx: Context, args: str) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
description = await player.loop(args)
await self.__send_embed(ctx, config.SONG_PLAYER, description, 'blue')
@commands.command(name='clear', help=help.HELP_CLEAR, description=help.HELP_CLEAR_LONG, aliases=['c', 'limpar'])
async def clear(self, ctx) -> None:
async def clear(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
await player.clear()
@commands.command(name='np', help=help.HELP_NP, description=help.HELP_NP_LONG, aliases=['playing', 'now'])
async def now_playing(self, ctx) -> None:
async def now_playing(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
embed = await player.now_playing()
@ -149,34 +150,34 @@ class Music(commands.Cog):
await ctx.send(embed=embed)
@commands.command(name='shuffle', help=help.HELP_SHUFFLE, description=help.HELP_SHUFFLE_LONG, aliases=['aleatorio'])
async def shuffle(self, ctx) -> None:
async def shuffle(self, ctx: Context) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
description = await player.shuffle()
await self.__send_embed(ctx, config.SONG_PLAYER, description, 'blue')
@commands.command(name='move', help=help.HELP_MOVE, description=help.HELP_MOVE_LONG, aliases=['m', 'mover'])
async def move(self, ctx, pos1, pos2='1') -> None:
async def move(self, ctx: Context, pos1, pos2='1') -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
description = await player.move(pos1, pos2)
await self.__send_embed(ctx, config.SONG_PLAYER, description, 'blue')
@commands.command(name='remove', help=help.HELP_REMOVE, description=help.HELP_REMOVE_LONG, aliases=['remover'])
async def remove(self, ctx, position) -> None:
async def remove(self, ctx: Context, position) -> None:
player = self.__get_player(ctx)
if player == None:
if player is None:
return
else:
description = await player.remove(position)
await self.__send_embed(ctx, config.SONG_PLAYER, description, 'blue')
@commands.command(name='reset', help=help.HELP_RESET, description=help.HELP_RESET_LONG, aliases=['resetar'])
async def reset(self, ctx) -> None:
async def reset(self, ctx: Context) -> None:
player = self.__get_player(ctx)
try:
await player.force_stop()
@ -191,20 +192,20 @@ class Music(commands.Cog):
player = self.__get_player(ctx)
print(f'Player for guild {ctx.guild} created')
async def __send_embed(self, ctx, title='', description='', colour='grey') -> None:
async def __send_embed(self, ctx: Context, title='', description='', colour='grey') -> None:
try:
colour = config.COLOURS[colour]
except:
colour = config.COLOURS['grey']
embedvc = discord.Embed(
embedvc = Embed(
title=title,
description=description,
colour=colour
)
await ctx.send(embed=embedvc)
async def __clean_messages(self, ctx) -> None:
async def __clean_messages(self, ctx: Context) -> None:
last_messages = await ctx.channel.history(limit=5).flatten()
for message in last_messages:
@ -219,7 +220,7 @@ class Music(commands.Cog):
except:
continue
def __get_player(self, ctx) -> Player:
def __get_player(self, ctx: Context) -> Player:
try:
return self.__guilds[ctx.guild]
except:

View File

@ -0,0 +1,3 @@
class Database:
def __init__(self) -> None:
pass

View File

@ -1,75 +1,111 @@
import asyncio
import concurrent.futures
from typing import List
from config import config
from yt_dlp import YoutubeDL
from yt_dlp.utils import ExtractorError, DownloadError
from concurrent.futures import ThreadPoolExecutor
from vulkan.music.Song import Song
from vulkan.music.utils import is_url
from vulkan.music.utils import is_url, run_async
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': False,
'playlistend': config.MAX_PLAYLIST_LENGTH,
'quiet': True
}
__YDL_OPTIONS_EXTRACT = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': config.MAX_PLAYLIST_LENGTH,
'quiet': True
}
__YDL_OPTIONS_FORCE_EXTRACT = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': False,
'playlistend': config.MAX_PLAYLIST_LENGTH,
'quiet': True
}
__BASE_URL = 'https://www.youtube.com/watch?v={}'
def __init__(self) -> None:
self.__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': config.MAX_PLAYLIST_LENGTH,
}
self.__music_keys_only = ['resolution', 'fps', 'quality']
self.__not_extracted_keys_only = ['ie_key']
self.__not_extracted_not_keys = ['entries']
self.__playlist_keys = ['entries']
def download_one(self, song: Song) -> Song:
async def finish_one_song(self, song: Song) -> Song:
"""Receives a song object, finish his download and return it"""
if song.identifier == None:
return None
if is_url(song.identifier): # Youtube URL
if is_url(song.identifier):
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Destroy the music with problems
return None
else:
song.finish_down(song_info)
return song
song_info = await self.__download_title(song.identifier)
def extract_youtube_link(self, playlist_url: str) -> list:
song.finish_down(song_info)
return song
async def preload(self, songs: List[Song]) -> None:
"""Download the full info of the songs objects"""
for song in songs:
asyncio.ensure_future(self.__download_song(song))
@run_async
def extract_info(self, url: str) -> List[dict]:
"""Extract all songs direct URL from a Youtube Link
Arg: Url String
Return: List with the direct youtube URL of each song
"""
if is_url(playlist_url): # If Url
options = self.__YDL_OPTIONS
options['extract_flat'] = True
if is_url(url): # If Url
options = Downloader.__YDL_OPTIONS_EXTRACT
options['extract_flat'] = False
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(playlist_url, download=False)
songs_identifiers = []
print('Normal Extraction')
print('A')
extracted_info = ydl.extract_info(url, download=False)
print('B')
if self.__failed_to_extract(extracted_info):
print('Forcing Extraction')
extracted_info = self.__get_forced_extracted_info(url)
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
songs_identifiers.append(
f"https://www.youtube.com/watch?v={entry['id']}")
if self.__is_music(extracted_info):
print('Is Music')
return [extracted_info['original_url']]
else: # Or a single music
songs_identifiers.append(result['original_url'])
elif self.__is_multiple_musics(extracted_info):
print('Multiple Musics')
songs = []
for song in extracted_info['entries']:
songs.append(self.__BASE_URL.format(song['id']))
return songs
return songs_identifiers # Return a list
except (ExtractorError, DownloadError) as e:
else: # Failed to extract the songs
print(f'DEVELOPER NOTE -> Failed to Extract URL {url}')
return []
except Exception as e:
print(f'DEVELOPER NOTE -> Error Extracting Music: {e}')
return None
else:
return None
return []
async def preload(self, songs: list) -> None:
"""Download the full info of the song object"""
for song in songs:
asyncio.ensure_future(self.__download_songs(song))
def __get_forced_extracted_info(self, url: str) -> list:
options = Downloader.__YDL_OPTIONS_FORCE_EXTRACT
with YoutubeDL(options) as ydl:
try:
extracted_info = ydl.extract_info(url, download=False)
return extracted_info
except Exception as e:
print(f'DEVELOPER NOTE -> Error Forcing Extract Music: {e}')
return []
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
@ -77,62 +113,79 @@ class Downloader():
Arg: URL from Youtube
Return: Dict with the full youtube information of the music, including source to play it
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = False
options = Downloader.__YDL_OPTIONS
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
return result
except (ExtractorError, DownloadError) as e: # Any type of error in download
except Exception as e: # Any type of error in download
print(f'DEVELOPER NOTE -> Error Downloading URL {e}')
return None
async def __download_songs(self, song: Song) -> None:
async def __download_song(self, song: Song) -> None:
"""Download a music object asynchronously"""
if song.source != None: # If Music already preloaded
return
if song.source is not None: # If Music already preloaded
return None
def download_song(song):
if is_url(song.identifier): # Youtube URL
def __download_func(song: Song) -> None:
if is_url(song.identifier):
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
else:
song.finish_down(song_info)
song_info = self.__download_title(song.identifier)
song.finish_down(song_info)
# Creating a loop task to download each song
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.MAX_PRELOAD_SONGS
)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
executor = ThreadPoolExecutor(max_workers=config.MAX_PRELOAD_SONGS)
fs = {loop.run_in_executor(executor, __download_func, song)}
await asyncio.wait(fs=fs, return_when=asyncio.ALL_COMPLETED)
@run_async
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
Arg: Music Name
Return: A dict containing the song information
"""
if type(title) != str:
return None
config = self.__YDL_OPTIONS
config['extract_flat'] = False
with YoutubeDL(self.__YDL_OPTIONS) as ydl:
options = Downloader.__YDL_OPTIONS
with YoutubeDL(options) as ydl:
try:
search = f"ytsearch:{title}"
result = ydl.extract_info(search, download=False)
search = f'ytsearch:{title}'
extracted_info = ydl.extract_info(search, download=False)
if result == None:
return None
if self.__failed_to_extract(extracted_info):
self.__get_forced_extracted_info(extracted_info)
# Return a dict with the full info of first music
return result['entries'][0]
if self.__is_multiple_musics(extracted_info):
return extracted_info['entries'][0]
else:
print(f'DEVELOPER NOTE -> Failed to extract title {title}')
return {}
except Exception as e:
return None
print(f'DEVELOPER NOTE -> Error downloading title {title}: {e}')
return {}
def __is_music(self, extracted_info: dict) -> bool:
for key in self.__music_keys_only:
if key not in extracted_info.keys():
return False
return True
def __is_multiple_musics(self, extracted_info: dict) -> bool:
for key in self.__playlist_keys:
if key not in extracted_info.keys():
return False
return True
def __failed_to_extract(self, extracted_info: dict) -> bool:
if type(extracted_info) is not dict:
return False
for key in self.__not_extracted_keys_only:
if key not in extracted_info.keys():
return False
for key in self.__not_extracted_not_keys:
if key in extracted_info.keys():
return False
return True

View File

@ -1,8 +1,8 @@
import discord
from discord.ext import commands
from config import config
import datetime
from discord import Client, Guild, FFmpegPCMAudio, Embed
from discord.ext.commands import Context
from datetime import timedelta
from vulkan.music.Downloader import Downloader
from vulkan.music.Playlist import Playlist
from vulkan.music.Searcher import Searcher
@ -12,24 +12,22 @@ from vulkan.music.utils import *
class Player(commands.Cog):
def __init__(self, bot, guild):
def __init__(self, bot: Client, guild: Guild):
self.__searcher: Searcher = Searcher()
self.__down: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__guild: discord.Guild = guild
self.__bot: Client = bot
self.__guild: Guild = guild
self.__timer = Timer(self.__timeout_handler)
self.__playing = False
# Flag to control if the player should stop totally the playing
self.__force_stop = False
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
async def connect(self, ctx) -> bool:
async def connect(self, ctx: Context) -> bool:
if not ctx.author.voice:
return False
@ -37,10 +35,10 @@ class Player(commands.Cog):
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return True
def __play_next(self, error, ctx) -> None:
def __play_next(self, error, ctx: Context) -> None:
if self.__force_stop: # If it's forced to stop player
self.__force_stop = False
return
return None
song = self.__playlist.next_song()
@ -50,7 +48,7 @@ class Player(commands.Cog):
else:
self.__playing = False
async def __play_music(self, ctx, song: Song) -> None:
async def __play_music(self, ctx: Context, song: Song) -> None:
try:
source = self.__ensure_source(song)
if source == None:
@ -58,7 +56,7 @@ class Player(commands.Cog):
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
@ -72,30 +70,29 @@ class Player(commands.Cog):
except:
self.__play_next(None, ctx)
async def play(self, ctx, track=str, requester=str) -> str:
async def play(self, ctx: Context, track: str, requester: str) -> str:
try:
songs_names, provider = self.__searcher.search(track)
if provider == Provider.Unknown or songs_names == None:
embed = discord.Embed(
links, provider = self.__searcher.search(track)
if provider == Provider.Unknown or links == None:
embed = Embed(
title=config.ERROR_TITLE,
description=config.INVALID_INPUT,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
return None
elif provider == Provider.YouTube:
songs_names = self.__down.extract_youtube_link(songs_names[0])
if provider == Provider.YouTube:
links = await self.__down.extract_info(links[0])
songs_quant = 0
for name in songs_names:
song = self.__playlist.add_song(name, requester)
songs_quant += 1
songs_quant = len(links)
for info in links:
song = self.__playlist.add_song(info, requester)
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
except:
embed = discord.Embed(
except Exception as e:
print(f'DEVELOPER NOTE -> Error while Downloading in Player: {e}')
embed = Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
@ -103,18 +100,18 @@ class Player(commands.Cog):
return
if songs_quant == 1:
song = self.__down.download_one(song)
song = await self.__down.finish_one_song(song)
pos = len(self.__playlist)
if song == None:
embed = discord.Embed(
if song.problematic:
embed = Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
return None
elif not self.__playing:
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.SONG_ADDED.format(song.title),
colour=config.COLOURS['blue'])
@ -123,7 +120,7 @@ class Player(commands.Cog):
embed = self.__format_embed(song.info, config.SONG_ADDED_TWO, pos)
await ctx.send(embed=embed)
else:
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.SONGS_ADDED.format(songs_quant),
colour=config.COLOURS['blue'])
@ -133,20 +130,20 @@ class Player(commands.Cog):
first_song = self.__playlist.next_song()
await self.__play_music(ctx, first_song)
async def play_prev(self, ctx) -> None:
async def play_prev(self, ctx: Context) -> None:
"""Stop the currently playing cycle, load the previous song and play"""
if self.__playlist.looping_one or self.__playlist.looping_all: # Do not allow play if loop
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.LOOP_ON,
colour=config.COLOURS['blue']
)
await ctx.send(embed=embed)
return
return None
song = self.__playlist.prev_song() # Prepare the prev song to play again
if song == None:
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.NOT_PREVIOUS,
colour=config.COLOURS['blue']
@ -161,7 +158,7 @@ class Player(commands.Cog):
await self.__play_music(ctx, song)
async def queue(self) -> discord.Embed:
async def queue(self) -> Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = config.ONE_SONG_LOOPING
@ -191,7 +188,7 @@ class Player(commands.Cog):
song_name = song.title if song.title else config.SONG_DOWNLOADING
text += f"**`{pos}` - ** {song_name} - `{format_time(song.duration)}`\n"
embed = discord.Embed(
embed = Embed(
title=title,
description=text,
colour=config.COLOURS['blue']
@ -199,9 +196,9 @@ class Player(commands.Cog):
return embed
async def skip(self, ctx) -> bool:
async def skip(self, ctx: Context) -> bool:
if self.__playlist.looping_one:
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.LOOP_ON,
colour=config.COLOURS['blue']
@ -215,7 +212,7 @@ class Player(commands.Cog):
else:
return False
def history(self) -> discord.Embed:
def history(self) -> Embed:
history = self.__playlist.songs_history
if len(history) == 0:
@ -226,7 +223,7 @@ class Player(commands.Cog):
for pos, song in enumerate(history, start=1):
text += f"**`{pos}` - ** {song.title} - `{format_time(song.duration)}`\n"
embed = discord.Embed(
embed = Embed(
title=config.HISTORY_TITLE,
description=text,
colour=config.COLOURS['blue']
@ -234,7 +231,7 @@ class Player(commands.Cog):
return embed
async def stop(self) -> bool:
if self.__guild.voice_client == None:
if self.__guild.voice_client is None:
return False
if self.__guild.voice_client.is_connected():
@ -246,6 +243,9 @@ class Player(commands.Cog):
async def force_stop(self) -> None:
try:
if self.__guild.voice_client is None:
return
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
self.__playlist.clear()
@ -288,9 +288,9 @@ class Player(commands.Cog):
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> discord.Embed:
async def now_playing(self) -> Embed:
if not self.__playing:
embed = discord.Embed(
embed = Embed(
title=config.SONG_PLAYER,
description=config.PLAYER_NOT_PLAYING,
colour=config.COLOURS['blue']
@ -348,9 +348,9 @@ class Player(commands.Cog):
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info: dict, title='', position='Playing Now') -> discord.Embed:
def __format_embed(self, info: dict, title='', position='Playing Now') -> Embed:
"""Configure the embed to show the song information"""
embedvc = discord.Embed(
embedvc = Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=config.COLOURS['blue']
@ -368,7 +368,7 @@ class Player(commands.Cog):
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
duration = str(timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=True)

View File

@ -1,4 +1,5 @@
from collections import deque
from typing import List
from config import config
import random
@ -35,7 +36,7 @@ class Playlist(IPlaylist):
return self.__current
@property
def songs_to_preload(self) -> list:
def songs_to_preload(self) -> List[Song]:
return list(self.__queue)[:config.MAX_PRELOAD_SONGS]
def __len__(self) -> int:
@ -68,7 +69,6 @@ class Playlist(IPlaylist):
return None
self.__current = self.__queue.popleft()
return self.__current
def prev_song(self) -> Song:

View File

@ -15,22 +15,22 @@ class Searcher():
Arg -> User Input, a string with the
Return -> A list of musics names and Provider Type
"""
url_type = self.__identify_source(music)
provider = self.__identify_source(music)
if url_type == Provider.YouTube:
if provider == Provider.YouTube:
return [music], Provider.YouTube
elif url_type == Provider.Spotify:
elif provider == Provider.Spotify:
if self.__Spotify.connected == True:
musics = self.__Spotify.search(music)
return musics, Provider.Name
else:
return [], Provider.Unknown
elif url_type == Provider.Name:
elif provider == Provider.Name:
return [music], Provider.Name
elif url_type == Provider.Unknown:
elif provider == Provider.Unknown:
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
@ -38,11 +38,10 @@ class Searcher():
if not is_url(music):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music:
if "https://www.youtu" in music or "https://youtu.be" in music or "https://music.youtube" in music:
return Provider.YouTube
if "https://open.spotify.com" in music:
return Provider.Spotify
# If no match
return Provider.Unknown

View File

@ -13,16 +13,24 @@ class Song(ISong):
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
self.__usefull_keys = ['duration',
'title', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail', 'original_url']
self.__required_keys = ['url']
for key in self.__required_keys:
if key in info:
self.__info[key] = info[key]
else:
print(f'DEVELOPER NOTE -> {key} not found in info of music: {self.identifier}')
self.destroy()
for key in self.__usefull_keys:
try:
if key in info:
self.__info[key] = info[key]
except Exception as e:
raise e
else:
print(f'DEVELOPER NOTE -> {key} not found in info of music: {self.identifier}')
@property
def source(self) -> str:
@ -58,6 +66,7 @@ class Song(ISong):
def destroy(self) -> None:
"""Mark this song with problems and removed from the playlist due to any type of error"""
print(f'DEVELOPER NOTE -> Music self destroying {self.__identifier}')
self.__problematic = True
self.__playlist.destroy_song(self)

View File

@ -1,6 +1,7 @@
import re
import asyncio
from config import config
from functools import wraps, partial
def is_connected(ctx):
@ -53,3 +54,13 @@ class Timer:
def cancel(self):
self.__task.cancel()
def run_async(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
partial_func = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, partial_func)
return run