diff --git a/scripts/python/API/.vscode/launch.json b/scripts/python/API/.vscode/launch.json index 96695e79..59e84c31 100644 --- a/scripts/python/API/.vscode/launch.json +++ b/scripts/python/API/.vscode/launch.json @@ -5,18 +5,26 @@ "version": "0.2.0", "configurations": [ { - "name": "Voice control", + "name": "Python: Main", "type": "debugpy", "request": "launch", - "program": "voice_control.py", + "program": "${workspaceFolder}/main.py", "console": "integratedTerminal", "justMyCode": false, }, { - "name": "Test bed", + "name": "Example voice control", "type": "debugpy", "request": "launch", - "program": "testbed.py", + "program": "example_voice_control.py", + "console": "integratedTerminal", + "justMyCode": false, + }, + { + "name": "Example disembarked infantry", + "type": "debugpy", + "request": "launch", + "program": "example_disembarked_infantry.py", "console": "integratedTerminal", "justMyCode": false, } diff --git a/scripts/python/API/api.py b/scripts/python/API/api.py index 545fc23e..60a97f3f 100644 --- a/scripts/python/API/api.py +++ b/scripts/python/API/api.py @@ -119,7 +119,122 @@ class API: signal.signal(signal.SIGINT, signal_handler) # Ctrl+C if hasattr(signal, 'SIGTERM'): signal.signal(signal.SIGTERM, signal_handler) # Termination signal + + async def _check_command_executed(self, command_hash: str, execution_callback, wait_for_result: bool, max_wait_time: int = 60): + """ + Check if a command has been executed by polling the API. + """ + start_time = time.time() + while True: + response = self._get(f"commands?commandHash={command_hash}") + if response.status_code == 200: + try: + data = response.json() + if data.get("commandExecuted") == True and (data.get("commandResult") is not None or (not wait_for_result)): + self.logger.info(f"Command {command_hash} executed successfully, command result: {data.get('commandResult')}") + if execution_callback: + await execution_callback(data.get("commandResult")) + break + elif data.get("status") == "failed": + self.logger.error(f"Command {command_hash} failed to execute.") + break + except ValueError: + self.logger.error("Failed to parse JSON response") + if time.time() - start_time > max_wait_time: + self.logger.warning(f"Timeout: Command {command_hash} did not complete within {max_wait_time} seconds.") + break + await asyncio.sleep(1) + async def _run_callback_async(self, callback, *args): + """ + Run a callback asynchronously, handling both sync and async callbacks. + """ + try: + if asyncio.iscoroutinefunction(callback): + await callback(*args) + else: + callback(*args) + except Exception as e: + # Log the error but don't crash the update process + self.logger.error(f"Error in callback: {e}") + + def register_on_update_callback(self, callback): + """ + Register a callback function to be called on each update. + + Args: + callback (function): The function to call on update. Can be sync or async. + The function should accept a single argument, which is the API instance. + """ + self.on_update_callback = callback + + async def _run_async(self): + """ + Async implementation of the API service loop. + """ + # Setup signal handlers for graceful shutdown + self._setup_signal_handlers() + + # Here you can add any initialization logic if needed + self.logger.info("API started") + self.logger.info("Press Ctrl+C to stop gracefully") + + self.running = True + self.should_stop = False + + # Call the startup callback if registered + if self.on_startup_callback: + try: + await self._run_callback_async(self.on_startup_callback, self) + except Exception as e: + self.logger.error(f"Error in startup callback: {e}") + + try: + while not self.should_stop: + # Update units from the last update timestamp + self.update_units(self.units_update_timestamp) + + if self.on_update_callback: + await self._run_callback_async(self.on_update_callback, self) + await asyncio.sleep(self.interval) + except KeyboardInterrupt: + self.logger.info("Keyboard interrupt received") + self.stop() + finally: + self.logger.info("API stopped") + self.running = False + + def unregister_on_update_callback(self): + """ + Unregister the callback function that is called on each update. + """ + self.on_update_callback = None + + def register_on_startup_callback(self, callback): + """ + Register a callback function to be called on startup. + Args: + callback (function): The function to call on startup. Can be sync or async. + The function should accept a single argument, which is the API instance. + """ + self.on_startup_callback = callback + + def unregister_on_startup_callback(self): + """ + Unregister the callback function that is called on startup. + """ + self.on_startup_callback = None + + def set_log_level(self, level): + """ + Set the logging level for the API. + + Args: + level: Logging level (e.g., logging.DEBUG, logging.INFO, logging.WARNING, self.logger.error) + """ + self.logger.setLevel(level) + self.logger.info(f"Log level set to {logging.getLevelName(level)}") + def get_units(self): """ Get all units from the API. Notice that if the API is not running, update_units() must be manually called first. @@ -170,8 +285,7 @@ class API: self.logger.error("Failed to parse JSON response") else: self.logger.error(f"Failed to fetch units: {response.status_code} - {response.text}") - - + def update_logs(self, time = 0): """ Fetch the logs from the API. @@ -192,7 +306,7 @@ class API: else: self.logger.error(f"Failed to fetch logs: {response.status_code} - {response.text}") - def spawn_aircrafts(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: int = 0): + def spawn_aircrafts(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: int = 0, execution_callback=None): """ Spawn aircraft units at the specified location or airbase. Args: @@ -202,6 +316,7 @@ class API: country (str): The country of the units. immediate (bool): Whether to spawn the units immediately or not, overriding the scheduler. spawnPoints (int): Amount of spawn points to use, default is 0. + execution_callback (function): An optional async callback function to execute after the command is processed. """ command = { "units": [unit.toJSON() for unit in units], @@ -214,7 +329,21 @@ class API: data = { "spawnAircrafts": command } response = self._put(data) - def spawn_helicopters(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: int = 0): + # Parse the response as JSON if callback is provided + if execution_callback: + try: + response_data = response.json() + command_hash = response_data.get("commandHash", None) + if command_hash: + self.logger.info(f"Aircraft spawned successfully. Command Hash: {command_hash}") + # Start a background task to check if the command was executed + asyncio.create_task(self._check_command_executed(command_hash, execution_callback, wait_for_result=True)) + else: + self.logger.error("Command hash not found in response") + except ValueError: + self.logger.error("Failed to parse JSON response") + + def spawn_helicopters(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: int = 0, execution_callback=None): """ Spawn helicopter units at the specified location or airbase. Args: @@ -224,6 +353,7 @@ class API: country (str): The country of the units. immediate (bool): Whether to spawn the units immediately or not, overriding the scheduler. spawnPoints (int): Amount of spawn points to use, default is 0. + execution_callback (function): An optional async callback function to execute after the command is processed. """ command = { "units": [unit.toJSON() for unit in units], @@ -236,6 +366,20 @@ class API: data = { "spawnHelicopters": command } response = self._put(data) + # Parse the response as JSON if callback is provided + if execution_callback: + try: + response_data = response.json() + command_hash = response_data.get("commandHash", None) + if command_hash: + self.logger.info(f"Helicopters spawned successfully. Command Hash: {command_hash}") + # Start a background task to check if the command was executed + asyncio.create_task(self._check_command_executed(command_hash, execution_callback, wait_for_result=True)) + else: + self.logger.error("Command hash not found in response") + except ValueError: + self.logger.error("Failed to parse JSON response") + def spawn_ground_units(self, units: list[UnitSpawnTable], coalition: str, country: str, immediate: bool, spawnPoints: int, execution_callback): """ Spawn ground units at the specified location. @@ -272,32 +416,7 @@ class API: except ValueError: self.logger.error("Failed to parse JSON response") - async def _check_command_executed(self, command_hash: str, execution_callback, wait_for_result: bool, max_wait_time: int = 60): - """ - Check if a command has been executed by polling the API. - """ - start_time = time.time() - while True: - response = self._get(f"commands?commandHash={command_hash}") - if response.status_code == 200: - try: - data = response.json() - if data.get("commandExecuted") == True and (data.get("commandResult") is not None or (not wait_for_result)): - self.logger.info(f"Command {command_hash} executed successfully, command result: {data.get('commandResult')}") - if execution_callback: - await execution_callback(data.get("commandResult")) - break - elif data.get("status") == "failed": - self.logger.error(f"Command {command_hash} failed to execute.") - break - except ValueError: - self.logger.error("Failed to parse JSON response") - if time.time() - start_time > max_wait_time: - self.logger.warning(f"Timeout: Command {command_hash} did not complete within {max_wait_time} seconds.") - break - await asyncio.sleep(1) - - def spawn_navy_units(self, units: list[UnitSpawnTable], coalition: str, country: str, immediate: bool, spawnPoints: int = 0): + def spawn_navy_units(self, units: list[UnitSpawnTable], coalition: str, country: str, immediate: bool, spawnPoints: int = 0, execution_callback=None): """ Spawn navy units at the specified location. Args: @@ -306,6 +425,7 @@ class API: country (str): The country of the units. immediate (bool): Whether to spawn the units immediately or not, overriding the scheduler. spawnPoints (int): Amount of spawn points to use, default is 0. + execution_callback (function): An optional async callback function to execute after the command is processed. """ command = { "units": [unit.toJSON() for unit in units], @@ -316,6 +436,20 @@ class API: } data = { "spawnNavyUnits": command } response = self._put(data) + + # Parse the response as JSON if callback is provided + if execution_callback: + try: + response_data = response.json() + command_hash = response_data.get("commandHash", None) + if command_hash: + self.logger.info(f"Navy units spawned successfully. Command Hash: {command_hash}") + # Start a background task to check if the command was executed + asyncio.create_task(self._check_command_executed(command_hash, execution_callback, wait_for_result=True)) + else: + self.logger.error("Command hash not found in response") + except ValueError: + self.logger.error("Failed to parse JSON response") def create_radio_listener(self): """ @@ -327,55 +461,6 @@ class API: from radio.radio_listener import RadioListener return RadioListener(self, "localhost", self.config.get("audio").get("WSPort")) - def register_on_update_callback(self, callback): - """ - Register a callback function to be called on each update. - - Args: - callback (function): The function to call on update. Can be sync or async. - The function should accept a single argument, which is the API instance. - """ - self.on_update_callback = callback - - def register_on_startup_callback(self, callback): - """ - Register a callback function to be called on startup. - Args: - callback (function): The function to call on startup. Can be sync or async. - The function should accept a single argument, which is the API instance. - """ - self.on_startup_callback = callback - - def set_log_level(self, level): - """ - Set the logging level for the API. - - Args: - level: Logging level (e.g., logging.DEBUG, logging.INFO, logging.WARNING, self.logger.error) - """ - self.logger.setLevel(level) - self.logger.info(f"Log level set to {logging.getLevelName(level)}") - - def stop(self): - """ - Stop the API service gracefully. - """ - self.logger.info("Stopping API service...") - self.should_stop = True - - async def _run_callback_async(self, callback, *args): - """ - Run a callback asynchronously, handling both sync and async callbacks. - """ - try: - if asyncio.iscoroutinefunction(callback): - await callback(*args) - else: - callback(*args) - except Exception as e: - # Log the error but don't crash the update process - self.logger.error(f"Error in callback: {e}") - def generate_audio_message(text: str, gender: str = "male", code: str = "en-US") -> str: """ Generate a WAV file from text using Google Text-to-Speech API. @@ -412,28 +497,6 @@ class API: return file_name - def send_command(self, command: str): - """ - Send a command to the API. - - Args: - command (str): The command to send. - """ - response = self._put(command) - if response.status_code == 200: - self.logger.info(f"Command sent successfully: {command}") - else: - self.logger.error(f"Failed to send command: {response.status_code} - {response.text}") - - def run(self): - """ - Start the API service. - - This method initializes the API and starts the necessary components. - Sets up signal handlers for graceful shutdown. - """ - asyncio.run(self._run_async()) - def get_closest_units(self, coalitions: list[str], categories: list[str], position: LatLng, operate_as: str | None = None, max_number: int = 1, max_distance: float = 10000) -> list[Unit]: """ Get the closest units of a specific coalition and category to a given position. @@ -453,7 +516,7 @@ class API: # Iterate through all units and find the closest ones that match the criteria for unit in self.units.values(): - if unit.alive and unit.coalition in coalitions and unit.category.lower() in categories and (operate_as is None or unit.operate_as == operate_as or unit.coalition is not "neutral"): + if unit.alive and unit.coalition in coalitions and unit.category.lower() in categories and (operate_as is None or unit.operate_as == operate_as or unit.coalition != "neutral"): distance = position.distance_to(unit.position) if distance < closest_distance: closest_distance = distance @@ -468,39 +531,33 @@ class API: closest_units = closest_units[:max_number] return closest_units - - async def _run_async(self): - """ - Async implementation of the API service loop. - """ - # Setup signal handlers for graceful shutdown - self._setup_signal_handlers() - - # Here you can add any initialization logic if needed - self.logger.info("API started") - self.logger.info("Press Ctrl+C to stop gracefully") - - self.running = True - self.should_stop = False - - # Call the startup callback if registered - if self.on_startup_callback: - try: - await self._run_callback_async(self.on_startup_callback, self) - except Exception as e: - self.logger.error(f"Error in startup callback: {e}") - try: - while not self.should_stop: - # Update units from the last update timestamp - self.update_units(self.units_update_timestamp) - - if self.on_update_callback: - await self._run_callback_async(self.on_update_callback, self) - await asyncio.sleep(self.interval) - except KeyboardInterrupt: - self.logger.info("Keyboard interrupt received") - self.stop() - finally: - self.logger.info("API stopped") - self.running = False + def send_command(self, command: str): + """ + Send a command to the API. + + Args: + command (str): The command to send. + """ + response = self._put(command) + if response.status_code == 200: + self.logger.info(f"Command sent successfully: {command}") + else: + self.logger.error(f"Failed to send command: {response.status_code} - {response.text}") + + def stop(self): + """ + Stop the API service gracefully. + """ + self.logger.info("Stopping API service...") + self.should_stop = True + + def run(self): + """ + Start the API service. + + This method initializes the API and starts the necessary components. + Sets up signal handlers for graceful shutdown. + """ + asyncio.run(self._run_async()) + diff --git a/scripts/python/API/example_disembarked_infantry.py b/scripts/python/API/example_disembarked_infantry.py index 6ff1fd90..02c5bada 100644 --- a/scripts/python/API/example_disembarked_infantry.py +++ b/scripts/python/API/example_disembarked_infantry.py @@ -5,7 +5,7 @@ from math import pi # Setup a logger for the module import logging -logger = logging.getLogger("TestBed") +logger = logging.getLogger("example_disembarked_infantry") logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s') diff --git a/scripts/python/API/voice_control.py b/scripts/python/API/example_voice_control.py similarity index 98% rename from scripts/python/API/voice_control.py rename to scripts/python/API/example_voice_control.py index bc1b1a43..ecb5a80c 100644 --- a/scripts/python/API/voice_control.py +++ b/scripts/python/API/example_voice_control.py @@ -6,7 +6,7 @@ from radio.radio_listener import RadioListener # Setup a logger for the module import logging -logger = logging.getLogger("OlympusVoiceControl") +logger = logging.getLogger("example_voice_control") logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s') diff --git a/scripts/python/API/kronos/kronos.py b/scripts/python/API/kronos/kronos.py new file mode 100644 index 00000000..b1c2193d --- /dev/null +++ b/scripts/python/API/kronos/kronos.py @@ -0,0 +1,16 @@ +# Setup a logger for the module +import logging +logger = logging.getLogger("Kronos") +logger.setLevel(logging.INFO) +handler = logging.StreamHandler() +formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +class Kronos(): + def __init__(self, api): + self.api = api + + def on_startup(self): + logger.info("Kronos API started") + \ No newline at end of file diff --git a/scripts/python/API/main.py b/scripts/python/API/main.py new file mode 100644 index 00000000..5804c9f6 --- /dev/null +++ b/scripts/python/API/main.py @@ -0,0 +1,20 @@ +from api import API +from kronos.kronos import Kronos + +############################################################################################## +# Main entry point for the script. It registers the callbacks and starts the API. +############################################################################################## +if __name__ == "__main__": + # Initialize the API + api = API() + + # Initialize Kronos with the API + kronos = Kronos(api) + + # Register the callbacks + api.register_on_startup_callback(kronos.on_startup) + + # Start the API, this will run forever until stopped + api.run() + + \ No newline at end of file