diff --git a/.gitignore b/.gitignore index 3799cf3..40aa9d2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.vscode assets/ __pycache__ .env diff --git a/Commands/Control.py b/Commands/Control.py index 58d5167..31a370a 100644 --- a/Commands/Control.py +++ b/Commands/Control.py @@ -44,7 +44,7 @@ class Control(commands.Cog): await ctx.send(embed=embed) else: - print(f'DEVELOPER NOTE -> Comand Error: {error}') + print(f'DEVELOPER NOTE -> Command Error: {error}') embed = self.__embeds.UNKNOWN_ERROR() await ctx.send(embed=embed) diff --git a/Commands/Music.py b/Commands/Music.py index 759b26a..57fbdf9 100644 --- a/Commands/Music.py +++ b/Commands/Music.py @@ -6,7 +6,7 @@ from Controllers.ClearController import ClearController from Controllers.MoveController import MoveController from Controllers.NowPlayingController import NowPlayingController from Controllers.PlayController import PlayController -from Controllers.PlayerController import PlayersController +from Controllers.PlayersController import PlayersController from Controllers.PrevController import PrevController from Controllers.RemoveController import RemoveController from Controllers.ResetController import ResetController @@ -21,7 +21,7 @@ from Controllers.QueueController import QueueController from Controllers.LoopController import LoopController from Views.EmoteView import EmoteView from Views.EmbedView import EmbedView - +from Parallelism.ProcessManager import ProcessManager helper = Helper() @@ -29,6 +29,7 @@ helper = Helper() class Music(commands.Cog): def __init__(self, bot) -> None: self.__bot: Client = bot + self.__processManager = ProcessManager(bot) self.__cleaner = Cleaner(self.__bot) self.__controller = PlayersController(self.__bot) diff --git a/Controllers/AbstractController.py b/Controllers/AbstractController.py index 81c69b5..ab05b6b 100644 --- a/Controllers/AbstractController.py +++ b/Controllers/AbstractController.py @@ -3,7 +3,7 @@ from typing import List from discord.ext.commands import Context from discord import Client, Guild, ClientUser, Member from Config.Messages import Messages -from Controllers.PlayerController import PlayersController +from Controllers.PlayersController import PlayersController from Music.Player import Player from Controllers.ControllerResponse import ControllerResponse from Config.Configs import Configs diff --git a/Controllers/PlayController.py b/Controllers/PlayController.py index c0c73db..b66cc24 100644 --- a/Controllers/PlayController.py +++ b/Controllers/PlayController.py @@ -1,4 +1,3 @@ -import asyncio from Exceptions.Exceptions import DownloadingError, InvalidInput, VulkanError from discord.ext.commands import Context from discord import Client @@ -8,6 +7,8 @@ from Controllers.ControllerResponse import ControllerResponse from Music.Downloader import Downloader from Music.Searcher import Searcher from Music.Song import Song +from Parallelism.ProcessManager import ProcessManager +from Parallelism.Commands import VCommands, VCommandsType class PlayController(AbstractController): @@ -25,13 +26,6 @@ class PlayController(AbstractController): embed = self.embeds.NO_CHANNEL() return ControllerResponse(self.ctx, embed, error) - if not self.__is_connected(): - success = await self.__connect() - if not success: - error = UnknownError() - embed = self.embeds.UNKNOWN_ERROR() - return ControllerResponse(self.ctx, embed, error) - try: musics = await self.__searcher.search(track) if musics is None or len(musics) == 0: @@ -63,7 +57,26 @@ class PlayController(AbstractController): embed = self.embeds.SONGS_ADDED(quant) response = ControllerResponse(self.ctx, embed) - asyncio.create_task(self.player.play(self.ctx)) + # Get the process context for the current guild + manager = ProcessManager(self.bot) + processContext = manager.getPlayerContext(self.guild, self.ctx) + # Add the downloaded song to the process playlist + # All access to shared memory should be protect by acquire the Lock + with processContext.getLock(): + processContext.getPlaylist().add_song(song) + + # If process already started send a command to the player process by queue + process = processContext.getProcess() + queue = processContext.getQueue() + if process.is_alive(): + command = VCommands(VCommandsType.PLAY) + queue.put(command) + else: + # Start the process + command = VCommands(VCommandsType.CONTEXT, self.ctx) + queue.put(command) + process.start() + return response except Exception as err: @@ -72,6 +85,7 @@ class PlayController(AbstractController): error = err embed = self.embeds.CUSTOM_ERROR(error) else: + print(f'DEVELOPER NOTE -> PlayController Error: {err}') error = UnknownError() embed = self.embeds.UNKNOWN_ERROR() diff --git a/Controllers/PlayerController.py b/Controllers/PlayersController.py similarity index 98% rename from Controllers/PlayerController.py rename to Controllers/PlayersController.py index 28293cc..723b971 100644 --- a/Controllers/PlayerController.py +++ b/Controllers/PlayersController.py @@ -1,3 +1,4 @@ +from multiprocessing import Process from typing import Dict, List, Union from Config.Singleton import Singleton from discord import Guild, Client, VoiceClient, Member diff --git a/Controllers/ResetController.py b/Controllers/ResetController.py index 43e0aa6..c15284e 100644 --- a/Controllers/ResetController.py +++ b/Controllers/ResetController.py @@ -1,8 +1,8 @@ from discord.ext.commands import Context -from discord import Client, Member +from discord import Client from Controllers.AbstractController import AbstractController from Controllers.ControllerResponse import ControllerResponse -from Controllers.PlayerController import PlayersController +from Controllers.PlayersController import PlayersController class ResetController(AbstractController): diff --git a/Music/Playlist.py b/Music/Playlist.py index 53fa14a..29db64c 100644 --- a/Music/Playlist.py +++ b/Music/Playlist.py @@ -8,7 +8,7 @@ import random class Playlist: def __init__(self) -> None: - self.__config = Configs() + self.__configs = Configs() self.__queue = deque() # Store the musics to play self.__songs_history = deque() # Store the musics played @@ -17,6 +17,9 @@ class Playlist: self.__current: Song = None + def getSongs(self) -> deque[Song]: + return self.__queue + def validate_position(self, position: int) -> bool: if position not in range(1, len(self.__queue) + 1): return False @@ -47,7 +50,7 @@ class Playlist: @property def songs_to_preload(self) -> List[Song]: - return list(self.__queue)[:self.__config.MAX_PRELOAD_SONGS] + return list(self.__queue)[:self.__configs.MAX_PRELOAD_SONGS] def __len__(self) -> int: return len(self.__queue) @@ -64,7 +67,7 @@ class Playlist: if played_song.problematic == False: self.__songs_history.appendleft(played_song) - if len(self.__songs_history) > self.__config.MAX_SONGS_HISTORY: + if len(self.__songs_history) > self.__configs.MAX_SONGS_HISTORY: self.__songs_history.pop() # Remove the older elif self.__looping_one: # Insert the current song to play again diff --git a/Parallelism/Commands.py b/Parallelism/Commands.py new file mode 100644 index 0000000..6c133f3 --- /dev/null +++ b/Parallelism/Commands.py @@ -0,0 +1,23 @@ +from enum import Enum +from typing import Tuple + + +class VCommandsType(Enum): + PLAY_PREV = 'Play Prev' + SKIP = 'Skip' + PAUSE = 'Pause' + RESUME = 'Resume' + CONTEXT = 'Context' + PLAY = 'Play' + + +class VCommands: + def __init__(self, type: VCommandsType, args=None) -> None: + self.__type = type + self.__args = args + + def getType(self) -> VCommandsType: + return self.__type + + def getArgs(self) -> Tuple: + return self.__args diff --git a/Parallelism/PlayerProcess.py b/Parallelism/PlayerProcess.py new file mode 100644 index 0000000..9541d8b --- /dev/null +++ b/Parallelism/PlayerProcess.py @@ -0,0 +1,236 @@ +import asyncio +from os import listdir +from discord import Intents +from asyncio import AbstractEventLoop, Semaphore +from multiprocessing import Process, Queue +from threading import Lock, Thread +from typing import Callable, Text +from discord import Client, Guild, FFmpegPCMAudio, VoiceChannel, TextChannel +from discord.ext.commands import Context +from Music.Playlist import Playlist +from Music.Song import Song +from Config.Configs import Configs +from discord.ext.commands import Bot +from Parallelism.Commands import VCommands, VCommandsType + + +class TimeoutClock: + def __init__(self, callback: Callable, loop: asyncio.AbstractEventLoop): + self.__callback = callback + self.__task = loop.create_task(self.__executor()) + + async def __executor(self): + await asyncio.sleep(Configs().VC_TIMEOUT) + await self.__callback() + + def cancel(self): + self.__task.cancel() + + +class PlayerProcess(Process): + """Process that will play songs, receive commands by a received Queue""" + + def __init__(self, playlist: Playlist, lock: Lock, queue: Queue) -> None: + Process.__init__(self, group=None, target=None, args=(), kwargs={}) + self.__playlist: Playlist = playlist + self.__lock: Lock = lock + self.__queue: Queue = queue + + # All information of discord context will be retrieved directly with discord API + self.__guild: Guild = None + self.__bot: Client = None + self.__voiceChannel: VoiceChannel = None + self.__textChannel: TextChannel = None + self.__loop: AbstractEventLoop = None + self.__configs: Configs = None + + self.__playing = False + + # Flag to control if the player should stop totally the playing + self.__forceStop = False + self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5', + 'options': '-vn'} + + def run(self) -> None: + """Function called in process.start(), this will exec the actually _run method it in event loop""" + print('Run') + + self.__loop = asyncio.get_event_loop() + self.__configs = Configs() + + # self.__loop = self.__bot.loop + self.__semStopPlaying = Semaphore(0) + self.__stopped = asyncio.Event() + # task = self.__loop.create_task(self._run()) + self.__loop.run_until_complete(self._run()) + + async def _run(self) -> None: + # Recreate the bot instance in this new process + self.__bot = await self.__createBotInstance() + + # Start the timeout function + self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop) + # Thread that will receive commands to execute in this Process + self.__commandsReceiver = Thread(target=self.__commandsReceiver, daemon=True) + self.__commandsReceiver.start() + + # Start a Task to play songs + self.__loop.create_task(self.__playPlaylistSongs()) + # Try to acquire a semaphore, it'll be release when timeout function trigger, we use the Semaphore + # from the asyncio lib to not block the event loop + await self.__semStopPlaying.acquire() + + async def __playPlaylistSongs(self) -> None: + if not self.__playing: + with self.__lock: + song = self.__playlist.next_song() + + await self.__playSong(song) + + async def __playSong(self, song: Song) -> None: + try: + source = await self.__ensureSource(song) + if source is None: + self.__playNext(None, self.__context) + self.__playing = True + + player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS) + voice = self.__guild.voice_client + voice.play(player, after=lambda e: self.__playNext(e, self.__context)) + + self.__timer.cancel() + self.__timer = TimeoutClock(self.__timeout_handler) + + await self.__context.invoke(self.__bot.get_command('np')) + except: + self.__playNext(None) + + def __playNext(self, error) -> None: + if self.__forceStop: # If it's forced to stop player + self.__forceStop = False + return None + + with self.__lock: + song = self.__playlist.next_song() + + if song is not None: + coro = self.__playSong(song) + self.__bot.loop.create_task(coro) + else: + self.__playing = False + + def __commandsReceiver(self) -> None: + for x in range(2): + command: VCommands = self.__queue.get() + type = command.getType() + args = command.getArgs() + + if type == VCommandsType.PAUSE: + self.pause() + elif type == VCommandsType.PLAY: + self.__loop.create_task(self.__playPlaylistSongs()) + elif type == VCommandsType.PLAY_PREV: + self.__playPrev() + elif type == VCommandsType.RESUME: + pass + elif type == VCommandsType.SKIP: + pass + else: + print(f'[ERROR] -> Unknown Command Received: {command}') + + def pause(self): + print(id(self)) + + async def __playPrev(self, ctx: Context) -> None: + with self.__lock: + song = self.__playlist.prev_song() + if song is not None: + if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused(): + # Will forbidden next_song to execute after stopping current player + self.__forceStop = True + self.__guild.voice_client.stop() + self.__playing = False + + await self.__playSong(ctx, song) + + async def __forceStop(self) -> None: + try: + if self.__guild.voice_client is None: + return + + self.__guild.voice_client.stop() + await self.__guild.voice_client.disconnect() + with self.__lock: + self.__playlist.clear() + self.__playlist.loop_off() + except Exception as e: + print(f'DEVELOPER NOTE -> Force Stop Error: {e}') + + async def __createBotInstance(self) -> Client: + # Load a new bot instance, this bot should not receive commands directly + intents = Intents.default() + intents.members = True + bot = Bot(command_prefix='Rafael', + pm_help=True, + case_insensitive=True, + intents=intents) + bot.remove_command('help') + + # Add the Cogs for this bot too + for filename in listdir(f'./{self.__configs.COMMANDS_PATH}'): + print(filename) + if filename.endswith('.py'): + bot.load_extension(f'{self.__configs.COMMANDS_PATH}.{filename[:-3]}') + + # Login and connect the bot instance to discord API + task = self.__loop.create_task(bot.login(token=self.__configs.BOT_TOKEN, bot=True)) + await task + self.__loop.create_task(bot.connect(reconnect=True)) + # Sleep to wait connection to be established + await asyncio.sleep(1) + + self.__guild: Guild = bot.get_guild(651983781258985484) + self.__voiceChannel = self.__bot.get_channel(933437427350118450) + + return bot + + async def __timeoutHandler(self) -> None: + if self.__guild.voice_client is None: + return + + if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused(): + self.__timer = TimeoutClock(self.__timeoutHandler) + + elif self.__guild.voice_client.is_connected(): + with self.__lock: + self.__playlist.clear() + self.__playlist.loop_off() + await self.__guild.voice_client.disconnect() + # Release semaphore to finish process + self.__semStopPlaying.release() + + async def __ensureSource(self, song: Song) -> str: + while True: + if song.source is not None: # If song got downloaded + return song.source + + if song.problematic: # If song got any error + return None + + await asyncio.sleep(0.1) + + def __is_connected(self) -> bool: + try: + if not self.__voiceChannel.is_connected(): + return False + else: + return True + except: + return False + + async def __connect(self) -> bool: + try: + await self.__voiceChannel.connect(reconnect=True, timeout=None) + return True + except: + return False diff --git a/Parallelism/ProcessContext.py b/Parallelism/ProcessContext.py new file mode 100644 index 0000000..e288a97 --- /dev/null +++ b/Parallelism/ProcessContext.py @@ -0,0 +1,22 @@ +from multiprocessing import Process, Queue, Lock +from Music.Playlist import Playlist + + +class ProcessContext: + def __init__(self, process: Process, queue: Queue, playlist: Playlist, lock: Lock) -> None: + self.__process = process + self.__queue = queue + self.__playlist = playlist + self.__lock = lock + + def getProcess(self) -> Process: + return self.__process + + def getQueue(self) -> Queue: + return self.__queue + + def getPlaylist(self) -> Playlist: + return self.__playlist + + def getLock(self) -> Lock: + return self.__lock diff --git a/Parallelism/ProcessManager.py b/Parallelism/ProcessManager.py new file mode 100644 index 0000000..d1c757a --- /dev/null +++ b/Parallelism/ProcessManager.py @@ -0,0 +1,45 @@ +from multiprocessing import Queue, Lock +from multiprocessing.managers import BaseManager, NamespaceProxy +from typing import Dict +from Config.Singleton import Singleton +from discord import Guild, Client +from discord.ext.commands import Context +from Parallelism.PlayerProcess import PlayerProcess +from Music.Playlist import Playlist +from Parallelism.ProcessContext import ProcessContext + + +class ProcessManager(Singleton): + def __init__(self, bot: Client = None) -> None: + if not super().created: + Manager.register('Playlist', Playlist) + self.__manager = Manager() + self.__manager.start() + if bot is not None: + self.__bot: Client = bot + self.__playersProcess: Dict[Guild, ProcessContext] = {} + + def setPlayerContext(self, guild: Guild, context: ProcessContext): + self.__playersProcess[guild] = context + + def getPlayerContext(self, guild: Guild, context: Context) -> ProcessContext: + try: + print('Get') + if guild not in self.__playersProcess.keys(): + playlist: Playlist = self.__manager.Playlist() + lock = Lock() + queue = Queue() + process = PlayerProcess(playlist, lock, queue) + processContext = ProcessContext(process, queue, playlist, lock) + self.__playersProcess[guild] = processContext + return self.__playersProcess[guild] + except Exception as e: + print(e) + + +class Manager(BaseManager): + pass + + +class ProxyBase(NamespaceProxy): + _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') diff --git a/main.py b/main.py index edb1f39..e4b6269 100644 --- a/main.py +++ b/main.py @@ -33,5 +33,6 @@ class VulkanInitializer: self.__bot.run(self.__config.BOT_TOKEN, bot=True, reconnect=True) -vulkan = VulkanInitializer() -vulkan.run() +if __name__ == '__main__': + vulkan = VulkanInitializer() + vulkan.run()