feat: Add initial API implementation and databases

Introduces a new Python API module with supporting scripts for data extraction, data types, unit management, and utility functions. Adds JSON databases for aircraft, helicopters, ground units, navy units, and mods, as well as configuration and VSCode launch settings. This provides the foundation for interacting with and managing units, spawning, and logging within the application.
This commit is contained in:
Pax1601 2025-08-05 17:26:24 +02:00
parent 4fd9b7e6c2
commit 4bcb5936b4
16 changed files with 63158 additions and 0 deletions

16
scripts/python/API/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "App",
"type": "debugpy",
"request": "launch",
"program": "app.py",
"console": "integratedTerminal"
}
]
}

248
scripts/python/API/app.py Normal file
View File

@ -0,0 +1,248 @@
import hashlib
import json
from math import pi
from random import random
import requests
import base64
from data_extractor import DataExtractor
from data_types import LatLng
from unit import Unit
from unit_spawn_table import UnitSpawnTable
from datetime import datetime
class API:
def __init__(self, username: str = "API", databases_location: str = "databases"):
self.base_url = None
self.config = None
self.units: dict[str, Unit] = {}
self.username = username
self.databases_location = databases_location
# Read the config file olympus.json
try:
with open("olympus.json", "r") as file:
# Load the JSON configuration
self.config = json.load(file)
except FileNotFoundError:
raise Exception("Configuration file olympus.json not found.")
self.password = self.config.get("authentication").get("gameMasterPassword")
address = self.config.get("backend").get("address")
port = self.config.get("backend").get("port", None)
if port:
self.base_url = f"http://{address}:{port}/olympus"
else:
self.base_url = f"https://{address}/olympus"
# Read the aircraft, helicopter, groundunit and navyunit databases as json files
try:
with open(f"{self.databases_location}/aircraftdatabase.json", "r", -1, 'utf-8') as file:
self.aircraft_database = json.load(file)
except FileNotFoundError:
raise Exception("Aircraft database file not found.")
try:
with open(f"{self.databases_location}/helicopterdatabase.json", "r", -1, 'utf-8') as file:
self.helicopter_database = json.load(file)
except FileNotFoundError:
raise Exception("Helicopter database file not found.")
try:
with open(f"{self.databases_location}/groundunitdatabase.json", "r", -1, 'utf-8') as file:
self.groundunit_database = json.load(file)
except FileNotFoundError:
raise Exception("Ground unit database file not found.")
try:
with open(f"{self.databases_location}/navyunitdatabase.json", "r", -1, 'utf-8') as file:
self.navyunit_database = json.load(file)
except FileNotFoundError:
raise Exception("Navy unit database file not found.")
def get(self, endpoint):
credentials = f"{self.username}:{self.password}"
base64_encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Authorization": f"Basic {base64_encoded_credentials}"
}
response = requests.get(f"{self.base_url}/{endpoint}", headers=headers)
if response.status_code == 200:
return response
else:
response.raise_for_status()
def put(self, endpoint, data):
credentials = f"{self.username}:{self.password}"
base64_encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Authorization": f"Basic {base64_encoded_credentials}",
"Content-Type": "application/json"
}
response = requests.put(f"{self.base_url}/{endpoint}", headers=headers, json=data)
if response.status_code == 200:
return response
else:
response.raise_for_status()
def get_units(self):
response = self.get("/units")
if response.status_code == 200:
try:
data_extractor = DataExtractor(response.content)
# Extract the update timestamp
update_timestamp = data_extractor.extract_uint64()
print(f"Update Timestamp: {update_timestamp}")
while data_extractor.get_seek_position() < len(response.content):
# Extract the unit ID
unit_id = data_extractor.extract_uint32()
if unit_id not in self.units:
# Create a new Unit instance if it doesn't exist
self.units[unit_id] = Unit(unit_id)
self.units[unit_id].update_from_data_extractor(data_extractor)
return self.units
except ValueError:
raise Exception("Failed to parse JSON response")
else:
raise Exception(f"Failed to fetch units: {response.status_code} - {response.text}")
def get_logs(self, time = 0):
endpoint = "/logs"
endpoint += f"?time={time}"
response = self.get(endpoint)
if response.status_code == 200:
try:
logs = json.loads(response.content.decode('utf-8'))
return logs
except ValueError:
raise Exception("Failed to parse JSON response")
else:
raise Exception(f"Failed to fetch logs: {response.status_code} - {response.text}")
def spawn_aircrafts(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: list[LatLng]):
command = {
"units": [unit.toJSON() for unit in units],
"coalition": coalition,
"airbaseName": airbaseName,
"country": country,
"immediate": immediate,
"spawnPoints": spawnPoints,
}
data = { "spawnAircrafts": command }
self.put("", data)
def spanw_helicopters(self, units: list[UnitSpawnTable], coalition: str, airbaseName: str, country: str, immediate: bool, spawnPoints: list[LatLng]):
command = {
"units": [unit.toJSON() for unit in units],
"coalition": coalition,
"airbaseName": airbaseName,
"country": country,
"immediate": immediate,
"spawnPoints": spawnPoints,
}
data = { "spawnHelicopters": command }
self.put("", data)
def spawn_ground_units(self, units: list[UnitSpawnTable], coalition: str, country: str, immediate: bool, spawnPoints: list[LatLng]):
command = {
"units": [unit.toJSON() for unit in units],
"coalition": coalition,
"country": country,
"immediate": immediate,
"spawnPoints": spawnPoints,
}
data = { "spawnGroundUnits": command }
self.put("", data)
def spawn_navy_units(self, units: list[UnitSpawnTable], coalition: str, country: str, immediate: bool, spawnPoints: list[LatLng]):
command = {
"units": [unit.toJSON() for unit in units],
"coalition": coalition,
"country": country,
"immediate": immediate,
"spawnPoints": spawnPoints,
}
data = { "spawnNavyUnits": command }
self.put("", data)
def delete_unit(self, ID: int, explosion: bool, explosionType: str, immediate: bool):
command = {
"ID": ID,
"explosion": explosion,
"explosionType": explosionType,
"immediate": immediate,
}
data = { "deleteUnit": command }
self.put("", data)
if __name__ == "__main__":
api = API()
try:
# Example usage
# Get the units from the API
print("Fetching units...")
units = api.get_units()
print("Units:", units)
# Example of spawning aircrafts
print("Spawning aircrafts...")
spawn_units = [
UnitSpawnTable(
unit_type="A-10C_2",
location=LatLng(lat=35.0, lng=35.0, alt=1000),
skill="High",
livery_id="Default",
altitude=1000,
loadout="Default",
heading=pi/2
)
]
api.spawn_aircrafts(spawn_units, "blue", "", "", False, 0)
# Spawn a random navy unit
print("Spawning navy units...")
random_navy_unit = list(api.navyunit_database.keys())[int(random() * len(api.navyunit_database))]
spawn_navy_units = [
UnitSpawnTable(
unit_type=random_navy_unit,
location=LatLng(lat=35.0, lng=35.0, alt=0),
skill="High",
livery_id="Default",
altitude=None,
loadout=None,
heading=0
)
]
api.spawn_navy_units(spawn_navy_units, "blue", "", False, 0)
# Example of deleting a unit
# Get all the unit of type A-10C_2
a10_units = [unit for unit in units.values() if unit.name == "A-10C_2"]
for unit in a10_units:
api.delete_unit(unit.ID, explosion=False, explosionType="", immediate=False)
# Fetch logs from the API
print("Fetching logs...")
logs = api.get_logs()["logs"]
# Pretty print the logs
print("Logs:")
# The log is a dictionary. The key is the timestamp and the value is the log message
for timestamp, log_message in logs.items():
# The timestamp is in milliseconds from unix epoch
timestamp = int(timestamp) / 1000 # Convert to seconds
iso_time = datetime.fromtimestamp(timestamp).isoformat()
print(f"{iso_time}: {log_message}")
except Exception as e:
print("An error occurred:", e)

View File

@ -0,0 +1,139 @@
import struct
from typing import List
from data_types import LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
class DataExtractor:
def __init__(self, buffer: bytes):
self._seek_position = 0
self._buffer = buffer
self._length = len(buffer)
def set_seek_position(self, seek_position: int):
self._seek_position = seek_position
def get_seek_position(self) -> int:
return self._seek_position
def extract_bool(self) -> bool:
value = struct.unpack_from('<B', self._buffer, self._seek_position)[0]
self._seek_position += 1
return value > 0
def extract_uint8(self) -> int:
value = struct.unpack_from('<B', self._buffer, self._seek_position)[0]
self._seek_position += 1
return value
def extract_uint16(self) -> int:
value = struct.unpack_from('<H', self._buffer, self._seek_position)[0]
self._seek_position += 2
return value
def extract_uint32(self) -> int:
value = struct.unpack_from('<I', self._buffer, self._seek_position)[0]
self._seek_position += 4
return value
def extract_uint64(self) -> int:
value = struct.unpack_from('<Q', self._buffer, self._seek_position)[0]
self._seek_position += 8
return value
def extract_float64(self) -> float:
value = struct.unpack_from('<d', self._buffer, self._seek_position)[0]
self._seek_position += 8
return value
def extract_lat_lng(self) -> LatLng:
lat = self.extract_float64()
lng = self.extract_float64()
alt = self.extract_float64()
return LatLng(lat, lng, alt)
def extract_from_bitmask(self, bitmask: int, position: int) -> bool:
return ((bitmask >> position) & 1) > 0
def extract_string(self, length: int = None) -> str:
if length is None:
length = self.extract_uint16()
string_buffer = self._buffer[self._seek_position:self._seek_position + length]
# Find null terminator
string_length = length
for idx, byte_val in enumerate(string_buffer):
if byte_val == 0:
string_length = idx
break
try:
value = string_buffer[:string_length].decode('utf-8').strip()
except UnicodeDecodeError:
value = string_buffer[:string_length].decode('utf-8', errors='ignore').strip()
self._seek_position += length
return value
def extract_char(self) -> str:
return self.extract_string(1)
def extract_tacan(self) -> TACAN:
return TACAN(
is_on=self.extract_bool(),
channel=self.extract_uint8(),
xy=self.extract_char(),
callsign=self.extract_string(4)
)
def extract_radio(self) -> Radio:
return Radio(
frequency=self.extract_uint32(),
callsign=self.extract_uint8(),
callsign_number=self.extract_uint8()
)
def extract_general_settings(self) -> GeneralSettings:
return GeneralSettings(
prohibit_jettison=self.extract_bool(),
prohibit_aa=self.extract_bool(),
prohibit_ag=self.extract_bool(),
prohibit_afterburner=self.extract_bool(),
prohibit_air_wpn=self.extract_bool()
)
def extract_ammo(self) -> List[Ammo]:
value = []
size = self.extract_uint16()
for _ in range(size):
value.append(Ammo(
quantity=self.extract_uint16(),
name=self.extract_string(33),
guidance=self.extract_uint8(),
category=self.extract_uint8(),
missile_category=self.extract_uint8()
))
return value
def extract_contacts(self) -> List[Contact]:
value = []
size = self.extract_uint16()
for _ in range(size):
value.append(Contact(
id=self.extract_uint32(),
detection_method=self.extract_uint8()
))
return value
def extract_active_path(self) -> List[LatLng]:
value = []
size = self.extract_uint16()
for _ in range(size):
value.append(self.extract_lat_lng())
return value
def extract_offset(self) -> Offset:
return Offset(
x=self.extract_float64(),
y=self.extract_float64(),
z=self.extract_float64()
)

View File

@ -0,0 +1,70 @@
from enum import Enum
class DataIndexes(Enum):
START_OF_DATA = 0
CATEGORY = 1
ALIVE = 2
ALARM_STATE = 3
RADAR_STATE = 4
HUMAN = 5
CONTROLLED = 6
COALITION = 7
COUNTRY = 8
NAME = 9
UNIT_NAME = 10
CALLSIGN = 11
UNIT_ID = 12
GROUP_ID = 13
GROUP_NAME = 14
STATE = 15
TASK = 16
HAS_TASK = 17
POSITION = 18
SPEED = 19
HORIZONTAL_VELOCITY = 20
VERTICAL_VELOCITY = 21
HEADING = 22
TRACK = 23
IS_ACTIVE_TANKER = 24
IS_ACTIVE_AWACS = 25
ON_OFF = 26
FOLLOW_ROADS = 27
FUEL = 28
DESIRED_SPEED = 29
DESIRED_SPEED_TYPE = 30
DESIRED_ALTITUDE = 31
DESIRED_ALTITUDE_TYPE = 32
LEADER_ID = 33
FORMATION_OFFSET = 34
TARGET_ID = 35
TARGET_POSITION = 36
ROE = 37
REACTION_TO_THREAT = 38
EMISSIONS_COUNTERMEASURES = 39
TACAN = 40
RADIO = 41
GENERAL_SETTINGS = 42
AMMO = 43
CONTACTS = 44
ACTIVE_PATH = 45
IS_LEADER = 46
OPERATE_AS = 47
SHOTS_SCATTER = 48
SHOTS_INTENSITY = 49
HEALTH = 50
RACETRACK_LENGTH = 51
RACETRACK_ANCHOR = 52
RACETRACK_BEARING = 53
TIME_TO_NEXT_TASKING = 54
BARREL_HEIGHT = 55
MUZZLE_VELOCITY = 56
AIM_TIME = 57
SHOTS_TO_FIRE = 58
SHOTS_BASE_INTERVAL = 59
SHOTS_BASE_SCATTER = 60
ENGAGEMENT_RANGE = 61
TARGETING_RANGE = 62
AIM_METHOD_RANGE = 63
ACQUISITION_RANGE = 64
AIRBORNE = 65
END_OF_DATA = 255

View File

@ -0,0 +1,56 @@
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class LatLng:
lat: float
lng: float
alt: float
def toJSON(self):
"""Convert LatLng to a JSON serializable dictionary."""
return {
"lat": self.lat,
"lng": self.lng,
"alt": self.alt
}
@dataclass
class TACAN:
is_on: bool
channel: int
xy: str
callsign: str
@dataclass
class Radio:
frequency: int
callsign: int
callsign_number: int
@dataclass
class GeneralSettings:
prohibit_jettison: bool
prohibit_aa: bool
prohibit_ag: bool
prohibit_afterburner: bool
prohibit_air_wpn: bool
@dataclass
class Ammo:
quantity: int
name: str
guidance: int
category: int
missile_category: int
@dataclass
class Contact:
id: int
detection_method: int
@dataclass
class Offset:
x: float
y: float
z: float

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,3 @@
{
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
{
"backend": {
"address": "refugees.dcsolympus.com/direct"
},
"authentication": {
"gameMasterPassword": "a00a5973aacb17e4659125fbe10f4160d096dd84b2f586d2d75669462a30106d",
"blueCommanderPassword": "7d2e1ef898b21db7411f725a945b76ec8dcad340ed705eaf801bc82be6fe8a4a",
"redCommanderPassword": "abc5de7abdb8ed98f6d11d22c9d17593e339fde9cf4b9e170541b4f41af937e3"
},
"frontend": {
"port": 3000,
"autoconnectWhenLocal": true,
"customAuthHeaders": {
"enabled": false,
"username": "X-Authorized",
"group": "X-Group"
},
"elevationProvider": {
"provider": "https://srtm.fasma.org/{lat}{lng}.SRTMGL3S.hgt.zip",
"username": null,
"password": null
},
"mapLayers": {
"ArcGIS Satellite": {
"urlTemplate": "https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}",
"minZoom": 1,
"maxZoom": 19,
"attribution": "Tiles © Esri — Source: Esri, i-cubed, USDA, USGS, AEX, GeoEye, Mapping, Aerogrid, IGN, IGP, UPR-EGP, and the GIS User Community"
},
"OpenStreetMap Mapnik": {
"urlTemplate": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
"minZoom": 1,
"maxZoom": 20,
"attribution": "OpenStreetMap contributors"
}
},
"mapMirrors": {
"DCS Map (Official)": "https://maps.dcsolympus.com/maps",
"DCS Map (Alt.)": "https://refugees.dcsolympus.com/maps"
}
},
"audio": {
"SRSPort": 5002,
"WSPort": 4000
}
}

View File

@ -0,0 +1 @@
ROES = ["", "free", "designated", "return", "hold"]

View File

@ -0,0 +1,19 @@
states = [
"none",
"idle",
"reach-destination",
"attack",
"follow",
"land",
"refuel",
"AWACS",
"tanker",
"bomb-point",
"carpet-bomb",
"bomb-building",
"fire-at-area",
"simulate-fire-fight",
"scenic-aaa",
"miss-on-purpose",
"land-at-point"
]

214
scripts/python/API/unit.py Normal file
View File

@ -0,0 +1,214 @@
from data_extractor import DataExtractor
from data_indexes import DataIndexes
from data_types import LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
from typing import List
from roes import ROES
from states import states
from utils import enum_to_coalition
class Unit:
def __init__(self, id: int):
self.ID = id
# Data controlled directly by the backend
self.alive = False
self.alarm_state = "AUTO"
self.human = False
self.controlled = False
self.coalition = "neutral"
self.country = 0
self.name = ""
self.unit_name = ""
self.callsign = ""
self.group_id = 0
self.unit_id = 0
self.group_name = ""
self.state = ""
self.task = ""
self.has_task = False
self.position = LatLng(0, 0, 0)
self.speed = 0.0
self.horizontal_velocity = 0.0
self.vertical_velocity = 0.0
self.heading = 0.0
self.track = 0.0
self.is_active_tanker = False
self.is_active_awacs = False
self.on_off = True
self.follow_roads = False
self.fuel = 0
self.desired_speed = 0.0
self.desired_speed_type = "CAS"
self.desired_altitude = 0.0
self.desired_altitude_type = "ASL"
self.leader_id = 0
self.formation_offset = Offset(0, 0, 0)
self.target_id = 0
self.target_position = LatLng(0, 0, 0)
self.roe = ""
self.reaction_to_threat = ""
self.emissions_countermeasures = ""
self.tacan = TACAN(False, 0, "X", "TKR")
self.radio = Radio(124000000, 1, 1)
self.general_settings = GeneralSettings(False, False, False, False, False)
self.ammo: List[Ammo] = []
self.contacts: List[Contact] = []
self.active_path: List[LatLng] = []
self.is_leader = False
self.operate_as = "blue"
self.shots_scatter = 2
self.shots_intensity = 2
self.health = 100
self.racetrack_length = 0.0
self.racetrack_anchor = LatLng(0, 0, 0)
self.racetrack_bearing = 0.0
self.airborne = False
self.previous_total_ammo = 0
self.total_ammo = 0
def __repr__(self):
return f"Unit(id={self.ID}, name={self.name}, coalition={self.coalition}, position={self.position})"
def update_from_data_extractor(self, data_extractor: DataExtractor):
datum_index = 0
while datum_index != DataIndexes.END_OF_DATA.value:
datum_index = data_extractor.extract_uint8()
if datum_index == DataIndexes.CATEGORY.value:
data_extractor.extract_string()
elif datum_index == DataIndexes.ALIVE.value:
self.alive = data_extractor.extract_bool()
elif datum_index == DataIndexes.RADAR_STATE.value:
self.radar_state = data_extractor.extract_bool()
elif datum_index == DataIndexes.HUMAN.value:
self.human = data_extractor.extract_bool()
elif datum_index == DataIndexes.CONTROLLED.value:
self.controlled = data_extractor.extract_bool()
elif datum_index == DataIndexes.COALITION.value:
new_coalition = enum_to_coalition(data_extractor.extract_uint8())
self.coalition = new_coalition
elif datum_index == DataIndexes.COUNTRY.value:
self.country = data_extractor.extract_uint8()
elif datum_index == DataIndexes.NAME.value:
self.name = data_extractor.extract_string()
elif datum_index == DataIndexes.UNIT_NAME.value:
self.unit_name = data_extractor.extract_string()
elif datum_index == DataIndexes.CALLSIGN.value:
self.callsign = data_extractor.extract_string()
elif datum_index == DataIndexes.UNIT_ID.value:
self.unit_id = data_extractor.extract_uint32()
elif datum_index == DataIndexes.GROUP_ID.value:
self.group_id = data_extractor.extract_uint32()
elif datum_index == DataIndexes.GROUP_NAME.value:
self.group_name = data_extractor.extract_string()
elif datum_index == DataIndexes.STATE.value:
self.state = states[data_extractor.extract_uint8()]
elif datum_index == DataIndexes.TASK.value:
self.task = data_extractor.extract_string()
elif datum_index == DataIndexes.HAS_TASK.value:
self.has_task = data_extractor.extract_bool()
elif datum_index == DataIndexes.POSITION.value:
self.position = data_extractor.extract_lat_lng()
elif datum_index == DataIndexes.SPEED.value:
self.speed = data_extractor.extract_float64()
elif datum_index == DataIndexes.HORIZONTAL_VELOCITY.value:
self.horizontal_velocity = data_extractor.extract_float64()
elif datum_index == DataIndexes.VERTICAL_VELOCITY.value:
self.vertical_velocity = data_extractor.extract_float64()
elif datum_index == DataIndexes.HEADING.value:
self.heading = data_extractor.extract_float64()
elif datum_index == DataIndexes.TRACK.value:
self.track = data_extractor.extract_float64()
elif datum_index == DataIndexes.IS_ACTIVE_TANKER.value:
self.is_active_tanker = data_extractor.extract_bool()
elif datum_index == DataIndexes.IS_ACTIVE_AWACS.value:
self.is_active_awacs = data_extractor.extract_bool()
elif datum_index == DataIndexes.ON_OFF.value:
self.on_off = data_extractor.extract_bool()
elif datum_index == DataIndexes.FOLLOW_ROADS.value:
self.follow_roads = data_extractor.extract_bool()
elif datum_index == DataIndexes.FUEL.value:
self.fuel = data_extractor.extract_uint16()
elif datum_index == DataIndexes.DESIRED_SPEED.value:
self.desired_speed = data_extractor.extract_float64()
elif datum_index == DataIndexes.DESIRED_SPEED_TYPE.value:
self.desired_speed_type = "GS" if data_extractor.extract_bool() else "CAS"
elif datum_index == DataIndexes.DESIRED_ALTITUDE.value:
self.desired_altitude = data_extractor.extract_float64()
elif datum_index == DataIndexes.DESIRED_ALTITUDE_TYPE.value:
self.desired_altitude_type = "AGL" if data_extractor.extract_bool() else "ASL"
elif datum_index == DataIndexes.LEADER_ID.value:
self.leader_id = data_extractor.extract_uint32()
elif datum_index == DataIndexes.FORMATION_OFFSET.value:
self.formation_offset = data_extractor.extract_offset()
elif datum_index == DataIndexes.TARGET_ID.value:
self.target_id = data_extractor.extract_uint32()
elif datum_index == DataIndexes.TARGET_POSITION.value:
self.target_position = data_extractor.extract_lat_lng()
elif datum_index == DataIndexes.ROE.value:
self.roe = ROES[data_extractor.extract_uint8()]
elif datum_index == DataIndexes.ALARM_STATE.value:
self.alarm_state = self.enum_to_alarm_state(data_extractor.extract_uint8())
elif datum_index == DataIndexes.REACTION_TO_THREAT.value:
self.reaction_to_threat = self.enum_to_reaction_to_threat(data_extractor.extract_uint8())
elif datum_index == DataIndexes.EMISSIONS_COUNTERMEASURES.value:
self.emissions_countermeasures = self.enum_to_emission_countermeasure(data_extractor.extract_uint8())
elif datum_index == DataIndexes.TACAN.value:
self.tacan = data_extractor.extract_tacan()
elif datum_index == DataIndexes.RADIO.value:
self.radio = data_extractor.extract_radio()
elif datum_index == DataIndexes.GENERAL_SETTINGS.value:
self.general_settings = data_extractor.extract_general_settings()
elif datum_index == DataIndexes.AMMO.value:
self.ammo = data_extractor.extract_ammo()
self.previous_total_ammo = self.total_ammo
self.total_ammo = sum(ammo.quantity for ammo in self.ammo)
elif datum_index == DataIndexes.CONTACTS.value:
self.contacts = data_extractor.extract_contacts()
elif datum_index == DataIndexes.ACTIVE_PATH.value:
self.active_path = data_extractor.extract_active_path()
elif datum_index == DataIndexes.IS_LEADER.value:
self.is_leader = data_extractor.extract_bool()
elif datum_index == DataIndexes.OPERATE_AS.value:
self.operate_as = self.enum_to_coalition(data_extractor.extract_uint8())
elif datum_index == DataIndexes.SHOTS_SCATTER.value:
self.shots_scatter = data_extractor.extract_uint8()
elif datum_index == DataIndexes.SHOTS_INTENSITY.value:
self.shots_intensity = data_extractor.extract_uint8()
elif datum_index == DataIndexes.HEALTH.value:
self.health = data_extractor.extract_uint8()
elif datum_index == DataIndexes.RACETRACK_LENGTH.value:
self.racetrack_length = data_extractor.extract_float64()
elif datum_index == DataIndexes.RACETRACK_ANCHOR.value:
self.racetrack_anchor = data_extractor.extract_lat_lng()
elif datum_index == DataIndexes.RACETRACK_BEARING.value:
self.racetrack_bearing = data_extractor.extract_float64()
elif datum_index == DataIndexes.TIME_TO_NEXT_TASKING.value:
self.time_to_next_tasking = data_extractor.extract_float64()
elif datum_index == DataIndexes.BARREL_HEIGHT.value:
self.barrel_height = data_extractor.extract_float64()
elif datum_index == DataIndexes.MUZZLE_VELOCITY.value:
self.muzzle_velocity = data_extractor.extract_float64()
elif datum_index == DataIndexes.AIM_TIME.value:
self.aim_time = data_extractor.extract_float64()
elif datum_index == DataIndexes.SHOTS_TO_FIRE.value:
self.shots_to_fire = data_extractor.extract_uint32()
elif datum_index == DataIndexes.SHOTS_BASE_INTERVAL.value:
self.shots_base_interval = data_extractor.extract_float64()
elif datum_index == DataIndexes.SHOTS_BASE_SCATTER.value:
self.shots_base_scatter = data_extractor.extract_float64()
elif datum_index == DataIndexes.ENGAGEMENT_RANGE.value:
self.engagement_range = data_extractor.extract_float64()
elif datum_index == DataIndexes.TARGETING_RANGE.value:
self.targeting_range = data_extractor.extract_float64()
elif datum_index == DataIndexes.AIM_METHOD_RANGE.value:
self.aim_method_range = data_extractor.extract_float64()
elif datum_index == DataIndexes.ACQUISITION_RANGE.value:
self.acquisition_range = data_extractor.extract_float64()
elif datum_index == DataIndexes.AIRBORNE.value:
self.airborne = data_extractor.extract_bool()

View File

@ -0,0 +1,30 @@
from dataclasses import dataclass
from typing import Optional
from data_types import LatLng
@dataclass
class UnitSpawnTable:
"""Unit spawn table data structure for spawning units."""
unit_type: str
location: LatLng
skill: str
livery_id: str
altitude: Optional[int] = None
loadout: Optional[str] = None
heading: Optional[int] = None
def toJSON(self):
"""Convert the unit spawn table to a JSON serializable dictionary."""
return {
"unitType": self.unit_type,
"location": {
"lat": self.location.lat,
"lng": self.location.lng,
"alt": self.location.alt
},
"skill": self.skill,
"liveryID": self.livery_id,
"altitude": self.altitude,
"loadout": self.loadout,
"heading": self.heading
}

View File

@ -0,0 +1,19 @@
def enum_to_coalition(coalition_id: int) -> str:
if coalition_id == 0:
return "neutral"
elif coalition_id == 1:
return "red"
elif coalition_id == 2:
return "blue"
return ""
def coalition_to_enum(coalition: str) -> int:
if coalition == "neutral":
return 0
elif coalition == "red":
return 1
elif coalition == "blue":
return 2
return 0
1