From 19ae59c5b8fb8b4ebafa3d40e1f7cf79a5415fbd Mon Sep 17 00:00:00 2001 From: Rafael Vargas Date: Fri, 22 Jul 2022 18:07:44 -0300 Subject: [PATCH] Changing some methods signature in Playlist to be compatible in Shared Memory, now finishing process after some time --- Controllers/HistoryController.py | 2 +- Controllers/LoopController.py | 2 +- Controllers/MoveController.py | 2 +- Controllers/NowPlayingController.py | 4 +- Controllers/PlayController.py | 27 ++--------- Controllers/PrevController.py | 2 +- Controllers/QueueController.py | 54 +++++++++++++--------- Controllers/ShuffleController.py | 2 +- Controllers/SkipController.py | 2 +- Database/Database.py | 3 -- Music/Player.py | 2 +- Music/Playlist.py | 15 +++---- Parallelism/PlayerProcess.py | 69 +++++++++++++++++------------ Parallelism/ProcessManager.py | 33 ++++++++++---- 14 files changed, 116 insertions(+), 103 deletions(-) delete mode 100644 Database/Database.py diff --git a/Controllers/HistoryController.py b/Controllers/HistoryController.py index 4a40b47..d5ed417 100644 --- a/Controllers/HistoryController.py +++ b/Controllers/HistoryController.py @@ -10,7 +10,7 @@ class HistoryController(AbstractController): super().__init__(ctx, bot) async def run(self) -> ControllerResponse: - history = self.player.playlist.songs_history + history = self.player.playlist.getSongsHistory() if len(history) == 0: text = self.messages.HISTORY_EMPTY diff --git a/Controllers/LoopController.py b/Controllers/LoopController.py index 2a2853c..7053912 100644 --- a/Controllers/LoopController.py +++ b/Controllers/LoopController.py @@ -16,7 +16,7 @@ class LoopController(AbstractController): return ControllerResponse(self.ctx, embed) args = args.lower() - if self.player.playlist.current is None: + if self.player.playlist.getCurrentSong() is None: embed = self.embeds.NOT_PLAYING() error = BadCommandUsage() return ControllerResponse(self.ctx, embed, error) diff --git a/Controllers/MoveController.py b/Controllers/MoveController.py index e5292f0..56b7f1e 100644 --- a/Controllers/MoveController.py +++ b/Controllers/MoveController.py @@ -33,7 +33,7 @@ class MoveController(AbstractController): try: song = self.player.playlist.move_songs(pos1, pos2) - songs = self.player.playlist.songs_to_preload + songs = self.player.playlist.getSongsToPreload() await self.__down.preload(songs) song_name = song.title if song.title else song.identifier diff --git a/Controllers/NowPlayingController.py b/Controllers/NowPlayingController.py index 35b684e..addaa9f 100644 --- a/Controllers/NowPlayingController.py +++ b/Controllers/NowPlayingController.py @@ -15,12 +15,12 @@ class NowPlayingController(AbstractController): embed = self.embeds.NOT_PLAYING() return ControllerResponse(self.ctx, embed) - if self.player.playlist.looping_one: + if self.player.playlist.isLoopingOne(): title = self.messages.ONE_SONG_LOOPING else: title = self.messages.SONG_PLAYING await self.__cleaner.clean_messages(self.ctx, self.config.CLEANER_MESSAGES_QUANT) - info = self.player.playlist.current.info + info = self.player.playlist.getCurrentSong().info embed = self.embeds.SONG_INFO(info, title) return ControllerResponse(self.ctx, embed) diff --git a/Controllers/PlayController.py b/Controllers/PlayController.py index b66cc24..f119d1a 100644 --- a/Controllers/PlayController.py +++ b/Controllers/PlayController.py @@ -21,7 +21,7 @@ class PlayController(AbstractController): track = " ".join(args) requester = self.ctx.author.name - if not self.__user_connected(): + if not self.__isUserConnected(): error = ImpossibleMove() embed = self.embeds.NO_CHANNEL() return ControllerResponse(self.ctx, embed, error) @@ -36,7 +36,7 @@ class PlayController(AbstractController): self.player.playlist.add_song(song) quant = len(musics) - songs_preload = self.player.playlist.songs_to_preload + songs_preload = self.player.playlist.getSongsToPreload() await self.__down.preload(songs_preload) if quant == 1: @@ -73,8 +73,6 @@ class PlayController(AbstractController): queue.put(command) else: # Start the process - command = VCommands(VCommandsType.CONTEXT, self.ctx) - queue.put(command) process.start() return response @@ -91,27 +89,8 @@ class PlayController(AbstractController): return ControllerResponse(self.ctx, embed, error) - def __user_connected(self) -> bool: + def __isUserConnected(self) -> bool: if self.ctx.author.voice: return True else: return False - - def __is_connected(self) -> bool: - try: - voice_channel = self.guild.voice_client.channel - - if not self.guild.voice_client.is_connected(): - return False - else: - return True - except: - return False - - async def __connect(self) -> bool: - # if self.guild.voice_client is None: - try: - await self.ctx.author.voice.channel.connect(reconnect=True, timeout=None) - return True - except: - return False diff --git a/Controllers/PrevController.py b/Controllers/PrevController.py index 8be250a..bbb71a6 100644 --- a/Controllers/PrevController.py +++ b/Controllers/PrevController.py @@ -27,7 +27,7 @@ class PrevController(AbstractController): embed = self.embeds.UNKNOWN_ERROR() return ControllerResponse(self.ctx, embed, error) - if self.player.playlist.looping_all or self.player.playlist.looping_one: + if self.player.playlist.isLoopingAll() or self.player.playlist.isLoopingOne(): error = BadCommandUsage() embed = self.embeds.FAIL_DUE_TO_LOOP_ON() return ControllerResponse(self.ctx, embed, error) diff --git a/Controllers/QueueController.py b/Controllers/QueueController.py index 8a5014f..0c09940 100644 --- a/Controllers/QueueController.py +++ b/Controllers/QueueController.py @@ -5,6 +5,7 @@ from Controllers.AbstractController import AbstractController from Controllers.ControllerResponse import ControllerResponse from Music.Downloader import Downloader from Utils.Utils import Utils +from Parallelism.ProcessManager import ProcessManager class QueueController(AbstractController): @@ -13,32 +14,43 @@ class QueueController(AbstractController): self.__down = Downloader() async def run(self) -> ControllerResponse: - if self.player.playlist.looping_one: - song = self.player.playlist.current - embed = self.embeds.ONE_SONG_LOOPING(song.info) - return ControllerResponse(self.ctx, embed) - - songs_preload = self.player.playlist.songs_to_preload - if len(songs_preload) == 0: + # Retrieve the process of the guild + process = ProcessManager() + processContext = process.getRunningPlayerContext(self.guild) + if not processContext: # If no process return empty list embed = self.embeds.EMPTY_QUEUE() return ControllerResponse(self.ctx, embed) - asyncio.create_task(self.__down.preload(songs_preload)) + # Acquire the Lock to manipulate the playlist + with processContext.getLock(): + playlist = processContext.getPlaylist() - if self.player.playlist.looping_all: - title = self.messages.ALL_SONGS_LOOPING - else: - title = self.messages.QUEUE_TITLE + if playlist.isLoopingOne(): + song = playlist.getCurrentSong() + embed = self.embeds.ONE_SONG_LOOPING(song.info) + return ControllerResponse(self.ctx, embed) - total_time = Utils.format_time(sum([int(song.duration if song.duration else 0) - for song in songs_preload])) - total_songs = len(self.player.playlist) + songs_preload = playlist.getSongsToPreload() + if len(songs_preload) == 0: + embed = self.embeds.EMPTY_QUEUE() + return ControllerResponse(self.ctx, embed) - text = f'📜 Queue length: {total_songs} | ⌛ Duration: `{total_time}` downloaded \n\n' + asyncio.create_task(self.__down.preload(songs_preload)) - for pos, song in enumerate(songs_preload, start=1): - song_name = song.title if song.title else self.messages.SONG_DOWNLOADING - text += f"**`{pos}` - ** {song_name} - `{Utils.format_time(song.duration)}`\n" + if playlist.isLoopingAll(): + title = self.messages.ALL_SONGS_LOOPING + else: + title = self.messages.QUEUE_TITLE - embed = self.embeds.QUEUE(title, text) - return ControllerResponse(self.ctx, embed) + total_time = Utils.format_time(sum([int(song.duration if song.duration else 0) + for song in songs_preload])) + total_songs = len(playlist.getSongs()) + + text = f'📜 Queue length: {total_songs} | ⌛ Duration: `{total_time}` downloaded \n\n' + + for pos, song in enumerate(songs_preload, start=1): + song_name = song.title if song.title else self.messages.SONG_DOWNLOADING + text += f"**`{pos}` - ** {song_name} - `{Utils.format_time(song.duration)}`\n" + + embed = self.embeds.QUEUE(title, text) + return ControllerResponse(self.ctx, embed) diff --git a/Controllers/ShuffleController.py b/Controllers/ShuffleController.py index 5ad3ac3..773aa03 100644 --- a/Controllers/ShuffleController.py +++ b/Controllers/ShuffleController.py @@ -15,7 +15,7 @@ class ShuffleController(AbstractController): async def run(self) -> ControllerResponse: try: self.player.playlist.shuffle() - songs = self.player.playlist.songs_to_preload + songs = self.player.playlist.getSongsToPreload() asyncio.create_task(self.__down.preload(songs)) embed = self.embeds.SONGS_SHUFFLED() diff --git a/Controllers/SkipController.py b/Controllers/SkipController.py index 8c6298b..43abbfd 100644 --- a/Controllers/SkipController.py +++ b/Controllers/SkipController.py @@ -10,7 +10,7 @@ class SkipController(AbstractController): super().__init__(ctx, bot) async def run(self) -> ControllerResponse: - if self.player.playlist.looping_one: + if self.player.playlist.isLoopingOne(): embed = self.embeds.ERROR_DUE_LOOP_ONE_ON() error = BadCommandUsage() return ControllerResponse(self.ctx, embed, error) diff --git a/Database/Database.py b/Database/Database.py deleted file mode 100644 index b16ecf3..0000000 --- a/Database/Database.py +++ /dev/null @@ -1,3 +0,0 @@ -class Database: - def __init__(self) -> None: - pass diff --git a/Music/Player.py b/Music/Player.py index a9c9722..036a0e2 100644 --- a/Music/Player.py +++ b/Music/Player.py @@ -89,7 +89,7 @@ class Player(commands.Cog): await ctx.invoke(self.__bot.get_command('np')) - songs = self.__playlist.songs_to_preload + songs = self.__playlist.getSongsToPreload() asyncio.create_task(self.__down.preload(songs)) except: self.__play_next(None, ctx) diff --git a/Music/Playlist.py b/Music/Playlist.py index 29db64c..32aeb3b 100644 --- a/Music/Playlist.py +++ b/Music/Playlist.py @@ -32,24 +32,19 @@ class Playlist: return False return True - @property - def songs_history(self) -> deque: + def getSongsHistory(self) -> deque: return self.__songs_history - @property - def looping_one(self) -> bool: + def isLoopingOne(self) -> bool: return self.__looping_one - @property - def looping_all(self) -> bool: + def isLoopingAll(self) -> bool: return self.__looping_all - @property - def current(self) -> Song: + def getCurrentSong(self) -> Song: return self.__current - @property - def songs_to_preload(self) -> List[Song]: + def getSongsToPreload(self) -> List[Song]: return list(self.__queue)[:self.__configs.MAX_PRELOAD_SONGS] def __len__(self) -> int: diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py index 9541d8b..42210b8 100644 --- a/Parallelism/PlayerProcess.py +++ b/Parallelism/PlayerProcess.py @@ -1,10 +1,10 @@ import asyncio from os import listdir -from discord import Intents +from discord import Intents, User from asyncio import AbstractEventLoop, Semaphore from multiprocessing import Process, Queue from threading import Lock, Thread -from typing import Callable, Text +from typing import Callable from discord import Client, Guild, FFmpegPCMAudio, VoiceChannel, TextChannel from discord.ext.commands import Context from Music.Playlist import Playlist @@ -30,47 +30,58 @@ class TimeoutClock: class PlayerProcess(Process): """Process that will play songs, receive commands by a received Queue""" - def __init__(self, playlist: Playlist, lock: Lock, queue: Queue) -> None: + def __init__(self, playlist: Playlist, lock: Lock, queue: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None: + """ + Start a new process that will have his own bot instance + Due to pickle serialization, no objects are stored, the values initialization are being made in the run method + """ Process.__init__(self, group=None, target=None, args=(), kwargs={}) + # Synchronization objects self.__playlist: Playlist = playlist self.__lock: Lock = lock self.__queue: Queue = queue - + self.__semStopPlaying: Semaphore = None + self.__loop: AbstractEventLoop = None + # Discord context ID + self.__textChannelID = textID + self.__guildID = guildID + self.__voiceChannelID = voiceID + self.__authorID = authorID # All information of discord context will be retrieved directly with discord API self.__guild: Guild = None self.__bot: Client = None self.__voiceChannel: VoiceChannel = None self.__textChannel: TextChannel = None - self.__loop: AbstractEventLoop = None + self.__author: User = None + self.__configs: Configs = None - self.__playing = False - - # Flag to control if the player should stop totally the playing self.__forceStop = False self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', 'options': '-vn'} def run(self) -> None: - """Function called in process.start(), this will exec the actually _run method it in event loop""" - print('Run') - + """Method called by process.start(), this will exec the actually _run method in a event loop""" self.__loop = asyncio.get_event_loop() self.__configs = Configs() - # self.__loop = self.__bot.loop self.__semStopPlaying = Semaphore(0) self.__stopped = asyncio.Event() - # task = self.__loop.create_task(self._run()) self.__loop.run_until_complete(self._run()) async def _run(self) -> None: - # Recreate the bot instance in this new process + # Recreate the bot instance and objects using discord API self.__bot = await self.__createBotInstance() + self.__guild = self.__bot.get_guild(self.__guildID) + self.__voiceChannel = self.__bot.get_channel(self.__voiceChannelID) + self.__textChannel = self.__bot.get_channel(self.__textChannelID) + self.__author = self.__bot.get_channel(self.__authorID) + # Connect to voice Channel + await self.__connectToVoiceChannel() # Start the timeout function self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) - # Thread that will receive commands to execute in this Process + # Thread that will receive commands to be executed in this Process self.__commandsReceiver = Thread(target=self.__commandsReceiver, daemon=True) self.__commandsReceiver.start() @@ -81,8 +92,10 @@ class PlayerProcess(Process): await self.__semStopPlaying.acquire() async def __playPlaylistSongs(self) -> None: + print(f'Playing: {self.__playing}') if not self.__playing: with self.__lock: + print('Next Song Aqui') song = self.__playlist.next_song() await self.__playSong(song) @@ -91,18 +104,19 @@ class PlayerProcess(Process): try: source = await self.__ensureSource(song) if source is None: - self.__playNext(None, self.__context) + self.__playNext(None) self.__playing = True player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS) voice = self.__guild.voice_client - voice.play(player, after=lambda e: self.__playNext(e, self.__context)) + voice.play(player, after=lambda e: self.__playNext(e)) self.__timer.cancel() - self.__timer = TimeoutClock(self.__timeout_handler) + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) - await self.__context.invoke(self.__bot.get_command('np')) - except: + # await self.__context.invoke(self.__bot.get_command('np')) + except Exception as e: + print(f'[ERROR IN PLAY SONG] -> {e}') self.__playNext(None) def __playNext(self, error) -> None: @@ -120,11 +134,12 @@ class PlayerProcess(Process): self.__playing = False def __commandsReceiver(self) -> None: - for x in range(2): + while True: command: VCommands = self.__queue.get() type = command.getType() args = command.getArgs() + print(f'Command Received: {type}') if type == VCommandsType.PAUSE: self.pause() elif type == VCommandsType.PLAY: @@ -167,7 +182,9 @@ class PlayerProcess(Process): print(f'DEVELOPER NOTE -> Force Stop Error: {e}') async def __createBotInstance(self) -> Client: - # Load a new bot instance, this bot should not receive commands directly + """Load a new bot instance that should not be directly called. + Get the guild, voice and text Channel in discord API using IDs passed in constructor + """ intents = Intents.default() intents.members = True bot = Bot(command_prefix='Rafael', @@ -178,7 +195,6 @@ class PlayerProcess(Process): # Add the Cogs for this bot too for filename in listdir(f'./{self.__configs.COMMANDS_PATH}'): - print(filename) if filename.endswith('.py'): bot.load_extension(f'{self.__configs.COMMANDS_PATH}.{filename[:-3]}') @@ -187,10 +203,7 @@ class PlayerProcess(Process): await task self.__loop.create_task(bot.connect(reconnect=True)) # Sleep to wait connection to be established - await asyncio.sleep(1) - - self.__guild: Guild = bot.get_guild(651983781258985484) - self.__voiceChannel = self.__bot.get_channel(933437427350118450) + await asyncio.sleep(2) return bot @@ -228,7 +241,7 @@ class PlayerProcess(Process): except: return False - async def __connect(self) -> bool: + async def __connectToVoiceChannel(self) -> bool: try: await self.__voiceChannel.connect(reconnect=True, timeout=None) return True diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py index d1c757a..e744de9 100644 --- a/Parallelism/ProcessManager.py +++ b/Parallelism/ProcessManager.py @@ -24,17 +24,34 @@ class ProcessManager(Singleton): def getPlayerContext(self, guild: Guild, context: Context) -> ProcessContext: try: - print('Get') if guild not in self.__playersProcess.keys(): - playlist: Playlist = self.__manager.Playlist() - lock = Lock() - queue = Queue() - process = PlayerProcess(playlist, lock, queue) - processContext = ProcessContext(process, queue, playlist, lock) - self.__playersProcess[guild] = processContext + self.__playersProcess[guild] = self.__createProcess(context) + else: + if not self.__playersProcess[guild].getProcess().is_alive(): + self.__playersProcess[guild] = self.__createProcess(context) + return self.__playersProcess[guild] except Exception as e: - print(e) + print(f'[Error In GetPlayerContext] -> {e}') + + def getRunningPlayerContext(self, guild: Guild) -> ProcessContext: + if guild not in self.__playersProcess.keys(): + return None + + return self.__playersProcess[guild] + + def __createProcess(self, context: Context): + guildID: int = context.guild.id + textID: int = context.channel.id + voiceID: int = context.author.voice.channel.id + authorID: int = context.author.id + + playlist: Playlist = self.__manager.Playlist() + lock = Lock() + queue = Queue() + process = PlayerProcess(playlist, lock, queue, guildID, textID, voiceID, authorID) + processContext = ProcessContext(process, queue, playlist, lock) + return processContext class Manager(BaseManager):