mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
492 lines
18 KiB
Python
492 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
from functools import cache, cached_property
|
|
from pathlib import Path
|
|
from typing import Any, ClassVar, Dict, Iterator, Optional, TYPE_CHECKING, Type
|
|
|
|
import yaml
|
|
from dcs.helicopters import helicopter_map
|
|
from dcs.planes import plane_map
|
|
from dcs.unittype import FlyingType
|
|
|
|
from game.data.units import UnitClass
|
|
from game.dcs.unitproperty import UnitProperty
|
|
from game.dcs.unittype import UnitType
|
|
from game.radio.channels import (
|
|
ApacheChannelNamer,
|
|
ChannelNamer,
|
|
CommonRadioChannelAllocator,
|
|
FarmerRadioChannelAllocator,
|
|
HueyChannelNamer,
|
|
MirageChannelNamer,
|
|
MirageF1CEChannelNamer,
|
|
NoOpChannelAllocator,
|
|
RadioChannelAllocator,
|
|
SCR522ChannelNamer,
|
|
SCR522RadioChannelAllocator,
|
|
SingleRadioChannelNamer,
|
|
TomcatChannelNamer,
|
|
ViggenChannelNamer,
|
|
ViggenRadioChannelAllocator,
|
|
ViperChannelNamer,
|
|
)
|
|
from game.utils import (
|
|
Distance,
|
|
ImperialUnits,
|
|
MetricUnits,
|
|
NauticalUnits,
|
|
SPEED_OF_SOUND_AT_SEA_LEVEL,
|
|
Speed,
|
|
UnitSystem,
|
|
feet,
|
|
knots,
|
|
kph,
|
|
nautical_miles,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from game.ato import FlightType
|
|
from game.missiongenerator.aircraft.flightdata import FlightData
|
|
from game.missiongenerator.missiondata import MissionData
|
|
from game.radio.radios import Radio, RadioFrequency, RadioRegistry
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RadioConfig:
|
|
inter_flight: Optional[Radio]
|
|
intra_flight: Optional[Radio]
|
|
channel_allocator: Optional[RadioChannelAllocator]
|
|
channel_namer: Type[ChannelNamer]
|
|
|
|
@classmethod
|
|
def from_data(cls, data: dict[str, Any]) -> RadioConfig:
|
|
return RadioConfig(
|
|
cls.make_radio(data.get("inter_flight", None)),
|
|
cls.make_radio(data.get("intra_flight", None)),
|
|
cls.make_allocator(data.get("channels", {})),
|
|
cls.make_namer(data.get("channels", {})),
|
|
)
|
|
|
|
@classmethod
|
|
def make_radio(cls, name: Optional[str]) -> Optional[Radio]:
|
|
from game.radio.radios import get_radio
|
|
|
|
if name is None:
|
|
return None
|
|
return get_radio(name)
|
|
|
|
@classmethod
|
|
def make_allocator(cls, data: dict[str, Any]) -> Optional[RadioChannelAllocator]:
|
|
try:
|
|
alloc_type = data["type"]
|
|
except KeyError:
|
|
return None
|
|
allocator_type: Type[RadioChannelAllocator] = {
|
|
"SCR-522": SCR522RadioChannelAllocator,
|
|
"common": CommonRadioChannelAllocator,
|
|
"farmer": FarmerRadioChannelAllocator,
|
|
"noop": NoOpChannelAllocator,
|
|
"viggen": ViggenRadioChannelAllocator,
|
|
}[alloc_type]
|
|
return allocator_type.from_cfg(data)
|
|
|
|
@classmethod
|
|
def make_namer(cls, config: dict[str, Any]) -> Type[ChannelNamer]:
|
|
return {
|
|
"SCR-522": SCR522ChannelNamer,
|
|
"default": ChannelNamer,
|
|
"huey": HueyChannelNamer,
|
|
"mirage": MirageChannelNamer,
|
|
"mirage-f1ce": MirageF1CEChannelNamer,
|
|
"single": SingleRadioChannelNamer,
|
|
"tomcat": TomcatChannelNamer,
|
|
"viggen": ViggenChannelNamer,
|
|
"viper": ViperChannelNamer,
|
|
"apache": ApacheChannelNamer,
|
|
}[config.get("namer", "default")]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PatrolConfig:
|
|
altitude: Optional[Distance]
|
|
speed: Optional[Speed]
|
|
|
|
@classmethod
|
|
def from_data(cls, data: dict[str, Any]) -> PatrolConfig:
|
|
altitude = data.get("altitude", None)
|
|
speed = data.get("speed", None)
|
|
return PatrolConfig(
|
|
feet(altitude) if altitude is not None else None,
|
|
knots(speed) if speed is not None else None,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FuelConsumption:
|
|
#: The estimated taxi fuel requirement, in pounds.
|
|
taxi: int
|
|
|
|
#: The estimated fuel consumption for a takeoff climb, in pounds per nautical mile.
|
|
climb: float
|
|
|
|
#: The estimated fuel consumption for cruising, in pounds per nautical mile.
|
|
cruise: float
|
|
|
|
#: The estimated fuel consumption for combat speeds, in pounds per nautical mile.
|
|
combat: float
|
|
|
|
#: The minimum amount of fuel that the aircraft should land with, in pounds. This is
|
|
#: a reserve amount for landing delays or emergencies.
|
|
min_safe: int
|
|
|
|
@classmethod
|
|
def from_data(cls, data: dict[str, Any]) -> FuelConsumption:
|
|
return FuelConsumption(
|
|
int(data["taxi"]),
|
|
float(data["climb_ppm"]),
|
|
float(data["cruise_ppm"]),
|
|
float(data["combat_ppm"]),
|
|
int(data["min_safe"]),
|
|
)
|
|
|
|
|
|
# TODO: Split into PlaneType and HelicopterType?
|
|
@dataclass(frozen=True)
|
|
class AircraftType(UnitType[Type[FlyingType]]):
|
|
carrier_capable: bool
|
|
lha_capable: bool
|
|
always_keeps_gun: bool
|
|
|
|
# If true, the aircraft does not use the guns as the last resort weapons, but as a
|
|
# main weapon. It'll RTB when it doesn't have gun ammo left.
|
|
gunfighter: bool
|
|
|
|
# UnitSystem to use for the kneeboard, defaults to Nautical (kt/nm/ft)
|
|
kneeboard_units: UnitSystem
|
|
|
|
# If true, kneeboards will display zulu times
|
|
utc_kneeboard: bool
|
|
|
|
max_group_size: int
|
|
patrol_altitude: Optional[Distance]
|
|
patrol_speed: Optional[Speed]
|
|
|
|
#: The maximum range between the origin airfield and the target for which the auto-
|
|
#: planner will consider this aircraft usable for a mission.
|
|
max_mission_range: Distance
|
|
|
|
fuel_consumption: Optional[FuelConsumption]
|
|
|
|
default_livery: Optional[str]
|
|
|
|
intra_flight_radio: Optional[Radio]
|
|
channel_allocator: Optional[RadioChannelAllocator]
|
|
channel_namer: Type[ChannelNamer]
|
|
|
|
# Logisitcs info
|
|
# cabin_size defines how many troops can be loaded. 0 means the aircraft can not
|
|
# transport any troops. Default for helos is 10, non helos will have 0.
|
|
cabin_size: int
|
|
# If the aircraft can carry crates can_carry_crates should be set to true which
|
|
# will be set to true for helos by default
|
|
can_carry_crates: bool
|
|
|
|
# Set to True when aircraft mounts a targeting pod by default i.e. the pod does
|
|
# not take up a weapons station. If True, do not replace LGBs with dumb bombs
|
|
# when no TGP is mounted on any station.
|
|
has_built_in_target_pod: bool
|
|
|
|
task_priorities: dict[FlightType, int]
|
|
|
|
_by_name: ClassVar[dict[str, AircraftType]] = {}
|
|
_by_unit_type: ClassVar[dict[type[FlyingType], list[AircraftType]]] = defaultdict(
|
|
list
|
|
)
|
|
|
|
@classmethod
|
|
def register(cls, unit_type: AircraftType) -> None:
|
|
cls._by_name[unit_type.name] = unit_type
|
|
cls._by_unit_type[unit_type.dcs_unit_type].append(unit_type)
|
|
|
|
@property
|
|
def flyable(self) -> bool:
|
|
return self.dcs_unit_type.flyable
|
|
|
|
@property
|
|
def helicopter(self) -> bool:
|
|
return self.dcs_unit_type.helicopter
|
|
|
|
@property
|
|
def max_fuel(self) -> float:
|
|
return self.dcs_unit_type.fuel_max
|
|
|
|
@cached_property
|
|
def max_speed(self) -> Speed:
|
|
return kph(self.dcs_unit_type.max_speed)
|
|
|
|
@property
|
|
def preferred_patrol_altitude(self) -> Distance:
|
|
if self.patrol_altitude is not None:
|
|
return self.patrol_altitude
|
|
else:
|
|
# Estimate based on max speed.
|
|
# Aircaft with max speed 600 kph will prefer patrol at 10 000 ft
|
|
# Aircraft with max speed 2800 kph will prefer pratrol at 33 000 ft
|
|
altitude_for_lowest_speed = feet(10 * 1000)
|
|
altitude_for_highest_speed = feet(33 * 1000)
|
|
lowest_speed = kph(600)
|
|
highest_speed = kph(2800)
|
|
factor = (self.max_speed - lowest_speed).kph / (
|
|
highest_speed - lowest_speed
|
|
).kph
|
|
altitude = (
|
|
altitude_for_lowest_speed
|
|
+ (altitude_for_highest_speed - altitude_for_lowest_speed) * factor
|
|
)
|
|
logging.debug(
|
|
f"Preferred patrol altitude for {self.dcs_unit_type.id}: {altitude.feet}"
|
|
)
|
|
rounded_altitude = feet(round(1000 * round(altitude.feet / 1000)))
|
|
return max(
|
|
altitude_for_lowest_speed,
|
|
min(altitude_for_highest_speed, rounded_altitude),
|
|
)
|
|
|
|
def preferred_patrol_speed(self, altitude: Distance) -> Speed:
|
|
"""Preferred true airspeed when patrolling"""
|
|
if self.patrol_speed is not None:
|
|
return self.patrol_speed
|
|
else:
|
|
# Estimate based on max speed.
|
|
max_speed = self.max_speed
|
|
if max_speed > SPEED_OF_SOUND_AT_SEA_LEVEL * 1.6:
|
|
# Fast airplanes, should manage pretty high patrol speed
|
|
return (
|
|
Speed.from_mach(0.85, altitude)
|
|
if altitude.feet > 20000
|
|
else Speed.from_mach(0.7, altitude)
|
|
)
|
|
elif max_speed > SPEED_OF_SOUND_AT_SEA_LEVEL * 1.2:
|
|
# Medium-fast like F/A-18C
|
|
return (
|
|
Speed.from_mach(0.8, altitude)
|
|
if altitude.feet > 20000
|
|
else Speed.from_mach(0.65, altitude)
|
|
)
|
|
elif max_speed > SPEED_OF_SOUND_AT_SEA_LEVEL * 0.7:
|
|
# Semi-fast like airliners or similar
|
|
return (
|
|
Speed.from_mach(0.5, altitude)
|
|
if altitude.feet > 20000
|
|
else Speed.from_mach(0.4, altitude)
|
|
)
|
|
else:
|
|
# Slow like warbirds or helicopters
|
|
# Use whichever is slowest - mach 0.35 or 70% of max speed
|
|
logging.debug(f"{self.name} max_speed * 0.7 is {max_speed * 0.7}")
|
|
return min(Speed.from_mach(0.35, altitude), max_speed * 0.7)
|
|
|
|
def alloc_flight_radio(self, radio_registry: RadioRegistry) -> RadioFrequency:
|
|
from game.radio.radios import ChannelInUseError, kHz
|
|
|
|
if self.intra_flight_radio is not None:
|
|
return radio_registry.alloc_for_radio(self.intra_flight_radio)
|
|
|
|
# The default radio frequency is set in megahertz. For some aircraft, it is a
|
|
# floating point value. For all current aircraft, adjusting to kilohertz will be
|
|
# sufficient to convert to an integer.
|
|
in_khz = float(self.dcs_unit_type.radio_frequency) * 1000
|
|
if not in_khz.is_integer():
|
|
logging.warning(
|
|
f"Found unexpected sub-kHz default radio for {self}: {in_khz} kHz. "
|
|
"Truncating to integer. The truncated frequency may not be valid for "
|
|
"the aircraft."
|
|
)
|
|
|
|
freq = kHz(int(in_khz))
|
|
try:
|
|
radio_registry.reserve(freq)
|
|
except ChannelInUseError:
|
|
pass
|
|
return freq
|
|
|
|
def assign_channels_for_flight(
|
|
self, flight: FlightData, mission_data: MissionData
|
|
) -> None:
|
|
if self.channel_allocator is not None:
|
|
self.channel_allocator.assign_channels_for_flight(flight, mission_data)
|
|
|
|
def channel_name(self, radio_id: int, channel_id: int) -> str:
|
|
return self.channel_namer.channel_name(radio_id, channel_id)
|
|
|
|
def iter_props(self) -> Iterator[UnitProperty[Any]]:
|
|
return UnitProperty.for_aircraft(self.dcs_unit_type)
|
|
|
|
def capable_of(self, task: FlightType) -> bool:
|
|
return task in self.task_priorities
|
|
|
|
def task_priority(self, task: FlightType) -> int:
|
|
return self.task_priorities[task]
|
|
|
|
def __setstate__(self, state: dict[str, Any]) -> None:
|
|
# Update any existing models with new data on load.
|
|
updated = AircraftType.named(state["name"])
|
|
state.update(updated.__dict__)
|
|
self.__dict__.update(state)
|
|
|
|
@classmethod
|
|
def named(cls, name: str) -> AircraftType:
|
|
if not cls._loaded:
|
|
cls._load_all()
|
|
return cls._by_name[name]
|
|
|
|
@classmethod
|
|
def for_dcs_type(cls, dcs_unit_type: Type[FlyingType]) -> Iterator[AircraftType]:
|
|
if not cls._loaded:
|
|
cls._load_all()
|
|
yield from cls._by_unit_type[dcs_unit_type]
|
|
|
|
@classmethod
|
|
def iter_all(cls) -> Iterator[AircraftType]:
|
|
if not cls._loaded:
|
|
cls._load_all()
|
|
yield from cls._by_name.values()
|
|
|
|
@classmethod
|
|
@cache
|
|
def priority_list_for_task(cls, task: FlightType) -> list[AircraftType]:
|
|
capable = []
|
|
for aircraft in cls.iter_all():
|
|
if aircraft.capable_of(task):
|
|
capable.append(aircraft)
|
|
return list(reversed(sorted(capable, key=lambda a: a.task_priority(task))))
|
|
|
|
@staticmethod
|
|
def each_dcs_type() -> Iterator[Type[FlyingType]]:
|
|
yield from helicopter_map.values()
|
|
yield from plane_map.values()
|
|
|
|
@staticmethod
|
|
def _set_props_overrides(
|
|
config: Dict[str, Any], aircraft: Type[FlyingType], data_path: Path
|
|
) -> None:
|
|
if aircraft.property_defaults is None:
|
|
logging.warning(
|
|
f"'{data_path.name}' attempted to set default prop that does not exist."
|
|
)
|
|
else:
|
|
for k in config:
|
|
if k in aircraft.property_defaults:
|
|
aircraft.property_defaults[k] = config[k]
|
|
else:
|
|
logging.warning(
|
|
f"'{data_path.name}' attempted to set default prop '{k}' that does not exist"
|
|
)
|
|
|
|
@classmethod
|
|
def _each_variant_of(cls, aircraft: Type[FlyingType]) -> Iterator[AircraftType]:
|
|
# Replace slashes with underscores because slashes are not allowed in filenames
|
|
aircraft_id = aircraft.id.replace("/", "_")
|
|
data_path = Path("resources/units/aircraft") / f"{aircraft_id}.yaml"
|
|
if not data_path.exists():
|
|
logging.warning(f"No data for {aircraft_id}; it will not be available")
|
|
return
|
|
|
|
with data_path.open(encoding="utf-8") as data_file:
|
|
data = yaml.safe_load(data_file)
|
|
|
|
try:
|
|
price = data["price"]
|
|
except KeyError as ex:
|
|
raise KeyError(f"Missing required price field: {data_path}") from ex
|
|
|
|
radio_config = RadioConfig.from_data(data.get("radios", {}))
|
|
patrol_config = PatrolConfig.from_data(data.get("patrol", {}))
|
|
|
|
try:
|
|
mission_range = nautical_miles(int(data["max_range"]))
|
|
except (KeyError, ValueError):
|
|
mission_range = (
|
|
nautical_miles(50) if aircraft.helicopter else nautical_miles(150)
|
|
)
|
|
logging.warning(
|
|
f"{aircraft_id} does not specify a max_range. Defaulting to "
|
|
f"{mission_range.nautical_miles}NM"
|
|
)
|
|
|
|
fuel_data = data.get("fuel")
|
|
if fuel_data is not None:
|
|
fuel_consumption: Optional[FuelConsumption] = FuelConsumption.from_data(
|
|
fuel_data
|
|
)
|
|
else:
|
|
fuel_consumption = None
|
|
|
|
try:
|
|
introduction = data["introduced"]
|
|
if introduction is None:
|
|
introduction = "N/A"
|
|
except KeyError:
|
|
introduction = "No data."
|
|
|
|
units_data = data.get("kneeboard_units", "nautical").lower()
|
|
units: UnitSystem = NauticalUnits()
|
|
if units_data == "imperial":
|
|
units = ImperialUnits()
|
|
if units_data == "metric":
|
|
units = MetricUnits()
|
|
|
|
class_name = data.get("class")
|
|
unit_class = UnitClass.PLANE if class_name is None else UnitClass(class_name)
|
|
|
|
prop_overrides = data.get("default_overrides")
|
|
if prop_overrides is not None:
|
|
cls._set_props_overrides(prop_overrides, aircraft, data_path)
|
|
|
|
from game.ato.flighttype import FlightType
|
|
|
|
task_priorities: dict[FlightType, int] = {}
|
|
for task_name, priority in data.get("tasks", {}).items():
|
|
task_priorities[FlightType(task_name)] = priority
|
|
|
|
for variant in data.get("variants", [aircraft.id]):
|
|
yield AircraftType(
|
|
dcs_unit_type=aircraft,
|
|
name=variant,
|
|
description=data.get(
|
|
"description",
|
|
f"No data. <a href=\"https://google.com/search?q=DCS+{variant.replace(' ', '+')}\"><span style=\"color:#FFFFFF\">Google {variant}</span></a>",
|
|
),
|
|
year_introduced=introduction,
|
|
country_of_origin=data.get("origin", "No data."),
|
|
manufacturer=data.get("manufacturer", "No data."),
|
|
role=data.get("role", "No data."),
|
|
price=price,
|
|
carrier_capable=data.get("carrier_capable", False),
|
|
lha_capable=data.get("lha_capable", False),
|
|
always_keeps_gun=data.get("always_keeps_gun", False),
|
|
gunfighter=data.get("gunfighter", False),
|
|
max_group_size=data.get("max_group_size", aircraft.group_size_max),
|
|
patrol_altitude=patrol_config.altitude,
|
|
patrol_speed=patrol_config.speed,
|
|
max_mission_range=mission_range,
|
|
fuel_consumption=fuel_consumption,
|
|
default_livery=data.get("default_livery"),
|
|
intra_flight_radio=radio_config.intra_flight,
|
|
channel_allocator=radio_config.channel_allocator,
|
|
channel_namer=radio_config.channel_namer,
|
|
kneeboard_units=units,
|
|
utc_kneeboard=data.get("utc_kneeboard", False),
|
|
unit_class=unit_class,
|
|
cabin_size=data.get("cabin_size", 10 if aircraft.helicopter else 0),
|
|
can_carry_crates=data.get("can_carry_crates", aircraft.helicopter),
|
|
has_built_in_target_pod=data.get("has_built_in_target_pod", False),
|
|
task_priorities=task_priorities,
|
|
)
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.name)
|