From 7a5d76ffd343e64f412e345104c406791e71be95 Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Sun, 19 Feb 2023 00:33:31 -0300 Subject: [PATCH 1/5] Refactoring some code and modifying variables and class names --- Config/Configs.py | 8 +- DiscordCogs/MusicCog.py | 2 +- Handlers/ClearHandler.py | 7 +- Handlers/HistoryHandler.py | 3 +- Handlers/JumpMusicHandler.py | 3 +- Handlers/LoopHandler.py | 4 +- Handlers/MoveHandler.py | 3 +- Handlers/NowPlayingHandler.py | 3 +- Handlers/PauseHandler.py | 7 +- Handlers/PlayHandler.py | 9 +- Handlers/PrevHandler.py | 3 +- Handlers/QueueHandler.py | 3 +- Handlers/RemoveHandler.py | 7 +- Handlers/ResetHandler.py | 7 +- Handlers/ResumeHandler.py | 7 +- Handlers/ShuffleHandler.py | 4 +- Handlers/SkipHandler.py | 7 +- Handlers/StopHandler.py | 7 +- Parallelism/AbstractProcessManager.py | 31 +++ Parallelism/PlayerProcess.py | 16 +- Parallelism/PlayerThread.py | 365 ++++++++++++++++++++++++++ Parallelism/ProcessExecutor.py | 4 +- Parallelism/ProcessInfo.py | 4 +- Parallelism/ProcessManager.py | 27 +- Parallelism/ThreadManager.py | 199 ++++++++++++++ 25 files changed, 672 insertions(+), 68 deletions(-) create mode 100644 Parallelism/AbstractProcessManager.py create mode 100644 Parallelism/PlayerThread.py create mode 100644 Parallelism/ThreadManager.py diff --git a/Config/Configs.py b/Config/Configs.py index d9da122..363e5e1 100644 --- a/Config/Configs.py +++ b/Config/Configs.py @@ -1,4 +1,3 @@ -import os from decouple import config from Config.Singleton import Singleton from Config.Folder import Folder @@ -10,6 +9,9 @@ class VConfigs(Singleton): # You can change this boolean to False if you want to prevent the Bot from auto disconnecting # Resolution for the issue: https://github.com/RafaelSolVargas/Vulkan/issues/33 self.SHOULD_AUTO_DISCONNECT_WHEN_ALONE = False + # Recommended to be True, except in cases when your Bot is present in thousands servers, in that case + # the delay to start a new Python process for the playback is too much, and to avoid that you set as False + self.SONG_PLAYBACK_IN_SEPARATE_PROCESS = False self.BOT_PREFIX = '!' try: @@ -44,8 +46,8 @@ class VConfigs(Singleton): self.MY_ERROR_BAD_COMMAND = 'This string serves to verify if some error was raised by myself on purpose' self.INVITE_URL = 'https://discordapp.com/oauth2/authorize?client_id={}&scope=bot' - def getProcessManager(self): + def getPlayersManager(self): return self.__manager - def setProcessManager(self, newManager): + def setPlayersManager(self, newManager): self.__manager = newManager diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index 153a844..c84293e 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -38,7 +38,7 @@ class MusicCog(Cog): def __init__(self, bot: VulkanBot) -> None: self.__bot: VulkanBot = bot self.__embeds = VEmbeds() - VConfigs().setProcessManager(ProcessManager(bot)) + VConfigs().setPlayersManager(ProcessManager(bot)) @command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar']) async def play(self, ctx: Context, *args) -> None: diff --git a/Handlers/ClearHandler.py b/Handlers/ClearHandler.py index 216e030..f79d4a8 100644 --- a/Handlers/ClearHandler.py +++ b/Handlers/ClearHandler.py @@ -4,7 +4,8 @@ from discord.ext.commands import Context from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessInfo import ProcessInfo +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo class ClearHandler(AbstractHandler): @@ -13,8 +14,8 @@ class ClearHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Clear the playlist playlist = processInfo.getPlaylist() diff --git a/Handlers/HistoryHandler.py b/Handlers/HistoryHandler.py index 864d90e..0c337c3 100644 --- a/Handlers/HistoryHandler.py +++ b/Handlers/HistoryHandler.py @@ -2,6 +2,7 @@ from discord.ext.commands import Context from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Utils.Utils import Utils from typing import Union from discord import Interaction @@ -13,7 +14,7 @@ class HistoryHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: processLock = processInfo.getLock() diff --git a/Handlers/JumpMusicHandler.py b/Handlers/JumpMusicHandler.py index 76e30fd..3ab6c95 100644 --- a/Handlers/JumpMusicHandler.py +++ b/Handlers/JumpMusicHandler.py @@ -6,6 +6,7 @@ from discord import Interaction from Handlers.HandlerResponse import HandlerResponse from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.Commands import VCommands, VCommandsType @@ -16,7 +17,7 @@ class JumpMusicHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, musicPos: str) -> HandlerResponse: - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/LoopHandler.py b/Handlers/LoopHandler.py index 065334a..17d9ee9 100644 --- a/Handlers/LoopHandler.py +++ b/Handlers/LoopHandler.py @@ -6,6 +6,8 @@ from Config.Exceptions import BadCommandUsage from typing import Union from discord import Interaction +from Parallelism.AbstractProcessManager import AbstractPlayersManager + class LoopHandler(AbstractHandler): def __init__(self, ctx: Union[Context, Interaction], bot: VulkanBot) -> None: @@ -13,7 +15,7 @@ class LoopHandler(AbstractHandler): async def run(self, args: str) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/MoveHandler.py b/Handlers/MoveHandler.py index 0f36260..27d004c 100644 --- a/Handlers/MoveHandler.py +++ b/Handlers/MoveHandler.py @@ -7,6 +7,7 @@ from Config.Exceptions import BadCommandUsage, VulkanError, InvalidInput, Number from Music.Playlist import Playlist from typing import Union from discord import Interaction +from Parallelism.AbstractProcessManager import AbstractPlayersManager class MoveHandler(AbstractHandler): @@ -14,7 +15,7 @@ class MoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pos1: str, pos2: str) -> HandlerResponse: - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/NowPlayingHandler.py b/Handlers/NowPlayingHandler.py index 81dc414..194314c 100644 --- a/Handlers/NowPlayingHandler.py +++ b/Handlers/NowPlayingHandler.py @@ -2,6 +2,7 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Utils.Cleaner import Cleaner from typing import Union from discord import Interaction @@ -14,7 +15,7 @@ class NowPlayingHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/PauseHandler.py b/Handlers/PauseHandler.py index 386997c..7cbda92 100644 --- a/Handlers/PauseHandler.py +++ b/Handlers/PauseHandler.py @@ -1,8 +1,9 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.Commands import VCommands, VCommandsType -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -13,8 +14,8 @@ class PauseHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: if processInfo.getStatus() == ProcessStatus.SLEEPING: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index 8816cf9..5dcd58e 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -9,7 +9,8 @@ from Handlers.HandlerResponse import HandlerResponse from Music.Downloader import Downloader from Music.Searcher import Searcher from Music.Song import Song -from Parallelism.ProcessInfo import ProcessInfo +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -37,7 +38,7 @@ class PlayHandler(AbstractHandler): raise InvalidInput(self.messages.INVALID_INPUT, self.messages.ERROR_TITLE) # Get the process context for the current guild - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) playlist: Playlist = processInfo.getPlaylist() process = processInfo.getProcess() @@ -109,7 +110,7 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) - async def __downloadSongsAndStore(self, songs: List[Song], processInfo: ProcessInfo) -> None: + async def __downloadSongsAndStore(self, songs: List[Song], processInfo: PlayerInfo) -> None: playlist = processInfo.getPlaylist() queue = processInfo.getQueueToPlayer() playCommand = VCommands(VCommandsType.PLAY, None) @@ -126,7 +127,7 @@ class PlayHandler(AbstractHandler): tasks.append(task) # In the original order, await for the task and then, if successfully downloaded, add to the playlist - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() for index, task in enumerate(tasks): await task song = songs[index] diff --git a/Handlers/PrevHandler.py b/Handlers/PrevHandler.py index 72dd4fc..13e81b6 100644 --- a/Handlers/PrevHandler.py +++ b/Handlers/PrevHandler.py @@ -2,6 +2,7 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Config.Exceptions import BadCommandUsage, ImpossibleMove from Handlers.HandlerResponse import HandlerResponse +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -18,7 +19,7 @@ class PrevHandler(AbstractHandler): embed = self.embeds.NO_CHANNEL() return HandlerResponse(self.ctx, embed, error) - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/QueueHandler.py b/Handlers/QueueHandler.py index 180402b..5ffefad 100644 --- a/Handlers/QueueHandler.py +++ b/Handlers/QueueHandler.py @@ -4,6 +4,7 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Handlers.JumpMusicHandler import JumpMusicHandler from Messages.MessagesCategory import MessagesCategory +from Parallelism.AbstractProcessManager import AbstractPlayersManager from UI.Views.BasicView import BasicView from Utils.Utils import Utils from Music.VulkanBot import VulkanBot @@ -22,7 +23,7 @@ class QueueHandler(AbstractHandler): async def run(self, pageNumber=0) -> HandlerResponse: # Retrieve the process of the guild - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: # If no process return empty list embed = self.embeds.EMPTY_QUEUE() diff --git a/Handlers/RemoveHandler.py b/Handlers/RemoveHandler.py index 0d032cd..fd17da6 100644 --- a/Handlers/RemoveHandler.py +++ b/Handlers/RemoveHandler.py @@ -4,7 +4,8 @@ from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, InvalidInput, NumberRequired from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import ProcessInfo +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo from typing import Union from discord import Interaction @@ -15,8 +16,8 @@ class RemoveHandler(AbstractHandler): async def run(self, position: str) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() diff --git a/Handlers/ResetHandler.py b/Handlers/ResetHandler.py index bb23f6b..2bc72d9 100644 --- a/Handlers/ResetHandler.py +++ b/Handlers/ResetHandler.py @@ -1,7 +1,8 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,8 +15,8 @@ class ResetHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: if processInfo.getStatus() == ProcessStatus.SLEEPING: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/ResumeHandler.py b/Handlers/ResumeHandler.py index 158a688..da7867b 100644 --- a/Handlers/ResumeHandler.py +++ b/Handlers/ResumeHandler.py @@ -1,7 +1,8 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -13,8 +14,8 @@ class ResumeHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: if processInfo.getStatus() == ProcessStatus.SLEEPING: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/ShuffleHandler.py b/Handlers/ShuffleHandler.py index 3f38b8b..7445e9d 100644 --- a/Handlers/ShuffleHandler.py +++ b/Handlers/ShuffleHandler.py @@ -6,13 +6,15 @@ from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction +from Parallelism.AbstractProcessManager import AbstractPlayersManager + class ShuffleHandler(AbstractHandler): def __init__(self, ctx: Union[Context, Interaction], bot: VulkanBot) -> None: super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = self.config.getProcessManager() + processManager: AbstractPlayersManager = self.config.getPlayersManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: try: diff --git a/Handlers/SkipHandler.py b/Handlers/SkipHandler.py index 8170b94..9de6589 100644 --- a/Handlers/SkipHandler.py +++ b/Handlers/SkipHandler.py @@ -3,7 +3,8 @@ from Handlers.AbstractHandler import AbstractHandler from Config.Exceptions import BadCommandUsage, ImpossibleMove from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -19,8 +20,8 @@ class SkipHandler(AbstractHandler): embed = self.embeds.NO_CHANNEL() return HandlerResponse(self.ctx, embed, error) - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Verify if there is a running process if processInfo.getStatus() == ProcessStatus.SLEEPING: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/StopHandler.py b/Handlers/StopHandler.py index ddc9199..dd5ea7a 100644 --- a/Handlers/StopHandler.py +++ b/Handlers/StopHandler.py @@ -2,7 +2,8 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -13,8 +14,8 @@ class StopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = self.config.getProcessManager() - processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild) + processManager: AbstractPlayersManager = self.config.getPlayersManager() + processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: if processInfo.getStatus() == ProcessStatus.SLEEPING: embed = self.embeds.NOT_PLAYING() diff --git a/Parallelism/AbstractProcessManager.py b/Parallelism/AbstractProcessManager.py new file mode 100644 index 0000000..ecc2849 --- /dev/null +++ b/Parallelism/AbstractProcessManager.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import Union +from discord.ext.commands import Context +from discord import Guild, Interaction +from Music.Song import Song +from Parallelism.ProcessInfo import PlayerInfo + + +class AbstractPlayersManager(ABC): + def __init__(self, bot) -> None: + pass + + @abstractmethod + def setPlayerInfo(self, guild: Guild, info: PlayerInfo): + pass + + @abstractmethod + def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + pass + + @abstractmethod + def resetProcess(self, guild: Guild, context: Context) -> None: + pass + + @abstractmethod + def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + pass + + @abstractmethod + async def showNowPlaying(self, guildID: int, song: Song) -> None: + pass diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py index de09f39..7e2979a 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/PlayerProcess.py @@ -2,19 +2,17 @@ import asyncio from time import sleep, time from urllib.parse import parse_qs, urlparse from Music.VulkanInitializer import VulkanInitializer -from discord import User, Member, Message, VoiceClient +from discord import Member, VoiceClient from asyncio import AbstractEventLoop, Semaphore, Queue from multiprocessing import Process, RLock, Lock, Queue from threading import Thread from typing import Callable, List -from discord import Guild, FFmpegPCMAudio, VoiceChannel, TextChannel +from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song from Config.Configs import VConfigs -from Config.Messages import Messages from Music.VulkanBot import VulkanBot from Music.Downloader import Downloader -from Config.Embeds import VEmbeds from Parallelism.Commands import VCommands, VCommandsType @@ -57,14 +55,7 @@ class PlayerProcess(Process): self.__bot: VulkanBot = None self.__voiceChannel: VoiceChannel = None self.__voiceClient: VoiceClient = None - self.__textChannel: TextChannel = None - self.__author: User = None - self.__botMember: Member = None - self.__configs: VConfigs = None - self.__embeds: VEmbeds = None - self.__messages: Messages = None - self.__messagesToDelete: List[Message] = [] self.__playing = False self.__forceStop = False self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', @@ -78,9 +69,6 @@ class PlayerProcess(Process): self.__loop = asyncio.get_event_loop_policy().new_event_loop() asyncio.set_event_loop(self.__loop) - self.__configs = VConfigs() - self.__messages = Messages() - self.__embeds = VEmbeds() self.__downloader = Downloader() self.__semStopPlaying = Semaphore(0) diff --git a/Parallelism/PlayerThread.py b/Parallelism/PlayerThread.py new file mode 100644 index 0000000..eb26c7d --- /dev/null +++ b/Parallelism/PlayerThread.py @@ -0,0 +1,365 @@ +import asyncio +from time import sleep, time +from urllib.parse import parse_qs, urlparse +from Music.VulkanInitializer import VulkanInitializer +from discord import Member, VoiceClient +from asyncio import AbstractEventLoop, Semaphore +from threading import RLock, Thread +from multiprocessing import Lock +from typing import Callable, List +from discord import Guild, FFmpegPCMAudio, VoiceChannel +from Music.Playlist import Playlist +from Music.Song import Song +from Config.Configs import VConfigs +from Music.VulkanBot import VulkanBot +from Music.Downloader import Downloader +from Parallelism.Commands import VCommands, VCommandsType + + +class TimeoutClock: + def __init__(self, callback: Callable, loop: asyncio.AbstractEventLoop): + self.__callback = callback + self.__task = loop.create_task(self.__executor()) + + async def __executor(self): + await asyncio.sleep(VConfigs().VC_TIMEOUT) + await self.__callback() + + def cancel(self): + self.__task.cancel() + + +class PlayerThread(Thread): + """Player Thread to control the song playback in the same Process of the Main Process""" + + def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={}) + # Synchronization objects + self.__playlist: Playlist = playlist + self.__playlistLock: Lock = lock + self.__loop: AbstractEventLoop = None + self.__playerLock: RLock = RLock() + # Discord context ID + self.__guildID = guildID + self.__voiceChannelID = voiceID + self.__guild: Guild = guild + self.__bot: VulkanBot = bot + self.__voiceChannel: VoiceChannel = voiceChannel + self.__voiceClient: VoiceClient = None + + self.__downloader = Downloader() + + self.__playing = False + self.__forceStop = False + self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', + 'options': '-vn'} + + def run(self) -> None: + """This method is called automatically when the Thread starts""" + try: + print(f'Starting Player Thread for Guild {self.name}') + self.__loop = asyncio.get_event_loop_policy().new_event_loop() + asyncio.set_event_loop(self.__loop) + self.__loop.run_until_complete(self._run()) + + except Exception as e: + print(f'[Error in Process {self.name}] -> {e}') + + async def _run(self) -> None: + # Connect to voice Channel + await self.__connectToVoiceChannel() + # Start the timeout function + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) + # Start a Task to play songs + self.__loop.create_task(self.__playPlaylistSongs()) + + def __verifyIfIsPlaying(self) -> bool: + if self.__voiceClient is None: + return False + if not self.__voiceClient.is_connected(): + return False + return self.__voiceClient.is_playing() or self.__voiceClient.is_paused() + + async def __playPlaylistSongs(self) -> None: + """If the player is not running trigger to play a new song""" + self.__playing = self.__verifyIfIsPlaying() + if not self.__playing: + song = None + with self.__playlistLock: + with self.__playerLock: + song = self.__playlist.next_song() + + if song is not None: + self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') + self.__playing = True + + async def __playSong(self, song: Song) -> None: + """Function that will trigger the player to play the song""" + try: + self.__playerLock.acquire() + if song is None: + return + + if song.source is None: + return self.__playNext(None) + + # If not connected, connect to bind channel + if self.__voiceClient is None: + await self.__connectToVoiceChannel() + + # If the voice channel disconnect for some reason + if not self.__voiceClient.is_connected(): + print('[VOICE CHANNEL NOT NULL BUT DISCONNECTED, CONNECTING AGAIN]') + await self.__connectToVoiceChannel() + # If the player is connected and playing return the song to the playlist + elif self.__voiceClient.is_playing(): + print('[SONG ALREADY PLAYING, RETURNING]') + self.__playlist.add_song_start(song) + return + + songStillAvailable = self.__verifyIfSongAvailable(song) + if not songStillAvailable: + print('[SONG NOT AVAILABLE ANYMORE, DOWNLOADING AGAIN]') + song = self.__downloadSongAgain(song) + + self.__playing = True + self.__songPlaying = song + + player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS) + self.__voiceClient.play(player, after=lambda e: self.__playNext(e)) + + self.__timer.cancel() + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) + + nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song) + self.__queueSend.put(nowPlayingCommand) + except Exception as e: + print(f'[ERROR IN PLAY SONG FUNCTION] -> {e}, {type(e)}') + self.__playNext(None) + finally: + self.__playerLock.release() + + def __playNext(self, error) -> None: + if error is not None: + print(f'[ERROR PLAYING SONG] -> {error}') + with self.__playlistLock: + with self.__playerLock: + if self.__forceStop: # If it's forced to stop player + self.__forceStop = False + return None + + song = self.__playlist.next_song() + + if song is not None: + self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') + else: + self.__playlist.loop_off() + self.__songPlaying = None + self.__playing = False + # Send a command to the main process put this one to sleep + sleepCommand = VCommands(VCommandsType.SLEEPING) + self.__queueSend.put(sleepCommand) + # Release the semaphore to finish the process + self.__semStopPlaying.release() + + def __verifyIfSongAvailable(self, song: Song) -> bool: + """Verify the song source to see if it's already expired""" + try: + parsedUrl = urlparse(song.source) + + if 'expire' not in parsedUrl.query: + # If already passed 5 hours since the download + if song.downloadTime + 18000 < int(time()): + return False + return True + + # If the current time plus the song duration plus 10min exceeds the expirationValue + expireValue = parse_qs(parsedUrl.query)['expire'][0] + if int(time()) + song.duration + 600 > int(str(expireValue)): + return False + return True + except Exception as e: + print(f'[ERROR VERIFYING SONG AVAILABILITY] -> {e}') + return False + + def __downloadSongAgain(self, song: Song) -> Song: + """Force a download to be executed again, one use case is when the song.source expired and needs to refresh""" + return self.__downloader.finish_one_song(song) + + async def __playPrev(self, voiceChannelID: int) -> None: + with self.__playlistLock: + song = self.__playlist.prev_song() + + with self.__playerLock: + if song is not None: + # If not connect, connect to the user voice channel, may change the channel + if self.__voiceClient is None or not self.__voiceClient.is_connected(): + self.__voiceChannelID = voiceChannelID + self.__voiceChannel = self.__guild.get_channel(self.__voiceChannelID) + await self.__connectToVoiceChannel() + + # If already playing, stop the current play + if self.__verifyIfIsPlaying(): + # Will forbidden next_song to execute after stopping current player + self.__forceStop = True + self.__voiceClient.stop() + self.__playing = False + + self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') + + async def __restartCurrentSong(self) -> None: + song = self.__playlist.getCurrentSong() + if song is None: + song = self.__playlist.next_song() + if song is None: + return + + self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') + + async def receiveCommand(self, command: VCommands) -> None: + type = command.getType() + args = command.getArgs() + print(f'Player Thread {self.__guild.name} received command {type}') + + try: + self.__playerLock.acquire() + if type == VCommandsType.PAUSE: + self.__pause() + elif type == VCommandsType.RESUME: + await self.__resume() + elif type == VCommandsType.SKIP: + await self.__skip() + elif type == VCommandsType.PLAY: + await self.__playPlaylistSongs() + elif type == VCommandsType.PREV: + await self.__playPrev(args) + elif type == VCommandsType.RESET: + await self.__reset() + elif type == VCommandsType.STOP: + await self.__stop() + else: + print(f'[ERROR] -> Unknown Command Received: {command}') + except Exception as e: + print(f'[ERROR IN COMMAND RECEIVER] -> {type} - {e}') + finally: + self.__playerLock.release() + + def __pause(self) -> None: + if self.__voiceClient is not None: + if self.__voiceClient.is_connected(): + if self.__voiceClient.is_playing(): + self.__voiceClient.pause() + + async def __reset(self) -> None: + if self.__voiceClient is None: + return + + if not self.__voiceClient.is_connected(): + await self.__connectToVoiceChannel() + if self.__songPlaying is not None: + await self.__restartCurrentSong() + + async def __stop(self) -> None: + if self.__voiceClient is not None: + if self.__voiceClient.is_connected(): + with self.__playlistLock: + self.__playlist.loop_off() + self.__playlist.clear() + + self.__voiceClient.stop() + await self.__voiceClient.disconnect() + + self.__songPlaying = None + self.__playing = False + self.__voiceClient = None + # If the voiceClient is not None we finish things + else: + await self.__forceBotDisconnectAndStop() + + async def __resume(self) -> None: + # Lock to work with Player + with self.__playerLock: + if self.__voiceClient is not None: + # If the player is paused then return to play + if self.__voiceClient.is_paused(): + return self.__voiceClient.resume() + # If there is a current song but the voice client is not playing + elif self.__songPlaying is not None and not self.__voiceClient.is_playing(): + await self.__playSong(self.__songPlaying) + + async def __skip(self) -> None: + self.__playing = self.__verifyIfIsPlaying() + # Lock to work with Player + with self.__playerLock: + if self.__playing: + self.__playing = False + self.__voiceClient.stop() + # If for some reason the Bot has disconnect but there is still songs to play + elif len(self.__playlist.getSongs()) > 0: + print('[RESTARTING CURRENT SONG]') + await self.__restartCurrentSong() + + async def __forceBotDisconnectAndStop(self) -> None: + # Lock to work with Player + with self.__playerLock: + if self.__voiceClient is None: + return + self.__playing = False + self.__songPlaying = None + try: + self.__voiceClient.stop() + await self.__voiceClient.disconnect(force=True) + except Exception as e: + print(f'[ERROR FORCING BOT TO STOP] -> {e}') + finally: + self.__voiceClient = None + with self.__playlistLock: + self.__playlist.clear() + self.__playlist.loop_off() + + async def __timeoutHandler(self) -> None: + try: + if self.__voiceClient is None: + return + + # If the bot should not disconnect when alone + if not VConfigs().SHOULD_AUTO_DISCONNECT_WHEN_ALONE: + return + + if self.__voiceClient.is_connected(): + if self.__voiceClient.is_playing() or self.__voiceClient.is_paused(): + if not self.__isBotAloneInChannel(): # If bot is not alone continue to play + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) + return + + # Finish the process + with self.__playerLock: + with self.__playlistLock: + self.__playlist.loop_off() + await self.__forceBotDisconnectAndStop() + except Exception as e: + print(f'[ERROR IN TIMEOUT] -> {e}') + + def __isBotAloneInChannel(self) -> bool: + try: + if len(self.__voiceClient.channel.members) <= 1: + return True + else: + return False + except Exception as e: + print(f'[ERROR IN CHECK BOT ALONE] -> {e}') + return False + + async def __connectToVoiceChannel(self) -> bool: + try: + print('[CONNECTING TO VOICE CHANNEL]') + if self.__voiceClient is not None: + try: + await self.__voiceClient.disconnect(force=True) + except Exception as e: + print(f'[ERROR FORCING DISCONNECT] -> {e}') + self.__voiceClient = await self.__voiceChannel.connect(reconnect=True, timeout=None) + return True + except Exception as e: + print(f'[ERROR CONNECTING TO VC] -> {e}') + return False diff --git a/Parallelism/ProcessExecutor.py b/Parallelism/ProcessExecutor.py index ba6ab1e..a5f7843 100644 --- a/Parallelism/ProcessExecutor.py +++ b/Parallelism/ProcessExecutor.py @@ -4,7 +4,7 @@ from discord.ui import View from Config.Emojis import VEmojis from Messages.MessagesCategory import MessagesCategory from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import ProcessInfo +from Parallelism.ProcessInfo import PlayerInfo from Config.Messages import Messages from Music.Song import Song from Config.Embeds import VEmbeds @@ -29,7 +29,7 @@ class ProcessCommandsExecutor: self.__embeds = VEmbeds() self.__emojis = VEmojis() - async def sendNowPlaying(self, processInfo: ProcessInfo, song: Song) -> None: + async def sendNowPlaying(self, processInfo: PlayerInfo, song: Song) -> None: # Get the lock of the playlist playlist = processInfo.getPlaylist() if playlist.isLoopingOne(): diff --git a/Parallelism/ProcessInfo.py b/Parallelism/ProcessInfo.py index 50ed1b0..d1a0d89 100644 --- a/Parallelism/ProcessInfo.py +++ b/Parallelism/ProcessInfo.py @@ -9,9 +9,9 @@ class ProcessStatus(Enum): SLEEPING = 'Sleeping' -class ProcessInfo: +class PlayerInfo: """ - Class to store the reference to all structures to maintain a player process + Class to store the reference to all structures to maintain a song player """ def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py index dad23de..0664935 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessManager.py @@ -7,19 +7,20 @@ from typing import Dict, Tuple, Union from Config.Singleton import Singleton from discord import Guild, Interaction from discord.ext.commands import Context +from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.ProcessExecutor import ProcessCommandsExecutor from Music.Song import Song from Parallelism.PlayerProcess import PlayerProcess from Music.Playlist import Playlist -from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -class ProcessManager(Singleton): +class ProcessManager(Singleton, AbstractPlayersManager): """ Manage all running player process, creating and storing them for future calls - Deal with the creation of shared memory + Deals with the creation of shared memory """ def __init__(self, bot: VulkanBot = None) -> None: @@ -28,14 +29,14 @@ class ProcessManager(Singleton): VManager.register('Playlist', Playlist) self.__manager = VManager() self.__manager.start() - self.__playersProcess: Dict[int, ProcessInfo] = {} + self.__playersProcess: Dict[int, PlayerInfo] = {} self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {} self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {} - def setPlayerInfo(self, guild: Guild, info: ProcessInfo): + def setPlayerInfo(self, guild: Guild, info: PlayerInfo): self.__playersProcess[guild.id] = info - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> ProcessInfo: + def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: """Return the process info for the guild, the user in context must be connected to a voice_channel""" try: if guild.id not in self.__playersProcess.keys(): @@ -62,7 +63,7 @@ class ProcessManager(Singleton): newProcessInfo.getQueueToPlayer().put(playCommand) self.__playersProcess[guild.id] = newProcessInfo - def getRunningPlayerInfo(self, guild: Guild) -> ProcessInfo: + def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: """Return the process info for the guild, if not, return None""" if guild.id not in self.__playersProcess.keys(): print('Process Info not found') @@ -70,7 +71,7 @@ class ProcessManager(Singleton): return self.__playersProcess[guild.id] - def __createProcessInfo(self, guild: Guild, context: Context) -> ProcessInfo: + def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: guildID: int = context.guild.id textID: int = context.channel.id voiceID: int = context.author.voice.channel.id @@ -82,8 +83,8 @@ class ProcessManager(Singleton): queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, textID, voiceID, authorID) - processInfo = ProcessInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + processInfo = PlayerInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, @@ -110,7 +111,7 @@ class ProcessManager(Singleton): except Exception as e: print(f'[ERROR STOPPING PROCESS] -> {e}') - def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> ProcessInfo: + def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: """Create a new process info using previous playlist""" self.__stopPossiblyRunningProcess(guild) @@ -129,8 +130,8 @@ class ProcessManager(Singleton): queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, textID, voiceID, authorID) - processInfo = ProcessInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + processInfo = PlayerInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, diff --git a/Parallelism/ThreadManager.py b/Parallelism/ThreadManager.py new file mode 100644 index 0000000..2df8aff --- /dev/null +++ b/Parallelism/ThreadManager.py @@ -0,0 +1,199 @@ +import asyncio +from multiprocessing import Lock, Queue +from multiprocessing.managers import BaseManager, NamespaceProxy +from queue import Empty +from threading import Thread +from typing import Dict, Tuple, Union +from Config.Singleton import Singleton +from discord import Guild, Interaction +from discord.ext.commands import Context +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Parallelism.ProcessExecutor import ProcessCommandsExecutor +from Music.Song import Song +from Parallelism.PlayerProcess import PlayerProcess +from Music.Playlist import Playlist +from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus +from Parallelism.Commands import VCommands, VCommandsType +from Music.VulkanBot import VulkanBot + + +class ThreadManager(Singleton, AbstractPlayersManager): + """ + Manage all running player threads, creating and storing them for future calls + """ + + def __init__(self, bot: VulkanBot = None) -> None: + if not super().created: + self.__bot = bot + self.__playersProcess: Dict[int, Thread] = {} + + def setPlayerInfo(self, guild: Guild, info: PlayerInfo): + self.__playersProcess[guild.id] = info + + def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + """Return the process info for the guild, the user in context must be connected to a voice_channel""" + try: + if guild.id not in self.__playersProcess.keys(): + self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) + else: + # If the process has ended create a new one + if not self.__playersProcess[guild.id].getProcess().is_alive(): + self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) + + return self.__playersProcess[guild.id] + except Exception as e: + print(f'[Error In GetPlayerContext] -> {e}') + + def resetProcess(self, guild: Guild, context: Context) -> None: + """Restart a running process, already start it to return to play""" + if guild.id not in self.__playersProcess.keys(): + return None + + # Recreate the process keeping the playlist + newProcessInfo = self.__recreateProcess(guild, context) + newProcessInfo.getProcess().start() # Start the process + # Send a command to start the play again + playCommand = VCommands(VCommandsType.PLAY) + newProcessInfo.getQueueToPlayer().put(playCommand) + self.__playersProcess[guild.id] = newProcessInfo + + def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + """Return the process info for the guild, if not, return None""" + if guild.id not in self.__playersProcess.keys(): + print('Process Info not found') + return None + + return self.__playersProcess[guild.id] + + def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: + guildID: int = context.guild.id + textID: int = context.channel.id + voiceID: int = context.author.voice.channel.id + authorID: int = context.author.id + + playlist: Playlist = self.__manager.Playlist() + lock = Lock() + queueToListen = Queue() + queueToSend = Queue() + process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + queueToListen, guildID, textID, voiceID, authorID) + processInfo = PlayerInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) + + # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async + thread = Thread(target=self.__listenToCommands, + args=(queueToListen, guild), daemon=True) + self.__playersListeners[guildID] = (thread, False) + thread.start() + + # Create a Message Controller for this player + self.__playersCommandsExecutor[guildID] = ProcessCommandsExecutor(self.__bot, guildID) + + return processInfo + + def __stopPossiblyRunningProcess(self, guild: Guild): + try: + if guild.id in self.__playersProcess.keys(): + playerProcess = self.__playersProcess[guild.id] + process = playerProcess.getProcess() + process.close() + process.kill() + playerProcess.getQueueToMain().close() + playerProcess.getQueueToMain().join_thread() + playerProcess.getQueueToPlayer().close() + playerProcess.getQueueToPlayer().join_thread() + except Exception as e: + print(f'[ERROR STOPPING PROCESS] -> {e}') + + def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + """Create a new process info using previous playlist""" + self.__stopPossiblyRunningProcess(guild) + + guildID: int = context.guild.id + textID: int = context.channel.id + if isinstance(context, Interaction): + authorID: int = context.user.id + voiceID: int = context.user.voice.channel.id + else: + authorID: int = context.author.id + voiceID: int = context.author.voice.channel.id + + playlist: Playlist = self.__playersProcess[guildID].getPlaylist() + lock = Lock() + queueToListen = Queue() + queueToSend = Queue() + process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + queueToListen, guildID, textID, voiceID, authorID) + processInfo = PlayerInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) + + # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async + thread = Thread(target=self.__listenToCommands, + args=(queueToListen, guild), daemon=True) + self.__playersListeners[guildID] = (thread, False) + thread.start() + + return processInfo + + def __listenToCommands(self, queue: Queue, guild: Guild) -> None: + guildID = guild.id + while True: + shouldEnd = self.__playersListeners[guildID][1] + if shouldEnd: + break + + try: + command: VCommands = queue.get(timeout=5) + commandType = command.getType() + args = command.getArgs() + + print(f'Process {guild.name} sended command {commandType}') + if commandType == VCommandsType.NOW_PLAYING: + asyncio.run_coroutine_threadsafe(self.showNowPlaying( + guild.id, args), self.__bot.loop) + elif commandType == VCommandsType.TERMINATE: + # Delete the process elements and return, to finish task + self.__terminateProcess(guildID) + return + elif commandType == VCommandsType.SLEEPING: + # The process might be used again + self.__sleepingProcess(guildID) + return + else: + print(f'[ERROR] -> Unknown Command Received from Process: {commandType}') + except Empty: + continue + except Exception as e: + print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}') + + def __terminateProcess(self, guildID: int) -> None: + # Delete all structures associated with the Player + del self.__playersProcess[guildID] + del self.__playersCommandsExecutor[guildID] + threadListening = self.__playersListeners[guildID] + threadListening._stop() + del self.__playersListeners[guildID] + + def __sleepingProcess(self, guildID: int) -> None: + # Disable all process structures, except Playlist + queue1 = self.__playersProcess[guildID].getQueueToMain() + queue2 = self.__playersProcess[guildID].getQueueToPlayer() + queue1.close() + queue1.join_thread() + queue2.close() + queue2.join_thread() + # Set the status of this process as sleeping, only the playlist object remains + self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING) + + async def showNowPlaying(self, guildID: int, song: Song) -> None: + commandExecutor = self.__playersCommandsExecutor[guildID] + processInfo = self.__playersProcess[guildID] + await commandExecutor.sendNowPlaying(processInfo, song) + + +class VManager(BaseManager): + pass + + +class VProxy(NamespaceProxy): + _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') From 72043c4475f251b99ce6f3ab1534f7dc34e4d666 Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Sun, 19 Feb 2023 11:28:33 -0300 Subject: [PATCH 2/5] Continuing the refactoring --- DiscordCogs/MusicCog.py | 4 +- Handlers/AbstractHandler.py | 8 -- Handlers/ClearHandler.py | 16 +-- Handlers/HistoryHandler.py | 16 +-- Handlers/JumpMusicHandler.py | 25 ++-- Handlers/LoopHandler.py | 20 ++- Handlers/MoveHandler.py | 21 ++- Handlers/NowPlayingHandler.py | 8 +- Handlers/PauseHandler.py | 14 +- Handlers/PlayHandler.py | 58 ++++---- Handlers/PrevHandler.py | 15 +- Handlers/QueueHandler.py | 20 ++- Handlers/RemoveHandler.py | 9 +- Handlers/ResetHandler.py | 15 +- Handlers/ResumeHandler.py | 15 +- Handlers/ShuffleHandler.py | 15 +- Handlers/SkipHandler.py | 27 +--- Handlers/StopHandler.py | 15 +- Parallelism/AbstractProcessManager.py | 26 +++- Parallelism/PlayerProcess.py | 7 +- Parallelism/PlayerThread.py | 11 +- Parallelism/ProcessExecutor.py | 7 +- Parallelism/ProcessInfo.py | 51 ------- Parallelism/ProcessManager.py | 118 ++++++++++++--- Parallelism/ThreadManager.py | 199 -------------------------- Parallelism/ThreadPlayerManager.py | 131 +++++++++++++++++ 26 files changed, 369 insertions(+), 502 deletions(-) delete mode 100644 Parallelism/ProcessInfo.py delete mode 100644 Parallelism/ThreadManager.py create mode 100644 Parallelism/ThreadPlayerManager.py diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index c84293e..74a27c5 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -23,7 +23,7 @@ from Messages.Responses.EmbedCogResponse import EmbedCommandResponse from Music.VulkanBot import VulkanBot from Config.Configs import VConfigs from Config.Embeds import VEmbeds -from Parallelism.ProcessManager import ProcessManager +from Parallelism.ProcessManager import ProcessPlayerManager helper = Helper() @@ -38,7 +38,7 @@ class MusicCog(Cog): def __init__(self, bot: VulkanBot) -> None: self.__bot: VulkanBot = bot self.__embeds = VEmbeds() - VConfigs().setPlayersManager(ProcessManager(bot)) + VConfigs().setPlayersManager(ProcessPlayerManager(bot)) @command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar']) async def play(self, ctx: Context, *args) -> None: diff --git a/Handlers/AbstractHandler.py b/Handlers/AbstractHandler.py index 54b987d..db6729e 100644 --- a/Handlers/AbstractHandler.py +++ b/Handlers/AbstractHandler.py @@ -1,6 +1,4 @@ from abc import ABC, abstractmethod -from Parallelism.Commands import VCommands -from multiprocessing import Queue from typing import List, Union from discord.ext.commands import Context from discord import Client, Guild, ClientUser, Interaction, Member, User @@ -29,12 +27,6 @@ class AbstractHandler(ABC): else: self.__author = ctx.user - def putCommandInQueue(self, queue: Queue, command: VCommands) -> None: - try: - queue.put(command) - except Exception as e: - print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}') - @abstractmethod async def run(self) -> HandlerResponse: pass diff --git a/Handlers/ClearHandler.py b/Handlers/ClearHandler.py index f79d4a8..04a2025 100644 --- a/Handlers/ClearHandler.py +++ b/Handlers/ClearHandler.py @@ -5,7 +5,6 @@ from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo class ClearHandler(AbstractHandler): @@ -14,19 +13,18 @@ class ClearHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): # Clear the playlist - playlist = processInfo.getPlaylist() - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playlist = playersManager.getPlayerPlaylist(self.guild) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.clear() - processLock.release() + playerLock.release() embed = self.embeds.PLAYLIST_CLEAR() return HandlerResponse(self.ctx, embed) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/HistoryHandler.py b/Handlers/HistoryHandler.py index 0c337c3..b6889e1 100644 --- a/Handlers/HistoryHandler.py +++ b/Handlers/HistoryHandler.py @@ -14,18 +14,16 @@ class HistoryHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist = processInfo.getPlaylist() - history = playlist.getSongsHistory() - processLock.release() + history = playersManager.getPlayerPlaylist(self.guild).getSongsHistory() + playerLock.release() else: # If the player doesn't respond in time we restart it - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/JumpMusicHandler.py b/Handlers/JumpMusicHandler.py index 3ab6c95..4ad5fe6 100644 --- a/Handlers/JumpMusicHandler.py +++ b/Handlers/JumpMusicHandler.py @@ -17,32 +17,31 @@ class JumpMusicHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, musicPos: str) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: # Try to convert input to int error = self.__validateInput(musicPos) if error: embed = self.embeds.ERROR_EMBED(error.message) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) # Sanitize the input - playlist: Playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) musicPos = self.__sanitizeInput(playlist, musicPos) # Validate the position if not playlist.validate_position(musicPos): error = InvalidInput() embed = self.embeds.PLAYLIST_RANGE_ERROR() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) try: # Move the selected song @@ -50,19 +49,17 @@ class JumpMusicHandler(AbstractHandler): # Send a command to the player to skip the music command = VCommands(VCommandsType.SKIP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) + playersManager.sendCommandToPlayer(command, self.guild) - processLock.release() return HandlerResponse(self.ctx) except: - # Release the acquired Lock - processLock.release() embed = self.embeds.ERROR_MOVING() error = UnknownError() return HandlerResponse(self.ctx, embed, error) + finally: + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/LoopHandler.py b/Handlers/LoopHandler.py index 17d9ee9..06f5917 100644 --- a/Handlers/LoopHandler.py +++ b/Handlers/LoopHandler.py @@ -5,7 +5,6 @@ from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage from typing import Union from discord import Interaction - from Parallelism.AbstractProcessManager import AbstractPlayersManager @@ -14,23 +13,20 @@ class LoopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, args: str) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() - - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playlist = playersManager.getPlayerPlaylist(self.guild) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: if args == '' or args is None: playlist.loop_all() embed = self.embeds.LOOP_ALL_ACTIVATED() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed) args = args.lower() @@ -53,9 +49,9 @@ class LoopHandler(AbstractHandler): error = BadCommandUsage() embed = self.embeds.BAD_LOOP_USE() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/MoveHandler.py b/Handlers/MoveHandler.py index 27d004c..c25e94f 100644 --- a/Handlers/MoveHandler.py +++ b/Handlers/MoveHandler.py @@ -15,45 +15,44 @@ class MoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pos1: str, pos2: str) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: error = self.__validateInput(pos1, pos2) if error: embed = self.embeds.ERROR_EMBED(error.message) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) pos1, pos2 = self.__sanitizeInput(playlist, pos1, pos2) if not playlist.validate_position(pos1) or not playlist.validate_position(pos2): error = InvalidInput() embed = self.embeds.PLAYLIST_RANGE_ERROR() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) try: song = playlist.move_songs(pos1, pos2) song_name = song.title if song.title else song.identifier embed = self.embeds.SONG_MOVED(song_name, pos1, pos2) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed) except: # Release the acquired Lock - processLock.release() + playerLock.release() embed = self.embeds.ERROR_MOVING() error = UnknownError() return HandlerResponse(self.ctx, embed, error) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/NowPlayingHandler.py b/Handlers/NowPlayingHandler.py index 194314c..a8efef4 100644 --- a/Handlers/NowPlayingHandler.py +++ b/Handlers/NowPlayingHandler.py @@ -14,14 +14,12 @@ class NowPlayingHandler(AbstractHandler): self.__cleaner = Cleaner() async def run(self) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if playlist.getCurrentSong() is None: embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/PauseHandler.py b/Handlers/PauseHandler.py index 7cbda92..3447ccc 100644 --- a/Handlers/PauseHandler.py +++ b/Handlers/PauseHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.Commands import VCommands, VCommandsType -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -14,17 +13,10 @@ class PauseHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send Pause command to be execute by player process + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.PAUSE, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_PAUSED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index 5dcd58e..a8e3979 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -1,6 +1,6 @@ import asyncio import traceback -from typing import List +from typing import List, Union from Config.Exceptions import DownloadingError, InvalidInput, VulkanError from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler @@ -10,12 +10,9 @@ from Music.Downloader import Downloader from Music.Searcher import Searcher from Music.Song import Song from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -from typing import Union from discord import Interaction -from Music.Playlist import Playlist class PlayHandler(AbstractHandler): @@ -37,13 +34,12 @@ class PlayHandler(AbstractHandler): if musicsInfo is None or len(musicsInfo) == 0: raise InvalidInput(self.messages.INVALID_INPUT, self.messages.ERROR_TITLE) - # Get the process context for the current guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - playlist: Playlist = processInfo.getPlaylist() - process = processInfo.getProcess() - if not process.is_alive(): # If process has not yet started, start - process.start() + # If there is no executing player for the guild then we create the player + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): + playersManager.createPlayerForGuild(self.guild, self.ctx) + + playlist = playersManager.getPlayerPlaylist(self.guild) # Create the Songs objects songs: List[Song] = [] @@ -67,19 +63,17 @@ class PlayHandler(AbstractHandler): embed = self.embeds.SONG_ADDED_TWO(song.info, pos) response = HandlerResponse(self.ctx, embed) - # Add the unique song to the playlist and send a command to player process - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + # Add the unique song to the playlist and send a command to player + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.add_song(song) # Release the acquired Lock - processLock.release() - queue = processInfo.getQueueToPlayer() + playerLock.release() playCommand = VCommands(VCommandsType.PLAY, None) - self.putCommandInQueue(queue, playCommand) - + playersManager.sendCommandToPlayer(playCommand, self.guild) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) @@ -89,10 +83,10 @@ class PlayHandler(AbstractHandler): if len(songs) > 10: fiveFirstSongs = songs[0:5] songs = songs[5:] - await self.__downloadSongsAndStore(fiveFirstSongs, processInfo) + await self.__downloadSongsAndStore(fiveFirstSongs, playersManager) - # Trigger a task to download all songs and then store them in the process playlist - asyncio.create_task(self.__downloadSongsAndStore(songs, processInfo)) + # Trigger a task to download all songs and then store them in the playlist + asyncio.create_task(self.__downloadSongsAndStore(songs, playersManager)) embed = self.embeds.SONGS_ADDED(len(songs)) return HandlerResponse(self.ctx, embed) @@ -102,7 +96,7 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) except Exception as error: print(f'ERROR IN PLAYHANDLER -> {traceback.format_exc()}', {type(error)}) - if isinstance(error, VulkanError): # If error was already processed + if isinstance(error, VulkanError): embed = self.embeds.CUSTOM_ERROR(error) else: error = UnknownError() @@ -110,9 +104,8 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) - async def __downloadSongsAndStore(self, songs: List[Song], processInfo: PlayerInfo) -> None: - playlist = processInfo.getPlaylist() - queue = processInfo.getQueueToPlayer() + async def __downloadSongsAndStore(self, songs: List[Song], playersManager: AbstractPlayersManager) -> None: + playlist = playersManager.getPlayerPlaylist(self.guild) playCommand = VCommands(VCommandsType.PLAY, None) tooManySongs = len(songs) > 100 @@ -126,21 +119,18 @@ class PlayHandler(AbstractHandler): task = asyncio.create_task(self.__down.download_song(song)) tasks.append(task) - # In the original order, await for the task and then, if successfully downloaded, add to the playlist - processManager: AbstractPlayersManager = self.config.getPlayersManager() for index, task in enumerate(tasks): await task song = songs[index] if not song.problematic: # If downloaded add to the playlist and send play command - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.add_song(song) - self.putCommandInQueue(queue, playCommand) - processLock.release() + playersManager.sendCommandToPlayer(playCommand, self.guild) + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) def __isUserConnected(self) -> bool: if self.ctx.author.voice: diff --git a/Handlers/PrevHandler.py b/Handlers/PrevHandler.py index 13e81b6..22a0577 100644 --- a/Handlers/PrevHandler.py +++ b/Handlers/PrevHandler.py @@ -19,14 +19,13 @@ class PrevHandler(AbstractHandler): embed = self.embeds.NO_CHANNEL() return HandlerResponse(self.ctx, embed, error) - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if len(playlist.getHistory()) == 0: error = ImpossibleMove() embed = self.embeds.NOT_PREVIOUS_SONG() @@ -37,15 +36,9 @@ class PrevHandler(AbstractHandler): embed = self.embeds.FAIL_DUE_TO_LOOP_ON() return HandlerResponse(self.ctx, embed, error) - # If not started, start the player process - process = processInfo.getProcess() - if not process.is_alive(): - process.start() - # Send a prev command, together with the user voice channel prevCommand = VCommands(VCommandsType.PREV, self.author.voice.channel.id) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, prevCommand) + playersManager.sendCommandToPlayer(prevCommand, self.guild) embed = self.embeds.RETURNING_SONG() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/QueueHandler.py b/Handlers/QueueHandler.py index 5ffefad..1da5bf5 100644 --- a/Handlers/QueueHandler.py +++ b/Handlers/QueueHandler.py @@ -22,29 +22,27 @@ class QueueHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pageNumber=0) -> HandlerResponse: - # Retrieve the process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: # If no process return empty list + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.EMPTY_QUEUE() return HandlerResponse(self.ctx, embed) # Acquire the Lock to manipulate the playlist - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist: Playlist = processInfo.getPlaylist() + playlist: Playlist = playersManager.getPlayerPlaylist(self.guild) if playlist.isLoopingOne(): song = playlist.getCurrentSong() embed = self.embeds.ONE_SONG_LOOPING(song.info) - processLock.release() # Release the Lock + playerLock.release() # Release the Lock return HandlerResponse(self.ctx, embed) allSongs = playlist.getSongs() if len(allSongs) == 0: embed = self.embeds.EMPTY_QUEUE() - processLock.release() # Release the Lock + playerLock.release() # Release the Lock return HandlerResponse(self.ctx, embed) songsPages = playlist.getSongsPages() @@ -93,10 +91,10 @@ class QueueHandler(AbstractHandler): embed = self.embeds.QUEUE(title, text) # Release the acquired Lock - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, view=queueView) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/RemoveHandler.py b/Handlers/RemoveHandler.py index fd17da6..b769b6b 100644 --- a/Handlers/RemoveHandler.py +++ b/Handlers/RemoveHandler.py @@ -5,7 +5,6 @@ from Config.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, Inval from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo from typing import Union from discord import Interaction @@ -15,15 +14,13 @@ class RemoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, position: str) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if playlist is None: embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() diff --git a/Handlers/ResetHandler.py b/Handlers/ResetHandler.py index 2bc72d9..bdc3451 100644 --- a/Handlers/ResetHandler.py +++ b/Handlers/ResetHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,18 +13,10 @@ class ResetHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESET, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) return HandlerResponse(self.ctx) else: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/ResumeHandler.py b/Handlers/ResumeHandler.py index da7867b..260f448 100644 --- a/Handlers/ResumeHandler.py +++ b/Handlers/ResumeHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,18 +13,10 @@ class ResumeHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send Resume command to be execute by player process + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESUME, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_RESUMED() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/ShuffleHandler.py b/Handlers/ShuffleHandler.py index 7445e9d..ae6dfc4 100644 --- a/Handlers/ShuffleHandler.py +++ b/Handlers/ShuffleHandler.py @@ -14,19 +14,18 @@ class ShuffleHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): try: - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) playlist.shuffle() # Release the acquired Lock - processLock.release() + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/SkipHandler.py b/Handlers/SkipHandler.py index 9de6589..a740443 100644 --- a/Handlers/SkipHandler.py +++ b/Handlers/SkipHandler.py @@ -1,10 +1,8 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler -from Config.Exceptions import BadCommandUsage, ImpossibleMove from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -15,31 +13,12 @@ class SkipHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - if not self.__user_connected(): - error = ImpossibleMove() - embed = self.embeds.NO_CHANNEL() - return HandlerResponse(self.ctx, embed, error) - - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: # Verify if there is a running process - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send a command to the player process to skip the music + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.SKIP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.SKIPPING_SONG() return HandlerResponse(self.ctx, embed) else: embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) - - def __user_connected(self) -> bool: - if self.author.voice: - return True - else: - return False diff --git a/Handlers/StopHandler.py b/Handlers/StopHandler.py index dd5ea7a..a3b2d32 100644 --- a/Handlers/StopHandler.py +++ b/Handlers/StopHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -14,18 +13,10 @@ class StopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send command to player process stop + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.STOP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.STOPPING_PLAYER() return HandlerResponse(self.ctx, embed) else: diff --git a/Parallelism/AbstractProcessManager.py b/Parallelism/AbstractProcessManager.py index ecc2849..693f803 100644 --- a/Parallelism/AbstractProcessManager.py +++ b/Parallelism/AbstractProcessManager.py @@ -1,9 +1,11 @@ from abc import ABC, abstractmethod +from threading import Lock from typing import Union from discord.ext.commands import Context from discord import Guild, Interaction +from Music.Playlist import Playlist from Music.Song import Song -from Parallelism.ProcessInfo import PlayerInfo +from Parallelism.Commands import VCommands class AbstractPlayersManager(ABC): @@ -11,19 +13,33 @@ class AbstractPlayersManager(ABC): pass @abstractmethod - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + """If the forceCreation boolean is True, then the context must be provided for the Player to be created""" pass @abstractmethod - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + """If there is a player process for the guild, then return the playlist of the guild""" pass @abstractmethod - def resetProcess(self, guild: Guild, context: Context) -> None: + def getPlayerLock(self, guild: Guild) -> Lock: + """If there is a player process for the guild, then return the lock of the guild""" pass @abstractmethod - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + def verifyIfPlayerExists(self, guild: Guild) -> bool: + """Returns if a player for the guild exists""" + pass + + @abstractmethod + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None: + """With the context information of a guild create a internal player for the guild""" + pass + + @abstractmethod + def resetPlayer(self, guild: Guild, context: Context) -> None: + """Tries to reset the player of the guild""" pass @abstractmethod diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py index 7e2979a..c953a19 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/PlayerProcess.py @@ -32,7 +32,7 @@ class TimeoutClock: class PlayerProcess(Process): """Process that will play songs, receive commands from the main process by a Queue""" - def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, voiceID: int) -> None: """ Start a new process that will have his own bot instance Due to pickle serialization, no objects are stored, the values initialization are being made in the run method @@ -46,10 +46,8 @@ class PlayerProcess(Process): self.__semStopPlaying: Semaphore = None self.__loop: AbstractEventLoop = None # Discord context ID - self.__textChannelID = textID self.__guildID = guildID self.__voiceChannelID = voiceID - self.__authorID = authorID # All information of discord context will be retrieved directly with discord API self.__guild: Guild = None self.__bot: VulkanBot = None @@ -81,9 +79,6 @@ class PlayerProcess(Process): self.__bot = await self.__createBotInstance() self.__guild = self.__bot.get_guild(self.__guildID) self.__voiceChannel = self.__bot.get_channel(self.__voiceChannelID) - self.__textChannel = self.__bot.get_channel(self.__textChannelID) - self.__author = self.__bot.get_channel(self.__authorID) - self.__botMember = self.__getBotMember() # Connect to voice Channel await self.__connectToVoiceChannel() diff --git a/Parallelism/PlayerThread.py b/Parallelism/PlayerThread.py index eb26c7d..e15b48a 100644 --- a/Parallelism/PlayerThread.py +++ b/Parallelism/PlayerThread.py @@ -1,12 +1,11 @@ import asyncio -from time import sleep, time +from time import time from urllib.parse import parse_qs, urlparse -from Music.VulkanInitializer import VulkanInitializer -from discord import Member, VoiceClient -from asyncio import AbstractEventLoop, Semaphore +from discord import VoiceClient +from asyncio import AbstractEventLoop from threading import RLock, Thread from multiprocessing import Lock -from typing import Callable, List +from typing import Callable from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song @@ -32,7 +31,7 @@ class TimeoutClock: class PlayerThread(Thread): """Player Thread to control the song playback in the same Process of the Main Process""" - def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int) -> None: Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={}) # Synchronization objects self.__playlist: Playlist = playlist diff --git a/Parallelism/ProcessExecutor.py b/Parallelism/ProcessExecutor.py index a5f7843..1b2c80b 100644 --- a/Parallelism/ProcessExecutor.py +++ b/Parallelism/ProcessExecutor.py @@ -3,8 +3,8 @@ from discord import Button, TextChannel from discord.ui import View from Config.Emojis import VEmojis from Messages.MessagesCategory import MessagesCategory +from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import PlayerInfo from Config.Messages import Messages from Music.Song import Song from Config.Embeds import VEmbeds @@ -29,9 +29,9 @@ class ProcessCommandsExecutor: self.__embeds = VEmbeds() self.__emojis = VEmojis() - async def sendNowPlaying(self, processInfo: PlayerInfo, song: Song) -> None: + async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None: + print('B') # Get the lock of the playlist - playlist = processInfo.getPlaylist() if playlist.isLoopingOne(): title = self.__messages.ONE_SONG_LOOPING else: @@ -39,7 +39,6 @@ class ProcessCommandsExecutor: # Create View and Embed embed = self.__embeds.SONG_INFO(song.info, title) - channel = processInfo.getTextChannel() view = self.__getPlayerView(channel) # Send Message and add to the MessagesManager message = await channel.send(embed=embed, view=view) diff --git a/Parallelism/ProcessInfo.py b/Parallelism/ProcessInfo.py deleted file mode 100644 index d1a0d89..0000000 --- a/Parallelism/ProcessInfo.py +++ /dev/null @@ -1,51 +0,0 @@ -from enum import Enum -from multiprocessing import Process, Queue, Lock -from discord import TextChannel -from Music.Playlist import Playlist - - -class ProcessStatus(Enum): - RUNNING = 'Running' - SLEEPING = 'Sleeping' - - -class PlayerInfo: - """ - Class to store the reference to all structures to maintain a song player - """ - - def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: - self.__process = process - self.__queueToPlayer = queueToPlayer - self.__queueToMain = queueToMain - self.__playlist = playlist - self.__lock = lock - self.__textChannel = textChannel - self.__status = ProcessStatus.RUNNING - - def setProcess(self, newProcess: Process) -> None: - self.__process = newProcess - - def getStatus(self) -> ProcessStatus: - return self.__status - - def setStatus(self, status: ProcessStatus) -> None: - self.__status = status - - def getProcess(self) -> Process: - return self.__process - - def getQueueToPlayer(self) -> Queue: - return self.__queueToPlayer - - def getQueueToMain(self) -> Queue: - return self.__queueToMain - - def getPlaylist(self) -> Playlist: - return self.__playlist - - def getLock(self) -> Lock: - return self.__lock - - def getTextChannel(self) -> TextChannel: - return self.__textChannel diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py index 0664935..fd88f5b 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessManager.py @@ -1,23 +1,70 @@ import asyncio -from multiprocessing import Lock, Queue +from enum import Enum +from multiprocessing import Lock, Process, Queue from multiprocessing.managers import BaseManager, NamespaceProxy from queue import Empty from threading import Thread from typing import Dict, Tuple, Union from Config.Singleton import Singleton -from discord import Guild, Interaction +from discord import Guild, Interaction, TextChannel from discord.ext.commands import Context from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.ProcessExecutor import ProcessCommandsExecutor from Music.Song import Song from Parallelism.PlayerProcess import PlayerProcess from Music.Playlist import Playlist -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -class ProcessManager(Singleton, AbstractPlayersManager): +class ProcessStatus(Enum): + RUNNING = 'Running' + SLEEPING = 'Sleeping' + + +class PlayerProcessInfo: + """ + Class to store the reference to all structures to maintain a process player + """ + + def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + self.__process = process + self.__queueToPlayer = queueToPlayer + self.__queueToMain = queueToMain + self.__playlist = playlist + self.__lock = lock + self.__textChannel = textChannel + self.__status = ProcessStatus.RUNNING + + def setProcess(self, newProcess: Process) -> None: + self.__process = newProcess + + def getStatus(self) -> ProcessStatus: + return self.__status + + def setStatus(self, status: ProcessStatus) -> None: + self.__status = status + + def getProcess(self) -> Process: + return self.__process + + def getQueueToPlayer(self) -> Queue: + return self.__queueToPlayer + + def getQueueToMain(self) -> Queue: + return self.__queueToMain + + def getPlaylist(self) -> Playlist: + return self.__playlist + + def getLock(self) -> Lock: + return self.__lock + + def getTextChannel(self) -> TextChannel: + return self.__textChannel + + +class ProcessPlayerManager(Singleton, AbstractPlayersManager): """ Manage all running player process, creating and storing them for future calls Deals with the creation of shared memory @@ -29,28 +76,51 @@ class ProcessManager(Singleton, AbstractPlayersManager): VManager.register('Playlist', Playlist) self.__manager = VManager() self.__manager.start() - self.__playersProcess: Dict[int, PlayerInfo] = {} + self.__playersProcess: Dict[int, PlayerProcessInfo] = {} self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {} self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {} - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): - self.__playersProcess[guild.id] = info + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + if forceCreation: + processInfo = self.createPlayerForGuild(guild, context) + else: + processInfo = self.__getRunningPlayerInfo(guild) - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Return the process info for the guild, the user in context must be connected to a voice_channel""" + if processInfo == None: + return + + queue = processInfo.getQueueToPlayer() + self.__putCommandInQueue(queue, command) + + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getPlaylist() + + def getPlayerLock(self, guild: Guild) -> Lock: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getLock() + + def verifyIfPlayerExists(self, guild: Guild) -> bool: + return guild.id in self.__playersProcess.keys() + + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None: try: if guild.id not in self.__playersProcess.keys(): - self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) + self.__playersProcess[guild.id] = self.__createProcessPlayerInfo(guild, context) else: # If the process has ended create a new one if not self.__playersProcess[guild.id].getProcess().is_alive(): self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) + # Start the process + self.__playersProcess[guild.id].getProcess().start() return self.__playersProcess[guild.id] except Exception as e: print(f'[Error In GetPlayerContext] -> {e}') - def resetProcess(self, guild: Guild, context: Context) -> None: + def resetPlayer(self, guild: Guild, context: Context) -> None: """Restart a running process, already start it to return to play""" if guild.id not in self.__playersProcess.keys(): return None @@ -60,10 +130,10 @@ class ProcessManager(Singleton, AbstractPlayersManager): newProcessInfo.getProcess().start() # Start the process # Send a command to start the play again playCommand = VCommands(VCommandsType.PLAY) - newProcessInfo.getQueueToPlayer().put(playCommand) + self.__putCommandInQueue(newProcessInfo.getQueueToPlayer(), playCommand) self.__playersProcess[guild.id] = newProcessInfo - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + def __getRunningPlayerInfo(self, guild: Guild) -> PlayerProcessInfo: """Return the process info for the guild, if not, return None""" if guild.id not in self.__playersProcess.keys(): print('Process Info not found') @@ -71,7 +141,7 @@ class ProcessManager(Singleton, AbstractPlayersManager): return self.__playersProcess[guild.id] - def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: + def __createProcessPlayerInfo(self, guild: Guild, context: Context) -> PlayerProcessInfo: guildID: int = context.guild.id textID: int = context.channel.id voiceID: int = context.author.voice.channel.id @@ -82,9 +152,9 @@ class ProcessManager(Singleton, AbstractPlayersManager): queueToListen = Queue() queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + queueToListen, guildID, voiceID) + processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, @@ -111,7 +181,7 @@ class ProcessManager(Singleton, AbstractPlayersManager): except Exception as e: print(f'[ERROR STOPPING PROCESS] -> {e}') - def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerProcessInfo: """Create a new process info using previous playlist""" self.__stopPossiblyRunningProcess(guild) @@ -130,8 +200,8 @@ class ProcessManager(Singleton, AbstractPlayersManager): queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, @@ -191,10 +261,18 @@ class ProcessManager(Singleton, AbstractPlayersManager): # Set the status of this process as sleeping, only the playlist object remains self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING) + def __putCommandInQueue(self, queue: Queue, command: VCommands) -> None: + try: + queue.put(command) + except Exception as e: + print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}') + async def showNowPlaying(self, guildID: int, song: Song) -> None: commandExecutor = self.__playersCommandsExecutor[guildID] processInfo = self.__playersProcess[guildID] + print('A') await commandExecutor.sendNowPlaying(processInfo, song) + print('C') class VManager(BaseManager): diff --git a/Parallelism/ThreadManager.py b/Parallelism/ThreadManager.py deleted file mode 100644 index 2df8aff..0000000 --- a/Parallelism/ThreadManager.py +++ /dev/null @@ -1,199 +0,0 @@ -import asyncio -from multiprocessing import Lock, Queue -from multiprocessing.managers import BaseManager, NamespaceProxy -from queue import Empty -from threading import Thread -from typing import Dict, Tuple, Union -from Config.Singleton import Singleton -from discord import Guild, Interaction -from discord.ext.commands import Context -from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessExecutor import ProcessCommandsExecutor -from Music.Song import Song -from Parallelism.PlayerProcess import PlayerProcess -from Music.Playlist import Playlist -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus -from Parallelism.Commands import VCommands, VCommandsType -from Music.VulkanBot import VulkanBot - - -class ThreadManager(Singleton, AbstractPlayersManager): - """ - Manage all running player threads, creating and storing them for future calls - """ - - def __init__(self, bot: VulkanBot = None) -> None: - if not super().created: - self.__bot = bot - self.__playersProcess: Dict[int, Thread] = {} - - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): - self.__playersProcess[guild.id] = info - - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Return the process info for the guild, the user in context must be connected to a voice_channel""" - try: - if guild.id not in self.__playersProcess.keys(): - self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) - else: - # If the process has ended create a new one - if not self.__playersProcess[guild.id].getProcess().is_alive(): - self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) - - return self.__playersProcess[guild.id] - except Exception as e: - print(f'[Error In GetPlayerContext] -> {e}') - - def resetProcess(self, guild: Guild, context: Context) -> None: - """Restart a running process, already start it to return to play""" - if guild.id not in self.__playersProcess.keys(): - return None - - # Recreate the process keeping the playlist - newProcessInfo = self.__recreateProcess(guild, context) - newProcessInfo.getProcess().start() # Start the process - # Send a command to start the play again - playCommand = VCommands(VCommandsType.PLAY) - newProcessInfo.getQueueToPlayer().put(playCommand) - self.__playersProcess[guild.id] = newProcessInfo - - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: - """Return the process info for the guild, if not, return None""" - if guild.id not in self.__playersProcess.keys(): - print('Process Info not found') - return None - - return self.__playersProcess[guild.id] - - def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: - guildID: int = context.guild.id - textID: int = context.channel.id - voiceID: int = context.author.voice.channel.id - authorID: int = context.author.id - - playlist: Playlist = self.__manager.Playlist() - lock = Lock() - queueToListen = Queue() - queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) - - # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async - thread = Thread(target=self.__listenToCommands, - args=(queueToListen, guild), daemon=True) - self.__playersListeners[guildID] = (thread, False) - thread.start() - - # Create a Message Controller for this player - self.__playersCommandsExecutor[guildID] = ProcessCommandsExecutor(self.__bot, guildID) - - return processInfo - - def __stopPossiblyRunningProcess(self, guild: Guild): - try: - if guild.id in self.__playersProcess.keys(): - playerProcess = self.__playersProcess[guild.id] - process = playerProcess.getProcess() - process.close() - process.kill() - playerProcess.getQueueToMain().close() - playerProcess.getQueueToMain().join_thread() - playerProcess.getQueueToPlayer().close() - playerProcess.getQueueToPlayer().join_thread() - except Exception as e: - print(f'[ERROR STOPPING PROCESS] -> {e}') - - def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Create a new process info using previous playlist""" - self.__stopPossiblyRunningProcess(guild) - - guildID: int = context.guild.id - textID: int = context.channel.id - if isinstance(context, Interaction): - authorID: int = context.user.id - voiceID: int = context.user.voice.channel.id - else: - authorID: int = context.author.id - voiceID: int = context.author.voice.channel.id - - playlist: Playlist = self.__playersProcess[guildID].getPlaylist() - lock = Lock() - queueToListen = Queue() - queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) - - # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async - thread = Thread(target=self.__listenToCommands, - args=(queueToListen, guild), daemon=True) - self.__playersListeners[guildID] = (thread, False) - thread.start() - - return processInfo - - def __listenToCommands(self, queue: Queue, guild: Guild) -> None: - guildID = guild.id - while True: - shouldEnd = self.__playersListeners[guildID][1] - if shouldEnd: - break - - try: - command: VCommands = queue.get(timeout=5) - commandType = command.getType() - args = command.getArgs() - - print(f'Process {guild.name} sended command {commandType}') - if commandType == VCommandsType.NOW_PLAYING: - asyncio.run_coroutine_threadsafe(self.showNowPlaying( - guild.id, args), self.__bot.loop) - elif commandType == VCommandsType.TERMINATE: - # Delete the process elements and return, to finish task - self.__terminateProcess(guildID) - return - elif commandType == VCommandsType.SLEEPING: - # The process might be used again - self.__sleepingProcess(guildID) - return - else: - print(f'[ERROR] -> Unknown Command Received from Process: {commandType}') - except Empty: - continue - except Exception as e: - print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}') - - def __terminateProcess(self, guildID: int) -> None: - # Delete all structures associated with the Player - del self.__playersProcess[guildID] - del self.__playersCommandsExecutor[guildID] - threadListening = self.__playersListeners[guildID] - threadListening._stop() - del self.__playersListeners[guildID] - - def __sleepingProcess(self, guildID: int) -> None: - # Disable all process structures, except Playlist - queue1 = self.__playersProcess[guildID].getQueueToMain() - queue2 = self.__playersProcess[guildID].getQueueToPlayer() - queue1.close() - queue1.join_thread() - queue2.close() - queue2.join_thread() - # Set the status of this process as sleeping, only the playlist object remains - self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING) - - async def showNowPlaying(self, guildID: int, song: Song) -> None: - commandExecutor = self.__playersCommandsExecutor[guildID] - processInfo = self.__playersProcess[guildID] - await commandExecutor.sendNowPlaying(processInfo, song) - - -class VManager(BaseManager): - pass - - -class VProxy(NamespaceProxy): - _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') diff --git a/Parallelism/ThreadPlayerManager.py b/Parallelism/ThreadPlayerManager.py new file mode 100644 index 0000000..b23a6a2 --- /dev/null +++ b/Parallelism/ThreadPlayerManager.py @@ -0,0 +1,131 @@ +from multiprocessing import Lock +from typing import Dict, Union +from Config.Singleton import Singleton +from discord import Guild, Interaction, TextChannel +from discord.ext.commands import Context +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Music.Song import Song +from Music.Playlist import Playlist +from Parallelism.Commands import VCommands, VCommandsType +from Music.VulkanBot import VulkanBot +from Parallelism.PlayerThread import PlayerThread + + +class PlayerThreadInfo: + """ + Class to store the reference to all structures to maintain a player thread + """ + + def __init__(self, thread: PlayerThread, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + self.__thread = thread + self.__playlist = playlist + self.__lock = lock + self.__textChannel = textChannel + + def getThread(self) -> PlayerThread: + return self.__thread + + def getPlaylist(self) -> Playlist: + return self.__playlist + + def getLock(self) -> Lock: + return self.__lock + + def getTextChannel(self) -> TextChannel: + return self.__textChannel + + +class ThreadPlayerManager(Singleton, AbstractPlayersManager): + """ + Manage all running player threads, creating and storing them for future calls + """ + + def __init__(self, bot: VulkanBot = None) -> None: + if not super().created: + self.__bot = bot + self.__playersThreads: Dict[int, PlayerThreadInfo] = {} + + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + return super().sendCommandToPlayer(command, guild, forceCreation, context) + + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getPlaylist() + + def getPlayerLock(self, guild: Guild) -> Lock: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getLock() + + def verifyIfPlayerExists(self, guild: Guild) -> bool: + return guild.id in self.__playersThreads.keys() + + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]): + try: + if guild.id not in self.__playersThreads.keys(): + self.__playersThreads[guild.id] = self.__createPlayerThreadInfo(context) + else: + # If the thread has ended create a new one + if not self.__playersThreads[guild.id].getThread().is_alive(): + self.__playersThreads[guild.id] = self.__recreateThread(guild, context) + + return self.__playersThreads[guild.id] + except Exception as e: + print(f'[Error In GetPlayerContext] -> {e}') + + def resetPlayer(self, guild: Guild, context: Context) -> None: + if guild.id not in self.__playersThreads.keys(): + return None + + # Recreate the thread keeping the playlist + newPlayerInfo = self.__recreateThread(guild, context) + newPlayerInfo.getThread().start() + # Send a command to start the play again + playCommand = VCommands(VCommandsType.PLAY) + newPlayerInfo.getQueueToPlayer().put(playCommand) + self.__playersThreads[guild.id] = newPlayerInfo + + def __getRunningPlayerInfo(self, guild: Guild) -> PlayerThreadInfo: + if guild.id not in self.__playersThreads.keys(): + print('Process Info not found') + return None + + return self.__playersThreads[guild.id] + + def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> PlayerThreadInfo: + guildID: int = context.guild.id + if isinstance(context, Interaction): + voiceID: int = context.user.voice.channel.id + else: + voiceID: int = context.author.voice.channel.id + + playlist = Playlist() + lock = Lock() + player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + player.start() + + return playerInfo + + def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerThreadInfo: + self.__stopPossiblyRunningProcess(guild) + + guildID: int = context.guild.id + if isinstance(context, Interaction): + voiceID: int = context.user.voice.channel.id + else: + voiceID: int = context.author.voice.channel.id + + playlist = self.__playersThreads[guildID].getPlaylist() + lock = Lock() + player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + player.start() + + return playerInfo + + async def showNowPlaying(self, guildID: int, song: Song) -> None: + commandExecutor = self.__playersCommandsExecutor[guildID] + processInfo = self.__playersThreads[guildID] + await commandExecutor.sendNowPlaying(processInfo, song) From 7d53840f877b78302d2e42a142b21b162820d308 Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Sun, 19 Feb 2023 11:53:58 -0300 Subject: [PATCH 3/5] Fixing error --- DiscordCogs/MusicCog.py | 9 +++++++-- Parallelism/ProcessExecutor.py | 1 - .../{ProcessManager.py => ProcessPlayerManager.py} | 8 +++----- Parallelism/ThreadPlayerManager.py | 14 +++++++------- 4 files changed, 17 insertions(+), 15 deletions(-) rename Parallelism/{ProcessManager.py => ProcessPlayerManager.py} (98%) diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index 74a27c5..05d5d10 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -23,7 +23,8 @@ from Messages.Responses.EmbedCogResponse import EmbedCommandResponse from Music.VulkanBot import VulkanBot from Config.Configs import VConfigs from Config.Embeds import VEmbeds -from Parallelism.ProcessManager import ProcessPlayerManager +from Parallelism.ProcessPlayerManager import ProcessPlayerManager +from Parallelism.ThreadPlayerManager import ThreadPlayerManager helper = Helper() @@ -38,7 +39,11 @@ class MusicCog(Cog): def __init__(self, bot: VulkanBot) -> None: self.__bot: VulkanBot = bot self.__embeds = VEmbeds() - VConfigs().setPlayersManager(ProcessPlayerManager(bot)) + configs = VConfigs() + if configs.SHOULD_AUTO_DISCONNECT_WHEN_ALONE: + configs.setPlayersManager(ProcessPlayerManager(bot)) + else: + configs.setPlayersManager(ThreadPlayerManager(bot)) @command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar']) async def play(self, ctx: Context, *args) -> None: diff --git a/Parallelism/ProcessExecutor.py b/Parallelism/ProcessExecutor.py index 1b2c80b..8d8f007 100644 --- a/Parallelism/ProcessExecutor.py +++ b/Parallelism/ProcessExecutor.py @@ -30,7 +30,6 @@ class ProcessCommandsExecutor: self.__emojis = VEmojis() async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None: - print('B') # Get the lock of the playlist if playlist.isLoopingOne(): title = self.__messages.ONE_SONG_LOOPING diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessPlayerManager.py similarity index 98% rename from Parallelism/ProcessManager.py rename to Parallelism/ProcessPlayerManager.py index fd88f5b..6f302cc 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessPlayerManager.py @@ -143,9 +143,7 @@ class ProcessPlayerManager(Singleton, AbstractPlayersManager): def __createProcessPlayerInfo(self, guild: Guild, context: Context) -> PlayerProcessInfo: guildID: int = context.guild.id - textID: int = context.channel.id voiceID: int = context.author.voice.channel.id - authorID: int = context.author.id playlist: Playlist = self.__manager.Playlist() lock = Lock() @@ -270,9 +268,9 @@ class ProcessPlayerManager(Singleton, AbstractPlayersManager): async def showNowPlaying(self, guildID: int, song: Song) -> None: commandExecutor = self.__playersCommandsExecutor[guildID] processInfo = self.__playersProcess[guildID] - print('A') - await commandExecutor.sendNowPlaying(processInfo, song) - print('C') + playlist = processInfo.getPlaylist() + channel = processInfo.getTextChannel() + await commandExecutor.sendNowPlaying(playlist, channel, song) class VManager(BaseManager): diff --git a/Parallelism/ThreadPlayerManager.py b/Parallelism/ThreadPlayerManager.py index b23a6a2..aaabec3 100644 --- a/Parallelism/ThreadPlayerManager.py +++ b/Parallelism/ThreadPlayerManager.py @@ -11,7 +11,7 @@ from Music.VulkanBot import VulkanBot from Parallelism.PlayerThread import PlayerThread -class PlayerThreadInfo: +class ThreadPlayerInfo: """ Class to store the reference to all structures to maintain a player thread """ @@ -43,7 +43,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): def __init__(self, bot: VulkanBot = None) -> None: if not super().created: self.__bot = bot - self.__playersThreads: Dict[int, PlayerThreadInfo] = {} + self.__playersThreads: Dict[int, ThreadPlayerInfo] = {} def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): return super().sendCommandToPlayer(command, guild, forceCreation, context) @@ -86,14 +86,14 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): newPlayerInfo.getQueueToPlayer().put(playCommand) self.__playersThreads[guild.id] = newPlayerInfo - def __getRunningPlayerInfo(self, guild: Guild) -> PlayerThreadInfo: + def __getRunningPlayerInfo(self, guild: Guild) -> ThreadPlayerInfo: if guild.id not in self.__playersThreads.keys(): print('Process Info not found') return None return self.__playersThreads[guild.id] - def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> PlayerThreadInfo: + def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> ThreadPlayerInfo: guildID: int = context.guild.id if isinstance(context, Interaction): voiceID: int = context.user.voice.channel.id @@ -103,12 +103,12 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): playlist = Playlist() lock = Lock() player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) - playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) player.start() return playerInfo - def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerThreadInfo: + def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> ThreadPlayerInfo: self.__stopPossiblyRunningProcess(guild) guildID: int = context.guild.id @@ -120,7 +120,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): playlist = self.__playersThreads[guildID].getPlaylist() lock = Lock() player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) - playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) player.start() return playerInfo From 1f45b64a620df6486608dd57a5c4e2c26dba1a29 Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Sun, 19 Feb 2023 13:40:37 -0300 Subject: [PATCH 4/5] Sending more code --- DiscordCogs/MusicCog.py | 2 +- Handlers/JumpMusicHandler.py | 2 +- Handlers/PauseHandler.py | 2 +- Handlers/PlayHandler.py | 4 +- Handlers/PrevHandler.py | 2 +- Handlers/ResetHandler.py | 2 +- Handlers/ResumeHandler.py | 2 +- Handlers/SkipHandler.py | 2 +- Handlers/StopHandler.py | 2 +- Parallelism/AbstractProcessManager.py | 2 +- .../{PlayerProcess.py => ProcessPlayer.py} | 12 ++--- Parallelism/ProcessPlayerManager.py | 12 ++--- .../{PlayerThread.py => ThreadPlayer.py} | 24 +++++----- Parallelism/ThreadPlayerManager.py | 48 +++++++++++++++---- 14 files changed, 69 insertions(+), 49 deletions(-) rename Parallelism/{PlayerProcess.py => ProcessPlayer.py} (98%) rename Parallelism/{PlayerThread.py => ThreadPlayer.py} (95%) diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index 05d5d10..5d6709b 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -40,7 +40,7 @@ class MusicCog(Cog): self.__bot: VulkanBot = bot self.__embeds = VEmbeds() configs = VConfigs() - if configs.SHOULD_AUTO_DISCONNECT_WHEN_ALONE: + if configs.SONG_PLAYBACK_IN_SEPARATE_PROCESS: configs.setPlayersManager(ProcessPlayerManager(bot)) else: configs.setPlayersManager(ThreadPlayerManager(bot)) diff --git a/Handlers/JumpMusicHandler.py b/Handlers/JumpMusicHandler.py index 4ad5fe6..2210eee 100644 --- a/Handlers/JumpMusicHandler.py +++ b/Handlers/JumpMusicHandler.py @@ -49,7 +49,7 @@ class JumpMusicHandler(AbstractHandler): # Send a command to the player to skip the music command = VCommands(VCommandsType.SKIP, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) return HandlerResponse(self.ctx) except: diff --git a/Handlers/PauseHandler.py b/Handlers/PauseHandler.py index 3447ccc..681a48e 100644 --- a/Handlers/PauseHandler.py +++ b/Handlers/PauseHandler.py @@ -16,7 +16,7 @@ class PauseHandler(AbstractHandler): playersManager: AbstractPlayersManager = self.config.getPlayersManager() if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.PAUSE, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_PAUSED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index a8e3979..2aae0d3 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -71,7 +71,7 @@ class PlayHandler(AbstractHandler): # Release the acquired Lock playerLock.release() playCommand = VCommands(VCommandsType.PLAY, None) - playersManager.sendCommandToPlayer(playCommand, self.guild) + await playersManager.sendCommandToPlayer(playCommand, self.guild) else: playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() @@ -127,7 +127,7 @@ class PlayHandler(AbstractHandler): acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.add_song(song) - playersManager.sendCommandToPlayer(playCommand, self.guild) + await playersManager.sendCommandToPlayer(playCommand, self.guild) playerLock.release() else: playersManager.resetPlayer(self.guild, self.ctx) diff --git a/Handlers/PrevHandler.py b/Handlers/PrevHandler.py index 22a0577..09c5b14 100644 --- a/Handlers/PrevHandler.py +++ b/Handlers/PrevHandler.py @@ -38,7 +38,7 @@ class PrevHandler(AbstractHandler): # Send a prev command, together with the user voice channel prevCommand = VCommands(VCommandsType.PREV, self.author.voice.channel.id) - playersManager.sendCommandToPlayer(prevCommand, self.guild) + await playersManager.sendCommandToPlayer(prevCommand, self.guild) embed = self.embeds.RETURNING_SONG() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/ResetHandler.py b/Handlers/ResetHandler.py index bdc3451..4cf8eb7 100644 --- a/Handlers/ResetHandler.py +++ b/Handlers/ResetHandler.py @@ -16,7 +16,7 @@ class ResetHandler(AbstractHandler): playersManager: AbstractPlayersManager = self.config.getPlayersManager() if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESET, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) return HandlerResponse(self.ctx) else: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/ResumeHandler.py b/Handlers/ResumeHandler.py index 260f448..5a187e9 100644 --- a/Handlers/ResumeHandler.py +++ b/Handlers/ResumeHandler.py @@ -16,7 +16,7 @@ class ResumeHandler(AbstractHandler): playersManager: AbstractPlayersManager = self.config.getPlayersManager() if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESUME, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_RESUMED() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/SkipHandler.py b/Handlers/SkipHandler.py index a740443..6400fc3 100644 --- a/Handlers/SkipHandler.py +++ b/Handlers/SkipHandler.py @@ -16,7 +16,7 @@ class SkipHandler(AbstractHandler): playersManager: AbstractPlayersManager = self.config.getPlayersManager() if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.SKIP, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.SKIPPING_SONG() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/StopHandler.py b/Handlers/StopHandler.py index a3b2d32..07972dc 100644 --- a/Handlers/StopHandler.py +++ b/Handlers/StopHandler.py @@ -16,7 +16,7 @@ class StopHandler(AbstractHandler): playersManager: AbstractPlayersManager = self.config.getPlayersManager() if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.STOP, None) - playersManager.sendCommandToPlayer(command, self.guild) + await playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.STOPPING_PLAYER() return HandlerResponse(self.ctx, embed) else: diff --git a/Parallelism/AbstractProcessManager.py b/Parallelism/AbstractProcessManager.py index 693f803..edcd432 100644 --- a/Parallelism/AbstractProcessManager.py +++ b/Parallelism/AbstractProcessManager.py @@ -13,7 +13,7 @@ class AbstractPlayersManager(ABC): pass @abstractmethod - def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): """If the forceCreation boolean is True, then the context must be provided for the Player to be created""" pass diff --git a/Parallelism/PlayerProcess.py b/Parallelism/ProcessPlayer.py similarity index 98% rename from Parallelism/PlayerProcess.py rename to Parallelism/ProcessPlayer.py index c953a19..a1ac5f2 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/ProcessPlayer.py @@ -2,11 +2,11 @@ import asyncio from time import sleep, time from urllib.parse import parse_qs, urlparse from Music.VulkanInitializer import VulkanInitializer -from discord import Member, VoiceClient +from discord import VoiceClient from asyncio import AbstractEventLoop, Semaphore, Queue from multiprocessing import Process, RLock, Lock, Queue from threading import Thread -from typing import Callable, List +from typing import Callable from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song @@ -29,7 +29,7 @@ class TimeoutClock: self.__task.cancel() -class PlayerProcess(Process): +class ProcessPlayer(Process): """Process that will play songs, receive commands from the main process by a Queue""" def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, voiceID: int) -> None: @@ -421,9 +421,3 @@ class PlayerProcess(Process): except Exception as e: print(f'[ERROR CONNECTING TO VC] -> {e}') return False - - def __getBotMember(self) -> Member: - guild_members: List[Member] = self.__guild.members - for member in guild_members: - if member.id == self.__bot.user.id: - return member diff --git a/Parallelism/ProcessPlayerManager.py b/Parallelism/ProcessPlayerManager.py index 6f302cc..50bcaa0 100644 --- a/Parallelism/ProcessPlayerManager.py +++ b/Parallelism/ProcessPlayerManager.py @@ -6,12 +6,12 @@ from queue import Empty from threading import Thread from typing import Dict, Tuple, Union from Config.Singleton import Singleton -from discord import Guild, Interaction, TextChannel +from discord import Guild, Interaction, TextChannel, VoiceChannel from discord.ext.commands import Context from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.ProcessExecutor import ProcessCommandsExecutor from Music.Song import Song -from Parallelism.PlayerProcess import PlayerProcess +from Parallelism.ProcessPlayer import ProcessPlayer from Music.Playlist import Playlist from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot @@ -74,18 +74,18 @@ class ProcessPlayerManager(Singleton, AbstractPlayersManager): if not super().created: self.__bot = bot VManager.register('Playlist', Playlist) + VManager.register('VoiceChannel', VoiceChannel) self.__manager = VManager() self.__manager.start() self.__playersProcess: Dict[int, PlayerProcessInfo] = {} self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {} self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {} - def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): if forceCreation: processInfo = self.createPlayerForGuild(guild, context) else: processInfo = self.__getRunningPlayerInfo(guild) - if processInfo == None: return @@ -149,7 +149,7 @@ class ProcessPlayerManager(Singleton, AbstractPlayersManager): lock = Lock() queueToListen = Queue() queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + process = ProcessPlayer(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, voiceID) processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, playlist, lock, context.channel) @@ -196,7 +196,7 @@ class ProcessPlayerManager(Singleton, AbstractPlayersManager): lock = Lock() queueToListen = Queue() queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + process = ProcessPlayer(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, textID, voiceID, authorID) processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, playlist, lock, context.channel) diff --git a/Parallelism/PlayerThread.py b/Parallelism/ThreadPlayer.py similarity index 95% rename from Parallelism/PlayerThread.py rename to Parallelism/ThreadPlayer.py index e15b48a..3111def 100644 --- a/Parallelism/PlayerThread.py +++ b/Parallelism/ThreadPlayer.py @@ -5,7 +5,7 @@ from discord import VoiceClient from asyncio import AbstractEventLoop from threading import RLock, Thread from multiprocessing import Lock -from typing import Callable +from typing import Callable, Coroutine from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song @@ -28,10 +28,10 @@ class TimeoutClock: self.__task.cancel() -class PlayerThread(Thread): +class ThreadPlayer(Thread): """Player Thread to control the song playback in the same Process of the Main Process""" - def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int) -> None: + def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int, callbackToSendCommand: Callable, exitCB: Callable) -> None: Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={}) # Synchronization objects self.__playlist: Playlist = playlist @@ -39,14 +39,15 @@ class PlayerThread(Thread): self.__loop: AbstractEventLoop = None self.__playerLock: RLock = RLock() # Discord context ID - self.__guildID = guildID self.__voiceChannelID = voiceID self.__guild: Guild = guild - self.__bot: VulkanBot = bot self.__voiceChannel: VoiceChannel = voiceChannel self.__voiceClient: VoiceClient = None self.__downloader = Downloader() + self.__callback = callbackToSendCommand + self.__exitCB = exitCB + self.__bot = bot self.__playing = False self.__forceStop = False @@ -57,8 +58,7 @@ class PlayerThread(Thread): """This method is called automatically when the Thread starts""" try: print(f'Starting Player Thread for Guild {self.name}') - self.__loop = asyncio.get_event_loop_policy().new_event_loop() - asyncio.set_event_loop(self.__loop) + self.__loop = self.__bot.loop self.__loop.run_until_complete(self._run()) except Exception as e: @@ -89,6 +89,7 @@ class PlayerThread(Thread): song = self.__playlist.next_song() if song is not None: + print('Criando song') self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') self.__playing = True @@ -131,7 +132,7 @@ class PlayerThread(Thread): self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song) - self.__queueSend.put(nowPlayingCommand) + await self.__callback(nowPlayingCommand, self.__guild.id, song) except Exception as e: print(f'[ERROR IN PLAY SONG FUNCTION] -> {e}, {type(e)}') self.__playNext(None) @@ -155,11 +156,8 @@ class PlayerThread(Thread): self.__playlist.loop_off() self.__songPlaying = None self.__playing = False - # Send a command to the main process put this one to sleep - sleepCommand = VCommands(VCommandsType.SLEEPING) - self.__queueSend.put(sleepCommand) - # Release the semaphore to finish the process - self.__semStopPlaying.release() + # Send a command to the main process to kill this thread + self.__exitCB(self.__guild.id) def __verifyIfSongAvailable(self, song: Song) -> bool: """Verify the song source to see if it's already expired""" diff --git a/Parallelism/ThreadPlayerManager.py b/Parallelism/ThreadPlayerManager.py index aaabec3..2e8a13c 100644 --- a/Parallelism/ThreadPlayerManager.py +++ b/Parallelism/ThreadPlayerManager.py @@ -1,5 +1,5 @@ from multiprocessing import Lock -from typing import Dict, Union +from typing import Any, Dict, Union from Config.Singleton import Singleton from discord import Guild, Interaction, TextChannel from discord.ext.commands import Context @@ -8,7 +8,7 @@ from Music.Song import Song from Music.Playlist import Playlist from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -from Parallelism.PlayerThread import PlayerThread +from Parallelism.ThreadPlayer import ThreadPlayer class ThreadPlayerInfo: @@ -16,13 +16,13 @@ class ThreadPlayerInfo: Class to store the reference to all structures to maintain a player thread """ - def __init__(self, thread: PlayerThread, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + def __init__(self, thread: ThreadPlayer, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: self.__thread = thread self.__playlist = playlist self.__lock = lock self.__textChannel = textChannel - def getThread(self) -> PlayerThread: + def getPlayer(self) -> ThreadPlayer: return self.__thread def getPlaylist(self) -> Playlist: @@ -45,8 +45,23 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): self.__bot = bot self.__playersThreads: Dict[int, ThreadPlayerInfo] = {} - def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): - return super().sendCommandToPlayer(command, guild, forceCreation, context) + async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + playerInfo = self.__playersThreads[guild.id] + player = playerInfo.getPlayer() + if player is None and forceCreation: + self.__createPlayerThreadInfo(context) + if player is None: + return + + await player.receiveCommand(command) + + async def __receiveCommand(self, command: VCommands, guildID: int, args: Any) -> None: + commandType = command.getType() + if commandType == VCommandsType.NOW_PLAYING: + await self.showNowPlaying(guildID, args) + else: + print( + f'[ERROR] -> Command not processable received from Thread {guildID}: {commandType}') def getPlayerPlaylist(self, guild: Guild) -> Playlist: playerInfo = self.__getRunningPlayerInfo(guild) @@ -67,7 +82,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): self.__playersThreads[guild.id] = self.__createPlayerThreadInfo(context) else: # If the thread has ended create a new one - if not self.__playersThreads[guild.id].getThread().is_alive(): + if not self.__playersThreads[guild.id].getPlayer().is_alive(): self.__playersThreads[guild.id] = self.__recreateThread(guild, context) return self.__playersThreads[guild.id] @@ -80,7 +95,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): # Recreate the thread keeping the playlist newPlayerInfo = self.__recreateThread(guild, context) - newPlayerInfo.getThread().start() + newPlayerInfo.getPlayer().start() # Send a command to start the play again playCommand = VCommands(VCommandsType.PLAY) newPlayerInfo.getQueueToPlayer().put(playCommand) @@ -100,14 +115,25 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): else: voiceID: int = context.author.voice.channel.id + voiceChannel = self.__bot.get_channel(voiceID) + playlist = Playlist() lock = Lock() - player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + player = ThreadPlayer(self.__bot, context.guild, context.guild.name, + voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread) playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) player.start() return playerInfo + def __deleteThread(self, guildID: int) -> None: + """Tries to delete the thread and removes all the references to it""" + playerInfo = self.__playersThreads[guildID] + if playerInfo: + thread = playerInfo.getPlayer() + del thread + self.__playersThreads.popitem(thread) + def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> ThreadPlayerInfo: self.__stopPossiblyRunningProcess(guild) @@ -116,10 +142,12 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): voiceID: int = context.user.voice.channel.id else: voiceID: int = context.author.voice.channel.id + voiceChannel = self.__bot.get_channel(voiceID) playlist = self.__playersThreads[guildID].getPlaylist() lock = Lock() - player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + player = ThreadPlayer(self.__bot, context.guild, context.guild.name, + voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread) playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) player.start() From a34a6a78d7adc65bb3e7b5ed35321f231b20930d Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Mon, 20 Feb 2023 01:52:59 -0300 Subject: [PATCH 5/5] Finishing to add the ThreadPlayer to the Bot, allowing the user to switch between the versions --- Config/Configs.py | 6 ++- Handlers/PlayHandler.py | 63 +++++++++++++++--------------- Parallelism/ProcessExecutor.py | 57 ++++++++++++++++++++++++++- Parallelism/ThreadPlayer.py | 40 ++++++------------- Parallelism/ThreadPlayerManager.py | 36 +++++++++-------- 5 files changed, 125 insertions(+), 77 deletions(-) diff --git a/Config/Configs.py b/Config/Configs.py index 363e5e1..682d251 100644 --- a/Config/Configs.py +++ b/Config/Configs.py @@ -11,7 +11,11 @@ class VConfigs(Singleton): self.SHOULD_AUTO_DISCONNECT_WHEN_ALONE = False # Recommended to be True, except in cases when your Bot is present in thousands servers, in that case # the delay to start a new Python process for the playback is too much, and to avoid that you set as False - self.SONG_PLAYBACK_IN_SEPARATE_PROCESS = False + # This feature is for now in testing period, for a more stable version, keep this boolean = True + self.SONG_PLAYBACK_IN_SEPARATE_PROCESS = True + # Maximum of songs that will be downloaded at once, the higher this number is, the faster the songs will be all available + # but the slower will be the others commands of the Bot during the downloading time, for example, the playback quality + self.MAX_DOWNLOAD_SONGS_AT_A_TIME = 5 self.BOT_PREFIX = '!' try: diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index 2aae0d3..1ee3d77 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -79,14 +79,8 @@ class PlayHandler(AbstractHandler): return response else: # If multiple songs added - # If more than 10 songs, download and load the first 5 to start the play right away - if len(songs) > 10: - fiveFirstSongs = songs[0:5] - songs = songs[5:] - await self.__downloadSongsAndStore(fiveFirstSongs, playersManager) - # Trigger a task to download all songs and then store them in the playlist - asyncio.create_task(self.__downloadSongsAndStore(songs, playersManager)) + asyncio.create_task(self.__downloadSongsInLots(songs, playersManager)) embed = self.embeds.SONGS_ADDED(len(songs)) return HandlerResponse(self.ctx, embed) @@ -95,7 +89,7 @@ class PlayHandler(AbstractHandler): embed = self.embeds.DOWNLOADING_ERROR() return HandlerResponse(self.ctx, embed, error) except Exception as error: - print(f'ERROR IN PLAYHANDLER -> {traceback.format_exc()}', {type(error)}) + print(f'[ERROR IN PLAYHANDLER] -> {traceback.format_exc()}', {type(error)}) if isinstance(error, VulkanError): embed = self.embeds.CUSTOM_ERROR(error) else: @@ -104,33 +98,40 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) - async def __downloadSongsAndStore(self, songs: List[Song], playersManager: AbstractPlayersManager) -> None: + async def __downloadSongsInLots(self, songs: List[Song], playersManager: AbstractPlayersManager) -> None: + """ + To avoid having a lot of tasks delaying the song playback we will lock the maximum songs downloading at a time + """ playlist = playersManager.getPlayerPlaylist(self.guild) playCommand = VCommands(VCommandsType.PLAY, None) - tooManySongs = len(songs) > 100 + maxDownloads = self.config.MAX_DOWNLOAD_SONGS_AT_A_TIME - # Trigger a task for each song to be downloaded - tasks: List[asyncio.Task] = [] - for index, song in enumerate(songs): - # If there is a lot of songs being downloaded, force a sleep to try resolve the Http Error 429 "To Many Requests" - # Trying to fix the issue https://github.com/RafaelSolVargas/Vulkan/issues/32 - if tooManySongs and index % 3 == 0: - await asyncio.sleep(0.5) - task = asyncio.create_task(self.__down.download_song(song)) - tasks.append(task) + while len(songs) > 0: + # Verify how many songs will be downloaded in this lot and extract from the songs list + songsQuant = min(maxDownloads, len(songs)) + # Get the first quantInLot songs + songsInLot = songs[:songsQuant] + # Remove the first quantInLot songs from the songs + songs = songs[songsQuant:] - for index, task in enumerate(tasks): - await task - song = songs[index] - if not song.problematic: # If downloaded add to the playlist and send play command - playerLock = playersManager.getPlayerLock(self.guild) - acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) - if acquired: - playlist.add_song(song) - await playersManager.sendCommandToPlayer(playCommand, self.guild) - playerLock.release() - else: - playersManager.resetPlayer(self.guild, self.ctx) + # Create task to download the songs in the lot + tasks: List[asyncio.Task] = [] + for index, song in enumerate(songsInLot): + task = asyncio.create_task(self.__down.download_song(song)) + tasks.append(task) + + for index, task, in enumerate(tasks): + await task + song = songsInLot[index] + if not song.problematic: # If downloaded add to the playlist and send play command + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + if acquired: + playlist.add_song(song) + await playersManager.sendCommandToPlayer(playCommand, self.guild) + playerLock.release() + else: + playersManager.resetPlayer(self.guild, self.ctx) def __isUserConnected(self) -> bool: if self.ctx.author.voice: diff --git a/Parallelism/ProcessExecutor.py b/Parallelism/ProcessExecutor.py index 8d8f007..4979bdd 100644 --- a/Parallelism/ProcessExecutor.py +++ b/Parallelism/ProcessExecutor.py @@ -1,5 +1,5 @@ from typing import List -from discord import Button, TextChannel +from discord import Button, Guild, TextChannel from discord.ui import View from Config.Emojis import VEmojis from Messages.MessagesCategory import MessagesCategory @@ -21,6 +21,11 @@ from Handlers.QueueHandler import QueueHandler class ProcessCommandsExecutor: + MESSAGES = Messages() + EMBEDS = VEmbeds() + EMOJIS = VEmojis() + MSG_MANAGER = MessagesManager() + def __init__(self, bot: VulkanBot, guildID: int) -> None: self.__bot = bot self.__guildID = guildID @@ -29,6 +34,56 @@ class ProcessCommandsExecutor: self.__embeds = VEmbeds() self.__emojis = VEmojis() + @classmethod + async def sendNowPlayingToGuild(cls, bot: VulkanBot, playlist: Playlist, channel: TextChannel, song: Song, guild: Guild) -> None: + # Get the lock of the playlist + if playlist.isLoopingOne(): + title = cls.MESSAGES.ONE_SONG_LOOPING + else: + title = cls.MESSAGES.SONG_PLAYING + + # Create View and Embed + embed = cls.EMBEDS.SONG_INFO(song.info, title) + view = cls.__getPlayerViewForGuild(channel, guild.id, bot) + # Send Message and add to the MessagesManager + message = await channel.send(embed=embed, view=view) + await cls.MSG_MANAGER.addMessageAndClearPrevious(guild.id, MessagesCategory.NOW_PLAYING, message, view) + + # Set in the view the message witch contains the view + view.set_message(message=message) + + @classmethod + def __getPlayerViewForGuild(cls, channel: TextChannel, guildID: int, bot: VulkanBot) -> View: + buttons = cls.__getPlayerButtonsForGuild(channel, guildID, bot) + view = BasicView(bot, buttons) + return view + + @classmethod + def __getPlayerButtonsForGuild(cls, textChannel: TextChannel, guildID: int, bot: VulkanBot) -> List[Button]: + """Create the Buttons to be inserted in the Player View""" + buttons: List[Button] = [] + + buttons.append(HandlerButton(bot, PrevHandler, cls.EMOJIS.BACK, + textChannel, guildID, MessagesCategory.PLAYER, "Back")) + buttons.append(HandlerButton(bot, PauseHandler, cls.EMOJIS.PAUSE, + textChannel, guildID, MessagesCategory.PLAYER, "Pause")) + buttons.append(HandlerButton(bot, ResumeHandler, cls.EMOJIS.PLAY, + textChannel, guildID, MessagesCategory.PLAYER, "Play")) + buttons.append(HandlerButton(bot, StopHandler, cls.EMOJIS.STOP, + textChannel, guildID, MessagesCategory.PLAYER, "Stop")) + buttons.append(HandlerButton(bot, SkipHandler, cls.EMOJIS.SKIP, + textChannel, guildID, MessagesCategory.PLAYER, "Skip")) + buttons.append(HandlerButton(bot, QueueHandler, cls.EMOJIS.QUEUE, + textChannel, guildID, MessagesCategory.QUEUE, "Songs")) + buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_ONE, + textChannel, guildID, MessagesCategory.LOOP, "Loop One", 'One')) + buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_OFF, + textChannel, guildID, MessagesCategory.LOOP, "Loop Off", 'Off')) + buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_ALL, + textChannel, guildID, MessagesCategory.LOOP, "Loop All", 'All')) + + return buttons + async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None: # Get the lock of the playlist if playlist.isLoopingOne(): diff --git a/Parallelism/ThreadPlayer.py b/Parallelism/ThreadPlayer.py index 3111def..3358f9b 100644 --- a/Parallelism/ThreadPlayer.py +++ b/Parallelism/ThreadPlayer.py @@ -5,7 +5,7 @@ from discord import VoiceClient from asyncio import AbstractEventLoop from threading import RLock, Thread from multiprocessing import Lock -from typing import Callable, Coroutine +from typing import Callable from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song @@ -33,10 +33,11 @@ class ThreadPlayer(Thread): def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int, callbackToSendCommand: Callable, exitCB: Callable) -> None: Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={}) + print(f'Starting Player Thread for Guild {self.name}') # Synchronization objects self.__playlist: Playlist = playlist self.__playlistLock: Lock = lock - self.__loop: AbstractEventLoop = None + self.__loop: AbstractEventLoop = bot.loop self.__playerLock: RLock = RLock() # Discord context ID self.__voiceChannelID = voiceID @@ -48,30 +49,13 @@ class ThreadPlayer(Thread): self.__callback = callbackToSendCommand self.__exitCB = exitCB self.__bot = bot + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) self.__playing = False self.__forceStop = False self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'} - def run(self) -> None: - """This method is called automatically when the Thread starts""" - try: - print(f'Starting Player Thread for Guild {self.name}') - self.__loop = self.__bot.loop - self.__loop.run_until_complete(self._run()) - - except Exception as e: - print(f'[Error in Process {self.name}] -> {e}') - - async def _run(self) -> None: - # Connect to voice Channel - await self.__connectToVoiceChannel() - # Start the timeout function - self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) - # Start a Task to play songs - self.__loop.create_task(self.__playPlaylistSongs()) - def __verifyIfIsPlaying(self) -> bool: if self.__voiceClient is None: return False @@ -89,8 +73,7 @@ class ThreadPlayer(Thread): song = self.__playlist.next_song() if song is not None: - print('Criando song') - self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') + await self.__playSong(song) self.__playing = True async def __playSong(self, song: Song) -> None: @@ -132,7 +115,7 @@ class ThreadPlayer(Thread): self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song) - await self.__callback(nowPlayingCommand, self.__guild.id, song) + await self.__callback(nowPlayingCommand, self.__guild, song) except Exception as e: print(f'[ERROR IN PLAY SONG FUNCTION] -> {e}, {type(e)}') self.__playNext(None) @@ -157,7 +140,8 @@ class ThreadPlayer(Thread): self.__songPlaying = None self.__playing = False # Send a command to the main process to kill this thread - self.__exitCB(self.__guild.id) + + self.__exitCB(self.__guild) def __verifyIfSongAvailable(self, song: Song) -> bool: """Verify the song source to see if it's already expired""" @@ -214,12 +198,12 @@ class ThreadPlayer(Thread): self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') async def receiveCommand(self, command: VCommands) -> None: - type = command.getType() - args = command.getArgs() - print(f'Player Thread {self.__guild.name} received command {type}') - try: self.__playerLock.acquire() + type = command.getType() + args = command.getArgs() + # print(f'Player Thread {self.__guild.name} received command {type}') + if type == VCommandsType.PAUSE: self.__pause() elif type == VCommandsType.RESUME: diff --git a/Parallelism/ThreadPlayerManager.py b/Parallelism/ThreadPlayerManager.py index 2e8a13c..f90e145 100644 --- a/Parallelism/ThreadPlayerManager.py +++ b/Parallelism/ThreadPlayerManager.py @@ -1,4 +1,4 @@ -from multiprocessing import Lock +from threading import RLock from typing import Any, Dict, Union from Config.Singleton import Singleton from discord import Guild, Interaction, TextChannel @@ -8,6 +8,7 @@ from Music.Song import Song from Music.Playlist import Playlist from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot +from Parallelism.ProcessExecutor import ProcessCommandsExecutor from Parallelism.ThreadPlayer import ThreadPlayer @@ -16,7 +17,7 @@ class ThreadPlayerInfo: Class to store the reference to all structures to maintain a player thread """ - def __init__(self, thread: ThreadPlayer, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + def __init__(self, thread: ThreadPlayer, playlist: Playlist, lock: RLock, textChannel: TextChannel) -> None: self.__thread = thread self.__playlist = playlist self.__lock = lock @@ -28,7 +29,7 @@ class ThreadPlayerInfo: def getPlaylist(self) -> Playlist: return self.__playlist - def getLock(self) -> Lock: + def getLock(self) -> RLock: return self.__lock def getTextChannel(self) -> TextChannel: @@ -55,20 +56,20 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): await player.receiveCommand(command) - async def __receiveCommand(self, command: VCommands, guildID: int, args: Any) -> None: + async def __receiveCommand(self, command: VCommands, guild: Guild, args: Any) -> None: commandType = command.getType() if commandType == VCommandsType.NOW_PLAYING: - await self.showNowPlaying(guildID, args) + await self.showNowPlaying(guild, args) else: print( - f'[ERROR] -> Command not processable received from Thread {guildID}: {commandType}') + f'[ERROR] -> Command not processable received from Thread {guild.name}: {commandType}') def getPlayerPlaylist(self, guild: Guild) -> Playlist: playerInfo = self.__getRunningPlayerInfo(guild) if playerInfo: return playerInfo.getPlaylist() - def getPlayerLock(self, guild: Guild) -> Lock: + def getPlayerLock(self, guild: Guild) -> RLock: playerInfo = self.__getRunningPlayerInfo(guild) if playerInfo: return playerInfo.getLock() @@ -118,7 +119,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): voiceChannel = self.__bot.get_channel(voiceID) playlist = Playlist() - lock = Lock() + lock = RLock() player = ThreadPlayer(self.__bot, context.guild, context.guild.name, voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread) playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) @@ -126,13 +127,14 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): return playerInfo - def __deleteThread(self, guildID: int) -> None: + def __deleteThread(self, guild: Guild) -> None: """Tries to delete the thread and removes all the references to it""" - playerInfo = self.__playersThreads[guildID] + print(f'[THREAD MANAGER] -> Deleting Thread for guild {guild.name}') + playerInfo = self.__playersThreads[guild.id] if playerInfo: thread = playerInfo.getPlayer() + self.__playersThreads.pop(guild.id) del thread - self.__playersThreads.popitem(thread) def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> ThreadPlayerInfo: self.__stopPossiblyRunningProcess(guild) @@ -145,7 +147,7 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): voiceChannel = self.__bot.get_channel(voiceID) playlist = self.__playersThreads[guildID].getPlaylist() - lock = Lock() + lock = RLock() player = ThreadPlayer(self.__bot, context.guild, context.guild.name, voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread) playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel) @@ -153,7 +155,9 @@ class ThreadPlayerManager(Singleton, AbstractPlayersManager): return playerInfo - async def showNowPlaying(self, guildID: int, song: Song) -> None: - commandExecutor = self.__playersCommandsExecutor[guildID] - processInfo = self.__playersThreads[guildID] - await commandExecutor.sendNowPlaying(processInfo, song) + async def showNowPlaying(self, guild: Guild, song: Song) -> None: + processInfo = self.__playersThreads[guild.id] + playlist = processInfo.getPlaylist() + txtChannel = processInfo.getTextChannel() + + await ProcessCommandsExecutor.sendNowPlayingToGuild(self.__bot, playlist, txtChannel, song, guild)