Add cargo weight and draw argument support

Introduces cargo weight and draw argument properties to units across backend, frontend, and Python API. Adds related commands, data extraction, and registration logic, enabling setting and reading of cargo weight and custom draw arguments for units. Includes new API examples and updates to interfaces, data types, and Lua backend for full feature integration.
This commit is contained in:
Pax1601
2025-09-11 21:47:11 +02:00
parent 73a7ea74f3
commit 3eef91fb24
29 changed files with 409 additions and 64 deletions

View File

@@ -23,6 +23,7 @@ Olympus.unitIndex = 0 -- Counter used to spread the computational load of data
Olympus.unitStep = 50 -- Max number of units that get updated each cycle
Olympus.units = {} -- Table holding references to all the currently existing units
Olympus.unitsInitialLife = {} -- getLife0 returns 0 for ships, so we need to store the initial life of units
Olympus.drawArguments = {} -- Table that sets what drawArguments to read for each unit
Olympus.weaponIndex = 0 -- Counter used to spread the computational load of data retrievial from DCS
Olympus.weaponStep = 50 -- Max number of weapons that get updated each cycle
@@ -1087,10 +1088,38 @@ function Olympus.setOnOff(groupName, onOff)
end
end
-- Get the unit description
function getUnitDescription(unit)
return unit:getDescr()
end
-- Set the unit cargo weight
function Olympus.setCargoWeight(ID, weight)
Olympus.debug("Olympus.setCargoWeight " .. ID .. " " .. tostring(weight), 2)
local unit = Olympus.getUnitByID(ID)
if unit ~= nil and unit:isExist() then
trigger.action.setUnitInternalCargo(unit:getName(), weight)
end
end
-- Register a drawArgument to be read for a unit
function Olympus.registerDrawArgument(ID, argument, active)
Olympus.debug("Olympus.registerDrawArgument " .. ID .. " " .. tostring(argument) .. " " .. tostring(active), 2)
-- Create the table if it does not exist
if Olympus.drawArguments[ID] == nil then
Olympus.drawArguments[ID] = {}
end
-- Set the draw argument to true or false
if active then
Olympus.drawArguments[ID][argument] = true
else
Olympus.drawArguments[ID][argument] = false
end
end
-- This function gets the navpoints from the DCS mission
function Olympus.getNavPoints()
local function extract_tag(str)
@@ -1293,6 +1322,20 @@ function Olympus.setUnitsData(arg, time)
table["radarState"] = false
end
end ]]
-- Read the draw arguments
local drawArguments = {}
if Olympus.drawArguments[ID] ~= nil then
for argument, active in pairs(Olympus.drawArguments[ID]) do
if active then
drawArguments[#drawArguments + 1] = {
argument = argument,
value = unit:getDrawArgumentValue(argument)
}
end
end
end
table["drawArguments"] = drawArguments
local group = unit:getGroup()
if group ~= nil then

View File

@@ -27,6 +27,30 @@
"program": "example_disembarked_infantry.py",
"console": "integratedTerminal",
"justMyCode": false,
},
{
"name": "Example set cargo weight",
"type": "debugpy",
"request": "launch",
"program": "example_set_cargo_weight.py",
"console": "integratedTerminal",
"justMyCode": false,
},
{
"name": "Example draw argument",
"type": "debugpy",
"request": "launch",
"program": "example_draw_argument.py",
"console": "integratedTerminal",
"justMyCode": false,
},
{
"name": "Example precise movement",
"type": "debugpy",
"request": "launch",
"program": "example_precise_movement.py",
"console": "integratedTerminal",
"justMyCode": false,
}
]
}

View File

@@ -1,6 +1,6 @@
import struct
from typing import List
from data.data_types import LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
from data.data_types import DrawArgument, LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
class DataExtractor:
def __init__(self, buffer: bytes):
@@ -48,6 +48,7 @@ class DataExtractor:
lat = self.extract_float64()
lng = self.extract_float64()
alt = self.extract_float64()
threshold = self.extract_float64()
return LatLng(lat, lng, alt)
def extract_from_bitmask(self, bitmask: int, position: int) -> bool:
@@ -136,4 +137,14 @@ class DataExtractor:
x=self.extract_float64(),
y=self.extract_float64(),
z=self.extract_float64()
)
)
def extract_draw_arguments(self) -> List[DrawArgument]:
value = []
size = self.extract_uint16()
for _ in range(size):
value.append(DrawArgument(
argument=self.extract_uint32(),
value=self.extract_float64()
))
return value

View File

@@ -67,4 +67,6 @@ class DataIndexes(Enum):
AIM_METHOD_RANGE = 63
ACQUISITION_RANGE = 64
AIRBORNE = 65
CARGO_WEIGHT = 66
DRAW_ARGUMENTS = 67
END_OF_DATA = 255

View File

@@ -8,13 +8,15 @@ class LatLng:
lat: float
lng: float
alt: float
threshold: Optional[float] = 0 # Optional threshold for proximity checks
def toJSON(self):
"""Convert LatLng to a JSON serializable dictionary."""
return {
"lat": self.lat,
"lng": self.lng,
"alt": self.alt
"alt": self.alt,
"threshold": self.threshold
}
def project_with_bearing_and_distance(self, d, bearing):
@@ -88,4 +90,9 @@ class Contact:
class Offset:
x: float
y: float
z: float
z: float
@dataclass
class DrawArgument:
argument: int
value: float

View File

@@ -0,0 +1,31 @@
from api import API
def on_api_startup(api: API):
units = api.update_units()
for unit in units.values():
if unit.name == "UH-1H":
# Register draw argument 43 for UH-1H
unit.register_draw_argument(43)
def on_api_update(api: API):
units = api.get_units()
for unit in units.values():
if unit.name == "UH-1H":
print(f"Draw Arguments for {unit.name}:")
for draw_arg in unit.draw_arguments:
print(f" Argument: {draw_arg.argument}, Value: {draw_arg.value}")
##############################################################################################
# Main entry point for the script. It registers the callbacks and starts the API.
##############################################################################################
if __name__ == "__main__":
# Initialize the API
api = API()
# Register the callbacks
api.register_on_update_callback(on_api_update)
api.register_on_startup_callback(on_api_startup)
# Start the API, this will run forever until stopped
api.run()

View File

@@ -1,5 +1,13 @@
from api import API
from kronos.kronos import Kronos
def on_api_startup(api: API):
units = api.update_units()
for unit in units.values():
if unit.name == "Infantry AK Ins":
current_pos = unit.position
next_pos = current_pos.project_with_bearing_and_distance(20, 0) # Move 20 meters north
next_pos.threshold = 2 # Set threshold to 1 meter, very precise
unit.set_path([next_pos])
##############################################################################################
# Main entry point for the script. It registers the callbacks and starts the API.
@@ -8,13 +16,9 @@ if __name__ == "__main__":
# Initialize the API
api = API()
# Initialize Kronos with the API
kronos = Kronos(api)
# Register the callbacks
api.register_on_startup_callback(kronos.on_startup)
api.register_on_startup_callback(on_api_startup)
# Start the API, this will run forever until stopped
api.run()

View File

@@ -0,0 +1,29 @@
from api import API
def on_api_startup(api: API):
units = api.update_units()
for unit in units.values():
if unit.name == "UH-1H":
# Set cargo weight to 5000 kg
unit.set_cargo_weight(5000.0)
def on_api_update(api: API):
units = api.get_units()
for unit in units.values():
if unit.name == "UH-1H":
print(f"Cargo Weight for {unit.name}: {unit.cargo_weight} kg")
##############################################################################################
# Main entry point for the script. It registers the callbacks and starts the API.
##############################################################################################
if __name__ == "__main__":
# Initialize the API
api = API()
# Register the callbacks
api.register_on_update_callback(on_api_update)
api.register_on_startup_callback(on_api_startup)
# Start the API, this will run forever until stopped
api.run()

View File

@@ -1,16 +0,0 @@
# Setup a logger for the module
import logging
logger = logging.getLogger("Kronos")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class Kronos():
def __init__(self, api):
self.api = api
def on_startup(self):
logger.info("Kronos API started")

View File

@@ -4,7 +4,7 @@
"port": 4512
},
"authentication": {
"gameMasterPassword": "a474219e5e9503c84d59500bb1bda3d9ade81e52d9fa1c234278770892a6dd74",
"gameMasterPassword": "a00a5973aacb17e4659125fbe10f4160d096dd84b2f586d2d75669462a30106d",
"blueCommanderPassword": "7d2e1ef898b21db7411f725a945b76ec8dcad340ed705eaf801bc82be6fe8a4a",
"redCommanderPassword": "abc5de7abdb8ed98f6d11d22c9d17593e339fde9cf4b9e170541b4f41af937e3"
},

View File

@@ -1,18 +0,0 @@
import re
# Read the file
with open('unit.py', 'r', encoding='utf-8') as f:
content = f.read()
# Pattern to match callback invocations
pattern = r'self\.on_property_change_callbacks\[\"(\w+)\"\]\(self, self\.(\w+)\)'
replacement = r'self._trigger_callback("\1", self.\2)'
# Replace all matches
new_content = re.sub(pattern, replacement, content)
# Write back to file
with open('unit.py', 'w', encoding='utf-8') as f:
f.write(new_content)
print('Updated all callback invocations')

View File

@@ -3,7 +3,7 @@ import asyncio
from data.data_extractor import DataExtractor
from data.data_indexes import DataIndexes
from data.data_types import LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
from data.data_types import DrawArgument, LatLng, TACAN, Radio, GeneralSettings, Ammo, Contact, Offset
from data.roes import ROES
from data.states import states
from utils.utils import enum_to_coalition
@@ -81,6 +81,8 @@ class Unit:
self.targeting_range = 0.0
self.aim_method_range = 0.0
self.acquisition_range = 0.0
self.cargo_weight = 0.0
self.draw_arguments: List[DrawArgument] = []
self.previous_total_ammo = 0
self.total_ammo = 0
@@ -654,6 +656,20 @@ class Unit:
# Trigger callbacks for property change
if "airborne" in self.on_property_change_callbacks:
self._trigger_callback("airborne", self.airborne)
elif datum_index == DataIndexes.CARGO_WEIGHT.value:
cargo_weight = data_extractor.extract_float64()
if cargo_weight != self.cargo_weight:
self.cargo_weight = cargo_weight
# Trigger callbacks for property change
if "cargo_weight" in self.on_property_change_callbacks:
self._trigger_callback("cargo_weight", self.cargo_weight)
elif datum_index == DataIndexes.DRAW_ARGUMENTS.value:
draw_arguments = data_extractor.extract_draw_arguments()
if draw_arguments != self.draw_arguments:
self.draw_arguments = draw_arguments
# Trigger callbacks for property change
if "draw_arguments" in self.on_property_change_callbacks:
self._trigger_callback("draw_arguments", self.draw_arguments)
# --- API functions requiring ID ---
def set_path(self, path: List[LatLng]):
@@ -758,6 +774,8 @@ class Unit:
def set_engagement_properties(self, barrel_height, muzzle_velocity, aim_time, shots_to_fire, shots_base_interval, shots_base_scatter, engagement_range, targeting_range, aim_method_range, acquisition_range):
return self.api.send_command({"setEngagementProperties": {"ID": self.ID, "barrelHeight": barrel_height, "muzzleVelocity": muzzle_velocity, "aimTime": aim_time, "shotsToFire": shots_to_fire, "shotsBaseInterval": shots_base_interval, "shotsBaseScatter": shots_base_scatter, "engagementRange": engagement_range, "targetingRange": targeting_range, "aimMethodRange": aim_method_range, "acquisitionRange": acquisition_range}})
def set_cargo_weight(self, cargo_weight: float):
return self.api.send_command({"setCargoWeight": {"ID": self.ID, "weight": cargo_weight}})
def register_draw_argument(self, argument: int, active: bool = True):
return self.api.send_command({"registerDrawArgument": {"ID": self.ID, "argument": argument, "active": active}})