Merge pull request #6 from RafaelSolVargas/fixing-spotify-bug

Adding new Adming Module and Stylish update
This commit is contained in:
Rafael Vargas 2022-01-01 14:32:37 -04:00 committed by GitHub
commit aa4bb00745
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 250 additions and 204 deletions

View File

@ -12,12 +12,13 @@ PHRASES_API = dotenv_values('.env')['PHRASES_API']
BOT_PREFIX = '!'
INITIAL_EXTENSIONS = {'vulkanbot.commands.Phrases', 'vulkanbot.commands.Warframe',
'vulkanbot.general.Filter', 'vulkanbot.general.Control', 'vulkanbot.music.Music',
'vulkanbot.commands.Random'}
'vulkanbot.commands.Random', 'vulkanbot.general.Admin'}
STARTUP_MESSAGE = 'Starting Vulkan...'
STARTUP_COMPLETE_MESSAGE = 'Vulkan is now operating.'
FFMPEG_PATH = 'C:/ffmpeg/bin/ffmpeg.exe'
MAX_PLAYLIST_LENGTH = 50
MAX_API_PHRASES_TRIES = 10
@ -26,9 +27,31 @@ MAX_API_CAMBION_TRIES = 10
MAX_API_FISSURES_TRIES = 10
MAX_PRELOAD_SONGS = 10
TRASH_CHANNEL_ID = 919961802140450837
SONGINFO_UPLOADER = "Uploader: "
SONGINFO_DURATION = "Duration: "
HELP_SKIP = 'Skip the current playing song'
HELP_RESUME = 'Resumes the song player'
HELP_CLEAR = 'Clear the queue'
HELP_STOP = 'Stop the song player, removing Vulkan from voice channel'
HELP_LOOP = '(one/all/off) - Control the loop of songs'
HELP_NP = 'Show the info of the current song'
HELP_QUEUE = f'Show the first {MAX_PRELOAD_SONGS} songs in queue'
HELP_PAUSE = 'Pauses the song player'
HELP_SHUFFLE = 'Shuffle the songs playing'
HELP_PLAY = '(title/youtube/spotify) - Plays a song'
HELP_MOVE = '(x, y) - Moves a song from position x to y in queue'
HELP_WARFRAME = f'({BOT_PREFIX}warframe help for more) - Return warframe information'
HELP_RANDOM = '(x) - Return a random number between 1 and x'
HELP_ESCOLHA = '(x, y, z...) - Choose randomly one item passed in the command'
HELP_CARA = 'Return cara or coroa'
HELP_DROP = '(user_name) - Try to remove the user from the current voice channel'
HELP_FRASE = "Send a randomly phrase, there's a chance of getting the braba"
HELP_HELP = 'This command :)'
ABSOLUTE_PATH = ''
COOKIE_PATH = '/config/cookies/cookies.txt'
@ -39,3 +62,14 @@ COLOURS = {
'grey': 0x708090,
'blue': 0x3498DB
}
MEMBERS_MAXIMUM_DROPS = {
'RafaelV': 1,
'Gassu': 2,
'LopesZ3R4': 4,
'BABIGIRL': 4,
'Hijãomi': 2,
'Jillian': 4,
'Maxymuns': 2
}

View File

@ -22,7 +22,7 @@ if __name__ == '__main__':
print("Error: No bot token!")
exit()
bot.log_error = ErrorHandler('errors') # Creating the error handler
bot.log_error = ErrorHandler('errors')
for extension in config.INITIAL_EXTENSIONS:
try:

5
teste.py Normal file
View File

@ -0,0 +1,5 @@
x = {
'rafael': 5
}
print(x['Tteste'])

View File

@ -8,8 +8,8 @@ MAXIMUM_TRIES = 10
class ErrorHandler():
"""Receive errors to write into a log file
Arg:
folder_path = Relative path from root to where the logs should be create
Arg:
folder_path = Relative path from root to where the logs should be create
"""
def __init__(self, folder_path='') -> None:

View File

@ -1,3 +1,4 @@
from discord.client import Client
import requests
import json
from config import config
@ -8,20 +9,12 @@ from random import random as rand
class Phrases(commands.Cog):
"""Deal with the generation of motivational phrases"""
def __init__(self, bot):
def __init__(self, bot: Client):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.command(name='frase', help='Envia uma frase pica, talvez a braba')
@commands.command(name='frase', help=config.HELP_FRASE)
async def phrase(self, ctx):
# There is a chance that the phrase will be send for the dev
"""Send some phrase to the requester"""
secret = await self.__calculate_rgn()
if secret != None:
await ctx.send(secret)
@ -30,6 +23,7 @@ class Phrases(commands.Cog):
await ctx.send(phrase)
async def __calculate_rgn(self):
"""Calculate the chance from the phrase function return a secret custom message"""
x = rand()
if x < 0.15:
return config.SECRET_MESSAGE
@ -37,6 +31,7 @@ class Phrases(commands.Cog):
return None
async def __get_phrase(self):
"""Get the phrase from the server"""
tries = 0
while True:
tries += 1
@ -50,7 +45,7 @@ class Phrases(commands.Cog):
phrase = data['quoteText']
author = data['quoteAuthor']
if phrase == '' or author == '': # Don't accept incomplete phrases
if phrase == '' or author == '':
continue
text = f'{phrase} \nBy: {author}'

View File

@ -3,14 +3,13 @@ import discord
from discord.ext import commands
from config import config
class Random(commands.Cog):
"""Deal with returning random things"""
def __init__(self, bot):
self.__bot = bot
@commands.command(name='random', help='Número aleatório de 1 a X')
@commands.command(name='random', help=config.HELP_RANDOM)
async def random(self, ctx, arg: str):
try:
arg = int(arg)
@ -38,7 +37,7 @@ class Random(commands.Cog):
)
await ctx.send(embed=embed)
@commands.command(name='cara', help='coroa')
@commands.command(name='cara', help=config.HELP_CARA)
async def cara(self, ctx):
x = random()
if x < 0.5:
@ -53,7 +52,7 @@ class Random(commands.Cog):
)
await ctx.send(embed=embed)
@commands.command(name='escolha', help='Escolhe um dos itens, separador: Vírgula')
@commands.command(name='escolha', help=config.HELP_ESCOLHA)
async def escolher(self, ctx, *args: str):
try:
user_input = " ".join(args)

View File

@ -13,13 +13,14 @@ class Warframe(commands.Cog):
self.__bot = bot
self.__open_functions = ['cetus', 'cambion', 'fissures']
@commands.command(name='warframe', help='<x> - Retorna informações de x')
async def warframe(self, ctx, arg):
@commands.command(name='warframe', help=config.HELP_WARFRAME)
async def warframe(self, ctx, arg) -> Embed:
if arg in self.__open_functions:
function = getattr(Warframe, f'_Warframe__{arg}') # Get the required function
embed = await function(self) # Execute the function passing self
# Get the required function
function = getattr(Warframe, f'_Warframe__{arg}')
embed = await function(self) # Execute the function passing self
await ctx.send(embed=embed) # Return the result
await ctx.send(embed=embed) # Return the result
else:
info = f'Warframe commands: {self.__open_functions}'
@ -29,7 +30,7 @@ class Warframe(commands.Cog):
colour=config.COLOURS['blue']
)
await ctx.send(embed=embed)
async def __cetus(self) -> Embed:
description = await self.__get_cetus()
embed = discord.Embed(
@ -39,7 +40,7 @@ class Warframe(commands.Cog):
)
return embed
async def __get_cetus(self):
async def __get_cetus(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
@ -56,7 +57,7 @@ class Warframe(commands.Cog):
except Exception as e:
continue
async def __cambion(self) -> Embed:
description = await self.__get_cambion()
embed = discord.Embed(
@ -66,7 +67,7 @@ class Warframe(commands.Cog):
)
return embed
async def __get_cambion(self):
async def __get_cambion(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
@ -79,7 +80,7 @@ class Warframe(commands.Cog):
data = json.loads(response.content)
info = f'**Active:** {data["active"]}\n**Time Left:** {data["timeLeft"]}'
return info
except Exception as e:
print(e)
@ -94,7 +95,7 @@ class Warframe(commands.Cog):
)
return embed
async def __get_fissures(self):
async def __get_fissures(self) -> str:
"""Return the information of the Warframe API"""
tries = 0
while True:
@ -105,11 +106,11 @@ class Warframe(commands.Cog):
try:
response = requests.get(config.FISSURES_API)
data = json.loads(response.content)
info = ''
for pos, fissure in enumerate(data, start=1):
info += f'`{pos}` - **Mission:** {fissure["missionType"]} | **Type:** {fissure["tier"]} | **Timing:** {fissure["eta"]} | **Storm:** {fissure["isStorm"]}\n'
return info
except Exception as e:
print(e)

View File

@ -0,0 +1,51 @@
from discord.ext import commands
from discord import Member, Client
from config import config
class Admin(commands.Cog):
"""Deal with administration of users in server"""
def __init__(self, bot: Client):
self.__bot = bot
@commands.command(name='drop', help='Manda um membro para a Terapia')
async def drop(self, ctx, name):
user: Member = None
guild = ctx.guild
for member in guild.members:
if member.name == name:
user = member
break
if user == None:
await ctx.send(f'{name} não foi encontrado, utilize o nome de usuário ao invés do apelido do servidor')
return
permanent_drops = False
maximum_drops = None
try:
maximum_drops = config.MEMBERS_MAXIMUM_DROPS[user.name]
except KeyError:
permanent_drops = True
except Exception as e:
await ctx.send('Houve algum erro :/')
return
if maximum_drops == 0:
await ctx.send(f'{user.name} já foi dropado várias vezes, larga o cara bicho')
return
if user.voice == None:
await ctx.send(f'{user.name} precisa estar conectado a um canal de voz antes')
else:
await user.move_to(None) # Remove from voice
if not permanent_drops:
# Att the life of user
config.MEMBERS_MAXIMUM_DROPS[user.name] -= 1
def setup(bot):
bot.add_cog(Admin(bot))

View File

@ -1,4 +1,5 @@
import discord
from discord import Client
from discord.ext.commands.errors import CommandNotFound, MissingRequiredArgument
from discord.ext import commands
from config import config
@ -7,24 +8,16 @@ from config import config
class Control(commands.Cog):
"""Control the flow of the Bot"""
def __init__(self, bot):
def __init__(self, bot: Client):
self.__bot = bot
self.__comandos = {
'MUSIC': ['resume', 'pause', 'loop', 'stop', 'skip', 'play', 'queue', 'clear', 'np', 'shuffle', 'move'],
'WARFRAME': ['warframe'],
'RANDOM': ['escolha', 'cara', 'random'],
'HELP': ['help'],
'OTHERS': ['frase']
'OTHERS': ['frase', 'drop']
}
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.Cog.listener()
async def on_ready(self):
print(config.STARTUP_MESSAGE)
@ -38,9 +31,10 @@ class Control(commands.Cog):
elif isinstance(error, CommandNotFound):
await ctx.channel.send(f'O comando não existe')
else:
await ctx.channel.send(f'Teve um erro aí bicho')
raise error
@commands.command(name="help", alisases=['ajuda'], help="Comando de ajuda")
@commands.command(name="help", alisases=['ajuda'], help=config.HELP_HELP)
async def help_msg(self, ctx):
helptxt = ''
help_music = '-- MUSIC\n'

View File

@ -1,20 +1,13 @@
from discord import Client
from discord.ext import commands
class Filter(commands.Cog):
"""Deal with filtering of discord messages"""
def __init__(self, bot):
def __init__(self, bot: Client):
self.__bot = bot
@property
def bot(self):
return self.__bot
@bot.setter
def bot(self, newBot):
self.__bot = newBot
@commands.Cog.listener()
async def on_message(self, message):
if message.author == self.__bot.user:

View File

@ -8,6 +8,7 @@ from yt_dlp.utils import ExtractorError, DownloadError
from vulkanbot.music.Song import Song
from vulkanbot.music.utils import is_url
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
@ -25,18 +26,18 @@ class Downloader():
print('Invalid song identifier type')
return
if is_url(song.identifier): # Youtube URL
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Destroy the music with problems
song.destroy() # Destroy the music with problems
return None
else:
song.finish_down(song_info)
return song
def extract_youtube_link(self, playlist_url: str) -> list:
"""Extract all songs direct URL from a Youtube Link
@ -54,7 +55,8 @@ class Downloader():
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
songs_identifiers.append(f"https://www.youtube.com/watch?v={entry['id']}")
songs_identifiers.append(
f"https://www.youtube.com/watch?v={entry['id']}")
else: # Or a single music
songs_identifiers.append(result['original_url'])
@ -71,7 +73,7 @@ class Downloader():
"""Download the full info of the song object"""
for song in songs:
asyncio.ensure_future(self.__download_songs(song))
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
@ -89,28 +91,29 @@ class Downloader():
except (ExtractorError, DownloadError) as e: # Any type of error in download
print(e)
async def __download_songs(self, song: Song):
if song.source != None: # If Music already preloaded
async def __download_songs(self, song: Song) -> None:
"""Download a music object asynchronously"""
if song.source != None: # If Music already preloaded
return
def download_song(song):
if is_url(song.identifier): # Youtube URL
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
else:
song.finish_down(song_info)
# Creating a loop task to download each song
# Creating a loop task to download each song
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.MAX_PRELOAD_SONGS
)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
@ -118,9 +121,9 @@ class Downloader():
Arg: Music Name
Return: A dict containing the song information
"""
if type(title) != str:
if type(title) != str:
print('Invalid music identifier type')
return
return
config = self.__YDL_OPTIONS
config['extract_flat'] = False
@ -131,9 +134,9 @@ class Downloader():
result = ydl.extract_info(search, download=False)
if result == None:
return
return
# Return a dict with the full info of first music
return result['entries'][0]
return result['entries'][0]
except Exception as e:
print(e)

View File

@ -4,7 +4,6 @@ from abc import ABC, abstractproperty, abstractmethod
class IPlaylist(ABC):
"""Class to manage and control the songs to play and played"""
@abstractproperty
def looping_one(self):
pass
@ -75,17 +74,16 @@ class ISong(ABC):
@abstractmethod
def title(self) -> str:
pass
pass
@abstractmethod
def duration(self) -> str:
pass
@abstractmethod
def identifier(self) -> str:
def identifier(self) -> str:
pass
@abstractmethod
def destroy(self) -> None:
pass

View File

@ -17,12 +17,12 @@ class Music(commands.Cog):
self.__bot: discord.Client = bot
self.__playing = False
self.__ffmpeg = 'C:/ffmpeg/bin/ffmpeg.exe'
self.__vc = "" # Objeto voice_bot do discord
self.__vc = ""
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'executable': self.__ffmpeg,
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'}
self.FFMPEG_OPTIONS = {'executable': config.FFMPEG_PATH,
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
def __play_next(self, error, ctx):
while True:
@ -30,7 +30,7 @@ class Music(commands.Cog):
source = self.__playlist.next_song()
if source == None: # If there is not a source for the song
continue
coro = self.__play_music(ctx, source)
self.__bot.loop.create_task(coro)
break
@ -49,7 +49,7 @@ class Music(commands.Cog):
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
@commands.command(name="play", help="Toca música - YouTube/Spotify/Título", aliases=['p', 'tocar'])
@commands.command(name="play", help=config.HELP_PLAY, aliases=['p', 'tocar'])
async def play(self, ctx, *args):
user_input = " ".join(args)
@ -64,68 +64,71 @@ class Music(commands.Cog):
else:
songs_quant = 0
musics_identifiers, provider = self.__searcher.search(user_input)
if provider == Provider.Unknown: # If type not identified
if provider == Provider.Unknown: # If type not identified
await self.__send_embed(ctx, description='Entrada inválida, tente algo melhor', colour_name='blue')
return
if provider == Provider.YouTube: # If youtube source
musics_identifiers = self.__downloader.extract_youtube_link(musics_identifiers[0])
if provider == Provider.YouTube: # If youtube source
musics_identifiers = self.__downloader.extract_youtube_link(
musics_identifiers[0])
for identifier in musics_identifiers: # Creating songs
for identifier in musics_identifiers: # Creating songs
last_song = self.__playlist.add_song(identifier)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(last_song) # Download the new music
if song == None: # If song not downloaded
if songs_quant == 1: # If only one music downloaded
song = self.__downloader.download_one(
last_song) # Download the new music
if song == None: # If song not downloaded
await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue')
elif not self.__playing : # If not playing
elif not self.__playing: # If not playing
await self.__send_embed(ctx, description=f'Você adicionou a música **{song.title}** à playlist', colour_name='blue')
else: # If playing
else: # If playing
await ctx.send(embed=song.embed(title='Song added to Queue'))
else:
await self.__send_embed(ctx, description=f"Você adicionou {songs_quant} músicas à fila!", colour_name='blue')
if not self.__playing:
try_another = True
while try_another:
first = self.__playlist.next_song()
if first == None:
await self.__send_embed(ctx, description='Houve um problema no download dessa música, tente novamente', colour_name='blue')
break
while True:
if first.source != None: # If song got downloaded
try_another = False
break
if first.problematic: # If song got any error, try another one
while True:
if first.source != None: # If song got downloaded
try_another = False
break
else: # The song is downloading, checking another time
if first.problematic: # If song got any error, try another one
break
else: # The song is downloading, checking another time
continue
if first != None:
await self.__play_music(ctx, first)
@commands.command(name="queue", help="Mostra as atuais músicas da fila.", aliases=['q', 'fila'])
@commands.command(name="queue", help=config.HELP_QUEUE, aliases=['q', 'fila'])
async def queue(self, ctx):
if self.__playlist.looping_one: # If Repeating one
await self.now_playing(ctx)
return
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0) for song in songs_preload])) # Sum the duration
songs_preload = self.__playlist.songs_to_preload
await self.__downloader.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
@ -141,12 +144,12 @@ class Music(commands.Cog):
else: # No music
await self.__send_embed(ctx, description='There is not musics in queue.', colour_name='red')
@commands.command(name="skip", help="Pula a atual música que está tocando.", aliases=['pular'])
@commands.command(name="skip", help=config.HELP_SKIP, aliases=['pular'])
async def skip(self, ctx):
if len(self.__bot.voice_clients) > 0:
self.__vc.stop()
@commands.command(name='stop', help='Para de tocar músicas')
@commands.command(name='stop', help=config.HELP_STOP)
async def stop(self, ctx):
if self.__vc == '':
return
@ -155,7 +158,7 @@ class Music(commands.Cog):
self.__vc.stop()
await self.__vc.disconnect()
@commands.command(name='pause', help='Pausa a música')
@commands.command(name='pause', help=config.HELP_PAUSE)
async def pause(self, ctx):
if self.__vc == '':
return
@ -163,7 +166,7 @@ class Music(commands.Cog):
self.__vc.pause()
await self.__send_embed(ctx, description='Música pausada', colour_name='green')
@commands.command(name='resume', help='Solta a música atual')
@commands.command(name='resume', help=config.HELP_RESUME)
async def resume(self, ctx):
if self.__vc == '':
return
@ -171,7 +174,7 @@ class Music(commands.Cog):
self.__vc.resume()
await self.__send_embed(ctx, description='Música tocando', colour_name='green')
@commands.command(name='loop', help='Controla a repetição de músicas')
@commands.command(name='loop', help=config.HELP_LOOP)
async def loop(self, ctx, args: str):
args = args.lower()
if args == 'one':
@ -185,11 +188,11 @@ class Music(commands.Cog):
await self.__send_embed(ctx, description=description, colour_name='grey')
@commands.command(name='clear', help='Limpa a fila de músicas a tocar')
@commands.command(name='clear', help=config.HELP_CLEAR)
async def clear(self, ctx):
self.__playlist.clear()
@commands.command(name='np', help='Mostra a música que está tocando no instante')
@commands.command(name='np', help=config.HELP_NP)
async def now_playing(self, ctx):
if self.__playlist.looping_one:
title = 'Song Looping Now'
@ -199,6 +202,35 @@ class Music(commands.Cog):
current_song = self.__playlist.current
await ctx.send(embed=current_song.embed(title=title))
@commands.command(name='shuffle', help=config.HELP_SHUFFLE)
async def shuffle(self, ctx):
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await self.__send_embed(ctx, description='Musicas embaralhadas', colour_name='blue')
@commands.command(name='move', help=config.HELP_MOVE)
async def move(self, ctx, pos1, pos2='1'):
try:
pos1 = int(pos1)
pos2 = int(pos2)
except Exception as e:
print(e)
await ctx.send('This command require a number')
return
success, reason = self.__playlist.move_songs(pos1, pos2)
if not success:
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await ctx.send(reason)
else:
await ctx.send(reason)
async def __send_embed(self, ctx, title='', description='', colour_name='grey'):
try:
colour = config.COLOURS[colour_name]
@ -212,34 +244,6 @@ class Music(commands.Cog):
)
await ctx.send(embed=embedvc)
@commands.command(name='shuffle', help='Altera aleatoriamente a ordem das músicas')
async def shuffle(self, ctx):
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await self.__send_embed(ctx, description='Musicas embaralhadas', colour_name='blue')
@commands.command(name='move', help='Manda uma música de uma posição para outra <from> <to>')
async def move(self, ctx, pos1, pos2='1'):
try:
pos1 = int(pos1)
pos2 = int(pos2)
except Exception as e:
print(e)
await ctx.send('This command require a number')
return
success, reason = self.__playlist.move_songs(pos1, pos2)
if not success:
songs = self.__playlist.songs_to_preload
await self.__downloader.preload(songs)
await ctx.send(reason)
else:
await ctx.send(reason)
def setup(bot):
bot.add_cog(Music(bot))

View File

@ -1,12 +1,11 @@
from collections import deque
import random
from config import config
import random
from vulkanbot.music.Interfaces import IPlaylist
from vulkanbot.music.Song import Song
class Playlist(IPlaylist):
"""Class to manage and control the songs to play and played"""
@ -21,27 +20,27 @@ class Playlist(IPlaylist):
self.__current: Song = None
@property
def looping_one(self):
def looping_one(self) -> bool:
return self.__looping_one
@property
def looping_all(self):
def looping_all(self) -> bool:
return self.__looping_all
@property
def current(self):
def current(self) -> Song:
return self.__current
@property
def songs_to_preload(self) -> list:
return list(self.__queue)[:config.MAX_PRELOAD_SONGS]
def __len__(self):
return len(self.__queue)
def __len__(self) -> int:
return len(self.__queue)
def next_song(self) -> Song:
"""Return the next song to play"""
if self.__current == None and len(self.__queue) == 0:
if self.__current == None and len(self.__queue) == 0:
# If not playing and nothing to play
return None
@ -59,14 +58,16 @@ class Playlist(IPlaylist):
if len(self.__queue) == 0: # If no more song to play, return None
return None
self.__current = self.__queue[0] # Att the current with the first one
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(self.__current.identifier) # Add to name history
# Att the current with the first one
self.__current = self.__queue[0]
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(
self.__current.identifier) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current
def prev_song(self):
def prev_song(self) -> Song:
"""Return the source of the last song played
Return None or the source of the prev song
@ -139,11 +140,11 @@ class Playlist(IPlaylist):
def move_songs(self, pos1, pos2) -> tuple:
"""Receive two position and try to chance the songs in those positions
Positions: First music is 1
Return (Error bool, string) with the status of the function, to show to user
"""
if pos2 not in range(1, len(self.__queue)) or pos1 not in range(1, len(self.__queue)):
if pos2 not in range(1, len(self.__queue) + 1) or pos1 not in range(1, len(self.__queue) + 1):
return (False, 'Numbers need to be more than 0 and less than the queue length')
try:
@ -155,4 +156,4 @@ class Playlist(IPlaylist):
return (True, 'Songs moved with successfully')
except Exception as e:
print(e)
return (False, 'There was a problem with the moving of songs')
return (False, 'There was a problem with the moving of songs')

View File

@ -27,7 +27,7 @@ class Searcher():
elif url_type == Provider.Name:
return [music], Provider.Name
elif url_type == Provider.Unknown:
return None, Provider.Unknown
@ -44,4 +44,3 @@ class Searcher():
# If no match
return Provider.Unknown

View File

@ -1,7 +1,8 @@
from discord import Embed
import datetime
from vulkanbot.music.Interfaces import ISong, IPlaylist
from config import config
import datetime
from vulkanbot.music.Interfaces import ISong, IPlaylist
class Song(ISong):
@ -15,18 +16,18 @@ class Song(ISong):
self.__playlist: IPlaylist = playlist
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
'title', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail', 'original_url']
for key in self.__usefull_keys:
try:
self.__info[key] = info[key]
except Exception as e:
print(e)
raise e
raise e
@property
def source(self) -> str:
@ -55,19 +56,19 @@ class Song(ISong):
@property
def identifier(self) -> str:
return self.__identifier
@property
def problematic(self) -> bool:
return self.__problematic
def destroy(self) -> None:
"""Mark this song with problems and removed from the playlist due to any type of error"""
self.__problematic = True
self.__problematic = True
self.__playlist.destroy_song(self)
def embed(self, title: str) -> Embed:
"""Configure the embed to show the song information"""
embedvc = Embed(
title=title,
description=f"[{self.__info['title']}]({self.__info['original_url']})",

View File

@ -1,33 +1,6 @@
import spotipy
import re
from spotipy.oauth2 import SpotifyClientCredentials
from bs4 import BeautifulSoup
from config import config
import aiohttp
class Browser():
def __init__(self) -> None:
self.__url_regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
self.__session = aiohttp.ClientSession(
headers={'User-Agent': 'python-requests/2.20.0'})
async def search(self, url) -> str:
"""Convert the external_url link to the title of music using browser"""
if re.search(self.__url_regex, url):
result = self.__url_regex.search(url)
url = result.group(0)
async with self.__session.get(url) as response:
page = await response.text()
soup = BeautifulSoup(page, 'html.parser')
title = soup.find('title')
title = title.string
title = title.replace('- song by', '')
title = title.replace('| Spotify', '')
return title
class SpotifySearch():
@ -35,7 +8,6 @@ class SpotifySearch():
def __init__(self) -> None:
self.__connected = False
self.__browser = Browser()
def connect(self) -> bool:
try:
@ -142,8 +114,3 @@ class SpotifySearch():
title += f'{artist["name"]} '
return title
async def __convert_spotify(self, url) -> str:
"""(Experimental) - Convert the external_url link to the title of music using browser"""
title = self.__browser(url)
return title

View File

@ -1,5 +1,6 @@
import re
def format_time(duration):
if not duration:
return "00:00"