From 72043c4475f251b99ce6f3ab1534f7dc34e4d666 Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Sun, 19 Feb 2023 11:28:33 -0300 Subject: [PATCH] Continuing the refactoring --- DiscordCogs/MusicCog.py | 4 +- Handlers/AbstractHandler.py | 8 -- Handlers/ClearHandler.py | 16 +-- Handlers/HistoryHandler.py | 16 +-- Handlers/JumpMusicHandler.py | 25 ++-- Handlers/LoopHandler.py | 20 ++- Handlers/MoveHandler.py | 21 ++- Handlers/NowPlayingHandler.py | 8 +- Handlers/PauseHandler.py | 14 +- Handlers/PlayHandler.py | 58 ++++---- Handlers/PrevHandler.py | 15 +- Handlers/QueueHandler.py | 20 ++- Handlers/RemoveHandler.py | 9 +- Handlers/ResetHandler.py | 15 +- Handlers/ResumeHandler.py | 15 +- Handlers/ShuffleHandler.py | 15 +- Handlers/SkipHandler.py | 27 +--- Handlers/StopHandler.py | 15 +- Parallelism/AbstractProcessManager.py | 26 +++- Parallelism/PlayerProcess.py | 7 +- Parallelism/PlayerThread.py | 11 +- Parallelism/ProcessExecutor.py | 7 +- Parallelism/ProcessInfo.py | 51 ------- Parallelism/ProcessManager.py | 118 ++++++++++++--- Parallelism/ThreadManager.py | 199 -------------------------- Parallelism/ThreadPlayerManager.py | 131 +++++++++++++++++ 26 files changed, 369 insertions(+), 502 deletions(-) delete mode 100644 Parallelism/ProcessInfo.py delete mode 100644 Parallelism/ThreadManager.py create mode 100644 Parallelism/ThreadPlayerManager.py diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index c84293e..74a27c5 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -23,7 +23,7 @@ from Messages.Responses.EmbedCogResponse import EmbedCommandResponse from Music.VulkanBot import VulkanBot from Config.Configs import VConfigs from Config.Embeds import VEmbeds -from Parallelism.ProcessManager import ProcessManager +from Parallelism.ProcessManager import ProcessPlayerManager helper = Helper() @@ -38,7 +38,7 @@ class MusicCog(Cog): def __init__(self, bot: VulkanBot) -> None: self.__bot: VulkanBot = bot self.__embeds = VEmbeds() - VConfigs().setPlayersManager(ProcessManager(bot)) + VConfigs().setPlayersManager(ProcessPlayerManager(bot)) @command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar']) async def play(self, ctx: Context, *args) -> None: diff --git a/Handlers/AbstractHandler.py b/Handlers/AbstractHandler.py index 54b987d..db6729e 100644 --- a/Handlers/AbstractHandler.py +++ b/Handlers/AbstractHandler.py @@ -1,6 +1,4 @@ from abc import ABC, abstractmethod -from Parallelism.Commands import VCommands -from multiprocessing import Queue from typing import List, Union from discord.ext.commands import Context from discord import Client, Guild, ClientUser, Interaction, Member, User @@ -29,12 +27,6 @@ class AbstractHandler(ABC): else: self.__author = ctx.user - def putCommandInQueue(self, queue: Queue, command: VCommands) -> None: - try: - queue.put(command) - except Exception as e: - print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}') - @abstractmethod async def run(self) -> HandlerResponse: pass diff --git a/Handlers/ClearHandler.py b/Handlers/ClearHandler.py index f79d4a8..04a2025 100644 --- a/Handlers/ClearHandler.py +++ b/Handlers/ClearHandler.py @@ -5,7 +5,6 @@ from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo class ClearHandler(AbstractHandler): @@ -14,19 +13,18 @@ class ClearHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): # Clear the playlist - playlist = processInfo.getPlaylist() - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playlist = playersManager.getPlayerPlaylist(self.guild) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.clear() - processLock.release() + playerLock.release() embed = self.embeds.PLAYLIST_CLEAR() return HandlerResponse(self.ctx, embed) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/HistoryHandler.py b/Handlers/HistoryHandler.py index 0c337c3..b6889e1 100644 --- a/Handlers/HistoryHandler.py +++ b/Handlers/HistoryHandler.py @@ -14,18 +14,16 @@ class HistoryHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist = processInfo.getPlaylist() - history = playlist.getSongsHistory() - processLock.release() + history = playersManager.getPlayerPlaylist(self.guild).getSongsHistory() + playerLock.release() else: # If the player doesn't respond in time we restart it - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/JumpMusicHandler.py b/Handlers/JumpMusicHandler.py index 3ab6c95..4ad5fe6 100644 --- a/Handlers/JumpMusicHandler.py +++ b/Handlers/JumpMusicHandler.py @@ -17,32 +17,31 @@ class JumpMusicHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, musicPos: str) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: # Try to convert input to int error = self.__validateInput(musicPos) if error: embed = self.embeds.ERROR_EMBED(error.message) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) # Sanitize the input - playlist: Playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) musicPos = self.__sanitizeInput(playlist, musicPos) # Validate the position if not playlist.validate_position(musicPos): error = InvalidInput() embed = self.embeds.PLAYLIST_RANGE_ERROR() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) try: # Move the selected song @@ -50,19 +49,17 @@ class JumpMusicHandler(AbstractHandler): # Send a command to the player to skip the music command = VCommands(VCommandsType.SKIP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) + playersManager.sendCommandToPlayer(command, self.guild) - processLock.release() return HandlerResponse(self.ctx) except: - # Release the acquired Lock - processLock.release() embed = self.embeds.ERROR_MOVING() error = UnknownError() return HandlerResponse(self.ctx, embed, error) + finally: + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/LoopHandler.py b/Handlers/LoopHandler.py index 17d9ee9..06f5917 100644 --- a/Handlers/LoopHandler.py +++ b/Handlers/LoopHandler.py @@ -5,7 +5,6 @@ from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage from typing import Union from discord import Interaction - from Parallelism.AbstractProcessManager import AbstractPlayersManager @@ -14,23 +13,20 @@ class LoopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, args: str) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() - - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playlist = playersManager.getPlayerPlaylist(self.guild) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: if args == '' or args is None: playlist.loop_all() embed = self.embeds.LOOP_ALL_ACTIVATED() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed) args = args.lower() @@ -53,9 +49,9 @@ class LoopHandler(AbstractHandler): error = BadCommandUsage() embed = self.embeds.BAD_LOOP_USE() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/MoveHandler.py b/Handlers/MoveHandler.py index 27d004c..c25e94f 100644 --- a/Handlers/MoveHandler.py +++ b/Handlers/MoveHandler.py @@ -15,45 +15,44 @@ class MoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pos1: str, pos2: str) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: error = self.__validateInput(pos1, pos2) if error: embed = self.embeds.ERROR_EMBED(error.message) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) pos1, pos2 = self.__sanitizeInput(playlist, pos1, pos2) if not playlist.validate_position(pos1) or not playlist.validate_position(pos2): error = InvalidInput() embed = self.embeds.PLAYLIST_RANGE_ERROR() - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, error) try: song = playlist.move_songs(pos1, pos2) song_name = song.title if song.title else song.identifier embed = self.embeds.SONG_MOVED(song_name, pos1, pos2) - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed) except: # Release the acquired Lock - processLock.release() + playerLock.release() embed = self.embeds.ERROR_MOVING() error = UnknownError() return HandlerResponse(self.ctx, embed, error) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/NowPlayingHandler.py b/Handlers/NowPlayingHandler.py index 194314c..a8efef4 100644 --- a/Handlers/NowPlayingHandler.py +++ b/Handlers/NowPlayingHandler.py @@ -14,14 +14,12 @@ class NowPlayingHandler(AbstractHandler): self.__cleaner = Cleaner() async def run(self) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if playlist.getCurrentSong() is None: embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/PauseHandler.py b/Handlers/PauseHandler.py index 7cbda92..3447ccc 100644 --- a/Handlers/PauseHandler.py +++ b/Handlers/PauseHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.Commands import VCommands, VCommandsType -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -14,17 +13,10 @@ class PauseHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send Pause command to be execute by player process + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.PAUSE, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_PAUSED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index 5dcd58e..a8e3979 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -1,6 +1,6 @@ import asyncio import traceback -from typing import List +from typing import List, Union from Config.Exceptions import DownloadingError, InvalidInput, VulkanError from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler @@ -10,12 +10,9 @@ from Music.Downloader import Downloader from Music.Searcher import Searcher from Music.Song import Song from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -from typing import Union from discord import Interaction -from Music.Playlist import Playlist class PlayHandler(AbstractHandler): @@ -37,13 +34,12 @@ class PlayHandler(AbstractHandler): if musicsInfo is None or len(musicsInfo) == 0: raise InvalidInput(self.messages.INVALID_INPUT, self.messages.ERROR_TITLE) - # Get the process context for the current guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - playlist: Playlist = processInfo.getPlaylist() - process = processInfo.getProcess() - if not process.is_alive(): # If process has not yet started, start - process.start() + # If there is no executing player for the guild then we create the player + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): + playersManager.createPlayerForGuild(self.guild, self.ctx) + + playlist = playersManager.getPlayerPlaylist(self.guild) # Create the Songs objects songs: List[Song] = [] @@ -67,19 +63,17 @@ class PlayHandler(AbstractHandler): embed = self.embeds.SONG_ADDED_TWO(song.info, pos) response = HandlerResponse(self.ctx, embed) - # Add the unique song to the playlist and send a command to player process - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + # Add the unique song to the playlist and send a command to player + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.add_song(song) # Release the acquired Lock - processLock.release() - queue = processInfo.getQueueToPlayer() + playerLock.release() playCommand = VCommands(VCommandsType.PLAY, None) - self.putCommandInQueue(queue, playCommand) - + playersManager.sendCommandToPlayer(playCommand, self.guild) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) @@ -89,10 +83,10 @@ class PlayHandler(AbstractHandler): if len(songs) > 10: fiveFirstSongs = songs[0:5] songs = songs[5:] - await self.__downloadSongsAndStore(fiveFirstSongs, processInfo) + await self.__downloadSongsAndStore(fiveFirstSongs, playersManager) - # Trigger a task to download all songs and then store them in the process playlist - asyncio.create_task(self.__downloadSongsAndStore(songs, processInfo)) + # Trigger a task to download all songs and then store them in the playlist + asyncio.create_task(self.__downloadSongsAndStore(songs, playersManager)) embed = self.embeds.SONGS_ADDED(len(songs)) return HandlerResponse(self.ctx, embed) @@ -102,7 +96,7 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) except Exception as error: print(f'ERROR IN PLAYHANDLER -> {traceback.format_exc()}', {type(error)}) - if isinstance(error, VulkanError): # If error was already processed + if isinstance(error, VulkanError): embed = self.embeds.CUSTOM_ERROR(error) else: error = UnknownError() @@ -110,9 +104,8 @@ class PlayHandler(AbstractHandler): return HandlerResponse(self.ctx, embed, error) - async def __downloadSongsAndStore(self, songs: List[Song], processInfo: PlayerInfo) -> None: - playlist = processInfo.getPlaylist() - queue = processInfo.getQueueToPlayer() + async def __downloadSongsAndStore(self, songs: List[Song], playersManager: AbstractPlayersManager) -> None: + playlist = playersManager.getPlayerPlaylist(self.guild) playCommand = VCommands(VCommandsType.PLAY, None) tooManySongs = len(songs) > 100 @@ -126,21 +119,18 @@ class PlayHandler(AbstractHandler): task = asyncio.create_task(self.__down.download_song(song)) tasks.append(task) - # In the original order, await for the task and then, if successfully downloaded, add to the playlist - processManager: AbstractPlayersManager = self.config.getPlayersManager() for index, task in enumerate(tasks): await task song = songs[index] if not song.problematic: # If downloaded add to the playlist and send play command - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: playlist.add_song(song) - self.putCommandInQueue(queue, playCommand) - processLock.release() + playersManager.sendCommandToPlayer(playCommand, self.guild) + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) def __isUserConnected(self) -> bool: if self.ctx.author.voice: diff --git a/Handlers/PrevHandler.py b/Handlers/PrevHandler.py index 13e81b6..22a0577 100644 --- a/Handlers/PrevHandler.py +++ b/Handlers/PrevHandler.py @@ -19,14 +19,13 @@ class PrevHandler(AbstractHandler): embed = self.embeds.NO_CHANNEL() return HandlerResponse(self.ctx, embed, error) - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if len(playlist.getHistory()) == 0: error = ImpossibleMove() embed = self.embeds.NOT_PREVIOUS_SONG() @@ -37,15 +36,9 @@ class PrevHandler(AbstractHandler): embed = self.embeds.FAIL_DUE_TO_LOOP_ON() return HandlerResponse(self.ctx, embed, error) - # If not started, start the player process - process = processInfo.getProcess() - if not process.is_alive(): - process.start() - # Send a prev command, together with the user voice channel prevCommand = VCommands(VCommandsType.PREV, self.author.voice.channel.id) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, prevCommand) + playersManager.sendCommandToPlayer(prevCommand, self.guild) embed = self.embeds.RETURNING_SONG() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/QueueHandler.py b/Handlers/QueueHandler.py index 5ffefad..1da5bf5 100644 --- a/Handlers/QueueHandler.py +++ b/Handlers/QueueHandler.py @@ -22,29 +22,27 @@ class QueueHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pageNumber=0) -> HandlerResponse: - # Retrieve the process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: # If no process return empty list + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.EMPTY_QUEUE() return HandlerResponse(self.ctx, embed) # Acquire the Lock to manipulate the playlist - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist: Playlist = processInfo.getPlaylist() + playlist: Playlist = playersManager.getPlayerPlaylist(self.guild) if playlist.isLoopingOne(): song = playlist.getCurrentSong() embed = self.embeds.ONE_SONG_LOOPING(song.info) - processLock.release() # Release the Lock + playerLock.release() # Release the Lock return HandlerResponse(self.ctx, embed) allSongs = playlist.getSongs() if len(allSongs) == 0: embed = self.embeds.EMPTY_QUEUE() - processLock.release() # Release the Lock + playerLock.release() # Release the Lock return HandlerResponse(self.ctx, embed) songsPages = playlist.getSongsPages() @@ -93,10 +91,10 @@ class QueueHandler(AbstractHandler): embed = self.embeds.QUEUE(title, text) # Release the acquired Lock - processLock.release() + playerLock.release() return HandlerResponse(self.ctx, embed, view=queueView) else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/RemoveHandler.py b/Handlers/RemoveHandler.py index fd17da6..b769b6b 100644 --- a/Handlers/RemoveHandler.py +++ b/Handlers/RemoveHandler.py @@ -5,7 +5,6 @@ from Config.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, Inval from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo from typing import Union from discord import Interaction @@ -15,15 +14,13 @@ class RemoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, position: str) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if not processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if not playersManager.verifyIfPlayerExists(self.guild): embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return HandlerResponse(self.ctx, embed, error) - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) if playlist is None: embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() diff --git a/Handlers/ResetHandler.py b/Handlers/ResetHandler.py index 2bc72d9..bdc3451 100644 --- a/Handlers/ResetHandler.py +++ b/Handlers/ResetHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,18 +13,10 @@ class ResetHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - # Get the current process of the guild - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESET, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) return HandlerResponse(self.ctx) else: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/ResumeHandler.py b/Handlers/ResumeHandler.py index da7867b..260f448 100644 --- a/Handlers/ResumeHandler.py +++ b/Handlers/ResumeHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,18 +13,10 @@ class ResumeHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send Resume command to be execute by player process + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.RESUME, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.PLAYER_RESUMED() return HandlerResponse(self.ctx, embed) else: diff --git a/Handlers/ShuffleHandler.py b/Handlers/ShuffleHandler.py index 7445e9d..ae6dfc4 100644 --- a/Handlers/ShuffleHandler.py +++ b/Handlers/ShuffleHandler.py @@ -14,19 +14,18 @@ class ShuffleHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): try: - processLock = processInfo.getLock() - acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) + playerLock = playersManager.getPlayerLock(self.guild) + acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT) if acquired: - playlist = processInfo.getPlaylist() + playlist = playersManager.getPlayerPlaylist(self.guild) playlist.shuffle() # Release the acquired Lock - processLock.release() + playerLock.release() else: - processManager.resetProcess(self.guild, self.ctx) + playersManager.resetPlayer(self.guild, self.ctx) embed = self.embeds.PLAYER_RESTARTED() return HandlerResponse(self.ctx, embed) diff --git a/Handlers/SkipHandler.py b/Handlers/SkipHandler.py index 9de6589..a740443 100644 --- a/Handlers/SkipHandler.py +++ b/Handlers/SkipHandler.py @@ -1,10 +1,8 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler -from Config.Exceptions import BadCommandUsage, ImpossibleMove from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -15,31 +13,12 @@ class SkipHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - if not self.__user_connected(): - error = ImpossibleMove() - embed = self.embeds.NO_CHANNEL() - return HandlerResponse(self.ctx, embed, error) - - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: # Verify if there is a running process - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send a command to the player process to skip the music + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.SKIP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.SKIPPING_SONG() return HandlerResponse(self.ctx, embed) else: embed = self.embeds.NOT_PLAYING() return HandlerResponse(self.ctx, embed) - - def __user_connected(self) -> bool: - if self.author.voice: - return True - else: - return False diff --git a/Handlers/StopHandler.py b/Handlers/StopHandler.py index dd5ea7a..a3b2d32 100644 --- a/Handlers/StopHandler.py +++ b/Handlers/StopHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -14,18 +13,10 @@ class StopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager: AbstractPlayersManager = self.config.getPlayersManager() - processInfo = processManager.getRunningPlayerInfo(self.guild) - if processInfo: - if processInfo.getStatus() == ProcessStatus.SLEEPING: - embed = self.embeds.NOT_PLAYING() - return HandlerResponse(self.ctx, embed) - - # Send command to player process stop + playersManager: AbstractPlayersManager = self.config.getPlayersManager() + if playersManager.verifyIfPlayerExists(self.guild): command = VCommands(VCommandsType.STOP, None) - queue = processInfo.getQueueToPlayer() - self.putCommandInQueue(queue, command) - + playersManager.sendCommandToPlayer(command, self.guild) embed = self.embeds.STOPPING_PLAYER() return HandlerResponse(self.ctx, embed) else: diff --git a/Parallelism/AbstractProcessManager.py b/Parallelism/AbstractProcessManager.py index ecc2849..693f803 100644 --- a/Parallelism/AbstractProcessManager.py +++ b/Parallelism/AbstractProcessManager.py @@ -1,9 +1,11 @@ from abc import ABC, abstractmethod +from threading import Lock from typing import Union from discord.ext.commands import Context from discord import Guild, Interaction +from Music.Playlist import Playlist from Music.Song import Song -from Parallelism.ProcessInfo import PlayerInfo +from Parallelism.Commands import VCommands class AbstractPlayersManager(ABC): @@ -11,19 +13,33 @@ class AbstractPlayersManager(ABC): pass @abstractmethod - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + """If the forceCreation boolean is True, then the context must be provided for the Player to be created""" pass @abstractmethod - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + """If there is a player process for the guild, then return the playlist of the guild""" pass @abstractmethod - def resetProcess(self, guild: Guild, context: Context) -> None: + def getPlayerLock(self, guild: Guild) -> Lock: + """If there is a player process for the guild, then return the lock of the guild""" pass @abstractmethod - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + def verifyIfPlayerExists(self, guild: Guild) -> bool: + """Returns if a player for the guild exists""" + pass + + @abstractmethod + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None: + """With the context information of a guild create a internal player for the guild""" + pass + + @abstractmethod + def resetPlayer(self, guild: Guild, context: Context) -> None: + """Tries to reset the player of the guild""" pass @abstractmethod diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py index 7e2979a..c953a19 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/PlayerProcess.py @@ -32,7 +32,7 @@ class TimeoutClock: class PlayerProcess(Process): """Process that will play songs, receive commands from the main process by a Queue""" - def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, voiceID: int) -> None: """ Start a new process that will have his own bot instance Due to pickle serialization, no objects are stored, the values initialization are being made in the run method @@ -46,10 +46,8 @@ class PlayerProcess(Process): self.__semStopPlaying: Semaphore = None self.__loop: AbstractEventLoop = None # Discord context ID - self.__textChannelID = textID self.__guildID = guildID self.__voiceChannelID = voiceID - self.__authorID = authorID # All information of discord context will be retrieved directly with discord API self.__guild: Guild = None self.__bot: VulkanBot = None @@ -81,9 +79,6 @@ class PlayerProcess(Process): self.__bot = await self.__createBotInstance() self.__guild = self.__bot.get_guild(self.__guildID) self.__voiceChannel = self.__bot.get_channel(self.__voiceChannelID) - self.__textChannel = self.__bot.get_channel(self.__textChannelID) - self.__author = self.__bot.get_channel(self.__authorID) - self.__botMember = self.__getBotMember() # Connect to voice Channel await self.__connectToVoiceChannel() diff --git a/Parallelism/PlayerThread.py b/Parallelism/PlayerThread.py index eb26c7d..e15b48a 100644 --- a/Parallelism/PlayerThread.py +++ b/Parallelism/PlayerThread.py @@ -1,12 +1,11 @@ import asyncio -from time import sleep, time +from time import time from urllib.parse import parse_qs, urlparse -from Music.VulkanInitializer import VulkanInitializer -from discord import Member, VoiceClient -from asyncio import AbstractEventLoop, Semaphore +from discord import VoiceClient +from asyncio import AbstractEventLoop from threading import RLock, Thread from multiprocessing import Lock -from typing import Callable, List +from typing import Callable from discord import Guild, FFmpegPCMAudio, VoiceChannel from Music.Playlist import Playlist from Music.Song import Song @@ -32,7 +31,7 @@ class TimeoutClock: class PlayerThread(Thread): """Player Thread to control the song playback in the same Process of the Main Process""" - def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int) -> None: Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={}) # Synchronization objects self.__playlist: Playlist = playlist diff --git a/Parallelism/ProcessExecutor.py b/Parallelism/ProcessExecutor.py index a5f7843..1b2c80b 100644 --- a/Parallelism/ProcessExecutor.py +++ b/Parallelism/ProcessExecutor.py @@ -3,8 +3,8 @@ from discord import Button, TextChannel from discord.ui import View from Config.Emojis import VEmojis from Messages.MessagesCategory import MessagesCategory +from Music.Playlist import Playlist from Music.VulkanBot import VulkanBot -from Parallelism.ProcessInfo import PlayerInfo from Config.Messages import Messages from Music.Song import Song from Config.Embeds import VEmbeds @@ -29,9 +29,9 @@ class ProcessCommandsExecutor: self.__embeds = VEmbeds() self.__emojis = VEmojis() - async def sendNowPlaying(self, processInfo: PlayerInfo, song: Song) -> None: + async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None: + print('B') # Get the lock of the playlist - playlist = processInfo.getPlaylist() if playlist.isLoopingOne(): title = self.__messages.ONE_SONG_LOOPING else: @@ -39,7 +39,6 @@ class ProcessCommandsExecutor: # Create View and Embed embed = self.__embeds.SONG_INFO(song.info, title) - channel = processInfo.getTextChannel() view = self.__getPlayerView(channel) # Send Message and add to the MessagesManager message = await channel.send(embed=embed, view=view) diff --git a/Parallelism/ProcessInfo.py b/Parallelism/ProcessInfo.py deleted file mode 100644 index d1a0d89..0000000 --- a/Parallelism/ProcessInfo.py +++ /dev/null @@ -1,51 +0,0 @@ -from enum import Enum -from multiprocessing import Process, Queue, Lock -from discord import TextChannel -from Music.Playlist import Playlist - - -class ProcessStatus(Enum): - RUNNING = 'Running' - SLEEPING = 'Sleeping' - - -class PlayerInfo: - """ - Class to store the reference to all structures to maintain a song player - """ - - def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: - self.__process = process - self.__queueToPlayer = queueToPlayer - self.__queueToMain = queueToMain - self.__playlist = playlist - self.__lock = lock - self.__textChannel = textChannel - self.__status = ProcessStatus.RUNNING - - def setProcess(self, newProcess: Process) -> None: - self.__process = newProcess - - def getStatus(self) -> ProcessStatus: - return self.__status - - def setStatus(self, status: ProcessStatus) -> None: - self.__status = status - - def getProcess(self) -> Process: - return self.__process - - def getQueueToPlayer(self) -> Queue: - return self.__queueToPlayer - - def getQueueToMain(self) -> Queue: - return self.__queueToMain - - def getPlaylist(self) -> Playlist: - return self.__playlist - - def getLock(self) -> Lock: - return self.__lock - - def getTextChannel(self) -> TextChannel: - return self.__textChannel diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py index 0664935..fd88f5b 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessManager.py @@ -1,23 +1,70 @@ import asyncio -from multiprocessing import Lock, Queue +from enum import Enum +from multiprocessing import Lock, Process, Queue from multiprocessing.managers import BaseManager, NamespaceProxy from queue import Empty from threading import Thread from typing import Dict, Tuple, Union from Config.Singleton import Singleton -from discord import Guild, Interaction +from discord import Guild, Interaction, TextChannel from discord.ext.commands import Context from Parallelism.AbstractProcessManager import AbstractPlayersManager from Parallelism.ProcessExecutor import ProcessCommandsExecutor from Music.Song import Song from Parallelism.PlayerProcess import PlayerProcess from Music.Playlist import Playlist -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot -class ProcessManager(Singleton, AbstractPlayersManager): +class ProcessStatus(Enum): + RUNNING = 'Running' + SLEEPING = 'Sleeping' + + +class PlayerProcessInfo: + """ + Class to store the reference to all structures to maintain a process player + """ + + def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + self.__process = process + self.__queueToPlayer = queueToPlayer + self.__queueToMain = queueToMain + self.__playlist = playlist + self.__lock = lock + self.__textChannel = textChannel + self.__status = ProcessStatus.RUNNING + + def setProcess(self, newProcess: Process) -> None: + self.__process = newProcess + + def getStatus(self) -> ProcessStatus: + return self.__status + + def setStatus(self, status: ProcessStatus) -> None: + self.__status = status + + def getProcess(self) -> Process: + return self.__process + + def getQueueToPlayer(self) -> Queue: + return self.__queueToPlayer + + def getQueueToMain(self) -> Queue: + return self.__queueToMain + + def getPlaylist(self) -> Playlist: + return self.__playlist + + def getLock(self) -> Lock: + return self.__lock + + def getTextChannel(self) -> TextChannel: + return self.__textChannel + + +class ProcessPlayerManager(Singleton, AbstractPlayersManager): """ Manage all running player process, creating and storing them for future calls Deals with the creation of shared memory @@ -29,28 +76,51 @@ class ProcessManager(Singleton, AbstractPlayersManager): VManager.register('Playlist', Playlist) self.__manager = VManager() self.__manager.start() - self.__playersProcess: Dict[int, PlayerInfo] = {} + self.__playersProcess: Dict[int, PlayerProcessInfo] = {} self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {} self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {} - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): - self.__playersProcess[guild.id] = info + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + if forceCreation: + processInfo = self.createPlayerForGuild(guild, context) + else: + processInfo = self.__getRunningPlayerInfo(guild) - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Return the process info for the guild, the user in context must be connected to a voice_channel""" + if processInfo == None: + return + + queue = processInfo.getQueueToPlayer() + self.__putCommandInQueue(queue, command) + + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getPlaylist() + + def getPlayerLock(self, guild: Guild) -> Lock: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getLock() + + def verifyIfPlayerExists(self, guild: Guild) -> bool: + return guild.id in self.__playersProcess.keys() + + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None: try: if guild.id not in self.__playersProcess.keys(): - self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) + self.__playersProcess[guild.id] = self.__createProcessPlayerInfo(guild, context) else: # If the process has ended create a new one if not self.__playersProcess[guild.id].getProcess().is_alive(): self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) + # Start the process + self.__playersProcess[guild.id].getProcess().start() return self.__playersProcess[guild.id] except Exception as e: print(f'[Error In GetPlayerContext] -> {e}') - def resetProcess(self, guild: Guild, context: Context) -> None: + def resetPlayer(self, guild: Guild, context: Context) -> None: """Restart a running process, already start it to return to play""" if guild.id not in self.__playersProcess.keys(): return None @@ -60,10 +130,10 @@ class ProcessManager(Singleton, AbstractPlayersManager): newProcessInfo.getProcess().start() # Start the process # Send a command to start the play again playCommand = VCommands(VCommandsType.PLAY) - newProcessInfo.getQueueToPlayer().put(playCommand) + self.__putCommandInQueue(newProcessInfo.getQueueToPlayer(), playCommand) self.__playersProcess[guild.id] = newProcessInfo - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: + def __getRunningPlayerInfo(self, guild: Guild) -> PlayerProcessInfo: """Return the process info for the guild, if not, return None""" if guild.id not in self.__playersProcess.keys(): print('Process Info not found') @@ -71,7 +141,7 @@ class ProcessManager(Singleton, AbstractPlayersManager): return self.__playersProcess[guild.id] - def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: + def __createProcessPlayerInfo(self, guild: Guild, context: Context) -> PlayerProcessInfo: guildID: int = context.guild.id textID: int = context.channel.id voiceID: int = context.author.voice.channel.id @@ -82,9 +152,9 @@ class ProcessManager(Singleton, AbstractPlayersManager): queueToListen = Queue() queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + queueToListen, guildID, voiceID) + processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, @@ -111,7 +181,7 @@ class ProcessManager(Singleton, AbstractPlayersManager): except Exception as e: print(f'[ERROR STOPPING PROCESS] -> {e}') - def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: + def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerProcessInfo: """Create a new process info using previous playlist""" self.__stopPossiblyRunningProcess(guild) @@ -130,8 +200,8 @@ class ProcessManager(Singleton, AbstractPlayersManager): queueToSend = Queue() process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) + processInfo = PlayerProcessInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async thread = Thread(target=self.__listenToCommands, @@ -191,10 +261,18 @@ class ProcessManager(Singleton, AbstractPlayersManager): # Set the status of this process as sleeping, only the playlist object remains self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING) + def __putCommandInQueue(self, queue: Queue, command: VCommands) -> None: + try: + queue.put(command) + except Exception as e: + print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}') + async def showNowPlaying(self, guildID: int, song: Song) -> None: commandExecutor = self.__playersCommandsExecutor[guildID] processInfo = self.__playersProcess[guildID] + print('A') await commandExecutor.sendNowPlaying(processInfo, song) + print('C') class VManager(BaseManager): diff --git a/Parallelism/ThreadManager.py b/Parallelism/ThreadManager.py deleted file mode 100644 index 2df8aff..0000000 --- a/Parallelism/ThreadManager.py +++ /dev/null @@ -1,199 +0,0 @@ -import asyncio -from multiprocessing import Lock, Queue -from multiprocessing.managers import BaseManager, NamespaceProxy -from queue import Empty -from threading import Thread -from typing import Dict, Tuple, Union -from Config.Singleton import Singleton -from discord import Guild, Interaction -from discord.ext.commands import Context -from Parallelism.AbstractProcessManager import AbstractPlayersManager -from Parallelism.ProcessExecutor import ProcessCommandsExecutor -from Music.Song import Song -from Parallelism.PlayerProcess import PlayerProcess -from Music.Playlist import Playlist -from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus -from Parallelism.Commands import VCommands, VCommandsType -from Music.VulkanBot import VulkanBot - - -class ThreadManager(Singleton, AbstractPlayersManager): - """ - Manage all running player threads, creating and storing them for future calls - """ - - def __init__(self, bot: VulkanBot = None) -> None: - if not super().created: - self.__bot = bot - self.__playersProcess: Dict[int, Thread] = {} - - def setPlayerInfo(self, guild: Guild, info: PlayerInfo): - self.__playersProcess[guild.id] = info - - def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Return the process info for the guild, the user in context must be connected to a voice_channel""" - try: - if guild.id not in self.__playersProcess.keys(): - self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) - else: - # If the process has ended create a new one - if not self.__playersProcess[guild.id].getProcess().is_alive(): - self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) - - return self.__playersProcess[guild.id] - except Exception as e: - print(f'[Error In GetPlayerContext] -> {e}') - - def resetProcess(self, guild: Guild, context: Context) -> None: - """Restart a running process, already start it to return to play""" - if guild.id not in self.__playersProcess.keys(): - return None - - # Recreate the process keeping the playlist - newProcessInfo = self.__recreateProcess(guild, context) - newProcessInfo.getProcess().start() # Start the process - # Send a command to start the play again - playCommand = VCommands(VCommandsType.PLAY) - newProcessInfo.getQueueToPlayer().put(playCommand) - self.__playersProcess[guild.id] = newProcessInfo - - def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo: - """Return the process info for the guild, if not, return None""" - if guild.id not in self.__playersProcess.keys(): - print('Process Info not found') - return None - - return self.__playersProcess[guild.id] - - def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo: - guildID: int = context.guild.id - textID: int = context.channel.id - voiceID: int = context.author.voice.channel.id - authorID: int = context.author.id - - playlist: Playlist = self.__manager.Playlist() - lock = Lock() - queueToListen = Queue() - queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) - - # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async - thread = Thread(target=self.__listenToCommands, - args=(queueToListen, guild), daemon=True) - self.__playersListeners[guildID] = (thread, False) - thread.start() - - # Create a Message Controller for this player - self.__playersCommandsExecutor[guildID] = ProcessCommandsExecutor(self.__bot, guildID) - - return processInfo - - def __stopPossiblyRunningProcess(self, guild: Guild): - try: - if guild.id in self.__playersProcess.keys(): - playerProcess = self.__playersProcess[guild.id] - process = playerProcess.getProcess() - process.close() - process.kill() - playerProcess.getQueueToMain().close() - playerProcess.getQueueToMain().join_thread() - playerProcess.getQueueToPlayer().close() - playerProcess.getQueueToPlayer().join_thread() - except Exception as e: - print(f'[ERROR STOPPING PROCESS] -> {e}') - - def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo: - """Create a new process info using previous playlist""" - self.__stopPossiblyRunningProcess(guild) - - guildID: int = context.guild.id - textID: int = context.channel.id - if isinstance(context, Interaction): - authorID: int = context.user.id - voiceID: int = context.user.voice.channel.id - else: - authorID: int = context.author.id - voiceID: int = context.author.voice.channel.id - - playlist: Playlist = self.__playersProcess[guildID].getPlaylist() - lock = Lock() - queueToListen = Queue() - queueToSend = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, - queueToListen, guildID, textID, voiceID, authorID) - processInfo = PlayerInfo(process, queueToSend, queueToListen, - playlist, lock, context.channel) - - # Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async - thread = Thread(target=self.__listenToCommands, - args=(queueToListen, guild), daemon=True) - self.__playersListeners[guildID] = (thread, False) - thread.start() - - return processInfo - - def __listenToCommands(self, queue: Queue, guild: Guild) -> None: - guildID = guild.id - while True: - shouldEnd = self.__playersListeners[guildID][1] - if shouldEnd: - break - - try: - command: VCommands = queue.get(timeout=5) - commandType = command.getType() - args = command.getArgs() - - print(f'Process {guild.name} sended command {commandType}') - if commandType == VCommandsType.NOW_PLAYING: - asyncio.run_coroutine_threadsafe(self.showNowPlaying( - guild.id, args), self.__bot.loop) - elif commandType == VCommandsType.TERMINATE: - # Delete the process elements and return, to finish task - self.__terminateProcess(guildID) - return - elif commandType == VCommandsType.SLEEPING: - # The process might be used again - self.__sleepingProcess(guildID) - return - else: - print(f'[ERROR] -> Unknown Command Received from Process: {commandType}') - except Empty: - continue - except Exception as e: - print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}') - - def __terminateProcess(self, guildID: int) -> None: - # Delete all structures associated with the Player - del self.__playersProcess[guildID] - del self.__playersCommandsExecutor[guildID] - threadListening = self.__playersListeners[guildID] - threadListening._stop() - del self.__playersListeners[guildID] - - def __sleepingProcess(self, guildID: int) -> None: - # Disable all process structures, except Playlist - queue1 = self.__playersProcess[guildID].getQueueToMain() - queue2 = self.__playersProcess[guildID].getQueueToPlayer() - queue1.close() - queue1.join_thread() - queue2.close() - queue2.join_thread() - # Set the status of this process as sleeping, only the playlist object remains - self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING) - - async def showNowPlaying(self, guildID: int, song: Song) -> None: - commandExecutor = self.__playersCommandsExecutor[guildID] - processInfo = self.__playersProcess[guildID] - await commandExecutor.sendNowPlaying(processInfo, song) - - -class VManager(BaseManager): - pass - - -class VProxy(NamespaceProxy): - _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') diff --git a/Parallelism/ThreadPlayerManager.py b/Parallelism/ThreadPlayerManager.py new file mode 100644 index 0000000..b23a6a2 --- /dev/null +++ b/Parallelism/ThreadPlayerManager.py @@ -0,0 +1,131 @@ +from multiprocessing import Lock +from typing import Dict, Union +from Config.Singleton import Singleton +from discord import Guild, Interaction, TextChannel +from discord.ext.commands import Context +from Parallelism.AbstractProcessManager import AbstractPlayersManager +from Music.Song import Song +from Music.Playlist import Playlist +from Parallelism.Commands import VCommands, VCommandsType +from Music.VulkanBot import VulkanBot +from Parallelism.PlayerThread import PlayerThread + + +class PlayerThreadInfo: + """ + Class to store the reference to all structures to maintain a player thread + """ + + def __init__(self, thread: PlayerThread, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: + self.__thread = thread + self.__playlist = playlist + self.__lock = lock + self.__textChannel = textChannel + + def getThread(self) -> PlayerThread: + return self.__thread + + def getPlaylist(self) -> Playlist: + return self.__playlist + + def getLock(self) -> Lock: + return self.__lock + + def getTextChannel(self) -> TextChannel: + return self.__textChannel + + +class ThreadPlayerManager(Singleton, AbstractPlayersManager): + """ + Manage all running player threads, creating and storing them for future calls + """ + + def __init__(self, bot: VulkanBot = None) -> None: + if not super().created: + self.__bot = bot + self.__playersThreads: Dict[int, PlayerThreadInfo] = {} + + def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None): + return super().sendCommandToPlayer(command, guild, forceCreation, context) + + def getPlayerPlaylist(self, guild: Guild) -> Playlist: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getPlaylist() + + def getPlayerLock(self, guild: Guild) -> Lock: + playerInfo = self.__getRunningPlayerInfo(guild) + if playerInfo: + return playerInfo.getLock() + + def verifyIfPlayerExists(self, guild: Guild) -> bool: + return guild.id in self.__playersThreads.keys() + + def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]): + try: + if guild.id not in self.__playersThreads.keys(): + self.__playersThreads[guild.id] = self.__createPlayerThreadInfo(context) + else: + # If the thread has ended create a new one + if not self.__playersThreads[guild.id].getThread().is_alive(): + self.__playersThreads[guild.id] = self.__recreateThread(guild, context) + + return self.__playersThreads[guild.id] + except Exception as e: + print(f'[Error In GetPlayerContext] -> {e}') + + def resetPlayer(self, guild: Guild, context: Context) -> None: + if guild.id not in self.__playersThreads.keys(): + return None + + # Recreate the thread keeping the playlist + newPlayerInfo = self.__recreateThread(guild, context) + newPlayerInfo.getThread().start() + # Send a command to start the play again + playCommand = VCommands(VCommandsType.PLAY) + newPlayerInfo.getQueueToPlayer().put(playCommand) + self.__playersThreads[guild.id] = newPlayerInfo + + def __getRunningPlayerInfo(self, guild: Guild) -> PlayerThreadInfo: + if guild.id not in self.__playersThreads.keys(): + print('Process Info not found') + return None + + return self.__playersThreads[guild.id] + + def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> PlayerThreadInfo: + guildID: int = context.guild.id + if isinstance(context, Interaction): + voiceID: int = context.user.voice.channel.id + else: + voiceID: int = context.author.voice.channel.id + + playlist = Playlist() + lock = Lock() + player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + player.start() + + return playerInfo + + def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerThreadInfo: + self.__stopPossiblyRunningProcess(guild) + + guildID: int = context.guild.id + if isinstance(context, Interaction): + voiceID: int = context.user.voice.channel.id + else: + voiceID: int = context.author.voice.channel.id + + playlist = self.__playersThreads[guildID].getPlaylist() + lock = Lock() + player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID) + playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel) + player.start() + + return playerInfo + + async def showNowPlaying(self, guildID: int, song: Song) -> None: + commandExecutor = self.__playersCommandsExecutor[guildID] + processInfo = self.__playersThreads[guildID] + await commandExecutor.sendNowPlaying(processInfo, song)