From 60a36425eedc77edc4bc5e318486161062f919ba Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Thu, 28 Jul 2022 00:38:30 -0300 Subject: [PATCH] Adding a queue for player send commands to Main process --- Config/Configs.py | 6 ++ DiscordCogs/MusicCog.py | 3 + Handlers/ClearHandler.py | 3 +- Handlers/HistoryHandler.py | 3 +- Handlers/LoopHandler.py | 3 +- Handlers/MoveHandler.py | 3 +- Handlers/NowPlayingHandler.py | 3 +- Handlers/PauseHandler.py | 5 +- Handlers/PlayHandler.py | 9 ++- Handlers/PrevHandler.py | 5 +- Handlers/QueueHandler.py | 3 +- Handlers/RemoveHandler.py | 3 +- Handlers/ResetHandler.py | 5 +- Handlers/ResumeHandler.py | 5 +- Handlers/ShuffleHandler.py | 3 +- Handlers/SkipHandler.py | 5 +- Handlers/StopHandler.py | 5 +- Music/MessagesController.py | 63 ++++++++++++++++++ Parallelism/Commands.py | 3 + Parallelism/PlayerProcess.py | 33 +++++----- Parallelism/ProcessInfo.py | 17 +++-- Parallelism/ProcessManager.py | 121 +++++++++++++++++++++++++++++----- UI/Views/PlayerView.py | 1 - 23 files changed, 233 insertions(+), 77 deletions(-) create mode 100644 Music/MessagesController.py diff --git a/Config/Configs.py b/Config/Configs.py index 523f7e1..b7120e4 100644 --- a/Config/Configs.py +++ b/Config/Configs.py @@ -30,3 +30,9 @@ class VConfigs(Singleton): self.MY_ERROR_BAD_COMMAND = 'This string serves to verify if some error was raised by myself on purpose' self.INVITE_URL = 'https://discordapp.com/oauth2/authorize?client_id={}&scope=bot' + + def getProcessManager(self): + return self.__manager + + def setProcessManager(self, newManager): + self.__manager = newManager diff --git a/DiscordCogs/MusicCog.py b/DiscordCogs/MusicCog.py index 4cb361b..859e88b 100644 --- a/DiscordCogs/MusicCog.py +++ b/DiscordCogs/MusicCog.py @@ -19,6 +19,8 @@ from UI.Responses.EmoteCogResponse import EmoteCommandResponse from UI.Responses.EmbedCogResponse import EmbedCommandResponse from UI.Views.PlayerView import PlayerView from Music.VulkanBot import VulkanBot +from Config.Configs import VConfigs +from Parallelism.ProcessManager import ProcessManager helper = Helper() @@ -32,6 +34,7 @@ class MusicCog(Cog): def __init__(self, bot: VulkanBot) -> None: self.__bot: VulkanBot = bot + VConfigs().setProcessManager(ProcessManager(bot)) @command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar']) async def play(self, ctx: Context, *args) -> None: diff --git a/Handlers/ClearHandler.py b/Handlers/ClearHandler.py index a7735ef..a151d83 100644 --- a/Handlers/ClearHandler.py +++ b/Handlers/ClearHandler.py @@ -4,7 +4,6 @@ from discord.ext.commands import Context from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessManager import ProcessManager class ClearHandler(AbstractHandler): @@ -13,7 +12,7 @@ class ClearHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Clear the playlist diff --git a/Handlers/HistoryHandler.py b/Handlers/HistoryHandler.py index c041335..864d90e 100644 --- a/Handlers/HistoryHandler.py +++ b/Handlers/HistoryHandler.py @@ -4,7 +4,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Utils.Utils import Utils from typing import Union -from Parallelism.ProcessManager import ProcessManager from discord import Interaction @@ -14,7 +13,7 @@ class HistoryHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: processLock = processInfo.getLock() diff --git a/Handlers/LoopHandler.py b/Handlers/LoopHandler.py index 929333e..065334a 100644 --- a/Handlers/LoopHandler.py +++ b/Handlers/LoopHandler.py @@ -3,7 +3,6 @@ from Music.VulkanBot import VulkanBot from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage -from Parallelism.ProcessManager import ProcessManager from typing import Union from discord import Interaction @@ -14,7 +13,7 @@ class LoopHandler(AbstractHandler): async def run(self, args: str) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/MoveHandler.py b/Handlers/MoveHandler.py index ca9e9f8..0f36260 100644 --- a/Handlers/MoveHandler.py +++ b/Handlers/MoveHandler.py @@ -5,7 +5,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage, VulkanError, InvalidInput, NumberRequired, UnknownError from Music.Playlist import Playlist -from Parallelism.ProcessManager import ProcessManager from typing import Union from discord import Interaction @@ -15,7 +14,7 @@ class MoveHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self, pos1: str, pos2: str) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/NowPlayingHandler.py b/Handlers/NowPlayingHandler.py index 727c022..81dc414 100644 --- a/Handlers/NowPlayingHandler.py +++ b/Handlers/NowPlayingHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot from Utils.Cleaner import Cleaner -from Parallelism.ProcessManager import ProcessManager from typing import Union from discord import Interaction @@ -15,7 +14,7 @@ class NowPlayingHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: embed = self.embeds.NOT_PLAYING() diff --git a/Handlers/PauseHandler.py b/Handlers/PauseHandler.py index cbe41b9..dc80bce 100644 --- a/Handlers/PauseHandler.py +++ b/Handlers/PauseHandler.py @@ -1,7 +1,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -13,12 +12,12 @@ class PauseHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Send Pause command to be execute by player process command = VCommands(VCommandsType.PAUSE, None) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(command) return HandlerResponse(self.ctx) diff --git a/Handlers/PlayHandler.py b/Handlers/PlayHandler.py index fbcd085..4674336 100644 --- a/Handlers/PlayHandler.py +++ b/Handlers/PlayHandler.py @@ -8,7 +8,6 @@ from Handlers.HandlerResponse import HandlerResponse from Music.Downloader import Downloader from Music.Searcher import Searcher from Music.Song import Song -from Parallelism.ProcessManager import ProcessManager from Parallelism.ProcessInfo import ProcessInfo from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot @@ -38,7 +37,7 @@ class PlayHandler(AbstractHandler): raise InvalidInput(self.messages.INVALID_INPUT, self.messages.ERROR_TITLE) # Get the process context for the current guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getPlayerInfo(self.guild, self.ctx) playlist = processInfo.getPlaylist() process = processInfo.getProcess() @@ -74,7 +73,7 @@ class PlayHandler(AbstractHandler): playlist.add_song(song) # Release the acquired Lock processLock.release() - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() playCommand = VCommands(VCommandsType.PLAY, None) queue.put(playCommand) else: @@ -106,7 +105,7 @@ class PlayHandler(AbstractHandler): async def __downloadSongsAndStore(self, songs: List[Song], processInfo: ProcessInfo) -> None: playlist = processInfo.getPlaylist() - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() playCommand = VCommands(VCommandsType.PLAY, None) # Trigger a task for each song to be downloaded tasks: List[asyncio.Task] = [] @@ -115,7 +114,7 @@ class PlayHandler(AbstractHandler): tasks.append(task) # In the original order, await for the task and then if successfully downloaded add in the playlist - processManager = ProcessManager() + processManager = self.config.getProcessManager() for index, task in enumerate(tasks): await task song = songs[index] diff --git a/Handlers/PrevHandler.py b/Handlers/PrevHandler.py index 0fb3996..1e25b4c 100644 --- a/Handlers/PrevHandler.py +++ b/Handlers/PrevHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Config.Exceptions import BadCommandUsage, ImpossibleMove from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,7 +13,7 @@ class PrevHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getPlayerInfo(self.guild, self.ctx) if not processInfo: embed = self.embeds.NOT_PLAYING() @@ -44,7 +43,7 @@ class PrevHandler(AbstractHandler): # Send a prev command, together with the user voice channel prevCommand = VCommands(VCommandsType.PREV, self.author.voice.channel.id) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(prevCommand) return HandlerResponse(self.ctx) diff --git a/Handlers/QueueHandler.py b/Handlers/QueueHandler.py index e1c0736..db5ebb2 100644 --- a/Handlers/QueueHandler.py +++ b/Handlers/QueueHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.Downloader import Downloader from Utils.Utils import Utils -from Parallelism.ProcessManager import ProcessManager from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -16,7 +15,7 @@ class QueueHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Retrieve the process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: # If no process return empty list embed = self.embeds.EMPTY_QUEUE() diff --git a/Handlers/RemoveHandler.py b/Handlers/RemoveHandler.py index d0a6e23..b55d61a 100644 --- a/Handlers/RemoveHandler.py +++ b/Handlers/RemoveHandler.py @@ -4,7 +4,6 @@ from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, InvalidInput, NumberRequired from Music.Playlist import Playlist -from Parallelism.ProcessManager import ProcessManager from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -16,7 +15,7 @@ class RemoveHandler(AbstractHandler): async def run(self, position: str) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if not processInfo: # Clear the playlist diff --git a/Handlers/ResetHandler.py b/Handlers/ResetHandler.py index de800f7..272f02c 100644 --- a/Handlers/ResetHandler.py +++ b/Handlers/ResetHandler.py @@ -1,7 +1,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -14,11 +13,11 @@ class ResetHandler(AbstractHandler): async def run(self) -> HandlerResponse: # Get the current process of the guild - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: command = VCommands(VCommandsType.RESET, None) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(command) return HandlerResponse(self.ctx) diff --git a/Handlers/ResumeHandler.py b/Handlers/ResumeHandler.py index 924a3b8..da40bc0 100644 --- a/Handlers/ResumeHandler.py +++ b/Handlers/ResumeHandler.py @@ -1,7 +1,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from Music.VulkanBot import VulkanBot from typing import Union @@ -13,12 +12,12 @@ class ResumeHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Send Resume command to be execute by player process command = VCommands(VCommandsType.RESUME, None) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(command) return HandlerResponse(self.ctx) diff --git a/Handlers/ShuffleHandler.py b/Handlers/ShuffleHandler.py index dda23e2..3f38b8b 100644 --- a/Handlers/ShuffleHandler.py +++ b/Handlers/ShuffleHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Config.Exceptions import UnknownError -from Parallelism.ProcessManager import ProcessManager from Music.VulkanBot import VulkanBot from typing import Union from discord import Interaction @@ -13,7 +12,7 @@ class ShuffleHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: try: diff --git a/Handlers/SkipHandler.py b/Handlers/SkipHandler.py index 42b0143..c5f1e90 100644 --- a/Handlers/SkipHandler.py +++ b/Handlers/SkipHandler.py @@ -3,7 +3,6 @@ from Handlers.AbstractHandler import AbstractHandler from Config.Exceptions import BadCommandUsage from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -14,7 +13,7 @@ class SkipHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Verify if there is a running process playlist = processInfo.getPlaylist() @@ -25,7 +24,7 @@ class SkipHandler(AbstractHandler): # Send a command to the player process to skip the music command = VCommands(VCommandsType.SKIP, None) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(command) return HandlerResponse(self.ctx) diff --git a/Handlers/StopHandler.py b/Handlers/StopHandler.py index 68ae620..89c9e09 100644 --- a/Handlers/StopHandler.py +++ b/Handlers/StopHandler.py @@ -2,7 +2,6 @@ from discord.ext.commands import Context from Handlers.AbstractHandler import AbstractHandler from Handlers.HandlerResponse import HandlerResponse from Music.VulkanBot import VulkanBot -from Parallelism.ProcessManager import ProcessManager from Parallelism.Commands import VCommands, VCommandsType from typing import Union from discord import Interaction @@ -13,12 +12,12 @@ class StopHandler(AbstractHandler): super().__init__(ctx, bot) async def run(self) -> HandlerResponse: - processManager = ProcessManager() + processManager = self.config.getProcessManager() processInfo = processManager.getRunningPlayerInfo(self.guild) if processInfo: # Send command to player process stop command = VCommands(VCommandsType.STOP, None) - queue = processInfo.getQueue() + queue = processInfo.getQueueToPlayer() queue.put(command) return HandlerResponse(self.ctx) diff --git a/Music/MessagesController.py b/Music/MessagesController.py new file mode 100644 index 0000000..5082721 --- /dev/null +++ b/Music/MessagesController.py @@ -0,0 +1,63 @@ +from typing import List +from discord import Embed, Message +from Music.VulkanBot import VulkanBot +from Parallelism.ProcessInfo import ProcessInfo +from Config.Configs import VConfigs +from Config.Messages import Messages +from Music.Song import Song +from Config.Embeds import VEmbeds +from UI.Views.PlayerView import PlayerView + + +class MessagesController: + def __init__(self, bot: VulkanBot) -> None: + self.__bot = bot + self.__previousMessages = [] + self.__configs = VConfigs() + self.__messages = Messages() + self.__embeds = VEmbeds() + + async def sendNowPlaying(self, processInfo: ProcessInfo, song: Song) -> None: + # Get the lock of the playlist + print('Entrei') + playlistLock = processInfo.getLock() + playlist = processInfo.getPlaylist() + with playlistLock: + print('A') + if playlist.isLoopingOne(): + title = self.__messages.ONE_SONG_LOOPING + else: + title = self.__messages.SONG_PLAYING + + embed = self.__embeds.SONG_INFO(song.info, title) + view = PlayerView(self.__bot) + channel = processInfo.getTextChannel() + + await self.__deletePreviousNPMessages() + await channel.send(embed=embed, view=view) + self.__previousMessages.append(await self.__getSendedMessage()) + + async def __deletePreviousNPMessages(self) -> None: + for message in self.__previousMessages: + try: + await message.delete() + except: + pass + self.__previousMessages.clear() + + async def __getSendedMessage(self) -> Message: + stringToIdentify = 'Uploader:' + last_messages: List[Message] = await self.__textChannel.history(limit=5).flatten() + + for message in last_messages: + try: + if message.author == self.__bot.user: + if len(message.embeds) > 0: + embed: Embed = message.embeds[0] + if len(embed.fields) > 0: + if embed.fields[0].name == stringToIdentify: + return message + + except Exception as e: + print(f'DEVELOPER NOTE -> Error cleaning messages {e}') + continue diff --git a/Parallelism/Commands.py b/Parallelism/Commands.py index d6c8baa..9a18a09 100644 --- a/Parallelism/Commands.py +++ b/Parallelism/Commands.py @@ -11,6 +11,9 @@ class VCommandsType(Enum): PLAY = 'Play' STOP = 'Stop' RESET = 'Reset' + NOW_PLAYING = 'Now Playing' + TERMINATE = 'Terminate' + SLEEPING = 'Sleeping' class VCommands: diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py index 4dcbd90..d356615 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/PlayerProcess.py @@ -1,9 +1,9 @@ import asyncio from Music.VulkanInitializer import VulkanInitializer from discord import User, Member, Message, Embed -from asyncio import AbstractEventLoop, Semaphore -from multiprocessing import Process, Queue, RLock -from threading import Lock, Thread +from asyncio import AbstractEventLoop, Semaphore, Queue +from multiprocessing import Process, RLock, Lock +from threading import Thread from typing import Callable, List from discord import Guild, FFmpegPCMAudio, VoiceChannel, TextChannel from Music.Playlist import Playlist @@ -31,7 +31,7 @@ class TimeoutClock: class PlayerProcess(Process): """Process that will play songs, receive commands from the main process by a Queue""" - def __init__(self, name: str, playlist: Playlist, lock: Lock, queue: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None: """ Start a new process that will have his own bot instance Due to pickle serialization, no objects are stored, the values initialization are being made in the run method @@ -40,7 +40,8 @@ class PlayerProcess(Process): # Synchronization objects self.__playlist: Playlist = playlist self.__playlistLock: Lock = lock - self.__queue: Queue = queue + self.__queueReceive: Queue = queueToReceive + self.__queueSend: Queue = queueToSend self.__semStopPlaying: Semaphore = None self.__loop: AbstractEventLoop = None # Discord context ID @@ -96,8 +97,7 @@ class PlayerProcess(Process): # Start the timeout function self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) # Thread that will receive commands to be executed in this Process - self.__commandsReceiver = Thread(target=self.__commandsReceiver, daemon=True) - self.__commandsReceiver.start() + self.__loop.create_task(self.__commandsReceiver()) # Start a Task to play songs self.__loop.create_task(self.__playPlaylistSongs()) @@ -146,8 +146,10 @@ class PlayerProcess(Process): self.__timer.cancel() self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) - await self.__deletePrevNowPlaying() - await self.__showNowPlaying() + nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song) + await self.__queueSend.put(nowPlayingCommand) + # await self.__deletePrevNowPlaying() + # await self.__showNowPlaying() except Exception as e: print(f'[ERROR IN PLAY SONG] -> {e}, {type(e)}') self.__playNext(None) @@ -190,12 +192,11 @@ class PlayerProcess(Process): self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}') - def __commandsReceiver(self) -> None: + async def __commandsReceiver(self) -> None: while True: - command: VCommands = self.__queue.get() + command: VCommands = await self.__queueReceive.get() type = command.getType() args = command.getArgs() - print(f'{self.name} received command {type}') try: self.__playerLock.acquire() @@ -206,13 +207,13 @@ class PlayerProcess(Process): elif type == VCommandsType.SKIP: self.__skip() elif type == VCommandsType.PLAY: - asyncio.run_coroutine_threadsafe(self.__playPlaylistSongs(), self.__loop) + await self.__playPlaylistSongs() elif type == VCommandsType.PREV: - asyncio.run_coroutine_threadsafe(self.__playPrev(args), self.__loop) + await self.__playPrev(args) elif type == VCommandsType.RESET: - asyncio.run_coroutine_threadsafe(self.__reset(), self.__loop) + await self.__reset() elif type == VCommandsType.STOP: - asyncio.run_coroutine_threadsafe(self.__stop(), self.__loop) + await self.__stop() else: print(f'[ERROR] -> Unknown Command Received: {command}') except Exception as e: diff --git a/Parallelism/ProcessInfo.py b/Parallelism/ProcessInfo.py index 0f8bf83..011f638 100644 --- a/Parallelism/ProcessInfo.py +++ b/Parallelism/ProcessInfo.py @@ -1,4 +1,5 @@ from multiprocessing import Process, Queue, Lock +from discord import TextChannel from Music.Playlist import Playlist @@ -7,11 +8,13 @@ class ProcessInfo: Class to store the reference to all structures to maintain a player process """ - def __init__(self, process: Process, queue: Queue, playlist: Playlist, lock: Lock) -> None: + def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None: self.__process = process - self.__queue = queue + self.__queueToPlayer = queueToPlayer + self.__queueToMain = queueToMain self.__playlist = playlist self.__lock = lock + self.__textChannel = textChannel def setProcess(self, newProcess: Process) -> None: self.__process = newProcess @@ -19,11 +22,17 @@ class ProcessInfo: def getProcess(self) -> Process: return self.__process - def getQueue(self) -> Queue: - return self.__queue + def getQueueToPlayer(self) -> Queue: + return self.__queueToPlayer + + def getQueueToMain(self) -> Queue: + return self.__queueToMain def getPlaylist(self) -> Playlist: return self.__playlist def getLock(self) -> Lock: return self.__lock + + def getTextChannel(self) -> TextChannel: + return self.__textChannel diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py index a0e1e4b..09cb929 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessManager.py @@ -1,13 +1,21 @@ -from multiprocessing import Queue, Lock +from asyncio import Queue, Task +import asyncio +from multiprocessing import Lock from multiprocessing.managers import BaseManager, NamespaceProxy -from typing import Dict, Union +from queue import Empty +from threading import Thread +from typing import Dict, Tuple, Union from Config.Singleton import Singleton from discord import Guild, Interaction from discord.ext.commands import Context +from Music.MessagesController import MessagesController +from Music.Song import Song from Parallelism.PlayerProcess import PlayerProcess from Music.Playlist import Playlist from Parallelism.ProcessInfo import ProcessInfo from Parallelism.Commands import VCommands, VCommandsType +from Music.VulkanBot import VulkanBot +from Tests.LoopRunner import LoopRunner class ProcessManager(Singleton): @@ -16,12 +24,16 @@ class ProcessManager(Singleton): Deal with the creation of shared memory """ - def __init__(self) -> None: + def __init__(self, bot: VulkanBot = None) -> None: if not super().created: + self.__bot = bot VManager.register('Playlist', Playlist) self.__manager = VManager() self.__manager.start() self.__playersProcess: Dict[Guild, ProcessInfo] = {} + # self.__playersListeners: Dict[Guild, Tuple[Thread, bool]] = {} + self.__playersListeners: Dict[Guild, Task] = {} + self.__playersMessages: Dict[Guild, MessagesController] = {} def setPlayerInfo(self, guild: Guild, info: ProcessInfo): self.__playersProcess[guild.id] = info @@ -37,11 +49,11 @@ class ProcessManager(Singleton): return self.__playersProcess[guild.id] if guild.id not in self.__playersProcess.keys(): - self.__playersProcess[guild.id] = self.__createProcessInfo(context) + self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context) else: # If the process has ended create a new one if not self.__playersProcess[guild.id].getProcess().is_alive(): - self.__playersProcess[guild.id] = self.__recreateProcess(context) + self.__playersProcess[guild.id] = self.__recreateProcess(guild, context) return self.__playersProcess[guild.id] except Exception as e: @@ -53,11 +65,11 @@ class ProcessManager(Singleton): return None # Recreate the process keeping the playlist - newProcessInfo = self.__recreateProcess(context) + newProcessInfo = self.__recreateProcess(guild, context) newProcessInfo.getProcess().start() # Start the process # Send a command to start the play again playCommand = VCommands(VCommandsType.PLAY) - newProcessInfo.getQueue().put(playCommand) + newProcessInfo.getQueueToPlayer().put(playCommand) self.__playersProcess[guild.id] = newProcessInfo def getRunningPlayerInfo(self, guild: Guild) -> ProcessInfo: @@ -67,7 +79,7 @@ class ProcessManager(Singleton): return self.__playersProcess[guild.id] - def __createProcessInfo(self, context: Context) -> ProcessInfo: + def __createProcessInfo(self, guild: Guild, context: Context) -> ProcessInfo: guildID: int = context.guild.id textID: int = context.channel.id voiceID: int = context.author.voice.channel.id @@ -75,14 +87,25 @@ class ProcessManager(Singleton): playlist: Playlist = self.__manager.Playlist() lock = Lock() - queue = Queue() - process = PlayerProcess(context.guild.name, playlist, lock, queue, - guildID, textID, voiceID, authorID) - processInfo = ProcessInfo(process, queue, playlist, lock) + queueToListen = Queue() + queueToSend = Queue() + process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + queueToListen, guildID, textID, voiceID, authorID) + processInfo = ProcessInfo(process, queueToSend, queueToListen, + playlist, lock, context.channel) + + task = asyncio.create_task(self.__listenToCommands(queueToListen, guild)) + # Create a Thread to listen for the queue coming from the Player Process + # thread = Thread(target=self.__listenToCommands, args=(queueToListen, guild), daemon=True) + self.__playersListeners[guildID] = task + # thread.start() + + # Create a Message Controller for this player + self.__playersMessages[guildID] = MessagesController(self.__bot) return processInfo - def __recreateProcess(self, context: Context) -> ProcessInfo: + def __recreateProcess(self, guild: Guild, context: Context) -> ProcessInfo: """Create a new process info using previous playlist""" guildID: int = context.guild.id textID: int = context.channel.id @@ -91,14 +114,78 @@ class ProcessManager(Singleton): playlist: Playlist = self.__playersProcess[guildID].getPlaylist() lock = Lock() - queue = Queue() + queueToListen = Queue() + queueToSend = Queue() + process = PlayerProcess(context.guild.name, playlist, lock, queueToSend, + queueToListen, guildID, textID, voiceID, authorID) + processInfo = ProcessInfo(process, queueToSend, queueToListen, playlist, lock) - process = PlayerProcess(context.guild.name, playlist, lock, queue, - guildID, textID, voiceID, authorID) - processInfo = ProcessInfo(process, queue, playlist, lock) + task = asyncio.create_task(self.__listenToCommands(queueToListen, guild)) + # Create a Thread to listen for the queue coming from the Player Process + # thread = Thread(target=self.__listenToCommands, args=(queueToListen, guild), daemon=True) + self.__playersListeners[guildID] = task + # thread.start() + + # Create a Message Controller for this player + self.__playersMessages[guildID] = MessagesController(self.__bot) return processInfo + async def __listenToCommands(self, queue: Queue, guild: Guild) -> None: + shouldEnd = False + guildID = guild.id + while not shouldEnd: + shouldEnd = self.__playersListeners[guildID][1] + try: + print('Esperando') + command: VCommands = await queue.get() + commandType = command.getType() + args = command.getArgs() + + print(f'Process {guild.name} sended command {commandType}') + if commandType == VCommandsType.NOW_PLAYING: + print('Aqui dentro') + await self.__showNowPlaying(args, guildID) + elif commandType == VCommandsType.TERMINATE: + # Delete the process elements and return, to finish task + self.__terminateProcess() + return + elif commandType == VCommandsType.SLEEPING: + # The process might be used again + self.__sleepingProcess() + return + else: + print(f'[ERROR] -> Unknown Command Received from Process: {commandType}') + except Empty: + continue + except Exception as e: + print(e) + print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}') + + def __terminateProcess(self, guildID: int) -> None: + # Delete all structures associated with the Player + del self.__playersProcess[guildID] + del self.__playersMessages[guildID] + threadListening = self.__playersListeners[guildID] + threadListening._stop() + del self.__playersListeners[guildID] + + def __sleepingProcess(self, guildID: int) -> None: + # Disable all process structures, except Playlist + queue1 = self.__playersProcess[guildID].getQueueToMain() + queue2 = self.__playersProcess[guildID].getQueueToPlayer() + queue1.close() + queue1.join_thread() + queue2.close() + queue2.join_thread() + + async def __showNowPlaying(self, guildID: int, song: Song) -> None: + messagesController = self.__playersMessages[guildID] + processInfo = self.__playersProcess[guildID] + print('Aq1') + await messagesController.sendNowPlaying(processInfo, song) + print('Aq2') + class VManager(BaseManager): pass diff --git a/UI/Views/PlayerView.py b/UI/Views/PlayerView.py index 0892462..aca753c 100644 --- a/UI/Views/PlayerView.py +++ b/UI/Views/PlayerView.py @@ -1,4 +1,3 @@ -from typing import Optional from discord.ui import View from Config.Emojis import VEmojis from UI.Buttons.PauseButton import PauseButton