First commit

This commit is contained in:
Rafael Vargas
2022-01-07 11:53:12 -04:00
commit 089b47fc44
19 changed files with 1666 additions and 0 deletions

139
vulkan/music/Downloader.py Normal file
View File

@@ -0,0 +1,139 @@
import asyncio
import concurrent.futures
from config import config
from yt_dlp import YoutubeDL
from yt_dlp.utils import ExtractorError, DownloadError
from vulkan.music.Song import Song
from vulkan.music.utils import is_url
class Downloader():
"""Download musics direct URL and title or Source from Youtube using a music name or Youtube URL"""
def __init__(self) -> None:
self.__YDL_OPTIONS = {'format': 'bestaudio/best',
'default_search': 'auto',
'playliststart': 0,
'extract_flat': True,
'playlistend': config.MAX_PLAYLIST_LENGTH,
}
def download_one(self, song: Song) -> Song:
"""Receives a song object, finish his download and return it"""
if song.identifier == None:
return None
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Destroy the music with problems
return None
else:
song.finish_down(song_info)
return song
def extract_youtube_link(self, playlist_url: str) -> list:
"""Extract all songs direct URL from a Youtube Link
Arg: Url String
Return: List with the direct youtube URL of each song
"""
if is_url(playlist_url): # If Url
options = self.__YDL_OPTIONS
options['extract_flat'] = True
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(playlist_url, download=False)
songs_identifiers = []
if result.get('entries'): # If got a dict of musics
for entry in result['entries']:
songs_identifiers.append(
f"https://www.youtube.com/watch?v={entry['id']}")
else: # Or a single music
songs_identifiers.append(result['original_url'])
return songs_identifiers # Return a list
except (ExtractorError, DownloadError) as e:
return None
else:
print('Invalid type of playlist URL')
return None
async def preload(self, songs: list) -> None:
"""Download the full info of the song object"""
for song in songs:
asyncio.ensure_future(self.__download_songs(song))
def __download_url(self, url) -> dict:
"""Download musics full info and source from Music URL
Arg: URL from Youtube
Return: Dict with the full youtube information of the music, including source to play it
"""
options = self.__YDL_OPTIONS
options['extract_flat'] = False
with YoutubeDL(options) as ydl:
try:
result = ydl.extract_info(url, download=False)
return result
except (ExtractorError, DownloadError) as e: # Any type of error in download
return None
async def __download_songs(self, song: Song) -> None:
"""Download a music object asynchronously"""
if song.source != None: # If Music already preloaded
return
def download_song(song):
if is_url(song.identifier): # Youtube URL
song_info = self.__download_url(song.identifier)
else: # Song name
song_info = self.__download_title(song.identifier)
if song_info == None:
song.destroy() # Remove the song with problems from the playlist
else:
song.finish_down(song_info)
# Creating a loop task to download each song
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=config.MAX_PRELOAD_SONGS
)
await asyncio.wait(fs={loop.run_in_executor(executor, download_song, song)},
return_when=asyncio.ALL_COMPLETED)
def __download_title(self, title: str) -> dict:
"""Download a music full information using his name.
Arg: Music Name
Return: A dict containing the song information
"""
if type(title) != str:
return None
config = self.__YDL_OPTIONS
config['extract_flat'] = False
with YoutubeDL(self.__YDL_OPTIONS) as ydl:
try:
search = f"ytsearch:{title}"
result = ydl.extract_info(search, download=False)
if result == None:
return None
# Return a dict with the full info of first music
return result['entries'][0]
except Exception as e:
return None

View File

@@ -0,0 +1,89 @@
from abc import ABC, abstractproperty, abstractmethod
class IPlaylist(ABC):
"""Class to manage and control the songs to play and played"""
@abstractproperty
def looping_one(self):
pass
@abstractproperty
def looping_all(self):
pass
@abstractproperty
def songs_to_preload(self) -> list:
pass
@abstractmethod
def __len__(self):
pass
@abstractmethod
def next_song(self):
pass
@abstractmethod
def prev_song(self):
pass
@abstractmethod
def add_song(self, identifier: str) -> None:
pass
@abstractmethod
def shuffle(self) -> None:
pass
@abstractmethod
def revert(self) -> None:
pass
@abstractmethod
def clear(self) -> None:
pass
@abstractmethod
def loop_one(self) -> str:
pass
@abstractmethod
def loop_all(self) -> str:
pass
@abstractmethod
def loop_off(self) -> str:
pass
@abstractmethod
def destroy_song(self, song_destroy) -> None:
pass
class ISong(ABC):
"""Store the usefull information about a Song"""
@abstractmethod
def finish_down(self, info: dict) -> None:
pass
@abstractmethod
def source(self) -> str:
pass
@abstractmethod
def title(self) -> str:
pass
@abstractmethod
def duration(self) -> str:
pass
@abstractmethod
def identifier(self) -> str:
pass
@abstractmethod
def destroy(self) -> None:
pass

308
vulkan/music/Player.py Normal file
View File

@@ -0,0 +1,308 @@
import discord
from discord.ext import commands
from config import config
import datetime
from vulkan.music.Downloader import Downloader
from vulkan.music.Playlist import Playlist
from vulkan.music.Searcher import Searcher
from vulkan.music.Types import Provider
from vulkan.music.utils import *
class Player(commands.Cog):
def __init__(self, bot, guild):
self.__searcher: Searcher = Searcher()
self.__down: Downloader = Downloader()
self.__playlist: Playlist = Playlist()
self.__bot: discord.Client = bot
self.__guild: discord.Guild = guild
self.__timer = Timer(self.__timeout_handler)
self.__playing = False
self.YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
self.FFMPEG_OPTIONS = {'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn'}
async def connect(self, ctx):
if not ctx.author.voice:
return {'success': False, 'reason': config.NO_CHANNEL}
if self.__guild.voice_client == None:
await ctx.author.voice.channel.connect(reconnect=True, timeout=None)
return {'success': True, 'reason': ''}
def __play_next(self, error, ctx):
song = self.__playlist.next_song()
if song != None: # If there is not a song for the song
coro = self.__play_music(ctx, song)
self.__bot.loop.create_task(coro)
else:
self.__playing = False
async def __play_music(self, ctx, song):
self.__playing = True
player = discord.FFmpegPCMAudio(song.source, **self.FFMPEG_OPTIONS)
self.__guild.voice_client.play(
player, after=lambda e: self.__play_next(e, ctx))
self.__timer.cancel()
self.__timer = Timer(self.__timeout_handler)
await ctx.invoke(self.__bot.get_command('np'))
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
async def play(self, ctx, track, requester) -> str:
try:
songs_names, provider = self.__searcher.search(track)
if provider == Provider.Unknown:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.INVALID_INPUT,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
elif provider == Provider.YouTube:
songs_names = self.__down.extract_youtube_link(songs_names[0])
songs_quant = 0
for name in songs_names:
song = self.__playlist.add_song(name, requester)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
except:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
if songs_quant == 1:
song = self.__down.download_one(song)
if song == None:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colours=config.COLOURS['blue'])
await ctx.send(embed=embed)
return
elif not self.__playing:
embed = discord.Embed(
title=config.SONG_QUEUE_TITLE,
description=config.SONG_ADDED.format(song.title),
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
else:
embed = self.__format_embed(song.info, config.SONG_ADDED)
await ctx.send(embed=embed)
else:
embed = discord.Embed(
title=config.SONG_QUEUE_TITLE,
description=config.SONGS_ADDED.format(songs_quant),
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
if not self.__playing:
try_another = True
while try_another: # This will ensure the first song source to be ready
first_song = self.__playlist.next_song()
if first_song == None:
embed = discord.Embed(
title=config.ERROR_TITLE,
description=config.DOWNLOADING_ERROR,
colour=config.COLOURS['blue'])
await ctx.send(embed=embed)
break
while True:
if first_song.source != None: # If song got downloaded
try_another = False
break
if first_song.problematic: # If song got any error, try another one
break
if first_song != None:
await self.__play_music(ctx, first_song)
async def queue(self) -> discord.Embed:
if self.__playlist.looping_one:
info = self.__playlist.current.info
title = 'Song Looping Now'
return self.__format_embed(info, title)
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
total_time = format_time(sum([int(song.duration if song.duration else 0)
for song in songs_preload])) # Sum the duration
total_songs = len(self.__playlist)
text = f'Total musics: {total_songs} | Duration: `{total_time}` downloaded \n\n'
for pos, song in enumerate(songs_preload, start=1):
title = song.title if song.title else 'Downloading...'
text += f"**`{pos}` - ** {title} - `{format_time(song.duration)}`\n"
title = 'Songs in Queue'
if len(songs_preload) > 0:
if self.__playlist.looping_all:
title = 'Repeating All'
else:
text = 'There is no musics in queue'
embed = discord.Embed(
title=title,
description=text,
colour=config.COLOURS['blue']
)
return embed
async def skip(self) -> bool:
if self.__guild.voice_client != None:
self.__guild.voice_client.stop()
return True
else:
return False
async def stop(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
self.__guild.voice_client.stop()
await self.__guild.voice_client.disconnect()
return True
async def pause(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_playing():
self.__guild.voice_client.pause()
return True
async def resume(self) -> bool:
if self.__guild.voice_client == None:
return False
if self.__guild.voice_client.is_paused():
self.__guild.voice_client.resume()
return True
async def loop(self, args: str):
args = args.lower()
if args == 'one':
description = self.__playlist.loop_one()
elif args == 'all':
description = self.__playlist.loop_all()
elif args == 'off':
description = self.__playlist.loop_off()
else:
description = 'Comando Loop\nOne - Repete a música atual\nAll - Repete as músicas atuais\nOff - Desativa o loop'
return description
async def clear(self) -> None:
self.__playlist.clear()
async def now_playing(self) -> discord.Embed:
if self.__playlist.looping_one:
title = 'Song Looping Now'
else:
title = 'Song Playing Now'
current_song = self.__playlist.current
embed = self.__format_embed(current_song.info, title)
return embed
async def shuffle(self) -> str:
try:
self.__playlist.shuffle()
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return 'Musics shuffled successfully'
except:
return 'An error ocurred :/'
async def move(self, pos1, pos2='1') -> str:
try:
pos1 = int(pos1)
pos2 = int(pos2)
except:
return 'This command require a number'
result = self.__playlist.move_songs(pos1, pos2)
songs = self.__playlist.songs_to_preload
await self.__down.preload(songs)
return result
async def remove(self, position) -> str:
"""Remove a song from the queue in the position"""
try:
position = int(position)
except:
return 'This command require a number'
result = self.__playlist.remove_song(position)
return result
def __format_embed(self, info, title='') -> discord.Embed:
"""Configure the embed to show the song information"""
embedvc = discord.Embed(
title=title,
description=f"[{info['title']}]({info['original_url']})",
color=config.COLOURS['blue']
)
embedvc.add_field(name=config.SONGINFO_UPLOADER,
value=info['uploader'],
inline=True)
embedvc.add_field(name=config.SONGINFO_REQUESTER,
value=info['requester'],
inline=True)
if 'thumbnail' in info.keys():
embedvc.set_thumbnail(url=info['thumbnail'])
if 'duration' in info.keys():
duration = str(datetime.timedelta(seconds=info['duration']))
embedvc.add_field(name=config.SONGINFO_DURATION,
value=f"{duration}",
inline=True)
else:
embedvc.add_field(name=config.SONGINFO_DURATION,
value=config.SONGINFO_UNKNOWN_DURATION,
inline=True)
return embedvc
async def __timeout_handler(self) -> None:
if self.__guild.voice_client == None:
return
if self.__guild.voice_client.is_playing() or self.__guild.voice_client.is_paused():
self.__timer = Timer(self.__timeout_handler)
elif self.__guild.voice_client.is_connected():
self.__playlist.clear()
self.__playlist.loop_off()
await self.__guild.voice_client.disconnect()

179
vulkan/music/Playlist.py Normal file
View File

@@ -0,0 +1,179 @@
from collections import deque
from config import config
import random
from vulkan.music.Interfaces import IPlaylist
from vulkan.music.Song import Song
class Playlist(IPlaylist):
"""Class to manage and control the songs to play and played"""
def __init__(self) -> None:
self.__queue = deque() # Store the musics to play
self.__songs_history = deque() # Store the musics played
self.__name_history = deque() # Store the name of musics played
self.__looping_one = False
self.__looping_all = False
self.__current: Song = None
@property
def looping_one(self) -> bool:
return self.__looping_one
@property
def looping_all(self) -> bool:
return self.__looping_all
@property
def current(self) -> Song:
return self.__current
@property
def songs_to_preload(self) -> list:
return list(self.__queue)[:config.MAX_PRELOAD_SONGS]
def __len__(self) -> int:
return len(self.__queue)
def next_song(self) -> Song:
"""Return the next song to play"""
if self.__current == None and len(self.__queue) == 0:
# If not playing and nothing to play
return None
# If playing
played_song = self.__current
# Check if need to repeat the played song
if self.__looping_one: # Insert the current song to play again
self.__queue.appendleft(played_song)
if self.__looping_all: # Insert the current song in the end of queue
self.__queue.append(played_song)
while True: # Try to get the source of next song
if len(self.__queue) == 0: # If no more song to play, return None
return None
# Att the current with the first one
self.__current = self.__queue[0]
self.__queue.popleft() # Remove the current from queue
self.__name_history.append(
self.__current.identifier) # Add to name history
self.__songs_history.append(self.__current) # Add to song history
return self.__current
def prev_song(self) -> Song:
"""Return the source of the last song played
Return None or the source of the prev song
"""
if len(self.__songs_history) == 0:
return None
else:
return self.__songs_history[0].source
def add_song(self, identifier: str, requester: str) -> Song:
"""Create a song object, add to queue and return it"""
song = Song(identifier=identifier, playlist=self, requester=requester)
self.__queue.append(song)
return song
def shuffle(self) -> None:
"""Shuffle the order of the songs to play"""
random.shuffle(self.__queue)
def revert(self) -> None:
"""Revert the order of the songs to play"""
self.__queue.reverse()
def clear(self) -> None:
"""Clear the songs to play song history"""
self.__queue.clear()
self.__songs_history.clear()
def loop_one(self) -> str:
"""Try to start the loop of the current song
Return: Embed descrition to show to user
"""
if self.__looping_all == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_one == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_one = True
return 'Repeating the current song'
def loop_all(self) -> str:
"""Try to start the loop of all songs
Return: Embed descrition to show to user
"""
if self.__looping_one == True:
return 'Vulkan already looping one music, disable loop first'
elif self.__looping_all == True:
return "I'm already doing this, you dumb ass"
else:
self.__looping_all = True
return 'Repeating all songs in queue'
def loop_off(self) -> str:
"""Disable both types of loop"""
if self.__looping_all == False and self.__looping_one == False:
return "The loop is already off, you fucking dick head"
self.__looping_all = False
self.__looping_one = False
return 'Loop disable'
def destroy_song(self, song_destroy: Song) -> None:
"""Destroy a song object from the queue"""
for song in self.__queue:
if song == song_destroy:
self.__queue.remove(song)
break
def move_songs(self, pos1, pos2) -> str:
"""Receive two position and try to change the songs in those positions, -1 is the last
Positions: First music is 1
Return (Error bool, string) with the status of the function, to show to user
"""
if pos1 == -1:
pos1 = len(self.__queue)
if pos2 == -1:
pos2 = len(self.__queue)
if pos2 not in range(1, len(self.__queue) + 1) or pos1 not in range(1, len(self.__queue) + 1):
return 'Numbers must be between 1 and queue length, or -1 for the last song'
try:
song1 = self.__queue[pos1-1]
song2 = self.__queue[pos2-1]
self.__queue[pos1-1] = song2
self.__queue[pos2-1] = song1
song1_name = song1.title if song1.title else song1.identifier
song2_name = song2.title if song2.title else song2.identifier
return f'Song `{song1_name}` in position `{pos1}` moved with `{song2_name}` in position `{pos2}` successfully'
except Exception as e:
print(e)
return 'There was a problem with the moving of songs'
def remove_song(self, position) -> tuple:
if position not in range(1, len(self.__queue) + 1) and position != -1:
return 'Numbers must be between 1 and queue length, or -1 for the last song'
else:
song = self.__queue[position-1]
self.__queue.remove(song)
song_name = song.title if song.title else song.identifier
return f'Song `{song_name}` removed successfully'

48
vulkan/music/Searcher.py Normal file
View File

@@ -0,0 +1,48 @@
from vulkan.music.Types import Provider
from vulkan.music.Spotify import SpotifySearch
from vulkan.music.utils import is_url
class Searcher():
"""Turn the user input into list of musics names, support youtube and spotify"""
def __init__(self) -> None:
self.__Spotify = SpotifySearch()
def search(self, music: str) -> list:
"""Return a list with the song names or an URL
Arg -> User Input, a string with the
Return -> A list of musics names and Provider Type
"""
url_type = self.__identify_source(music)
if url_type == Provider.YouTube:
return [music], Provider.YouTube
elif url_type == Provider.Spotify:
if self.__Spotify.connected == True:
musics = self.__Spotify.search(music)
return musics, Provider.Name
else:
return [], Provider.Unknown
elif url_type == Provider.Name:
return [music], Provider.Name
elif url_type == Provider.Unknown:
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
"""Identify the provider of a music"""
if not is_url(music):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music:
return Provider.YouTube
if "https://open.spotify.com" in music:
return Provider.Spotify
# If no match
return Provider.Unknown

66
vulkan/music/Song.py Normal file
View File

@@ -0,0 +1,66 @@
from vulkan.music.Interfaces import ISong, IPlaylist
class Song(ISong):
"""Store the usefull information about a Song"""
def __init__(self, identifier: str, playlist: IPlaylist, requester: str) -> None:
"""Create a song with only the URL to the youtube song"""
self.__identifier = identifier
self.__info = {'requester': requester}
self.__problematic = False
self.__playlist: IPlaylist = playlist
def finish_down(self, info: dict) -> None:
"""Get and store the full information of the song"""
self.__usefull_keys = ['url', 'duration',
'title', 'webpage_url',
'channel', 'id', 'uploader',
'thumbnail', 'original_url']
for key in self.__usefull_keys:
try:
self.__info[key] = info[key]
except Exception as e:
raise e
@property
def source(self) -> str:
"""Return the Song Source URL to play"""
if 'url' in self.__info.keys():
return self.__info['url']
else:
return None
@property
def title(self) -> str:
"""Return the Song Title"""
if 'title' in self.__info.keys():
return self.__info['title']
else:
return None
@property
def duration(self) -> str:
"""Return the Song Title"""
if 'duration' in self.__info.keys():
return self.__info['duration']
else:
return 0.0
@property
def identifier(self) -> str:
return self.__identifier
@property
def problematic(self) -> bool:
return self.__problematic
def destroy(self) -> None:
"""Mark this song with problems and removed from the playlist due to any type of error"""
self.__problematic = True
self.__playlist.destroy_song(self)
@property
def info(self):
return self.__info

121
vulkan/music/Spotify.py Normal file
View File

@@ -0,0 +1,121 @@
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from config import config
class SpotifySearch():
"""Search a Spotify music or playlist and return the musics names"""
def __init__(self) -> None:
self.__connected = False
self.__connect()
@property
def connected(self):
return self.__connected
def __connect(self) -> bool:
try:
# Initialize the connection with Spotify API
self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
client_id=config.SPOTIFY_ID, client_secret=config.SPOTIFY_SECRET))
self.__connected = True
return True
except:
return False
def search(self, music) -> list:
"""Search and return the title of musics on Spotify"""
type = music.split('/')[3].split('?')[0]
code = music.split('/')[4].split('?')[0]
if type == 'album':
musics = self.__get_album(code)
elif type == 'playlist':
musics = self.__get_playlist(code)
elif type == 'track':
musics = self.__get_track(code)
else:
return None
return musics
def __get_album(self, code) -> list:
"""Get the externals urls of a album
ARG: Spotify Code of the Album
"""
if self.__connected == True:
try:
# Load all music objects
results = self.__api.album_tracks(code)
musics = results['items']
while results['next']: # Get the next pages
results = self.__api.next(results)
musics.extend(results['items'])
musicsTitle = []
for music in musics:
try:
title = self.__extract_title(music)
musicsTitle.append(title)
except:
pass
return musicsTitle
except Exception as e:
raise e
def __get_playlist(self, code) -> list:
"""Get the externals urls of a playlist
Arg: Spotify Code of the Playlist
"""
try:
results = self.__api.playlist_items(code)
itens = results['items']
while results['next']: # Load the next pages
results = self.__api.next(results)
itens.extend(results['items'])
musics = []
for item in itens:
musics.append(item['track'])
titles = []
for music in musics:
try:
title = self.__extract_title(music)
titles.append(title)
except Exception as e:
raise e
return titles
except Exception as e:
raise e
def __get_track(self, code) -> list:
"""Convert a external_url track to the title of the music
ARG: Spotify Code of the Music
"""
results = self.__api.track(code)
name = results['name']
artists = ''
for artist in results['artists']:
artists += f'{artist["name"]} '
return [f'{name} {artists}']
def __extract_title(self, music: dict) -> str:
"""Receive a spotify music object and return his title
ARG: music dict returned by Spotify
"""
title = f'{music["name"]} '
for artist in music['artists']:
title += f'{artist["name"]} '
return title

9
vulkan/music/Types.py Normal file
View File

@@ -0,0 +1,9 @@
from enum import Enum
class Provider(Enum):
"""Store Enum Types of the Providers"""
Spotify = "Spotify"
YouTube = "YouTube"
Name = 'Track Name'
Unknown = "Unknown"

51
vulkan/music/utils.py Normal file
View File

@@ -0,0 +1,51 @@
import re
import asyncio
from config import config
def is_connected(ctx):
try:
voice_channel = ctx.guild.voice_client.channel
return voice_channel
except:
return None
def format_time(duration):
if not duration:
return "00:00"
hours = duration // 60 // 60
minutes = duration // 60 % 60
seconds = duration % 60
return "{}{}{:02d}:{:02d}".format(
hours if hours else "",
":" if hours else "",
minutes,
seconds
)
def is_url(string) -> bool:
"""Verify if a string is a url"""
regex = re.compile(
"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
if re.search(regex, string):
return True
else:
return False
class Timer:
def __init__(self, callback):
self.__callback = callback
self.__task = asyncio.create_task(self.__executor())
async def __executor(self):
await asyncio.sleep(config.VC_TIMEOUT)
await self.__callback()
def cancel(self):
self.__task.cancel()