mirror of
https://github.com/RafaelSolVargas/Vulkan.git
synced 2025-10-29 16:57:23 +00:00
Continuing the refactoring
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from threading import Lock
|
||||
from typing import Union
|
||||
from discord.ext.commands import Context
|
||||
from discord import Guild, Interaction
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
from Parallelism.ProcessInfo import PlayerInfo
|
||||
from Parallelism.Commands import VCommands
|
||||
|
||||
|
||||
class AbstractPlayersManager(ABC):
|
||||
@@ -11,19 +13,33 @@ class AbstractPlayersManager(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def setPlayerInfo(self, guild: Guild, info: PlayerInfo):
|
||||
def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
"""If the forceCreation boolean is True, then the context must be provided for the Player to be created"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
"""If there is a player process for the guild, then return the playlist of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def resetProcess(self, guild: Guild, context: Context) -> None:
|
||||
def getPlayerLock(self, guild: Guild) -> Lock:
|
||||
"""If there is a player process for the guild, then return the lock of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo:
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
"""Returns if a player for the guild exists"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None:
|
||||
"""With the context information of a guild create a internal player for the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
"""Tries to reset the player of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -32,7 +32,7 @@ class TimeoutClock:
|
||||
class PlayerProcess(Process):
|
||||
"""Process that will play songs, receive commands from the main process by a Queue"""
|
||||
|
||||
def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None:
|
||||
def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, voiceID: int) -> None:
|
||||
"""
|
||||
Start a new process that will have his own bot instance
|
||||
Due to pickle serialization, no objects are stored, the values initialization are being made in the run method
|
||||
@@ -46,10 +46,8 @@ class PlayerProcess(Process):
|
||||
self.__semStopPlaying: Semaphore = None
|
||||
self.__loop: AbstractEventLoop = None
|
||||
# Discord context ID
|
||||
self.__textChannelID = textID
|
||||
self.__guildID = guildID
|
||||
self.__voiceChannelID = voiceID
|
||||
self.__authorID = authorID
|
||||
# All information of discord context will be retrieved directly with discord API
|
||||
self.__guild: Guild = None
|
||||
self.__bot: VulkanBot = None
|
||||
@@ -81,9 +79,6 @@ class PlayerProcess(Process):
|
||||
self.__bot = await self.__createBotInstance()
|
||||
self.__guild = self.__bot.get_guild(self.__guildID)
|
||||
self.__voiceChannel = self.__bot.get_channel(self.__voiceChannelID)
|
||||
self.__textChannel = self.__bot.get_channel(self.__textChannelID)
|
||||
self.__author = self.__bot.get_channel(self.__authorID)
|
||||
self.__botMember = self.__getBotMember()
|
||||
# Connect to voice Channel
|
||||
await self.__connectToVoiceChannel()
|
||||
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import asyncio
|
||||
from time import sleep, time
|
||||
from time import time
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from Music.VulkanInitializer import VulkanInitializer
|
||||
from discord import Member, VoiceClient
|
||||
from asyncio import AbstractEventLoop, Semaphore
|
||||
from discord import VoiceClient
|
||||
from asyncio import AbstractEventLoop
|
||||
from threading import RLock, Thread
|
||||
from multiprocessing import Lock
|
||||
from typing import Callable, List
|
||||
from typing import Callable
|
||||
from discord import Guild, FFmpegPCMAudio, VoiceChannel
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
@@ -32,7 +31,7 @@ class TimeoutClock:
|
||||
class PlayerThread(Thread):
|
||||
"""Player Thread to control the song playback in the same Process of the Main Process"""
|
||||
|
||||
def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, textID: int, voiceID: int, authorID: int) -> None:
|
||||
def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int) -> None:
|
||||
Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={})
|
||||
# Synchronization objects
|
||||
self.__playlist: Playlist = playlist
|
||||
|
||||
@@ -3,8 +3,8 @@ from discord import Button, TextChannel
|
||||
from discord.ui import View
|
||||
from Config.Emojis import VEmojis
|
||||
from Messages.MessagesCategory import MessagesCategory
|
||||
from Music.Playlist import Playlist
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessInfo import PlayerInfo
|
||||
from Config.Messages import Messages
|
||||
from Music.Song import Song
|
||||
from Config.Embeds import VEmbeds
|
||||
@@ -29,9 +29,9 @@ class ProcessCommandsExecutor:
|
||||
self.__embeds = VEmbeds()
|
||||
self.__emojis = VEmojis()
|
||||
|
||||
async def sendNowPlaying(self, processInfo: PlayerInfo, song: Song) -> None:
|
||||
async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None:
|
||||
print('B')
|
||||
# Get the lock of the playlist
|
||||
playlist = processInfo.getPlaylist()
|
||||
if playlist.isLoopingOne():
|
||||
title = self.__messages.ONE_SONG_LOOPING
|
||||
else:
|
||||
@@ -39,7 +39,6 @@ class ProcessCommandsExecutor:
|
||||
|
||||
# Create View and Embed
|
||||
embed = self.__embeds.SONG_INFO(song.info, title)
|
||||
channel = processInfo.getTextChannel()
|
||||
view = self.__getPlayerView(channel)
|
||||
# Send Message and add to the MessagesManager
|
||||
message = await channel.send(embed=embed, view=view)
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
from enum import Enum
|
||||
from multiprocessing import Process, Queue, Lock
|
||||
from discord import TextChannel
|
||||
from Music.Playlist import Playlist
|
||||
|
||||
|
||||
class ProcessStatus(Enum):
|
||||
RUNNING = 'Running'
|
||||
SLEEPING = 'Sleeping'
|
||||
|
||||
|
||||
class PlayerInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a song player
|
||||
"""
|
||||
|
||||
def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
|
||||
self.__process = process
|
||||
self.__queueToPlayer = queueToPlayer
|
||||
self.__queueToMain = queueToMain
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
self.__status = ProcessStatus.RUNNING
|
||||
|
||||
def setProcess(self, newProcess: Process) -> None:
|
||||
self.__process = newProcess
|
||||
|
||||
def getStatus(self) -> ProcessStatus:
|
||||
return self.__status
|
||||
|
||||
def setStatus(self, status: ProcessStatus) -> None:
|
||||
self.__status = status
|
||||
|
||||
def getProcess(self) -> Process:
|
||||
return self.__process
|
||||
|
||||
def getQueueToPlayer(self) -> Queue:
|
||||
return self.__queueToPlayer
|
||||
|
||||
def getQueueToMain(self) -> Queue:
|
||||
return self.__queueToMain
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> Lock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
@@ -1,23 +1,70 @@
|
||||
import asyncio
|
||||
from multiprocessing import Lock, Queue
|
||||
from enum import Enum
|
||||
from multiprocessing import Lock, Process, Queue
|
||||
from multiprocessing.managers import BaseManager, NamespaceProxy
|
||||
from queue import Empty
|
||||
from threading import Thread
|
||||
from typing import Dict, Tuple, Union
|
||||
from Config.Singleton import Singleton
|
||||
from discord import Guild, Interaction
|
||||
from discord import Guild, Interaction, TextChannel
|
||||
from discord.ext.commands import Context
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.ProcessExecutor import ProcessCommandsExecutor
|
||||
from Music.Song import Song
|
||||
from Parallelism.PlayerProcess import PlayerProcess
|
||||
from Music.Playlist import Playlist
|
||||
from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
|
||||
|
||||
class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
class ProcessStatus(Enum):
|
||||
RUNNING = 'Running'
|
||||
SLEEPING = 'Sleeping'
|
||||
|
||||
|
||||
class PlayerProcessInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a process player
|
||||
"""
|
||||
|
||||
def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
|
||||
self.__process = process
|
||||
self.__queueToPlayer = queueToPlayer
|
||||
self.__queueToMain = queueToMain
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
self.__status = ProcessStatus.RUNNING
|
||||
|
||||
def setProcess(self, newProcess: Process) -> None:
|
||||
self.__process = newProcess
|
||||
|
||||
def getStatus(self) -> ProcessStatus:
|
||||
return self.__status
|
||||
|
||||
def setStatus(self, status: ProcessStatus) -> None:
|
||||
self.__status = status
|
||||
|
||||
def getProcess(self) -> Process:
|
||||
return self.__process
|
||||
|
||||
def getQueueToPlayer(self) -> Queue:
|
||||
return self.__queueToPlayer
|
||||
|
||||
def getQueueToMain(self) -> Queue:
|
||||
return self.__queueToMain
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> Lock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
|
||||
|
||||
class ProcessPlayerManager(Singleton, AbstractPlayersManager):
|
||||
"""
|
||||
Manage all running player process, creating and storing them for future calls
|
||||
Deals with the creation of shared memory
|
||||
@@ -29,28 +76,51 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
VManager.register('Playlist', Playlist)
|
||||
self.__manager = VManager()
|
||||
self.__manager.start()
|
||||
self.__playersProcess: Dict[int, PlayerInfo] = {}
|
||||
self.__playersProcess: Dict[int, PlayerProcessInfo] = {}
|
||||
self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {}
|
||||
self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {}
|
||||
|
||||
def setPlayerInfo(self, guild: Guild, info: PlayerInfo):
|
||||
self.__playersProcess[guild.id] = info
|
||||
def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
if forceCreation:
|
||||
processInfo = self.createPlayerForGuild(guild, context)
|
||||
else:
|
||||
processInfo = self.__getRunningPlayerInfo(guild)
|
||||
|
||||
def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
|
||||
"""Return the process info for the guild, the user in context must be connected to a voice_channel"""
|
||||
if processInfo == None:
|
||||
return
|
||||
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.__putCommandInQueue(queue, command)
|
||||
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getPlaylist()
|
||||
|
||||
def getPlayerLock(self, guild: Guild) -> Lock:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getLock()
|
||||
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
return guild.id in self.__playersProcess.keys()
|
||||
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None:
|
||||
try:
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context)
|
||||
self.__playersProcess[guild.id] = self.__createProcessPlayerInfo(guild, context)
|
||||
else:
|
||||
# If the process has ended create a new one
|
||||
if not self.__playersProcess[guild.id].getProcess().is_alive():
|
||||
self.__playersProcess[guild.id] = self.__recreateProcess(guild, context)
|
||||
|
||||
# Start the process
|
||||
self.__playersProcess[guild.id].getProcess().start()
|
||||
return self.__playersProcess[guild.id]
|
||||
except Exception as e:
|
||||
print(f'[Error In GetPlayerContext] -> {e}')
|
||||
|
||||
def resetProcess(self, guild: Guild, context: Context) -> None:
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
"""Restart a running process, already start it to return to play"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
return None
|
||||
@@ -60,10 +130,10 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
newProcessInfo.getProcess().start() # Start the process
|
||||
# Send a command to start the play again
|
||||
playCommand = VCommands(VCommandsType.PLAY)
|
||||
newProcessInfo.getQueueToPlayer().put(playCommand)
|
||||
self.__putCommandInQueue(newProcessInfo.getQueueToPlayer(), playCommand)
|
||||
self.__playersProcess[guild.id] = newProcessInfo
|
||||
|
||||
def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo:
|
||||
def __getRunningPlayerInfo(self, guild: Guild) -> PlayerProcessInfo:
|
||||
"""Return the process info for the guild, if not, return None"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
print('Process Info not found')
|
||||
@@ -71,7 +141,7 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
|
||||
return self.__playersProcess[guild.id]
|
||||
|
||||
def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo:
|
||||
def __createProcessPlayerInfo(self, guild: Guild, context: Context) -> PlayerProcessInfo:
|
||||
guildID: int = context.guild.id
|
||||
textID: int = context.channel.id
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
@@ -82,9 +152,9 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
queueToListen = Queue()
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = PlayerInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
queueToListen, guildID, voiceID)
|
||||
processInfo = PlayerProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
@@ -111,7 +181,7 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
except Exception as e:
|
||||
print(f'[ERROR STOPPING PROCESS] -> {e}')
|
||||
|
||||
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
|
||||
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerProcessInfo:
|
||||
"""Create a new process info using previous playlist"""
|
||||
self.__stopPossiblyRunningProcess(guild)
|
||||
|
||||
@@ -130,8 +200,8 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = PlayerInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
processInfo = PlayerProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
@@ -191,10 +261,18 @@ class ProcessManager(Singleton, AbstractPlayersManager):
|
||||
# Set the status of this process as sleeping, only the playlist object remains
|
||||
self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING)
|
||||
|
||||
def __putCommandInQueue(self, queue: Queue, command: VCommands) -> None:
|
||||
try:
|
||||
queue.put(command)
|
||||
except Exception as e:
|
||||
print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}')
|
||||
|
||||
async def showNowPlaying(self, guildID: int, song: Song) -> None:
|
||||
commandExecutor = self.__playersCommandsExecutor[guildID]
|
||||
processInfo = self.__playersProcess[guildID]
|
||||
print('A')
|
||||
await commandExecutor.sendNowPlaying(processInfo, song)
|
||||
print('C')
|
||||
|
||||
|
||||
class VManager(BaseManager):
|
||||
|
||||
@@ -1,199 +0,0 @@
|
||||
import asyncio
|
||||
from multiprocessing import Lock, Queue
|
||||
from multiprocessing.managers import BaseManager, NamespaceProxy
|
||||
from queue import Empty
|
||||
from threading import Thread
|
||||
from typing import Dict, Tuple, Union
|
||||
from Config.Singleton import Singleton
|
||||
from discord import Guild, Interaction
|
||||
from discord.ext.commands import Context
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.ProcessExecutor import ProcessCommandsExecutor
|
||||
from Music.Song import Song
|
||||
from Parallelism.PlayerProcess import PlayerProcess
|
||||
from Music.Playlist import Playlist
|
||||
from Parallelism.ProcessInfo import PlayerInfo, ProcessStatus
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
|
||||
|
||||
class ThreadManager(Singleton, AbstractPlayersManager):
|
||||
"""
|
||||
Manage all running player threads, creating and storing them for future calls
|
||||
"""
|
||||
|
||||
def __init__(self, bot: VulkanBot = None) -> None:
|
||||
if not super().created:
|
||||
self.__bot = bot
|
||||
self.__playersProcess: Dict[int, Thread] = {}
|
||||
|
||||
def setPlayerInfo(self, guild: Guild, info: PlayerInfo):
|
||||
self.__playersProcess[guild.id] = info
|
||||
|
||||
def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
|
||||
"""Return the process info for the guild, the user in context must be connected to a voice_channel"""
|
||||
try:
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context)
|
||||
else:
|
||||
# If the process has ended create a new one
|
||||
if not self.__playersProcess[guild.id].getProcess().is_alive():
|
||||
self.__playersProcess[guild.id] = self.__recreateProcess(guild, context)
|
||||
|
||||
return self.__playersProcess[guild.id]
|
||||
except Exception as e:
|
||||
print(f'[Error In GetPlayerContext] -> {e}')
|
||||
|
||||
def resetProcess(self, guild: Guild, context: Context) -> None:
|
||||
"""Restart a running process, already start it to return to play"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
return None
|
||||
|
||||
# Recreate the process keeping the playlist
|
||||
newProcessInfo = self.__recreateProcess(guild, context)
|
||||
newProcessInfo.getProcess().start() # Start the process
|
||||
# Send a command to start the play again
|
||||
playCommand = VCommands(VCommandsType.PLAY)
|
||||
newProcessInfo.getQueueToPlayer().put(playCommand)
|
||||
self.__playersProcess[guild.id] = newProcessInfo
|
||||
|
||||
def getRunningPlayerInfo(self, guild: Guild) -> PlayerInfo:
|
||||
"""Return the process info for the guild, if not, return None"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
print('Process Info not found')
|
||||
return None
|
||||
|
||||
return self.__playersProcess[guild.id]
|
||||
|
||||
def __createProcessInfo(self, guild: Guild, context: Context) -> PlayerInfo:
|
||||
guildID: int = context.guild.id
|
||||
textID: int = context.channel.id
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
authorID: int = context.author.id
|
||||
|
||||
playlist: Playlist = self.__manager.Playlist()
|
||||
lock = Lock()
|
||||
queueToListen = Queue()
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = PlayerInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
args=(queueToListen, guild), daemon=True)
|
||||
self.__playersListeners[guildID] = (thread, False)
|
||||
thread.start()
|
||||
|
||||
# Create a Message Controller for this player
|
||||
self.__playersCommandsExecutor[guildID] = ProcessCommandsExecutor(self.__bot, guildID)
|
||||
|
||||
return processInfo
|
||||
|
||||
def __stopPossiblyRunningProcess(self, guild: Guild):
|
||||
try:
|
||||
if guild.id in self.__playersProcess.keys():
|
||||
playerProcess = self.__playersProcess[guild.id]
|
||||
process = playerProcess.getProcess()
|
||||
process.close()
|
||||
process.kill()
|
||||
playerProcess.getQueueToMain().close()
|
||||
playerProcess.getQueueToMain().join_thread()
|
||||
playerProcess.getQueueToPlayer().close()
|
||||
playerProcess.getQueueToPlayer().join_thread()
|
||||
except Exception as e:
|
||||
print(f'[ERROR STOPPING PROCESS] -> {e}')
|
||||
|
||||
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerInfo:
|
||||
"""Create a new process info using previous playlist"""
|
||||
self.__stopPossiblyRunningProcess(guild)
|
||||
|
||||
guildID: int = context.guild.id
|
||||
textID: int = context.channel.id
|
||||
if isinstance(context, Interaction):
|
||||
authorID: int = context.user.id
|
||||
voiceID: int = context.user.voice.channel.id
|
||||
else:
|
||||
authorID: int = context.author.id
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
|
||||
playlist: Playlist = self.__playersProcess[guildID].getPlaylist()
|
||||
lock = Lock()
|
||||
queueToListen = Queue()
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = PlayerInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
args=(queueToListen, guild), daemon=True)
|
||||
self.__playersListeners[guildID] = (thread, False)
|
||||
thread.start()
|
||||
|
||||
return processInfo
|
||||
|
||||
def __listenToCommands(self, queue: Queue, guild: Guild) -> None:
|
||||
guildID = guild.id
|
||||
while True:
|
||||
shouldEnd = self.__playersListeners[guildID][1]
|
||||
if shouldEnd:
|
||||
break
|
||||
|
||||
try:
|
||||
command: VCommands = queue.get(timeout=5)
|
||||
commandType = command.getType()
|
||||
args = command.getArgs()
|
||||
|
||||
print(f'Process {guild.name} sended command {commandType}')
|
||||
if commandType == VCommandsType.NOW_PLAYING:
|
||||
asyncio.run_coroutine_threadsafe(self.showNowPlaying(
|
||||
guild.id, args), self.__bot.loop)
|
||||
elif commandType == VCommandsType.TERMINATE:
|
||||
# Delete the process elements and return, to finish task
|
||||
self.__terminateProcess(guildID)
|
||||
return
|
||||
elif commandType == VCommandsType.SLEEPING:
|
||||
# The process might be used again
|
||||
self.__sleepingProcess(guildID)
|
||||
return
|
||||
else:
|
||||
print(f'[ERROR] -> Unknown Command Received from Process: {commandType}')
|
||||
except Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f'[ERROR IN LISTENING PROCESS] -> {guild.name} - {e}')
|
||||
|
||||
def __terminateProcess(self, guildID: int) -> None:
|
||||
# Delete all structures associated with the Player
|
||||
del self.__playersProcess[guildID]
|
||||
del self.__playersCommandsExecutor[guildID]
|
||||
threadListening = self.__playersListeners[guildID]
|
||||
threadListening._stop()
|
||||
del self.__playersListeners[guildID]
|
||||
|
||||
def __sleepingProcess(self, guildID: int) -> None:
|
||||
# Disable all process structures, except Playlist
|
||||
queue1 = self.__playersProcess[guildID].getQueueToMain()
|
||||
queue2 = self.__playersProcess[guildID].getQueueToPlayer()
|
||||
queue1.close()
|
||||
queue1.join_thread()
|
||||
queue2.close()
|
||||
queue2.join_thread()
|
||||
# Set the status of this process as sleeping, only the playlist object remains
|
||||
self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING)
|
||||
|
||||
async def showNowPlaying(self, guildID: int, song: Song) -> None:
|
||||
commandExecutor = self.__playersCommandsExecutor[guildID]
|
||||
processInfo = self.__playersProcess[guildID]
|
||||
await commandExecutor.sendNowPlaying(processInfo, song)
|
||||
|
||||
|
||||
class VManager(BaseManager):
|
||||
pass
|
||||
|
||||
|
||||
class VProxy(NamespaceProxy):
|
||||
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
|
||||
131
Parallelism/ThreadPlayerManager.py
Normal file
131
Parallelism/ThreadPlayerManager.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from multiprocessing import Lock
|
||||
from typing import Dict, Union
|
||||
from Config.Singleton import Singleton
|
||||
from discord import Guild, Interaction, TextChannel
|
||||
from discord.ext.commands import Context
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Music.Song import Song
|
||||
from Music.Playlist import Playlist
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.PlayerThread import PlayerThread
|
||||
|
||||
|
||||
class PlayerThreadInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a player thread
|
||||
"""
|
||||
|
||||
def __init__(self, thread: PlayerThread, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
|
||||
self.__thread = thread
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
|
||||
def getThread(self) -> PlayerThread:
|
||||
return self.__thread
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> Lock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
|
||||
|
||||
class ThreadPlayerManager(Singleton, AbstractPlayersManager):
|
||||
"""
|
||||
Manage all running player threads, creating and storing them for future calls
|
||||
"""
|
||||
|
||||
def __init__(self, bot: VulkanBot = None) -> None:
|
||||
if not super().created:
|
||||
self.__bot = bot
|
||||
self.__playersThreads: Dict[int, PlayerThreadInfo] = {}
|
||||
|
||||
def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
return super().sendCommandToPlayer(command, guild, forceCreation, context)
|
||||
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getPlaylist()
|
||||
|
||||
def getPlayerLock(self, guild: Guild) -> Lock:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getLock()
|
||||
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
return guild.id in self.__playersThreads.keys()
|
||||
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]):
|
||||
try:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
self.__playersThreads[guild.id] = self.__createPlayerThreadInfo(context)
|
||||
else:
|
||||
# If the thread has ended create a new one
|
||||
if not self.__playersThreads[guild.id].getThread().is_alive():
|
||||
self.__playersThreads[guild.id] = self.__recreateThread(guild, context)
|
||||
|
||||
return self.__playersThreads[guild.id]
|
||||
except Exception as e:
|
||||
print(f'[Error In GetPlayerContext] -> {e}')
|
||||
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
return None
|
||||
|
||||
# Recreate the thread keeping the playlist
|
||||
newPlayerInfo = self.__recreateThread(guild, context)
|
||||
newPlayerInfo.getThread().start()
|
||||
# Send a command to start the play again
|
||||
playCommand = VCommands(VCommandsType.PLAY)
|
||||
newPlayerInfo.getQueueToPlayer().put(playCommand)
|
||||
self.__playersThreads[guild.id] = newPlayerInfo
|
||||
|
||||
def __getRunningPlayerInfo(self, guild: Guild) -> PlayerThreadInfo:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
print('Process Info not found')
|
||||
return None
|
||||
|
||||
return self.__playersThreads[guild.id]
|
||||
|
||||
def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> PlayerThreadInfo:
|
||||
guildID: int = context.guild.id
|
||||
if isinstance(context, Interaction):
|
||||
voiceID: int = context.user.voice.channel.id
|
||||
else:
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
|
||||
playlist = Playlist()
|
||||
lock = Lock()
|
||||
player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID)
|
||||
playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel)
|
||||
player.start()
|
||||
|
||||
return playerInfo
|
||||
|
||||
def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerThreadInfo:
|
||||
self.__stopPossiblyRunningProcess(guild)
|
||||
|
||||
guildID: int = context.guild.id
|
||||
if isinstance(context, Interaction):
|
||||
voiceID: int = context.user.voice.channel.id
|
||||
else:
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
|
||||
playlist = self.__playersThreads[guildID].getPlaylist()
|
||||
lock = Lock()
|
||||
player = PlayerThread(context.guild.name, playlist, lock, guildID, voiceID)
|
||||
playerInfo = PlayerThreadInfo(player, playlist, lock, context.channel)
|
||||
player.start()
|
||||
|
||||
return playerInfo
|
||||
|
||||
async def showNowPlaying(self, guildID: int, song: Song) -> None:
|
||||
commandExecutor = self.__playersCommandsExecutor[guildID]
|
||||
processInfo = self.__playersThreads[guildID]
|
||||
await commandExecutor.sendNowPlaying(processInfo, song)
|
||||
Reference in New Issue
Block a user