Vulkan/Handlers/JumpMusicHandler.py

79 lines
3.1 KiB
Python

from typing import Union
from Config.Exceptions import BadCommandUsage, InvalidInput, NumberRequired, UnknownError, VulkanError
from Handlers.AbstractHandler import AbstractHandler
from discord.ext.commands import Context
from discord import Interaction
from Handlers.HandlerResponse import HandlerResponse
from Music.Playlist import Playlist
from Music.VulkanBot import VulkanBot
from Parallelism.AbstractProcessManager import AbstractPlayersManager
from Parallelism.Commands import VCommands, VCommandsType
class JumpMusicHandler(AbstractHandler):
"""Move a music from a specific position and play it directly"""
def __init__(self, ctx: Union[Context, Interaction], bot: VulkanBot) -> None:
super().__init__(ctx, bot)
async def run(self, musicPos: str) -> HandlerResponse:
playersManager: AbstractPlayersManager = self.config.getPlayersManager()
if not playersManager.verifyIfPlayerExists(self.guild):
embed = self.embeds.NOT_PLAYING()
error = BadCommandUsage()
return HandlerResponse(self.ctx, embed, error)
playerLock = playersManager.getPlayerLock(self.guild)
acquired = playerLock.acquire(timeout=self.config.ACQUIRE_LOCK_TIMEOUT)
if acquired:
# Try to convert input to int
error = self.__validateInput(musicPos)
if error:
embed = self.embeds.ERROR_EMBED(error.message)
playerLock.release()
return HandlerResponse(self.ctx, embed, error)
# Sanitize the input
playlist = playersManager.getPlayerPlaylist(self.guild)
musicPos = self.__sanitizeInput(playlist, musicPos)
# Validate the position
if not playlist.validate_position(musicPos):
error = InvalidInput()
embed = self.embeds.PLAYLIST_RANGE_ERROR()
playerLock.release()
return HandlerResponse(self.ctx, embed, error)
try:
# Move the selected song
playlist.move_songs(musicPos, 1)
# Send a command to the player to skip the music
command = VCommands(VCommandsType.SKIP, None)
await playersManager.sendCommandToPlayer(command, self.guild, self.ctx)
return HandlerResponse(self.ctx)
except:
embed = self.embeds.ERROR_MOVING()
error = UnknownError()
return HandlerResponse(self.ctx, embed, error)
finally:
playerLock.release()
else:
playersManager.resetPlayer(self.guild, self.ctx)
embed = self.embeds.PLAYER_RESTARTED()
return HandlerResponse(self.ctx, embed)
def __validateInput(self, position: str) -> Union[VulkanError, None]:
try:
position = int(position)
except:
return NumberRequired(self.messages.ERROR_NUMBER)
def __sanitizeInput(self, playlist: Playlist, position: int) -> int:
position = int(position)
if position == -1:
position = len(playlist.getSongs())
return position