mirror of
https://github.com/RafaelSolVargas/Vulkan.git
synced 2025-10-29 16:57:23 +00:00
Merge pull request #37 from RafaelSolVargas/creatingThreadsManager
Adding ThreadPlayer
This commit is contained in:
commit
cf8f13d1d9
@ -1,4 +1,3 @@
|
||||
import os
|
||||
from decouple import config
|
||||
from Config.Singleton import Singleton
|
||||
from Config.Folder import Folder
|
||||
@ -10,6 +9,13 @@ class VConfigs(Singleton):
|
||||
# You can change this boolean to False if you want to prevent the Bot from auto disconnecting
|
||||
# Resolution for the issue: https://github.com/RafaelSolVargas/Vulkan/issues/33
|
||||
self.SHOULD_AUTO_DISCONNECT_WHEN_ALONE = False
|
||||
# Recommended to be True, except in cases when your Bot is present in thousands servers, in that case
|
||||
# the delay to start a new Python process for the playback is too much, and to avoid that you set as False
|
||||
# This feature is for now in testing period, for a more stable version, keep this boolean = True
|
||||
self.SONG_PLAYBACK_IN_SEPARATE_PROCESS = True
|
||||
# Maximum of songs that will be downloaded at once, the higher this number is, the faster the songs will be all available
|
||||
# but the slower will be the others commands of the Bot during the downloading time, for example, the playback quality
|
||||
self.MAX_DOWNLOAD_SONGS_AT_A_TIME = 5
|
||||
|
||||
self.BOT_PREFIX = '!'
|
||||
try:
|
||||
@ -44,8 +50,8 @@ class VConfigs(Singleton):
|
||||
self.MY_ERROR_BAD_COMMAND = 'This string serves to verify if some error was raised by myself on purpose'
|
||||
self.INVITE_URL = 'https://discordapp.com/oauth2/authorize?client_id={}&scope=bot'
|
||||
|
||||
def getProcessManager(self):
|
||||
def getPlayersManager(self):
|
||||
return self.__manager
|
||||
|
||||
def setProcessManager(self, newManager):
|
||||
def setPlayersManager(self, newManager):
|
||||
self.__manager = newManager
|
||||
|
||||
@ -23,7 +23,8 @@ from Messages.Responses.EmbedCogResponse import EmbedCommandResponse
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Config.Configs import VConfigs
|
||||
from Config.Embeds import VEmbeds
|
||||
from Parallelism.ProcessManager import ProcessManager
|
||||
from Parallelism.ProcessPlayerManager import ProcessPlayerManager
|
||||
from Parallelism.ThreadPlayerManager import ThreadPlayerManager
|
||||
|
||||
helper = Helper()
|
||||
|
||||
@ -38,7 +39,11 @@ class MusicCog(Cog):
|
||||
def __init__(self, bot: VulkanBot) -> None:
|
||||
self.__bot: VulkanBot = bot
|
||||
self.__embeds = VEmbeds()
|
||||
VConfigs().setProcessManager(ProcessManager(bot))
|
||||
configs = VConfigs()
|
||||
if configs.SONG_PLAYBACK_IN_SEPARATE_PROCESS:
|
||||
configs.setPlayersManager(ProcessPlayerManager(bot))
|
||||
else:
|
||||
configs.setPlayersManager(ThreadPlayerManager(bot))
|
||||
|
||||
@command(name="play", help=helper.HELP_PLAY, description=helper.HELP_PLAY_LONG, aliases=['p', 'tocar'])
|
||||
async def play(self, ctx: Context, *args) -> None:
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from Parallelism.Commands import VCommands
|
||||
from multiprocessing import Queue
|
||||
from typing import List, Union
|
||||
from discord.ext.commands import Context
|
||||
from discord import Client, Guild, ClientUser, Interaction, Member, User
|
||||
@ -29,12 +27,6 @@ class AbstractHandler(ABC):
|
||||
else:
|
||||
self.__author = ctx.user
|
||||
|
||||
def putCommandInQueue(self, queue: Queue, command: VCommands) -> None:
|
||||
try:
|
||||
queue.put(command)
|
||||
except Exception as e:
|
||||
print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}')
|
||||
|
||||
@abstractmethod
|
||||
async def run(self) -> HandlerResponse:
|
||||
pass
|
||||
|
||||
@ -4,7 +4,7 @@ from discord.ext.commands import Context
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.ProcessInfo import ProcessInfo
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
|
||||
|
||||
class ClearHandler(AbstractHandler):
|
||||
@ -13,19 +13,18 @@ class ClearHandler(AbstractHandler):
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
# Clear the playlist
|
||||
playlist = processInfo.getPlaylist()
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist.clear()
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
embed = self.embeds.PLAYLIST_CLEAR()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
@ -2,6 +2,7 @@ from discord.ext.commands import Context
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Utils.Utils import Utils
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
@ -13,18 +14,16 @@ class HistoryHandler(AbstractHandler):
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist = processInfo.getPlaylist()
|
||||
history = playlist.getSongsHistory()
|
||||
processLock.release()
|
||||
history = playersManager.getPlayerPlaylist(self.guild).getSongsHistory()
|
||||
playerLock.release()
|
||||
else:
|
||||
# If the player doesn't respond in time we restart it
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
else:
|
||||
|
||||
@ -6,6 +6,7 @@ from discord import Interaction
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Music.Playlist import Playlist
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
|
||||
|
||||
@ -16,32 +17,31 @@ class JumpMusicHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self, musicPos: str) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
# Try to convert input to int
|
||||
error = self.__validateInput(musicPos)
|
||||
if error:
|
||||
embed = self.embeds.ERROR_EMBED(error.message)
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
# Sanitize the input
|
||||
playlist: Playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
musicPos = self.__sanitizeInput(playlist, musicPos)
|
||||
|
||||
# Validate the position
|
||||
if not playlist.validate_position(musicPos):
|
||||
error = InvalidInput()
|
||||
embed = self.embeds.PLAYLIST_RANGE_ERROR()
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
try:
|
||||
# Move the selected song
|
||||
@ -49,19 +49,17 @@ class JumpMusicHandler(AbstractHandler):
|
||||
|
||||
# Send a command to the player to skip the music
|
||||
command = VCommands(VCommandsType.SKIP, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
|
||||
processLock.release()
|
||||
return HandlerResponse(self.ctx)
|
||||
except:
|
||||
# Release the acquired Lock
|
||||
processLock.release()
|
||||
embed = self.embeds.ERROR_MOVING()
|
||||
error = UnknownError()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
finally:
|
||||
playerLock.release()
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ from Handlers.HandlerResponse import HandlerResponse
|
||||
from Config.Exceptions import BadCommandUsage
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
|
||||
|
||||
class LoopHandler(AbstractHandler):
|
||||
@ -12,23 +13,20 @@ class LoopHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self, args: str) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
playlist = processInfo.getPlaylist()
|
||||
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
if args == '' or args is None:
|
||||
playlist.loop_all()
|
||||
embed = self.embeds.LOOP_ALL_ACTIVATED()
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
args = args.lower()
|
||||
@ -51,9 +49,9 @@ class LoopHandler(AbstractHandler):
|
||||
error = BadCommandUsage()
|
||||
embed = self.embeds.BAD_LOOP_USE()
|
||||
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
@ -7,6 +7,7 @@ from Config.Exceptions import BadCommandUsage, VulkanError, InvalidInput, Number
|
||||
from Music.Playlist import Playlist
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
|
||||
|
||||
class MoveHandler(AbstractHandler):
|
||||
@ -14,45 +15,44 @@ class MoveHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self, pos1: str, pos2: str) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
error = self.__validateInput(pos1, pos2)
|
||||
if error:
|
||||
embed = self.embeds.ERROR_EMBED(error.message)
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
pos1, pos2 = self.__sanitizeInput(playlist, pos1, pos2)
|
||||
|
||||
if not playlist.validate_position(pos1) or not playlist.validate_position(pos2):
|
||||
error = InvalidInput()
|
||||
embed = self.embeds.PLAYLIST_RANGE_ERROR()
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
try:
|
||||
song = playlist.move_songs(pos1, pos2)
|
||||
|
||||
song_name = song.title if song.title else song.identifier
|
||||
embed = self.embeds.SONG_MOVED(song_name, pos1, pos2)
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
except:
|
||||
# Release the acquired Lock
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
embed = self.embeds.ERROR_MOVING()
|
||||
error = UnknownError()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Utils.Cleaner import Cleaner
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
@ -13,14 +14,12 @@ class NowPlayingHandler(AbstractHandler):
|
||||
self.__cleaner = Cleaner()
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
if playlist.getCurrentSong() is None:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
@ -13,17 +13,10 @@ class PauseHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
if processInfo.getStatus() == ProcessStatus.SLEEPING:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Send Pause command to be execute by player process
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
command = VCommands(VCommandsType.PAUSE, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
|
||||
embed = self.embeds.PLAYER_PAUSED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import traceback
|
||||
from typing import List
|
||||
from typing import List, Union
|
||||
from Config.Exceptions import DownloadingError, InvalidInput, VulkanError
|
||||
from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
@ -9,12 +9,10 @@ from Handlers.HandlerResponse import HandlerResponse
|
||||
from Music.Downloader import Downloader
|
||||
from Music.Searcher import Searcher
|
||||
from Music.Song import Song
|
||||
from Parallelism.ProcessInfo import ProcessInfo
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
from Music.Playlist import Playlist
|
||||
|
||||
|
||||
class PlayHandler(AbstractHandler):
|
||||
@ -36,13 +34,12 @@ class PlayHandler(AbstractHandler):
|
||||
if musicsInfo is None or len(musicsInfo) == 0:
|
||||
raise InvalidInput(self.messages.INVALID_INPUT, self.messages.ERROR_TITLE)
|
||||
|
||||
# Get the process context for the current guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx)
|
||||
playlist: Playlist = processInfo.getPlaylist()
|
||||
process = processInfo.getProcess()
|
||||
if not process.is_alive(): # If process has not yet started, start
|
||||
process.start()
|
||||
# If there is no executing player for the guild then we create the player
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
playersManager.createPlayerForGuild(self.guild, self.ctx)
|
||||
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
|
||||
# Create the Songs objects
|
||||
songs: List[Song] = []
|
||||
@ -66,32 +63,24 @@ class PlayHandler(AbstractHandler):
|
||||
embed = self.embeds.SONG_ADDED_TWO(song.info, pos)
|
||||
response = HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Add the unique song to the playlist and send a command to player process
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
# Add the unique song to the playlist and send a command to player
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist.add_song(song)
|
||||
# Release the acquired Lock
|
||||
processLock.release()
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
playerLock.release()
|
||||
playCommand = VCommands(VCommandsType.PLAY, None)
|
||||
self.putCommandInQueue(queue, playCommand)
|
||||
|
||||
await playersManager.sendCommandToPlayer(playCommand, self.guild)
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
return response
|
||||
else: # If multiple songs added
|
||||
# If more than 10 songs, download and load the first 5 to start the play right away
|
||||
if len(songs) > 10:
|
||||
fiveFirstSongs = songs[0:5]
|
||||
songs = songs[5:]
|
||||
await self.__downloadSongsAndStore(fiveFirstSongs, processInfo)
|
||||
|
||||
# Trigger a task to download all songs and then store them in the process playlist
|
||||
asyncio.create_task(self.__downloadSongsAndStore(songs, processInfo))
|
||||
# Trigger a task to download all songs and then store them in the playlist
|
||||
asyncio.create_task(self.__downloadSongsInLots(songs, playersManager))
|
||||
|
||||
embed = self.embeds.SONGS_ADDED(len(songs))
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
@ -100,8 +89,8 @@ class PlayHandler(AbstractHandler):
|
||||
embed = self.embeds.DOWNLOADING_ERROR()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
except Exception as error:
|
||||
print(f'ERROR IN PLAYHANDLER -> {traceback.format_exc()}', {type(error)})
|
||||
if isinstance(error, VulkanError): # If error was already processed
|
||||
print(f'[ERROR IN PLAYHANDLER] -> {traceback.format_exc()}', {type(error)})
|
||||
if isinstance(error, VulkanError):
|
||||
embed = self.embeds.CUSTOM_ERROR(error)
|
||||
else:
|
||||
error = UnknownError()
|
||||
@ -109,37 +98,40 @@ class PlayHandler(AbstractHandler):
|
||||
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
async def __downloadSongsAndStore(self, songs: List[Song], processInfo: ProcessInfo) -> None:
|
||||
playlist = processInfo.getPlaylist()
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
async def __downloadSongsInLots(self, songs: List[Song], playersManager: AbstractPlayersManager) -> None:
|
||||
"""
|
||||
To avoid having a lot of tasks delaying the song playback we will lock the maximum songs downloading at a time
|
||||
"""
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
playCommand = VCommands(VCommandsType.PLAY, None)
|
||||
tooManySongs = len(songs) > 100
|
||||
maxDownloads = self.config.MAX_DOWNLOAD_SONGS_AT_A_TIME
|
||||
|
||||
# Trigger a task for each song to be downloaded
|
||||
tasks: List[asyncio.Task] = []
|
||||
for index, song in enumerate(songs):
|
||||
# If there is a lot of songs being downloaded, force a sleep to try resolve the Http Error 429 "To Many Requests"
|
||||
# Trying to fix the issue https://github.com/RafaelSolVargas/Vulkan/issues/32
|
||||
if tooManySongs and index % 3 == 0:
|
||||
await asyncio.sleep(0.5)
|
||||
task = asyncio.create_task(self.__down.download_song(song))
|
||||
tasks.append(task)
|
||||
while len(songs) > 0:
|
||||
# Verify how many songs will be downloaded in this lot and extract from the songs list
|
||||
songsQuant = min(maxDownloads, len(songs))
|
||||
# Get the first quantInLot songs
|
||||
songsInLot = songs[:songsQuant]
|
||||
# Remove the first quantInLot songs from the songs
|
||||
songs = songs[songsQuant:]
|
||||
|
||||
# In the original order, await for the task and then, if successfully downloaded, add to the playlist
|
||||
processManager = self.config.getProcessManager()
|
||||
for index, task in enumerate(tasks):
|
||||
await task
|
||||
song = songs[index]
|
||||
if not song.problematic: # If downloaded add to the playlist and send play command
|
||||
processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx)
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist.add_song(song)
|
||||
self.putCommandInQueue(queue, playCommand)
|
||||
processLock.release()
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
# Create task to download the songs in the lot
|
||||
tasks: List[asyncio.Task] = []
|
||||
for index, song in enumerate(songsInLot):
|
||||
task = asyncio.create_task(self.__down.download_song(song))
|
||||
tasks.append(task)
|
||||
|
||||
for index, task, in enumerate(tasks):
|
||||
await task
|
||||
song = songsInLot[index]
|
||||
if not song.problematic: # If downloaded add to the playlist and send play command
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist.add_song(song)
|
||||
await playersManager.sendCommandToPlayer(playCommand, self.guild)
|
||||
playerLock.release()
|
||||
else:
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
|
||||
def __isUserConnected(self) -> bool:
|
||||
if self.ctx.author.voice:
|
||||
|
||||
@ -2,6 +2,7 @@ from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Config.Exceptions import BadCommandUsage, ImpossibleMove
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
@ -18,14 +19,13 @@ class PrevHandler(AbstractHandler):
|
||||
embed = self.embeds.NO_CHANNEL()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getOrCreatePlayerInfo(self.guild, self.ctx)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
if len(playlist.getHistory()) == 0:
|
||||
error = ImpossibleMove()
|
||||
embed = self.embeds.NOT_PREVIOUS_SONG()
|
||||
@ -36,15 +36,9 @@ class PrevHandler(AbstractHandler):
|
||||
embed = self.embeds.FAIL_DUE_TO_LOOP_ON()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
# If not started, start the player process
|
||||
process = processInfo.getProcess()
|
||||
if not process.is_alive():
|
||||
process.start()
|
||||
|
||||
# Send a prev command, together with the user voice channel
|
||||
prevCommand = VCommands(VCommandsType.PREV, self.author.voice.channel.id)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, prevCommand)
|
||||
await playersManager.sendCommandToPlayer(prevCommand, self.guild)
|
||||
|
||||
embed = self.embeds.RETURNING_SONG()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
@ -4,6 +4,7 @@ from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Handlers.JumpMusicHandler import JumpMusicHandler
|
||||
from Messages.MessagesCategory import MessagesCategory
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from UI.Views.BasicView import BasicView
|
||||
from Utils.Utils import Utils
|
||||
from Music.VulkanBot import VulkanBot
|
||||
@ -21,29 +22,27 @@ class QueueHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self, pageNumber=0) -> HandlerResponse:
|
||||
# Retrieve the process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo: # If no process return empty list
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.EMPTY_QUEUE()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Acquire the Lock to manipulate the playlist
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist: Playlist = processInfo.getPlaylist()
|
||||
playlist: Playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
|
||||
if playlist.isLoopingOne():
|
||||
song = playlist.getCurrentSong()
|
||||
embed = self.embeds.ONE_SONG_LOOPING(song.info)
|
||||
processLock.release() # Release the Lock
|
||||
playerLock.release() # Release the Lock
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
allSongs = playlist.getSongs()
|
||||
if len(allSongs) == 0:
|
||||
embed = self.embeds.EMPTY_QUEUE()
|
||||
processLock.release() # Release the Lock
|
||||
playerLock.release() # Release the Lock
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
songsPages = playlist.getSongsPages()
|
||||
@ -92,10 +91,10 @@ class QueueHandler(AbstractHandler):
|
||||
|
||||
embed = self.embeds.QUEUE(title, text)
|
||||
# Release the acquired Lock
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
return HandlerResponse(self.ctx, embed, view=queueView)
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
|
||||
@ -4,7 +4,7 @@ from Handlers.HandlerResponse import HandlerResponse
|
||||
from Config.Exceptions import BadCommandUsage, VulkanError, ErrorRemoving, InvalidInput, NumberRequired
|
||||
from Music.Playlist import Playlist
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessInfo import ProcessInfo
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
|
||||
@ -14,15 +14,13 @@ class RemoveHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self, position: str) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if not processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if not playersManager.verifyIfPlayerExists(self.guild):
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
if playlist is None:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
error = BadCommandUsage()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
@ -13,18 +13,10 @@ class ResetHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
# Get the current process of the guild
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
if processInfo.getStatus() == ProcessStatus.SLEEPING:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
command = VCommands(VCommandsType.RESET, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
return HandlerResponse(self.ctx)
|
||||
else:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
@ -13,18 +13,10 @@ class ResumeHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
if processInfo.getStatus() == ProcessStatus.SLEEPING:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Send Resume command to be execute by player process
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
command = VCommands(VCommandsType.RESUME, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
embed = self.embeds.PLAYER_RESUMED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
else:
|
||||
|
||||
@ -6,25 +6,26 @@ from Music.VulkanBot import VulkanBot
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
|
||||
|
||||
class ShuffleHandler(AbstractHandler):
|
||||
def __init__(self, ctx: Union[Context, Interaction], bot: VulkanBot) -> None:
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
try:
|
||||
processLock = processInfo.getLock()
|
||||
acquired = processLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
playerLock = playersManager.getPlayerLock(self.guild)
|
||||
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
|
||||
if acquired:
|
||||
playlist = processInfo.getPlaylist()
|
||||
playlist = playersManager.getPlayerPlaylist(self.guild)
|
||||
playlist.shuffle()
|
||||
# Release the acquired Lock
|
||||
processLock.release()
|
||||
playerLock.release()
|
||||
else:
|
||||
processManager.resetProcess(self.guild, self.ctx)
|
||||
playersManager.resetPlayer(self.guild, self.ctx)
|
||||
embed = self.embeds.PLAYER_RESTARTED()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
|
||||
@ -1,9 +1,8 @@
|
||||
from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Config.Exceptions import BadCommandUsage, ImpossibleMove
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
@ -14,31 +13,12 @@ class SkipHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
if not self.__user_connected():
|
||||
error = ImpossibleMove()
|
||||
embed = self.embeds.NO_CHANNEL()
|
||||
return HandlerResponse(self.ctx, embed, error)
|
||||
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo: # Verify if there is a running process
|
||||
if processInfo.getStatus() == ProcessStatus.SLEEPING:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Send a command to the player process to skip the music
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
command = VCommands(VCommandsType.SKIP, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
embed = self.embeds.SKIPPING_SONG()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
else:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
def __user_connected(self) -> bool:
|
||||
if self.author.voice:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@ -2,7 +2,7 @@ from discord.ext.commands import Context
|
||||
from Handlers.AbstractHandler import AbstractHandler
|
||||
from Handlers.HandlerResponse import HandlerResponse
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from typing import Union
|
||||
from discord import Interaction
|
||||
@ -13,18 +13,10 @@ class StopHandler(AbstractHandler):
|
||||
super().__init__(ctx, bot)
|
||||
|
||||
async def run(self) -> HandlerResponse:
|
||||
processManager = self.config.getProcessManager()
|
||||
processInfo: ProcessInfo = processManager.getRunningPlayerInfo(self.guild)
|
||||
if processInfo:
|
||||
if processInfo.getStatus() == ProcessStatus.SLEEPING:
|
||||
embed = self.embeds.NOT_PLAYING()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
|
||||
# Send command to player process stop
|
||||
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
|
||||
if playersManager.verifyIfPlayerExists(self.guild):
|
||||
command = VCommands(VCommandsType.STOP, None)
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.putCommandInQueue(queue, command)
|
||||
|
||||
await playersManager.sendCommandToPlayer(command, self.guild)
|
||||
embed = self.embeds.STOPPING_PLAYER()
|
||||
return HandlerResponse(self.ctx, embed)
|
||||
else:
|
||||
|
||||
47
Parallelism/AbstractProcessManager.py
Normal file
47
Parallelism/AbstractProcessManager.py
Normal file
@ -0,0 +1,47 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from threading import Lock
|
||||
from typing import Union
|
||||
from discord.ext.commands import Context
|
||||
from discord import Guild, Interaction
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
from Parallelism.Commands import VCommands
|
||||
|
||||
|
||||
class AbstractPlayersManager(ABC):
|
||||
def __init__(self, bot) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
"""If the forceCreation boolean is True, then the context must be provided for the Player to be created"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
"""If there is a player process for the guild, then return the playlist of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def getPlayerLock(self, guild: Guild) -> Lock:
|
||||
"""If there is a player process for the guild, then return the lock of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
"""Returns if a player for the guild exists"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None:
|
||||
"""With the context information of a guild create a internal player for the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
"""Tries to reset the player of the guild"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def showNowPlaying(self, guildID: int, song: Song) -> None:
|
||||
pass
|
||||
@ -1,10 +1,10 @@
|
||||
from typing import List
|
||||
from discord import Button, TextChannel
|
||||
from discord import Button, Guild, TextChannel
|
||||
from discord.ui import View
|
||||
from Config.Emojis import VEmojis
|
||||
from Messages.MessagesCategory import MessagesCategory
|
||||
from Music.Playlist import Playlist
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessInfo import ProcessInfo
|
||||
from Config.Messages import Messages
|
||||
from Music.Song import Song
|
||||
from Config.Embeds import VEmbeds
|
||||
@ -21,6 +21,11 @@ from Handlers.QueueHandler import QueueHandler
|
||||
|
||||
|
||||
class ProcessCommandsExecutor:
|
||||
MESSAGES = Messages()
|
||||
EMBEDS = VEmbeds()
|
||||
EMOJIS = VEmojis()
|
||||
MSG_MANAGER = MessagesManager()
|
||||
|
||||
def __init__(self, bot: VulkanBot, guildID: int) -> None:
|
||||
self.__bot = bot
|
||||
self.__guildID = guildID
|
||||
@ -29,9 +34,58 @@ class ProcessCommandsExecutor:
|
||||
self.__embeds = VEmbeds()
|
||||
self.__emojis = VEmojis()
|
||||
|
||||
async def sendNowPlaying(self, processInfo: ProcessInfo, song: Song) -> None:
|
||||
@classmethod
|
||||
async def sendNowPlayingToGuild(cls, bot: VulkanBot, playlist: Playlist, channel: TextChannel, song: Song, guild: Guild) -> None:
|
||||
# Get the lock of the playlist
|
||||
if playlist.isLoopingOne():
|
||||
title = cls.MESSAGES.ONE_SONG_LOOPING
|
||||
else:
|
||||
title = cls.MESSAGES.SONG_PLAYING
|
||||
|
||||
# Create View and Embed
|
||||
embed = cls.EMBEDS.SONG_INFO(song.info, title)
|
||||
view = cls.__getPlayerViewForGuild(channel, guild.id, bot)
|
||||
# Send Message and add to the MessagesManager
|
||||
message = await channel.send(embed=embed, view=view)
|
||||
await cls.MSG_MANAGER.addMessageAndClearPrevious(guild.id, MessagesCategory.NOW_PLAYING, message, view)
|
||||
|
||||
# Set in the view the message witch contains the view
|
||||
view.set_message(message=message)
|
||||
|
||||
@classmethod
|
||||
def __getPlayerViewForGuild(cls, channel: TextChannel, guildID: int, bot: VulkanBot) -> View:
|
||||
buttons = cls.__getPlayerButtonsForGuild(channel, guildID, bot)
|
||||
view = BasicView(bot, buttons)
|
||||
return view
|
||||
|
||||
@classmethod
|
||||
def __getPlayerButtonsForGuild(cls, textChannel: TextChannel, guildID: int, bot: VulkanBot) -> List[Button]:
|
||||
"""Create the Buttons to be inserted in the Player View"""
|
||||
buttons: List[Button] = []
|
||||
|
||||
buttons.append(HandlerButton(bot, PrevHandler, cls.EMOJIS.BACK,
|
||||
textChannel, guildID, MessagesCategory.PLAYER, "Back"))
|
||||
buttons.append(HandlerButton(bot, PauseHandler, cls.EMOJIS.PAUSE,
|
||||
textChannel, guildID, MessagesCategory.PLAYER, "Pause"))
|
||||
buttons.append(HandlerButton(bot, ResumeHandler, cls.EMOJIS.PLAY,
|
||||
textChannel, guildID, MessagesCategory.PLAYER, "Play"))
|
||||
buttons.append(HandlerButton(bot, StopHandler, cls.EMOJIS.STOP,
|
||||
textChannel, guildID, MessagesCategory.PLAYER, "Stop"))
|
||||
buttons.append(HandlerButton(bot, SkipHandler, cls.EMOJIS.SKIP,
|
||||
textChannel, guildID, MessagesCategory.PLAYER, "Skip"))
|
||||
buttons.append(HandlerButton(bot, QueueHandler, cls.EMOJIS.QUEUE,
|
||||
textChannel, guildID, MessagesCategory.QUEUE, "Songs"))
|
||||
buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_ONE,
|
||||
textChannel, guildID, MessagesCategory.LOOP, "Loop One", 'One'))
|
||||
buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_OFF,
|
||||
textChannel, guildID, MessagesCategory.LOOP, "Loop Off", 'Off'))
|
||||
buttons.append(HandlerButton(bot, LoopHandler, cls.EMOJIS.LOOP_ALL,
|
||||
textChannel, guildID, MessagesCategory.LOOP, "Loop All", 'All'))
|
||||
|
||||
return buttons
|
||||
|
||||
async def sendNowPlaying(self, playlist: Playlist, channel: TextChannel, song: Song) -> None:
|
||||
# Get the lock of the playlist
|
||||
playlist = processInfo.getPlaylist()
|
||||
if playlist.isLoopingOne():
|
||||
title = self.__messages.ONE_SONG_LOOPING
|
||||
else:
|
||||
@ -39,7 +93,6 @@ class ProcessCommandsExecutor:
|
||||
|
||||
# Create View and Embed
|
||||
embed = self.__embeds.SONG_INFO(song.info, title)
|
||||
channel = processInfo.getTextChannel()
|
||||
view = self.__getPlayerView(channel)
|
||||
# Send Message and add to the MessagesManager
|
||||
message = await channel.send(embed=embed, view=view)
|
||||
|
||||
@ -1,51 +0,0 @@
|
||||
from enum import Enum
|
||||
from multiprocessing import Process, Queue, Lock
|
||||
from discord import TextChannel
|
||||
from Music.Playlist import Playlist
|
||||
|
||||
|
||||
class ProcessStatus(Enum):
|
||||
RUNNING = 'Running'
|
||||
SLEEPING = 'Sleeping'
|
||||
|
||||
|
||||
class ProcessInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a player process
|
||||
"""
|
||||
|
||||
def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
|
||||
self.__process = process
|
||||
self.__queueToPlayer = queueToPlayer
|
||||
self.__queueToMain = queueToMain
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
self.__status = ProcessStatus.RUNNING
|
||||
|
||||
def setProcess(self, newProcess: Process) -> None:
|
||||
self.__process = newProcess
|
||||
|
||||
def getStatus(self) -> ProcessStatus:
|
||||
return self.__status
|
||||
|
||||
def setStatus(self, status: ProcessStatus) -> None:
|
||||
self.__status = status
|
||||
|
||||
def getProcess(self) -> Process:
|
||||
return self.__process
|
||||
|
||||
def getQueueToPlayer(self) -> Queue:
|
||||
return self.__queueToPlayer
|
||||
|
||||
def getQueueToMain(self) -> Queue:
|
||||
return self.__queueToMain
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> Lock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
@ -2,19 +2,17 @@ import asyncio
|
||||
from time import sleep, time
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from Music.VulkanInitializer import VulkanInitializer
|
||||
from discord import User, Member, Message, VoiceClient
|
||||
from discord import VoiceClient
|
||||
from asyncio import AbstractEventLoop, Semaphore, Queue
|
||||
from multiprocessing import Process, RLock, Lock, Queue
|
||||
from threading import Thread
|
||||
from typing import Callable, List
|
||||
from discord import Guild, FFmpegPCMAudio, VoiceChannel, TextChannel
|
||||
from typing import Callable
|
||||
from discord import Guild, FFmpegPCMAudio, VoiceChannel
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
from Config.Configs import VConfigs
|
||||
from Config.Messages import Messages
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Music.Downloader import Downloader
|
||||
from Config.Embeds import VEmbeds
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
|
||||
|
||||
@ -31,10 +29,10 @@ class TimeoutClock:
|
||||
self.__task.cancel()
|
||||
|
||||
|
||||
class PlayerProcess(Process):
|
||||
class ProcessPlayer(Process):
|
||||
"""Process that will play songs, receive commands from the main process by a Queue"""
|
||||
|
||||
def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, textID: int, voiceID: int, authorID: int) -> None:
|
||||
def __init__(self, name: str, playlist: Playlist, lock: Lock, queueToReceive: Queue, queueToSend: Queue, guildID: int, voiceID: int) -> None:
|
||||
"""
|
||||
Start a new process that will have his own bot instance
|
||||
Due to pickle serialization, no objects are stored, the values initialization are being made in the run method
|
||||
@ -48,23 +46,14 @@ class PlayerProcess(Process):
|
||||
self.__semStopPlaying: Semaphore = None
|
||||
self.__loop: AbstractEventLoop = None
|
||||
# Discord context ID
|
||||
self.__textChannelID = textID
|
||||
self.__guildID = guildID
|
||||
self.__voiceChannelID = voiceID
|
||||
self.__authorID = authorID
|
||||
# All information of discord context will be retrieved directly with discord API
|
||||
self.__guild: Guild = None
|
||||
self.__bot: VulkanBot = None
|
||||
self.__voiceChannel: VoiceChannel = None
|
||||
self.__voiceClient: VoiceClient = None
|
||||
self.__textChannel: TextChannel = None
|
||||
self.__author: User = None
|
||||
self.__botMember: Member = None
|
||||
|
||||
self.__configs: VConfigs = None
|
||||
self.__embeds: VEmbeds = None
|
||||
self.__messages: Messages = None
|
||||
self.__messagesToDelete: List[Message] = []
|
||||
self.__playing = False
|
||||
self.__forceStop = False
|
||||
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
|
||||
@ -78,9 +67,6 @@ class PlayerProcess(Process):
|
||||
self.__loop = asyncio.get_event_loop_policy().new_event_loop()
|
||||
asyncio.set_event_loop(self.__loop)
|
||||
|
||||
self.__configs = VConfigs()
|
||||
self.__messages = Messages()
|
||||
self.__embeds = VEmbeds()
|
||||
self.__downloader = Downloader()
|
||||
|
||||
self.__semStopPlaying = Semaphore(0)
|
||||
@ -93,9 +79,6 @@ class PlayerProcess(Process):
|
||||
self.__bot = await self.__createBotInstance()
|
||||
self.__guild = self.__bot.get_guild(self.__guildID)
|
||||
self.__voiceChannel = self.__bot.get_channel(self.__voiceChannelID)
|
||||
self.__textChannel = self.__bot.get_channel(self.__textChannelID)
|
||||
self.__author = self.__bot.get_channel(self.__authorID)
|
||||
self.__botMember = self.__getBotMember()
|
||||
# Connect to voice Channel
|
||||
await self.__connectToVoiceChannel()
|
||||
|
||||
@ -438,9 +421,3 @@ class PlayerProcess(Process):
|
||||
except Exception as e:
|
||||
print(f'[ERROR CONNECTING TO VC] -> {e}')
|
||||
return False
|
||||
|
||||
def __getBotMember(self) -> Member:
|
||||
guild_members: List[Member] = self.__guild.members
|
||||
for member in guild_members:
|
||||
if member.id == self.__bot.user.id:
|
||||
return member
|
||||
@ -1,55 +1,126 @@
|
||||
import asyncio
|
||||
from multiprocessing import Lock, Queue
|
||||
from enum import Enum
|
||||
from multiprocessing import Lock, Process, Queue
|
||||
from multiprocessing.managers import BaseManager, NamespaceProxy
|
||||
from queue import Empty
|
||||
from threading import Thread
|
||||
from typing import Dict, Tuple, Union
|
||||
from Config.Singleton import Singleton
|
||||
from discord import Guild, Interaction
|
||||
from discord import Guild, Interaction, TextChannel, VoiceChannel
|
||||
from discord.ext.commands import Context
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Parallelism.ProcessExecutor import ProcessCommandsExecutor
|
||||
from Music.Song import Song
|
||||
from Parallelism.PlayerProcess import PlayerProcess
|
||||
from Parallelism.ProcessPlayer import ProcessPlayer
|
||||
from Music.Playlist import Playlist
|
||||
from Parallelism.ProcessInfo import ProcessInfo, ProcessStatus
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
|
||||
|
||||
class ProcessManager(Singleton):
|
||||
class ProcessStatus(Enum):
|
||||
RUNNING = 'Running'
|
||||
SLEEPING = 'Sleeping'
|
||||
|
||||
|
||||
class PlayerProcessInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a process player
|
||||
"""
|
||||
|
||||
def __init__(self, process: Process, queueToPlayer: Queue, queueToMain: Queue, playlist: Playlist, lock: Lock, textChannel: TextChannel) -> None:
|
||||
self.__process = process
|
||||
self.__queueToPlayer = queueToPlayer
|
||||
self.__queueToMain = queueToMain
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
self.__status = ProcessStatus.RUNNING
|
||||
|
||||
def setProcess(self, newProcess: Process) -> None:
|
||||
self.__process = newProcess
|
||||
|
||||
def getStatus(self) -> ProcessStatus:
|
||||
return self.__status
|
||||
|
||||
def setStatus(self, status: ProcessStatus) -> None:
|
||||
self.__status = status
|
||||
|
||||
def getProcess(self) -> Process:
|
||||
return self.__process
|
||||
|
||||
def getQueueToPlayer(self) -> Queue:
|
||||
return self.__queueToPlayer
|
||||
|
||||
def getQueueToMain(self) -> Queue:
|
||||
return self.__queueToMain
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> Lock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
|
||||
|
||||
class ProcessPlayerManager(Singleton, AbstractPlayersManager):
|
||||
"""
|
||||
Manage all running player process, creating and storing them for future calls
|
||||
Deal with the creation of shared memory
|
||||
Deals with the creation of shared memory
|
||||
"""
|
||||
|
||||
def __init__(self, bot: VulkanBot = None) -> None:
|
||||
if not super().created:
|
||||
self.__bot = bot
|
||||
VManager.register('Playlist', Playlist)
|
||||
VManager.register('VoiceChannel', VoiceChannel)
|
||||
self.__manager = VManager()
|
||||
self.__manager.start()
|
||||
self.__playersProcess: Dict[int, ProcessInfo] = {}
|
||||
self.__playersProcess: Dict[int, PlayerProcessInfo] = {}
|
||||
self.__playersListeners: Dict[int, Tuple[Thread, bool]] = {}
|
||||
self.__playersCommandsExecutor: Dict[int, ProcessCommandsExecutor] = {}
|
||||
|
||||
def setPlayerInfo(self, guild: Guild, info: ProcessInfo):
|
||||
self.__playersProcess[guild.id] = info
|
||||
async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
if forceCreation:
|
||||
processInfo = self.createPlayerForGuild(guild, context)
|
||||
else:
|
||||
processInfo = self.__getRunningPlayerInfo(guild)
|
||||
if processInfo == None:
|
||||
return
|
||||
|
||||
def getOrCreatePlayerInfo(self, guild: Guild, context: Union[Context, Interaction]) -> ProcessInfo:
|
||||
"""Return the process info for the guild, the user in context must be connected to a voice_channel"""
|
||||
queue = processInfo.getQueueToPlayer()
|
||||
self.__putCommandInQueue(queue, command)
|
||||
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getPlaylist()
|
||||
|
||||
def getPlayerLock(self, guild: Guild) -> Lock:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getLock()
|
||||
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
return guild.id in self.__playersProcess.keys()
|
||||
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]) -> None:
|
||||
try:
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
self.__playersProcess[guild.id] = self.__createProcessInfo(guild, context)
|
||||
self.__playersProcess[guild.id] = self.__createProcessPlayerInfo(guild, context)
|
||||
else:
|
||||
# If the process has ended create a new one
|
||||
if not self.__playersProcess[guild.id].getProcess().is_alive():
|
||||
self.__playersProcess[guild.id] = self.__recreateProcess(guild, context)
|
||||
|
||||
# Start the process
|
||||
self.__playersProcess[guild.id].getProcess().start()
|
||||
return self.__playersProcess[guild.id]
|
||||
except Exception as e:
|
||||
print(f'[Error In GetPlayerContext] -> {e}')
|
||||
|
||||
def resetProcess(self, guild: Guild, context: Context) -> None:
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
"""Restart a running process, already start it to return to play"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
return None
|
||||
@ -59,10 +130,10 @@ class ProcessManager(Singleton):
|
||||
newProcessInfo.getProcess().start() # Start the process
|
||||
# Send a command to start the play again
|
||||
playCommand = VCommands(VCommandsType.PLAY)
|
||||
newProcessInfo.getQueueToPlayer().put(playCommand)
|
||||
self.__putCommandInQueue(newProcessInfo.getQueueToPlayer(), playCommand)
|
||||
self.__playersProcess[guild.id] = newProcessInfo
|
||||
|
||||
def getRunningPlayerInfo(self, guild: Guild) -> ProcessInfo:
|
||||
def __getRunningPlayerInfo(self, guild: Guild) -> PlayerProcessInfo:
|
||||
"""Return the process info for the guild, if not, return None"""
|
||||
if guild.id not in self.__playersProcess.keys():
|
||||
print('Process Info not found')
|
||||
@ -70,20 +141,18 @@ class ProcessManager(Singleton):
|
||||
|
||||
return self.__playersProcess[guild.id]
|
||||
|
||||
def __createProcessInfo(self, guild: Guild, context: Context) -> ProcessInfo:
|
||||
def __createProcessPlayerInfo(self, guild: Guild, context: Context) -> PlayerProcessInfo:
|
||||
guildID: int = context.guild.id
|
||||
textID: int = context.channel.id
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
authorID: int = context.author.id
|
||||
|
||||
playlist: Playlist = self.__manager.Playlist()
|
||||
lock = Lock()
|
||||
queueToListen = Queue()
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = ProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
process = ProcessPlayer(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, voiceID)
|
||||
processInfo = PlayerProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
@ -110,7 +179,7 @@ class ProcessManager(Singleton):
|
||||
except Exception as e:
|
||||
print(f'[ERROR STOPPING PROCESS] -> {e}')
|
||||
|
||||
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> ProcessInfo:
|
||||
def __recreateProcess(self, guild: Guild, context: Union[Context, Interaction]) -> PlayerProcessInfo:
|
||||
"""Create a new process info using previous playlist"""
|
||||
self.__stopPossiblyRunningProcess(guild)
|
||||
|
||||
@ -127,10 +196,10 @@ class ProcessManager(Singleton):
|
||||
lock = Lock()
|
||||
queueToListen = Queue()
|
||||
queueToSend = Queue()
|
||||
process = PlayerProcess(context.guild.name, playlist, lock, queueToSend,
|
||||
process = ProcessPlayer(context.guild.name, playlist, lock, queueToSend,
|
||||
queueToListen, guildID, textID, voiceID, authorID)
|
||||
processInfo = ProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
processInfo = PlayerProcessInfo(process, queueToSend, queueToListen,
|
||||
playlist, lock, context.channel)
|
||||
|
||||
# Create a Thread to listen for the queue coming from the Player Process, this will redirect the Queue to a async
|
||||
thread = Thread(target=self.__listenToCommands,
|
||||
@ -190,10 +259,18 @@ class ProcessManager(Singleton):
|
||||
# Set the status of this process as sleeping, only the playlist object remains
|
||||
self.__playersProcess[guildID].setStatus(ProcessStatus.SLEEPING)
|
||||
|
||||
def __putCommandInQueue(self, queue: Queue, command: VCommands) -> None:
|
||||
try:
|
||||
queue.put(command)
|
||||
except Exception as e:
|
||||
print(f'[ERROR PUTTING COMMAND IN QUEUE] -> {e}')
|
||||
|
||||
async def showNowPlaying(self, guildID: int, song: Song) -> None:
|
||||
commandExecutor = self.__playersCommandsExecutor[guildID]
|
||||
processInfo = self.__playersProcess[guildID]
|
||||
await commandExecutor.sendNowPlaying(processInfo, song)
|
||||
playlist = processInfo.getPlaylist()
|
||||
channel = processInfo.getTextChannel()
|
||||
await commandExecutor.sendNowPlaying(playlist, channel, song)
|
||||
|
||||
|
||||
class VManager(BaseManager):
|
||||
346
Parallelism/ThreadPlayer.py
Normal file
346
Parallelism/ThreadPlayer.py
Normal file
@ -0,0 +1,346 @@
|
||||
import asyncio
|
||||
from time import time
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from discord import VoiceClient
|
||||
from asyncio import AbstractEventLoop
|
||||
from threading import RLock, Thread
|
||||
from multiprocessing import Lock
|
||||
from typing import Callable
|
||||
from discord import Guild, FFmpegPCMAudio, VoiceChannel
|
||||
from Music.Playlist import Playlist
|
||||
from Music.Song import Song
|
||||
from Config.Configs import VConfigs
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Music.Downloader import Downloader
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
|
||||
|
||||
class TimeoutClock:
|
||||
def __init__(self, callback: Callable, loop: asyncio.AbstractEventLoop):
|
||||
self.__callback = callback
|
||||
self.__task = loop.create_task(self.__executor())
|
||||
|
||||
async def __executor(self):
|
||||
await asyncio.sleep(VConfigs().VC_TIMEOUT)
|
||||
await self.__callback()
|
||||
|
||||
def cancel(self):
|
||||
self.__task.cancel()
|
||||
|
||||
|
||||
class ThreadPlayer(Thread):
|
||||
"""Player Thread to control the song playback in the same Process of the Main Process"""
|
||||
|
||||
def __init__(self, bot: VulkanBot, guild: Guild, name: str, voiceChannel: VoiceChannel, playlist: Playlist, lock: Lock, guildID: int, voiceID: int, callbackToSendCommand: Callable, exitCB: Callable) -> None:
|
||||
Thread.__init__(self, name=name, group=None, target=None, args=(), kwargs={})
|
||||
print(f'Starting Player Thread for Guild {self.name}')
|
||||
# Synchronization objects
|
||||
self.__playlist: Playlist = playlist
|
||||
self.__playlistLock: Lock = lock
|
||||
self.__loop: AbstractEventLoop = bot.loop
|
||||
self.__playerLock: RLock = RLock()
|
||||
# Discord context ID
|
||||
self.__voiceChannelID = voiceID
|
||||
self.__guild: Guild = guild
|
||||
self.__voiceChannel: VoiceChannel = voiceChannel
|
||||
self.__voiceClient: VoiceClient = None
|
||||
|
||||
self.__downloader = Downloader()
|
||||
self.__callback = callbackToSendCommand
|
||||
self.__exitCB = exitCB
|
||||
self.__bot = bot
|
||||
self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop)
|
||||
|
||||
self.__playing = False
|
||||
self.__forceStop = False
|
||||
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
|
||||
'options': '-vn'}
|
||||
|
||||
def __verifyIfIsPlaying(self) -> bool:
|
||||
if self.__voiceClient is None:
|
||||
return False
|
||||
if not self.__voiceClient.is_connected():
|
||||
return False
|
||||
return self.__voiceClient.is_playing() or self.__voiceClient.is_paused()
|
||||
|
||||
async def __playPlaylistSongs(self) -> None:
|
||||
"""If the player is not running trigger to play a new song"""
|
||||
self.__playing = self.__verifyIfIsPlaying()
|
||||
if not self.__playing:
|
||||
song = None
|
||||
with self.__playlistLock:
|
||||
with self.__playerLock:
|
||||
song = self.__playlist.next_song()
|
||||
|
||||
if song is not None:
|
||||
await self.__playSong(song)
|
||||
self.__playing = True
|
||||
|
||||
async def __playSong(self, song: Song) -> None:
|
||||
"""Function that will trigger the player to play the song"""
|
||||
try:
|
||||
self.__playerLock.acquire()
|
||||
if song is None:
|
||||
return
|
||||
|
||||
if song.source is None:
|
||||
return self.__playNext(None)
|
||||
|
||||
# If not connected, connect to bind channel
|
||||
if self.__voiceClient is None:
|
||||
await self.__connectToVoiceChannel()
|
||||
|
||||
# If the voice channel disconnect for some reason
|
||||
if not self.__voiceClient.is_connected():
|
||||
print('[VOICE CHANNEL NOT NULL BUT DISCONNECTED, CONNECTING AGAIN]')
|
||||
await self.__connectToVoiceChannel()
|
||||
# If the player is connected and playing return the song to the playlist
|
||||
elif self.__voiceClient.is_playing():
|
||||
print('[SONG ALREADY PLAYING, RETURNING]')
|
||||
self.__playlist.add_song_start(song)
|
||||
return
|
||||
|
||||
songStillAvailable = self.__verifyIfSongAvailable(song)
|
||||
if not songStillAvailable:
|
||||
print('[SONG NOT AVAILABLE ANYMORE, DOWNLOADING AGAIN]')
|
||||
song = self.__downloadSongAgain(song)
|
||||
|
||||
self.__playing = True
|
||||
self.__songPlaying = song
|
||||
|
||||
player = FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
|
||||
self.__voiceClient.play(player, after=lambda e: self.__playNext(e))
|
||||
|
||||
self.__timer.cancel()
|
||||
self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop)
|
||||
|
||||
nowPlayingCommand = VCommands(VCommandsType.NOW_PLAYING, song)
|
||||
await self.__callback(nowPlayingCommand, self.__guild, song)
|
||||
except Exception as e:
|
||||
print(f'[ERROR IN PLAY SONG FUNCTION] -> {e}, {type(e)}')
|
||||
self.__playNext(None)
|
||||
finally:
|
||||
self.__playerLock.release()
|
||||
|
||||
def __playNext(self, error) -> None:
|
||||
if error is not None:
|
||||
print(f'[ERROR PLAYING SONG] -> {error}')
|
||||
with self.__playlistLock:
|
||||
with self.__playerLock:
|
||||
if self.__forceStop: # If it's forced to stop player
|
||||
self.__forceStop = False
|
||||
return None
|
||||
|
||||
song = self.__playlist.next_song()
|
||||
|
||||
if song is not None:
|
||||
self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}')
|
||||
else:
|
||||
self.__playlist.loop_off()
|
||||
self.__songPlaying = None
|
||||
self.__playing = False
|
||||
# Send a command to the main process to kill this thread
|
||||
|
||||
self.__exitCB(self.__guild)
|
||||
|
||||
def __verifyIfSongAvailable(self, song: Song) -> bool:
|
||||
"""Verify the song source to see if it's already expired"""
|
||||
try:
|
||||
parsedUrl = urlparse(song.source)
|
||||
|
||||
if 'expire' not in parsedUrl.query:
|
||||
# If already passed 5 hours since the download
|
||||
if song.downloadTime + 18000 < int(time()):
|
||||
return False
|
||||
return True
|
||||
|
||||
# If the current time plus the song duration plus 10min exceeds the expirationValue
|
||||
expireValue = parse_qs(parsedUrl.query)['expire'][0]
|
||||
if int(time()) + song.duration + 600 > int(str(expireValue)):
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f'[ERROR VERIFYING SONG AVAILABILITY] -> {e}')
|
||||
return False
|
||||
|
||||
def __downloadSongAgain(self, song: Song) -> Song:
|
||||
"""Force a download to be executed again, one use case is when the song.source expired and needs to refresh"""
|
||||
return self.__downloader.finish_one_song(song)
|
||||
|
||||
async def __playPrev(self, voiceChannelID: int) -> None:
|
||||
with self.__playlistLock:
|
||||
song = self.__playlist.prev_song()
|
||||
|
||||
with self.__playerLock:
|
||||
if song is not None:
|
||||
# If not connect, connect to the user voice channel, may change the channel
|
||||
if self.__voiceClient is None or not self.__voiceClient.is_connected():
|
||||
self.__voiceChannelID = voiceChannelID
|
||||
self.__voiceChannel = self.__guild.get_channel(self.__voiceChannelID)
|
||||
await self.__connectToVoiceChannel()
|
||||
|
||||
# If already playing, stop the current play
|
||||
if self.__verifyIfIsPlaying():
|
||||
# Will forbidden next_song to execute after stopping current player
|
||||
self.__forceStop = True
|
||||
self.__voiceClient.stop()
|
||||
self.__playing = False
|
||||
|
||||
self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}')
|
||||
|
||||
async def __restartCurrentSong(self) -> None:
|
||||
song = self.__playlist.getCurrentSong()
|
||||
if song is None:
|
||||
song = self.__playlist.next_song()
|
||||
if song is None:
|
||||
return
|
||||
|
||||
self.__loop.create_task(self.__playSong(song), name=f'Song {song.identifier}')
|
||||
|
||||
async def receiveCommand(self, command: VCommands) -> None:
|
||||
try:
|
||||
self.__playerLock.acquire()
|
||||
type = command.getType()
|
||||
args = command.getArgs()
|
||||
# print(f'Player Thread {self.__guild.name} received command {type}')
|
||||
|
||||
if type == VCommandsType.PAUSE:
|
||||
self.__pause()
|
||||
elif type == VCommandsType.RESUME:
|
||||
await self.__resume()
|
||||
elif type == VCommandsType.SKIP:
|
||||
await self.__skip()
|
||||
elif type == VCommandsType.PLAY:
|
||||
await self.__playPlaylistSongs()
|
||||
elif type == VCommandsType.PREV:
|
||||
await self.__playPrev(args)
|
||||
elif type == VCommandsType.RESET:
|
||||
await self.__reset()
|
||||
elif type == VCommandsType.STOP:
|
||||
await self.__stop()
|
||||
else:
|
||||
print(f'[ERROR] -> Unknown Command Received: {command}')
|
||||
except Exception as e:
|
||||
print(f'[ERROR IN COMMAND RECEIVER] -> {type} - {e}')
|
||||
finally:
|
||||
self.__playerLock.release()
|
||||
|
||||
def __pause(self) -> None:
|
||||
if self.__voiceClient is not None:
|
||||
if self.__voiceClient.is_connected():
|
||||
if self.__voiceClient.is_playing():
|
||||
self.__voiceClient.pause()
|
||||
|
||||
async def __reset(self) -> None:
|
||||
if self.__voiceClient is None:
|
||||
return
|
||||
|
||||
if not self.__voiceClient.is_connected():
|
||||
await self.__connectToVoiceChannel()
|
||||
if self.__songPlaying is not None:
|
||||
await self.__restartCurrentSong()
|
||||
|
||||
async def __stop(self) -> None:
|
||||
if self.__voiceClient is not None:
|
||||
if self.__voiceClient.is_connected():
|
||||
with self.__playlistLock:
|
||||
self.__playlist.loop_off()
|
||||
self.__playlist.clear()
|
||||
|
||||
self.__voiceClient.stop()
|
||||
await self.__voiceClient.disconnect()
|
||||
|
||||
self.__songPlaying = None
|
||||
self.__playing = False
|
||||
self.__voiceClient = None
|
||||
# If the voiceClient is not None we finish things
|
||||
else:
|
||||
await self.__forceBotDisconnectAndStop()
|
||||
|
||||
async def __resume(self) -> None:
|
||||
# Lock to work with Player
|
||||
with self.__playerLock:
|
||||
if self.__voiceClient is not None:
|
||||
# If the player is paused then return to play
|
||||
if self.__voiceClient.is_paused():
|
||||
return self.__voiceClient.resume()
|
||||
# If there is a current song but the voice client is not playing
|
||||
elif self.__songPlaying is not None and not self.__voiceClient.is_playing():
|
||||
await self.__playSong(self.__songPlaying)
|
||||
|
||||
async def __skip(self) -> None:
|
||||
self.__playing = self.__verifyIfIsPlaying()
|
||||
# Lock to work with Player
|
||||
with self.__playerLock:
|
||||
if self.__playing:
|
||||
self.__playing = False
|
||||
self.__voiceClient.stop()
|
||||
# If for some reason the Bot has disconnect but there is still songs to play
|
||||
elif len(self.__playlist.getSongs()) > 0:
|
||||
print('[RESTARTING CURRENT SONG]')
|
||||
await self.__restartCurrentSong()
|
||||
|
||||
async def __forceBotDisconnectAndStop(self) -> None:
|
||||
# Lock to work with Player
|
||||
with self.__playerLock:
|
||||
if self.__voiceClient is None:
|
||||
return
|
||||
self.__playing = False
|
||||
self.__songPlaying = None
|
||||
try:
|
||||
self.__voiceClient.stop()
|
||||
await self.__voiceClient.disconnect(force=True)
|
||||
except Exception as e:
|
||||
print(f'[ERROR FORCING BOT TO STOP] -> {e}')
|
||||
finally:
|
||||
self.__voiceClient = None
|
||||
with self.__playlistLock:
|
||||
self.__playlist.clear()
|
||||
self.__playlist.loop_off()
|
||||
|
||||
async def __timeoutHandler(self) -> None:
|
||||
try:
|
||||
if self.__voiceClient is None:
|
||||
return
|
||||
|
||||
# If the bot should not disconnect when alone
|
||||
if not VConfigs().SHOULD_AUTO_DISCONNECT_WHEN_ALONE:
|
||||
return
|
||||
|
||||
if self.__voiceClient.is_connected():
|
||||
if self.__voiceClient.is_playing() or self.__voiceClient.is_paused():
|
||||
if not self.__isBotAloneInChannel(): # If bot is not alone continue to play
|
||||
self.__timer = TimeoutClock(self.__timeoutHandler, self.__loop)
|
||||
return
|
||||
|
||||
# Finish the process
|
||||
with self.__playerLock:
|
||||
with self.__playlistLock:
|
||||
self.__playlist.loop_off()
|
||||
await self.__forceBotDisconnectAndStop()
|
||||
except Exception as e:
|
||||
print(f'[ERROR IN TIMEOUT] -> {e}')
|
||||
|
||||
def __isBotAloneInChannel(self) -> bool:
|
||||
try:
|
||||
if len(self.__voiceClient.channel.members) <= 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f'[ERROR IN CHECK BOT ALONE] -> {e}')
|
||||
return False
|
||||
|
||||
async def __connectToVoiceChannel(self) -> bool:
|
||||
try:
|
||||
print('[CONNECTING TO VOICE CHANNEL]')
|
||||
if self.__voiceClient is not None:
|
||||
try:
|
||||
await self.__voiceClient.disconnect(force=True)
|
||||
except Exception as e:
|
||||
print(f'[ERROR FORCING DISCONNECT] -> {e}')
|
||||
self.__voiceClient = await self.__voiceChannel.connect(reconnect=True, timeout=None)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f'[ERROR CONNECTING TO VC] -> {e}')
|
||||
return False
|
||||
163
Parallelism/ThreadPlayerManager.py
Normal file
163
Parallelism/ThreadPlayerManager.py
Normal file
@ -0,0 +1,163 @@
|
||||
from threading import RLock
|
||||
from typing import Any, Dict, Union
|
||||
from Config.Singleton import Singleton
|
||||
from discord import Guild, Interaction, TextChannel
|
||||
from discord.ext.commands import Context
|
||||
from Parallelism.AbstractProcessManager import AbstractPlayersManager
|
||||
from Music.Song import Song
|
||||
from Music.Playlist import Playlist
|
||||
from Parallelism.Commands import VCommands, VCommandsType
|
||||
from Music.VulkanBot import VulkanBot
|
||||
from Parallelism.ProcessExecutor import ProcessCommandsExecutor
|
||||
from Parallelism.ThreadPlayer import ThreadPlayer
|
||||
|
||||
|
||||
class ThreadPlayerInfo:
|
||||
"""
|
||||
Class to store the reference to all structures to maintain a player thread
|
||||
"""
|
||||
|
||||
def __init__(self, thread: ThreadPlayer, playlist: Playlist, lock: RLock, textChannel: TextChannel) -> None:
|
||||
self.__thread = thread
|
||||
self.__playlist = playlist
|
||||
self.__lock = lock
|
||||
self.__textChannel = textChannel
|
||||
|
||||
def getPlayer(self) -> ThreadPlayer:
|
||||
return self.__thread
|
||||
|
||||
def getPlaylist(self) -> Playlist:
|
||||
return self.__playlist
|
||||
|
||||
def getLock(self) -> RLock:
|
||||
return self.__lock
|
||||
|
||||
def getTextChannel(self) -> TextChannel:
|
||||
return self.__textChannel
|
||||
|
||||
|
||||
class ThreadPlayerManager(Singleton, AbstractPlayersManager):
|
||||
"""
|
||||
Manage all running player threads, creating and storing them for future calls
|
||||
"""
|
||||
|
||||
def __init__(self, bot: VulkanBot = None) -> None:
|
||||
if not super().created:
|
||||
self.__bot = bot
|
||||
self.__playersThreads: Dict[int, ThreadPlayerInfo] = {}
|
||||
|
||||
async def sendCommandToPlayer(self, command: VCommands, guild: Guild, forceCreation: bool = False, context: Union[Context, Interaction] = None):
|
||||
playerInfo = self.__playersThreads[guild.id]
|
||||
player = playerInfo.getPlayer()
|
||||
if player is None and forceCreation:
|
||||
self.__createPlayerThreadInfo(context)
|
||||
if player is None:
|
||||
return
|
||||
|
||||
await player.receiveCommand(command)
|
||||
|
||||
async def __receiveCommand(self, command: VCommands, guild: Guild, args: Any) -> None:
|
||||
commandType = command.getType()
|
||||
if commandType == VCommandsType.NOW_PLAYING:
|
||||
await self.showNowPlaying(guild, args)
|
||||
else:
|
||||
print(
|
||||
f'[ERROR] -> Command not processable received from Thread {guild.name}: {commandType}')
|
||||
|
||||
def getPlayerPlaylist(self, guild: Guild) -> Playlist:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getPlaylist()
|
||||
|
||||
def getPlayerLock(self, guild: Guild) -> RLock:
|
||||
playerInfo = self.__getRunningPlayerInfo(guild)
|
||||
if playerInfo:
|
||||
return playerInfo.getLock()
|
||||
|
||||
def verifyIfPlayerExists(self, guild: Guild) -> bool:
|
||||
return guild.id in self.__playersThreads.keys()
|
||||
|
||||
def createPlayerForGuild(self, guild: Guild, context: Union[Context, Interaction]):
|
||||
try:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
self.__playersThreads[guild.id] = self.__createPlayerThreadInfo(context)
|
||||
else:
|
||||
# If the thread has ended create a new one
|
||||
if not self.__playersThreads[guild.id].getPlayer().is_alive():
|
||||
self.__playersThreads[guild.id] = self.__recreateThread(guild, context)
|
||||
|
||||
return self.__playersThreads[guild.id]
|
||||
except Exception as e:
|
||||
print(f'[Error In GetPlayerContext] -> {e}')
|
||||
|
||||
def resetPlayer(self, guild: Guild, context: Context) -> None:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
return None
|
||||
|
||||
# Recreate the thread keeping the playlist
|
||||
newPlayerInfo = self.__recreateThread(guild, context)
|
||||
newPlayerInfo.getPlayer().start()
|
||||
# Send a command to start the play again
|
||||
playCommand = VCommands(VCommandsType.PLAY)
|
||||
newPlayerInfo.getQueueToPlayer().put(playCommand)
|
||||
self.__playersThreads[guild.id] = newPlayerInfo
|
||||
|
||||
def __getRunningPlayerInfo(self, guild: Guild) -> ThreadPlayerInfo:
|
||||
if guild.id not in self.__playersThreads.keys():
|
||||
print('Process Info not found')
|
||||
return None
|
||||
|
||||
return self.__playersThreads[guild.id]
|
||||
|
||||
def __createPlayerThreadInfo(self, context: Union[Context, Interaction]) -> ThreadPlayerInfo:
|
||||
guildID: int = context.guild.id
|
||||
if isinstance(context, Interaction):
|
||||
voiceID: int = context.user.voice.channel.id
|
||||
else:
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
|
||||
voiceChannel = self.__bot.get_channel(voiceID)
|
||||
|
||||
playlist = Playlist()
|
||||
lock = RLock()
|
||||
player = ThreadPlayer(self.__bot, context.guild, context.guild.name,
|
||||
voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread)
|
||||
playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel)
|
||||
player.start()
|
||||
|
||||
return playerInfo
|
||||
|
||||
def __deleteThread(self, guild: Guild) -> None:
|
||||
"""Tries to delete the thread and removes all the references to it"""
|
||||
print(f'[THREAD MANAGER] -> Deleting Thread for guild {guild.name}')
|
||||
playerInfo = self.__playersThreads[guild.id]
|
||||
if playerInfo:
|
||||
thread = playerInfo.getPlayer()
|
||||
self.__playersThreads.pop(guild.id)
|
||||
del thread
|
||||
|
||||
def __recreateThread(self, guild: Guild, context: Union[Context, Interaction]) -> ThreadPlayerInfo:
|
||||
self.__stopPossiblyRunningProcess(guild)
|
||||
|
||||
guildID: int = context.guild.id
|
||||
if isinstance(context, Interaction):
|
||||
voiceID: int = context.user.voice.channel.id
|
||||
else:
|
||||
voiceID: int = context.author.voice.channel.id
|
||||
voiceChannel = self.__bot.get_channel(voiceID)
|
||||
|
||||
playlist = self.__playersThreads[guildID].getPlaylist()
|
||||
lock = RLock()
|
||||
player = ThreadPlayer(self.__bot, context.guild, context.guild.name,
|
||||
voiceChannel, playlist, lock, guildID, voiceID, self.__receiveCommand, self.__deleteThread)
|
||||
playerInfo = ThreadPlayerInfo(player, playlist, lock, context.channel)
|
||||
player.start()
|
||||
|
||||
return playerInfo
|
||||
|
||||
async def showNowPlaying(self, guild: Guild, song: Song) -> None:
|
||||
processInfo = self.__playersThreads[guild.id]
|
||||
playlist = processInfo.getPlaylist()
|
||||
txtChannel = processInfo.getTextChannel()
|
||||
|
||||
await ProcessCommandsExecutor.sendNowPlayingToGuild(self.__bot, playlist, txtChannel, song, guild)
|
||||
Loading…
x
Reference in New Issue
Block a user