Adding a queue for player send commands to Main process

This commit is contained in:
Rafael Vargas
2022-07-28 00:38:30 -03:00
parent 5902a0dc72
commit 60a36425ee
23 changed files with 233 additions and 77 deletions

View File

@@ -11,6 +11,9 @@ class VCommandsType(Enum):
PLAY = 'Play'
STOP = 'Stop'
RESET = 'Reset'
NOW_PLAYING = 'Now Playing'
TERMINATE = 'Terminate'
SLEEPING = 'Sleeping'
class VCommands:

View File

@@ -1,9 +1,9 @@
import asyncio
from Music.VulkanInitializer import VulkanInitializer
from discord import User, Member, Message, Embed
from asyncio import AbstractEventLoop, Semaphore
from multiprocessing import Process, Queue, RLock
from threading import Lock, Thread
from asyncio import AbstractEventLoop, Semaphore, Queue
from multiprocessing import Process, RLock, Lock
from threading import Thread
from typing import Callable, List
from discord import Guild, FFmpegPCMAudio, VoiceChannel, TextChannel
from Music.Playlist import Playlist
@@ -31,7 +31,7 @@ class TimeoutClock:
class PlayerProcess(Process):
"""Process that will play songs, receive commands from the main process by a Queue"""
def __init__(self, name: str, playlist: Playlist, lock: Lock, queue: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None:
def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None:
"""
Start a new process that will have his own bot instance
Due to pickle serialization, no objects are stored, the values initialization are being made in the run method
@@ -40,7 +40,8 @@ class PlayerProcess(Process):
# Synchronization objects
self.__playlist: Playlist = playlist
self.__playlistLock: Lock = lock
self.__queue: Queue = queue
self.__queueReceive: Queue = queueToReceive
self.__queueSend: Queue = queueToSend
self.__semStopPlaying: Semaphore = None
self.__loop: AbstractEventLoop = None
# Discord context ID
@@ -96,8 +97,7 @@ class PlayerProcess(Process):
# Start the timeout function
self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop)
# Thread that will receive commands to be executed in this Process
self.__commandsReceiver = Thread(target=self.__commandsReceiver, daemon=True)
self.__commandsReceiver.start()
self.__loop.create_task(self.__commandsReceiver())
# Start a Task to play songs
self.__loop.create_task(self.__playPlaylistSongs())
@@ -146,8 +146,10 @@ class PlayerProcess(Process):
self.__timer.cancel()
self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop)
await self.__deletePrevNowPlaying()
await self.__showNowPlaying()
nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song)
await self.__queueSend.put(nowPlayingCommand)
# await self.__deletePrevNowPlaying()
# await self.__showNowPlaying()
except Exception as e:
print(f'[ERROR IN PLAY SONG] -> {e}, {type(e)}')
self.__playNext(None)
@@ -190,12 +192,11 @@ class PlayerProcess(Process):
self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}')
def __commandsReceiver(self) -> None:
async def __commandsReceiver(self) -> None:
while True:
command: VCommands = self.__queue.get()
command: VCommands = await self.__queueReceive.get()
type = command.getType()
args = command.getArgs()
print(f'{self.name} received command {type}')
try:
self.__playerLock.acquire()
@@ -206,13 +207,13 @@ class PlayerProcess(Process):
elif type == VCommandsType.SKIP:
self.__skip()
elif type == VCommandsType.PLAY:
asyncio.run_coroutine_threadsafe(self.__playPlaylistSongs(), self.__loop)
await self.__playPlaylistSongs()
elif type == VCommandsType.PREV:
asyncio.run_coroutine_threadsafe(self.__playPrev(args), self.__loop)
await self.__playPrev(args)
elif type == VCommandsType.RESET:
asyncio.run_coroutine_threadsafe(self.__reset(), self.__loop)
await self.__reset()
elif type == VCommandsType.STOP:
asyncio.run_coroutine_threadsafe(self.__stop(), self.__loop)
await self.__stop()
else:
print(f'[ERROR] -> Unknown Command Received: {command}')
except Exception as e:

View File

@@ -1,4 +1,5 @@
from multiprocessing import Process, Queue, Lock
from discord import TextChannel
from Music.Playlist import Playlist
@@ -7,11 +8,13 @@ class ProcessInfo:
Class to store the reference to all structures to maintain a player process
"""
def __init__(self, process: Process, queue: Queue, playlist: Playlist, lock: Lock) -> None:
def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
self.__process = process
self.__queue = queue
self.__queueToPlayer = queueToPlayer
self.__queueToMain = queueToMain
self.__playlist = playlist
self.__lock = lock
self.__textChannel = textChannel
def setProcess(self, newProcess: Process) -> None:
self.__process = newProcess
@@ -19,11 +22,17 @@ class ProcessInfo:
def getProcess(self) -> Process:
return self.__process
def getQueue(self) -> Queue:
return self.__queue
def getQueueToPlayer(self) -> Queue:
return self.__queueToPlayer
def getQueueToMain(self) -> Queue:
return self.__queueToMain
def getPlaylist(self) -> Playlist:
return self.__playlist
def getLock(self) -> Lock:
return self.__lock
def getTextChannel(self) -> TextChannel:
return self.__textChannel

View File

@@ -1,13 +1,21 @@
from multiprocessing import Queue, Lock
from asyncio import Queue, Task
import asyncio
from multiprocessing import Lock
from multiprocessing.managers import BaseManager, NamespaceProxy
from typing import Dict, Union
from queue import Empty
from threading import Thread
from typing import Dict, Tuple, Union
from Config.Singleton import Singleton
from discord import Guild, Interaction
from discord.ext.commands import Context
from Music.MessagesController import MessagesController
from Music.Song import Song
from Parallelism.PlayerProcess import PlayerProcess
from Music.Playlist import Playlist
from Parallelism.ProcessInfo import ProcessInfo
from Parallelism.Commands import VCommands, VCommandsType
from Music.VulkanBot import VulkanBot
from Tests.LoopRunner import LoopRunner
class ProcessManager(Singleton):
@@ -16,12 +24,16 @@ class ProcessManager(Singleton):
Deal with the creation of shared memory
"""
def __init__(self) -> None:
def __init__(self, bot: VulkanBot = None) -> None:
if not super().created:
self.__bot = bot
VManager.register('Playlist', Playlist)
self.__manager = VManager()
self.__manager.start()
self.__playersProcess: Dict[Guild, ProcessInfo] = {}
# self.__playersListeners: Dict[Guild, Tuple[Thread, bool]] = {}
self.__playersListeners: Dict[Guild, Task] = {}
self.__playersMessages: Dict[Guild, MessagesController] = {}
def setPlayerInfo(self, guild: Guild, info: ProcessInfo):
self.__playersProcess[guild.id] = info
@@ -37,11 +49,11 @@ class ProcessManager(Singleton):
return self.__playersProcess[guild.id]
if guild.id not in self.__playersProcess.keys():
self.__playersProcess[guild.id] = self.__createProcessInfo(context)
self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context)
else:
# If the process has ended create a new one
if not self.__playersProcess[guild.id].getProcess().is_alive():
self.__playersProcess[guild.id] = self.__recreateProcess(context)
self.__playersProcess[guild.id] = self.__recreateProcess(guild, context)
return self.__playersProcess[guild.id]
except Exception as e:
@@ -53,11 +65,11 @@ class ProcessManager(Singleton):
return None
# Recreate the process keeping the playlist
newProcessInfo = self.__recreateProcess(context)
newProcessInfo = self.__recreateProcess(guild, context)
newProcessInfo.getProcess().start() # Start the process
# Send a command to start the play again
playCommand = VCommands(VCommandsType.PLAY)
newProcessInfo.getQueue().put(playCommand)
newProcessInfo.getQueueToPlayer().put(playCommand)
self.__playersProcess[guild.id] = newProcessInfo
def getRunningPlayerInfo(self, guild: Guild) -> ProcessInfo:
@@ -67,7 +79,7 @@ class ProcessManager(Singleton):
return self.__playersProcess[guild.id]
def __createProcessInfo(self, context: Context) -> ProcessInfo:
def __createProcessInfo(self, guild: Guild, context: Context) -> ProcessInfo:
guildID: int = context.guild.id
textID: int = context.channel.id
voiceID: int = context.author.voice.channel.id
@@ -75,14 +87,25 @@ class ProcessManager(Singleton):
playlist: Playlist = self.__manager.Playlist()
lock = Lock()
queue = Queue()
process = PlayerProcess(context.guild.name, playlist, lock, queue,
guildID, textID, voiceID, authorID)
processInfo = ProcessInfo(process, queue, playlist, lock)
queueToListen = Queue()
queueToSend = Queue()
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
queueToListen, guildID, textID, voiceID, authorID)
processInfo = ProcessInfo(process, queueToSend, queueToListen,
playlist, lock, context.channel)
task = asyncio.create_task(self.__listenToCommands(queueToListen, guild))
# Create a Thread to listen for the queue coming from the Player Process
# thread = Thread(target=self.__listenToCommands, args=(queueToListen, guild), daemon=True)
self.__playersListeners[guildID] = task
# thread.start()
# Create a Message Controller for this player
self.__playersMessages[guildID] = MessagesController(self.__bot)
return processInfo
def __recreateProcess(self, context: Context) -> ProcessInfo:
def __recreateProcess(self, guild: Guild, context: Context) -> ProcessInfo:
"""Create a new process info using previous playlist"""
guildID: int = context.guild.id
textID: int = context.channel.id
@@ -91,14 +114,78 @@ class ProcessManager(Singleton):
playlist: Playlist = self.__playersProcess[guildID].getPlaylist()
lock = Lock()
queue = Queue()
queueToListen = Queue()
queueToSend = Queue()
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
queueToListen, guildID, textID, voiceID, authorID)
processInfo = ProcessInfo(process, queueToSend, queueToListen, playlist, lock)
process = PlayerProcess(context.guild.name, playlist, lock, queue,
guildID, textID, voiceID, authorID)
processInfo = ProcessInfo(process, queue, playlist, lock)
task = asyncio.create_task(self.__listenToCommands(queueToListen, guild))
# Create a Thread to listen for the queue coming from the Player Process
# thread = Thread(target=self.__listenToCommands, args=(queueToListen, guild), daemon=True)
self.__playersListeners[guildID] = task
# thread.start()
# Create a Message Controller for this player
self.__playersMessages[guildID] = MessagesController(self.__bot)
return processInfo
async def __listenToCommands(self, queue: Queue, guild: Guild) -> None:
shouldEnd = False
guildID = guild.id
while not shouldEnd:
shouldEnd = self.__playersListeners[guildID][1]
try:
print('Esperando')
command: VCommands = await queue.get()
commandType = command.getType()
args = command.getArgs()
print(f'Process {guild.name} sended command {commandType}')
if commandType == VCommandsType.NOW_PLAYING:
print('Aqui dentro')
await self.__showNowPlaying(args, guildID)
elif commandType == VCommandsType.TERMINATE:
# Delete the process elements and return, to finish task
self.__terminateProcess()
return
elif commandType == VCommandsType.SLEEPING:
# The process might be used again
self.__sleepingProcess()
return
else:
print(f'[ERROR] -> Unknown Command Received from Process: {commandType}')
except Empty:
continue
except Exception as e:
print(e)
print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}')
def __terminateProcess(self, guildID: int) -> None:
# Delete all structures associated with the Player
del self.__playersProcess[guildID]
del self.__playersMessages[guildID]
threadListening = self.__playersListeners[guildID]
threadListening._stop()
del self.__playersListeners[guildID]
def __sleepingProcess(self, guildID: int) -> None:
# Disable all process structures, except Playlist
queue1 = self.__playersProcess[guildID].getQueueToMain()
queue2 = self.__playersProcess[guildID].getQueueToPlayer()
queue1.close()
queue1.join_thread()
queue2.close()
queue2.join_thread()
async def __showNowPlaying(self, guildID: int, song: Song) -> None:
messagesController = self.__playersMessages[guildID]
processInfo = self.__playersProcess[guildID]
print('Aq1')
await messagesController.sendNowPlaying(processInfo, song)
print('Aq2')
class VManager(BaseManager):
pass