Vulkan/Parallelism/ProcessManager.py

206 lines
8.8 KiB
Python

import asyncio
from multiprocessing import Lock, Queue
from multiprocessing.managers import BaseManager, NamespaceProxy
from queue import Empty
from threading import Thread
from typing import Dict, Tuple, Union
from Config.Singleton import Singleton
from discord import Guild, Interaction
from discord.ext.commands import Context
from Parallelism.AbstractProcessManager import AbstractPlayersManager
from Parallelism.ProcessExecutor import ProcessCommandsExecutor
from Music.Song import Song
from Parallelism.PlayerProcess import PlayerProcess
from Music.Playlist import Playlist
from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus
from Parallelism.Commands import VCommands, VCommandsType
from Music.VulkanBot import VulkanBot
class ProcessManager(Singleton, AbstractPlayersManager):
"""
Manage all running player process, creating and storing them for future calls
Deals with the creation of shared memory
"""
def __init__(self, bot: VulkanBot = None) -> None:
if not super().created:
self.__bot = bot
VManager.register('Playlist', Playlist)
self.__manager = VManager()
self.__manager.start()
self.__playersProcess: Dict[int, PlayerInfo] = {}
self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {}
self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {}
def setPlayerInfo(self, guild: Guild, info: PlayerInfo):
self.__playersProcess[guild.id] = info
def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
"""Return the process info for the guild, the user in context must be connected to a voice_channel"""
try:
if guild.id not in self.__playersProcess.keys():
self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context)
else:
# If the process has ended create a new one
if not self.__playersProcess[guild.id].getProcess().is_alive():
self.__playersProcess[guild.id] = self.__recreateProcess(guild, context)
return self.__playersProcess[guild.id]
except Exception as e:
print(f'[Error In GetPlayerContext] -> {e}')
def resetProcess(self, guild: Guild, context: Context) -> None:
"""Restart a running process, already start it to return to play"""
if guild.id not in self.__playersProcess.keys():
return None
# Recreate the process keeping the playlist
newProcessInfo = self.__recreateProcess(guild, context)
newProcessInfo.getProcess().start() # Start the process
# Send a command to start the play again
playCommand = VCommands(VCommandsType.PLAY)
newProcessInfo.getQueueToPlayer().put(playCommand)
self.__playersProcess[guild.id] = newProcessInfo
def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo:
"""Return the process info for the guild, if not, return None"""
if guild.id not in self.__playersProcess.keys():
print('Process Info not found')
return None
return self.__playersProcess[guild.id]
def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo:
guildID: int = context.guild.id
textID: int = context.channel.id
voiceID: int = context.author.voice.channel.id
authorID: int = context.author.id
playlist: Playlist = self.__manager.Playlist()
lock = Lock()
queueToListen = Queue()
queueToSend = Queue()
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
queueToListen, guildID, textID, voiceID, authorID)
processInfo = PlayerInfo(process, queueToSend, queueToListen,
playlist, lock, context.channel)
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
thread = Thread(target=self.__listenToCommands,
args=(queueToListen, guild), daemon=True)
self.__playersListeners[guildID] = (thread, False)
thread.start()
# Create a Message Controller for this player
self.__playersCommandsExecutor[guildID] = ProcessCommandsExecutor(self.__bot, guildID)
return processInfo
def __stopPossiblyRunningProcess(self, guild: Guild):
try:
if guild.id in self.__playersProcess.keys():
playerProcess = self.__playersProcess[guild.id]
process = playerProcess.getProcess()
process.close()
process.kill()
playerProcess.getQueueToMain().close()
playerProcess.getQueueToMain().join_thread()
playerProcess.getQueueToPlayer().close()
playerProcess.getQueueToPlayer().join_thread()
except Exception as e:
print(f'[ERROR STOPPING PROCESS] -> {e}')
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
"""Create a new process info using previous playlist"""
self.__stopPossiblyRunningProcess(guild)
guildID: int = context.guild.id
textID: int = context.channel.id
if isinstance(context, Interaction):
authorID: int = context.user.id
voiceID: int = context.user.voice.channel.id
else:
authorID: int = context.author.id
voiceID: int = context.author.voice.channel.id
playlist: Playlist = self.__playersProcess[guildID].getPlaylist()
lock = Lock()
queueToListen = Queue()
queueToSend = Queue()
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
queueToListen, guildID, textID, voiceID, authorID)
processInfo = PlayerInfo(process, queueToSend, queueToListen,
playlist, lock, context.channel)
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
thread = Thread(target=self.__listenToCommands,
args=(queueToListen, guild), daemon=True)
self.__playersListeners[guildID] = (thread, False)
thread.start()
return processInfo
def __listenToCommands(self, queue: Queue, guild: Guild) -> None:
guildID = guild.id
while True:
shouldEnd = self.__playersListeners[guildID][1]
if shouldEnd:
break
try:
command: VCommands = queue.get(timeout=5)
commandType = command.getType()
args = command.getArgs()
print(f'Process {guild.name} sended command {commandType}')
if commandType == VCommandsType.NOW_PLAYING:
asyncio.run_coroutine_threadsafe(self.showNowPlaying(
guild.id, args), self.__bot.loop)
elif commandType == VCommandsType.TERMINATE:
# Delete the process elements and return, to finish task
self.__terminateProcess(guildID)
return
elif commandType == VCommandsType.SLEEPING:
# The process might be used again
self.__sleepingProcess(guildID)
return
else:
print(f'[ERROR] -> Unknown Command Received from Process: {commandType}')
except Empty:
continue
except Exception as e:
print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}')
def __terminateProcess(self, guildID: int) -> None:
# Delete all structures associated with the Player
del self.__playersProcess[guildID]
del self.__playersCommandsExecutor[guildID]
threadListening = self.__playersListeners[guildID]
threadListening._stop()
del self.__playersListeners[guildID]
def __sleepingProcess(self, guildID: int) -> None:
# Disable all process structures, except Playlist
queue1 = self.__playersProcess[guildID].getQueueToMain()
queue2 = self.__playersProcess[guildID].getQueueToPlayer()
queue1.close()
queue1.join_thread()
queue2.close()
queue2.join_thread()
# Set the status of this process as sleeping, only the playlist object remains
self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING)
async def showNowPlaying(self, guildID: int, song: Song) -> None:
commandExecutor = self.__playersCommandsExecutor[guildID]
processInfo = self.__playersProcess[guildID]
await commandExecutor.sendNowPlaying(processInfo, song)
class VManager(BaseManager):
pass
class VProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')