Upgrading PlayController and Spotify Connection

This commit is contained in:
Rafael Vargas
2022-03-26 17:42:49 -04:00
parent f30513f710
commit 4c66c64041
15 changed files with 213 additions and 163 deletions

View File

@@ -1,5 +1,7 @@
import asyncio
from typing import List
from numpy import extract
from Config.Config import Configs
from yt_dlp import YoutubeDL
from concurrent.futures import ThreadPoolExecutor
@@ -25,7 +27,7 @@ class Downloader():
'default_search': 'auto',
'playliststart': 0,
'extract_flat': False,
'playlistend': config.MAX_PLAYLIST_LENGTH,
'playlistend': config.MAX_PLAYLIST_FORCED_LENGTH,
}
__BASE_URL = 'https://www.youtube.com/watch?v={}'
@@ -127,7 +129,10 @@ class Downloader():
extracted_info = ydl.extract_info(search, download=False)
if self.__failed_to_extract(extracted_info):
self.__get_forced_extracted_info(extracted_info)
extracted_info = self.__get_forced_extracted_info(title)
if extracted_info is None:
return {}
if self.__is_multiple_musics(extracted_info):
return extracted_info['entries'][0]

View File

@@ -36,71 +36,7 @@ class Player(commands.Cog):
def playlist(self) -> Playlist:
return self.__playlist
async def play(self, ctx: Context, track: str, requester: str) -> str:
try:
links, provider = self.__searcher.search(track)
if provider == Provider.Unknown or links == None:
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.INVALID_INPUT,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
if provider == Provider.YouTube:
links = await self.__down.extract_info(links[0])
if len(links) == 0:
embed = Embed(
title=self.__config.ERROR_TITLE,
description="This video is unavailable",
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
songs_quant = 0
for info in links:
song = self.__playlist.add_song(info, requester)
songs_quant += 1
songs_preload = self.__playlist.songs_to_preload
await self.__down.preload(songs_preload)
except Exception as e:
print(f'DEVELOPER NOTE -> Error while Downloading in Player: {e}')
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.DOWNLOADING_ERROR,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return
if songs_quant == 1:
song = self.__down.finish_one_song(song)
pos = len(self.__playlist)
if song.problematic:
embed = Embed(
title=self.__config.ERROR_TITLE,
description=self.__config.DOWNLOADING_ERROR,
colours=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
return None
elif not self.__playing:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.SONG_ADDED.format(song.title),
colour=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
else:
embed = self.__format_embed(song.info, self.__config.SONG_ADDED_TWO, pos)
await ctx.send(embed=embed)
else:
embed = Embed(
title=self.__config.SONG_PLAYER,
description=self.__config.SONGS_ADDED.format(songs_quant),
colour=self.__config.COLOURS['blue'])
await ctx.send(embed=embed)
async def play(self, ctx: Context) -> str:
if not self.__playing:
first_song = self.__playlist.next_song()
await self.__play_music(ctx, first_song)

View File

@@ -92,8 +92,7 @@ class Playlist(IPlaylist):
self.__current = last_song
return self.__current # return the song
def add_song(self, identifier: str, requester: str) -> Song:
song = Song(identifier=identifier, playlist=self, requester=requester)
def add_song(self, song: Song) -> Song:
self.__queue.append(song)
return song

View File

@@ -1,40 +1,44 @@
from Exceptions.Exceptions import InvalidInput, SpotifyError
from Music.Downloader import Downloader
from Music.Types import Provider
from Music.Spotify import SpotifySearch
from Utils.Utils import is_url
from Utils.Utils import Utils
from Config.Messages import SearchMessages
class Searcher():
def __init__(self) -> None:
self.__Spotify = SpotifySearch()
self.__messages = SearchMessages()
self.__down = Downloader()
def search(self, music: str) -> list:
provider = self.__identify_source(music)
async def search(self, track: str) -> list:
provider = self.__identify_source(track)
if provider == Provider.Unknown:
raise InvalidInput(self.__messages.UNKNOWN_INPUT, self.__messages.UNKNOWN_INPUT_TITLE)
if provider == Provider.YouTube:
return [music], Provider.YouTube
elif provider == Provider.YouTube:
musics = await self.__down.extract_info(track)
return musics
elif provider == Provider.Spotify:
if self.__Spotify.connected == True:
musics = self.__Spotify.search(music)
return musics, Provider.Name
else:
print('DEVELOPER NOTE -> Spotify Not Connected')
return [], Provider.Unknown
try:
musics = self.__Spotify.search(track)
return musics
except:
raise SpotifyError(self.__messages.SPOTIFY_ERROR, self.__messages.GENERIC_TITLE)
elif provider == Provider.Name:
return [music], Provider.Name
return [track]
elif provider == Provider.Unknown:
return None, Provider.Unknown
def __identify_source(self, music) -> Provider:
if not is_url(music):
def __identify_source(self, track) -> Provider:
if not Utils.is_url(track):
return Provider.Name
if "https://www.youtu" in music or "https://youtu.be" in music or "https://music.youtube" in music:
if "https://www.youtu" in track or "https://youtu.be" in track or "https://music.youtube" in track:
return Provider.YouTube
if "https://open.spotify.com" in music:
if "https://open.spotify.com" in track:
return Provider.Spotify
return Provider.Unknown

View File

@@ -17,17 +17,15 @@ class Song(ISong):
self.__required_keys = ['url']
for key in self.__required_keys:
if key in info:
if key in info.keys():
self.__info[key] = info[key]
else:
print(f'DEVELOPER NOTE -> {key} not found in info of music: {self.identifier}')
self.destroy()
for key in self.__usefull_keys:
if key in info:
if key in info.keys():
self.__info[key] = info[key]
else:
print(f'DEVELOPER NOTE -> {key} not found in info of music: {self.identifier}')
@property
def source(self) -> str:

View File

@@ -1,4 +1,4 @@
import spotipy
from spotipy import Spotify
from spotipy.oauth2 import SpotifyClientCredentials
from Config.Config import Configs
@@ -9,86 +9,67 @@ class SpotifySearch():
self.__connected = False
self.__connect()
@property
def connected(self):
return self.__connected
def __connect(self) -> bool:
def __connect(self) -> None:
try:
# Initialize the connection with Spotify API
self.__api = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
client_id=self.__config.SPOTIFY_ID, client_secret=self.__config.SPOTIFY_SECRET))
auth = SpotifyClientCredentials(self.__config.SPOTIFY_ID, self.__config.SPOTIFY_SECRET)
self.__api = Spotify(auth_manager=auth)
self.__connected = True
return True
except Exception as e:
print(f'DEVELOPER NOTE -> Spotify Connection {e}')
return False
print(f'DEVELOPER NOTE -> Spotify Connection Error {e}')
def search(self, music=str) -> list:
def search(self, music: str) -> list:
type = music.split('/')[3].split('?')[0]
code = music.split('/')[4].split('?')[0]
if type == 'album':
musics = self.__get_album(code)
elif type == 'playlist':
musics = self.__get_playlist(code)
elif type == 'track':
musics = self.__get_track(code)
elif type == 'artist':
musics = self.__get_artist(code)
else:
return None
musics = []
if self.__connected:
if type == 'album':
musics = self.__get_album(code)
elif type == 'playlist':
musics = self.__get_playlist(code)
elif type == 'track':
musics = self.__get_track(code)
elif type == 'artist':
musics = self.__get_artist(code)
return musics
def __get_album(self, code=str) -> list:
if self.__connected == True:
try:
results = self.__api.album_tracks(code)
musics = results['items']
def __get_album(self, code: str) -> list:
results = self.__api.album_tracks(code)
musics = results['items']
while results['next']: # Get the next pages
results = self.__api.next(results)
musics.extend(results['items'])
while results['next']: # Get the next pages
results = self.__api.next(results)
musics.extend(results['items'])
musicsTitle = []
musicsTitle = []
for music in musics:
try:
title = self.__extract_title(music)
musicsTitle.append(title)
except:
pass
return musicsTitle
except Exception as e:
raise e
for music in musics:
title = self.__extract_title(music)
musicsTitle.append(title)
def __get_playlist(self, code=str) -> list:
try:
results = self.__api.playlist_items(code)
itens = results['items']
return musicsTitle
while results['next']: # Load the next pages
results = self.__api.next(results)
itens.extend(results['items'])
def __get_playlist(self, code: str) -> list:
results = self.__api.playlist_items(code)
itens = results['items']
musics = []
for item in itens:
musics.append(item['track'])
while results['next']: # Load the next pages
results = self.__api.next(results)
itens.extend(results['items'])
titles = []
for music in musics:
try:
title = self.__extract_title(music)
titles.append(title)
except Exception as e:
raise e
musics = []
for item in itens:
musics.append(item['track'])
return titles
titles = []
for music in musics:
title = self.__extract_title(music)
titles.append(title)
except Exception as e:
raise e
return titles
def __get_track(self, code=str) -> list:
def __get_track(self, code: str) -> list:
results = self.__api.track(code)
name = results['name']
artists = ''
@@ -97,7 +78,7 @@ class SpotifySearch():
return [f'{name} {artists}']
def __get_artist(self, code=str) -> list:
def __get_artist(self, code: str) -> list:
results = self.__api.artist_top_tracks(code, country='BR')
musics_titles = []