Adding Cleaner and fixing bugs

This commit is contained in:
Rafael Vargas 2022-03-26 14:45:27 -04:00
parent 362ec02fe4
commit f30513f710
15 changed files with 115 additions and 184 deletions

View File

@ -9,7 +9,6 @@ helper = Helper()
class Control(commands.Cog):
"""Control the flow of the Bot"""
def __init__(self, bot: Client):
self.__bot = bot

View File

@ -1,8 +1,6 @@
from typing import Dict
from discord import Guild, Client, Embed
from discord import Guild, Client
from discord.ext import commands
from discord.ext.commands import Context
from Config.Config import Configs
from Config.Helper import Helper
from Controllers.ClearController import ClearController
from Controllers.MoveController import MoveController
@ -13,8 +11,7 @@ from Controllers.PrevController import PrevController
from Controllers.RemoveController import RemoveController
from Controllers.ResetController import ResetController
from Controllers.ShuffleController import ShuffleController
from Music.Player import Player
from Utils.Utils import is_connected
from Utils.Cleaner import Cleaner
from Controllers.SkipController import SkipController
from Controllers.PauseController import PauseController
from Controllers.StopController import StopController
@ -31,9 +28,8 @@ helper = Helper()
class Music(commands.Cog):
def __init__(self, bot) -> None:
self.__guilds: Dict[Guild, Player] = {}
self.__bot: Client = bot
self.__config = Configs()
self.__cleaner = Cleaner(self.__bot)
self.__controller = PlayersController(self.__bot)
@commands.Cog.listener()
@ -42,16 +38,7 @@ class Music(commands.Cog):
@commands.Cog.listener()
async def on_guild_join(self, guild: Guild) -> None:
"""Load a player when joining a guild"""
self.__guilds[guild] = Player(self.__bot, guild)
print(f'Player for guild {guild.name} created')
@commands.Cog.listener()
async def on_guild_remove(self, guild: Guild) -> None:
"""Removes the player of the guild if banned"""
if guild in self.__guilds.keys():
self.__guilds.pop(guild, None)
print(f'Player for guild {guild.name} destroyed')
self.__controller.create_player(guild)
@commands.command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar'])
async def play(self, ctx: Context, *args) -> None:
@ -205,37 +192,6 @@ class Music(commands.Cog):
await view1.run()
await view2.run()
async def __send_embed(self, ctx: Context, title='', description='', colour='grey') -> None:
try:
colour = self.__config.COLOURS[colour]
except:
colour = self.__config.COLOURS['grey']
embedvc = Embed(
title=title,
description=description,
colour=colour
)
await ctx.send(embed=embedvc)
async def __clean_messages(self, ctx: Context) -> None:
last_messages = await ctx.channel.history(limit=5).flatten()
for message in last_messages:
try:
if message.author == self.__bot.user:
if len(message.embeds) > 0:
embed = message.embeds[0]
if len(embed.fields) > 0:
if embed.fields[0].name == 'Uploader:':
await message.delete()
except:
continue
def __get_player(self, ctx: Context) -> Player:
return self.__controller.get_player(ctx.guild)
def setup(bot):
bot.add_cog(Music(bot))

View File

@ -8,7 +8,6 @@ helper = Helper()
class Random(commands.Cog):
"""Deal with returning random things"""
def __init__(self, bot):
self.__bot = bot

View File

@ -33,6 +33,11 @@ class PlayersController(Singleton):
else:
return guild.voice_client
def create_player(self, guild: Guild) -> None:
player = Player(self.__bot, guild)
self.__players[guild] = player
print(f'Player for guild {guild.name} created')
def __create_players(self) -> Dict[Guild, Player]:
list_guilds: List[Guild] = self.__bot.guilds
players: Dict[Guild, Player] = {}

View File

@ -8,7 +8,6 @@ from Utils.Utils import is_url, run_async
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
config = Configs()
__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
@ -38,7 +37,6 @@ class Downloader():
self.__playlist_keys = ['entries']
def finish_one_song(self, song: Song) -> Song:
"""Receives a song object, finish his download and return it"""
if song.identifier == None:
return None
@ -51,17 +49,11 @@ class Downloader():
return song
async def preload(self, songs: List[Song]) -> None:
"""Download the full info of the songs objects"""
for song in songs:
asyncio.ensure_future(self.__download_song(song))
@run_async
def extract_info(self, url: str) -> List[dict]:
"""Extract all songs direct URL from a Youtube Link
Arg: Url String
Return: List with the direct youtube URL of each song
"""
if is_url(url): # If Url
options = Downloader.__YDL_OPTIONS_EXTRACT
with YoutubeDL(options) as ydl:
@ -100,11 +92,6 @@ class Downloader():
return []
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
Arg: URL from Youtube
Return: Dict with the full youtube information of the music, including source to play it
"""
options = Downloader.__YDL_OPTIONS
with YoutubeDL(options) as ydl:
try:
@ -116,7 +103,6 @@ class Downloader():
return None
async def __download_song(self, song: Song) -> None:
"""Download a music object asynchronously"""
if song.source is not None: # If Music already preloaded
return None
@ -134,11 +120,6 @@ class Downloader():
await asyncio.wait(fs=fs, return_when=asyncio.ALL_COMPLETED)
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
Arg: Music Name
Return: A dict containing the song information
"""
options = Downloader.__YDL_OPTIONS
with YoutubeDL(options) as ydl:
try:

View File

@ -2,8 +2,6 @@ from abc import ABC, abstractproperty, abstractmethod
class IPlaylist(ABC):
"""Class to manage and control the songs to play and played"""
@abstractproperty
def looping_one(self):
pass
@ -58,8 +56,6 @@ class IPlaylist(ABC):
class ISong(ABC):
"""Store the usefull information about a Song"""
@abstractmethod
def finish_down(self, info: dict) -> None:
pass

View File

@ -36,41 +36,6 @@ class Player(commands.Cog):
def playlist(self) -> Playlist:
return self.__playlist
def __play_next(self, error, ctx: Context) -> None:
if self.__force_stop: # If it's forced to stop player
self.__force_stop = False
return None
song = self.__playlist.next_song()
if song != None:
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx: Context, song: Song) -> None:
try:
source = self.__ensure_source(song)
if source == None:
self.__play_next(None, ctx)
self.__playing = True
player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
self.__timer.cancel()
self.__timer = Timer(self.__timeout_handler)
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
except:
self.__play_next(None, ctx)
async def play(self, ctx: Context, track: str, requester: str) -> str:
try:
links, provider = self.__searcher.search(track)
@ -141,7 +106,6 @@ class Player(commands.Cog):
await self.__play_music(ctx, first_song)
async def play_prev(self, ctx: Context) -> None:
"""Stop the currently playing cycle, load the previous song and play"""
if self.__playlist.looping_one or self.__playlist.looping_all: # Do not allow play if loop
embed = Embed(
title=self.__config.SONG_PLAYER,
@ -168,17 +132,6 @@ class Player(commands.Cog):
await self.__play_music(ctx, song)
async def stop(self) -> bool:
if self.__guild.voice_client is None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def force_stop(self) -> None:
try:
if self.__guild.voice_client is None:
@ -191,8 +144,42 @@ class Player(commands.Cog):
except Exception as e:
print(f'DEVELOPER NOTE -> Force Stop Error: {e}')
def __play_next(self, error, ctx: Context) -> None:
if self.__force_stop: # If it's forced to stop player
self.__force_stop = False
return None
song = self.__playlist.next_song()
if song != None:
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx: Context, song: Song) -> None:
try:
source = self.__ensure_source(song)
if source == None:
self.__play_next(None, ctx)
self.__playing = True
player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
self.__timer.cancel()
self.__timer = Timer(self.__timeout_handler)
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
except:
self.__play_next(None, ctx)
def __format_embed(self, info: dict, title='', position='Playing Now') -> Embed:
"""Configure the embed to show the song information"""
embedvc = Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",

View File

@ -4,17 +4,10 @@ from Utils.Utils import is_url
class Searcher():
"""Turn the user input into list of musics names, support youtube and spotify"""
def __init__(self) -> None:
self.__Spotify = SpotifySearch()
def search(self, music: str) -> list:
"""Return a list with the song names or an URL
Arg -> User Input, a string with the
Return -> A list of musics names and Provider Type
"""
provider = self.__identify_source(music)
if provider == Provider.YouTube:
@ -35,7 +28,6 @@ class Searcher():
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
"""Identify the provider of a music"""
if not is_url(music):
return Provider.Name

View File

@ -2,17 +2,14 @@ from Music.Interfaces import ISong, IPlaylist
class Song(ISong):
"""Store the usefull information about a Song"""
def __init__(self, identifier: str, playlist: IPlaylist, requester: str) -> None:
"""Create a song with only the URL to the youtube song"""
self.__identifier = identifier
self.__info = {'requester': requester}
self.__problematic = False
self.__playlist: IPlaylist = playlist
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['duration',
'title', 'webpage_url',
'channel', 'id', 'uploader',
@ -34,7 +31,6 @@ class Song(ISong):
@property
def source(self) -> str:
"""Return the Song Source URL to play"""
if 'url' in self.__info.keys():
return self.__info['url']
else:
@ -42,7 +38,6 @@ class Song(ISong):
@property
def title(self) -> str:
"""Return the Song Title"""
if 'title' in self.__info.keys():
return self.__info['title']
else:
@ -50,7 +45,6 @@ class Song(ISong):
@property
def duration(self) -> str:
"""Return the Song Title"""
if 'duration' in self.__info.keys():
return self.__info['duration']
else:
@ -65,7 +59,6 @@ class Song(ISong):
return self.__problematic
def destroy(self) -> None:
"""Mark this song with problems and removed from the playlist due to any type of error"""
print(f'DEVELOPER NOTE -> Music self destroying {self.__identifier}')
self.__problematic = True
self.__playlist.destroy_song(self)

View File

@ -4,8 +4,6 @@ from Config.Config import Configs
class SpotifySearch():
"""Search a Spotify music or playlist and return the musics names"""
def __init__(self) -> None:
self.__config = Configs()
self.__connected = False
@ -27,7 +25,6 @@ class SpotifySearch():
return False
def search(self, music=str) -> list:
"""Search and return the title of musics on Spotify"""
type = music.split('/')[3].split('?')[0]
code = music.split('/')[4].split('?')[0]
if type == 'album':
@ -44,10 +41,6 @@ class SpotifySearch():
return musics
def __get_album(self, code=str) -> list:
"""Convert a album ID to list of songs names
ARG: Spotify Code of the Album
"""
if self.__connected == True:
try:
results = self.__api.album_tracks(code)
@ -70,10 +63,6 @@ class SpotifySearch():
raise e
def __get_playlist(self, code=str) -> list:
"""Convert a playlist ID to list of songs names
Arg: Spotify Code of the Playlist
"""
try:
results = self.__api.playlist_items(code)
itens = results['items']
@ -100,10 +89,6 @@ class SpotifySearch():
raise e
def __get_track(self, code=str) -> list:
"""Convert a track ID to the title of the music
ARG: Spotify Code of the Track
"""
results = self.__api.track(code)
name = results['name']
artists = ''
@ -113,10 +98,6 @@ class SpotifySearch():
return [f'{name} {artists}']
def __get_artist(self, code=str) -> list:
"""Convert a external_url track to the title of the music
ARG: Spotify Code of the Music
"""
results = self.__api.artist_top_tracks(code, country='BR')
musics_titles = []
@ -127,10 +108,6 @@ class SpotifySearch():
return musics_titles
def __extract_title(self, music: dict) -> str:
"""Receive a spotify music object and return his title
ARG: music dict returned by Spotify
"""
title = f'{music["name"]} '
for artist in music['artists']:
title += f'{artist["name"]} '

View File

@ -2,8 +2,7 @@ from enum import Enum
class Provider(Enum):
"""Store Enum Types of the Providers"""
Spotify = "Spotify"
YouTube = "YouTube"
Spotify = 'Spotify'
YouTube = 'YouTube'
Name = 'Track Name'
Unknown = "Unknown"
Unknown = 'Unknown'

32
Utils/Cleaner.py Normal file
View File

@ -0,0 +1,32 @@
from typing import List
from discord.ext.commands import Context
from discord import Client, Message, Embed
from Config.Singleton import Singleton
class Cleaner(Singleton):
def __init__(self, bot: Client = None) -> None:
if not super().created:
self.__bot = bot
self.__clean_str = 'Uploader:'
def set_bot(self, bot: Client) -> None:
self.__bot = bot
async def clean_messages(self, ctx: Context, quant: int) -> None:
if self.__bot is None:
return
last_messages: List[Message] = await ctx.channel.history(limit=quant).flatten()
for message in last_messages:
try:
if message.author == self.__bot.user:
if len(message.embeds) > 0:
embed: Embed = message.embeds[0]
if len(embed.fields) > 0:
if embed.fields[0].name == self.__clean_str:
await message.delete()
except Exception as e:
print(f'DEVELOPER NOTE -> Error cleaning messages {e}')
continue

View File

@ -34,7 +34,6 @@ def format_time(duration) -> str:
def is_url(string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")

View File

@ -5,6 +5,7 @@ from Config.Singleton import Singleton
class Configs(Singleton):
def __init__(self) -> None:
if not super().created:
self.COMMANDS_PATH = 'Commands'
self.BOT_TOKEN = config('BOT_TOKEN')
self.SPOTIFY_ID = config('SPOTIFY_ID')
self.SPOTIFY_SECRET = config('SPOTIFY_SECRET')

45
main.py
View File

@ -1,23 +1,38 @@
import discord
import os
from distutils.command.config import config
from discord import Intents, Client
from os import listdir
from Config.Config import Configs
from discord.ext import commands
from discord.ext.commands import Bot
intents = discord.Intents.default()
intents.members = True
config = Configs()
bot = commands.Bot(command_prefix=config.BOT_PREFIX, pm_help=True,
case_insensitive=True, intents=intents)
bot.remove_command('help')
class VulkanInitializer:
def __init__(self) -> None:
self.__config = Configs()
self.__intents = Intents.default()
self.__intents.members = True
self.__bot = self.__create_bot()
self.__add_cogs(self.__bot)
if config.BOT_TOKEN == "":
def __create_bot(self) -> Client:
bot = Bot(command_prefix=self.__config.BOT_PREFIX,
pm_help=True,
case_insensitive=True,
intents=self.__intents)
bot.remove_command('help')
return bot
def __add_cogs(self, bot: Client) -> None:
for filename in listdir(f'./{self.__config.COMMANDS_PATH}'):
if filename.endswith('.py'):
bot.load_extension(f'{self.__config.COMMANDS_PATH}.{filename[:-3]}')
def run(self) -> None:
if self.__config.BOT_TOKEN == '':
print('DEVELOPER NOTE -> Token not found')
exit()
for filename in os.listdir('./Commands'):
if filename.endswith('.py'):
bot.load_extension(f'Commands.{filename[:-3]}')
self.__bot.run(self.__config.BOT_TOKEN, bot=True, reconnect=True)
bot.run(config.BOT_TOKEN, bot=True, reconnect=True)
vulkan = VulkanInitializer()
vulkan.run()