diff --git a/game/ato/flight.py b/game/ato/flight.py index e33305df..3f141c8f 100644 --- a/game/ato/flight.py +++ b/game/ato/flight.py @@ -40,8 +40,6 @@ class Flight: self.roster = FlightRoster(self.squadron, initial_size=count) else: self.roster = roster - self.departure = self.squadron.location - self.arrival = self.squadron.arrival self.divert = divert self.flight_type = flight_type # TODO: Replace with FlightPlan. @@ -63,6 +61,14 @@ class Flight: package=package, flight=self, custom_waypoints=[] ) + @property + def departure(self) -> ControlPoint: + return self.squadron.location + + @property + def arrival(self) -> ControlPoint: + return self.squadron.arrival + @property def count(self) -> int: return self.roster.max_size diff --git a/game/dcs/aircrafttype.py b/game/dcs/aircrafttype.py index dea78fec..33388eb0 100644 --- a/game/dcs/aircrafttype.py +++ b/game/dcs/aircrafttype.py @@ -40,7 +40,7 @@ from game.utils import ( ) if TYPE_CHECKING: - from game.missiongenerator.aircraftgenerator import FlightData + from game.missiongenerator.aircraft.flightdata import FlightData from game.missiongenerator.airsupport import AirSupport from game.radio.radios import Radio, RadioFrequency, RadioRegistry diff --git a/game/missiongenerator/aircraft/__init__.py b/game/missiongenerator/aircraft/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/game/missiongenerator/aircraft/aircraftbehavior.py b/game/missiongenerator/aircraft/aircraftbehavior.py new file mode 100644 index 00000000..969c0b65 --- /dev/null +++ b/game/missiongenerator/aircraft/aircraftbehavior.py @@ -0,0 +1,316 @@ +import logging +from typing import Any, Optional + +from dcs.task import ( + AWACS, + AWACSTaskAction, + AntishipStrike, + CAP, + CAS, + EPLRS, + FighterSweep, + GroundAttack, + Nothing, + OptROE, + OptRTBOnBingoFuel, + OptRTBOnOutOfAmmo, + OptReactOnThreat, + OptRestrictJettison, + Refueling, + RunwayAttack, + Transport, +) +from dcs.unitgroup import FlyingGroup + +from game.ato import Flight, FlightType +from gen.flights.flightplan import AwacsFlightPlan, RefuelingFlightPlan + + +class AircraftBehavior: + def __init__(self, task: FlightType) -> None: + self.task = task + + def apply_to(self, flight: Flight, group: FlyingGroup[Any]) -> None: + if self.task in [ + FlightType.BARCAP, + FlightType.TARCAP, + FlightType.INTERCEPTION, + ]: + self.configure_cap(group, flight) + elif self.task == FlightType.SWEEP: + self.configure_sweep(group, flight) + elif self.task == FlightType.AEWC: + self.configure_awacs(group, flight) + elif self.task == FlightType.REFUELING: + self.configure_refueling(group, flight) + elif self.task in [FlightType.CAS, FlightType.BAI]: + self.configure_cas(group, flight) + elif self.task == FlightType.DEAD: + self.configure_dead(group, flight) + elif self.task == FlightType.SEAD: + self.configure_sead(group, flight) + elif self.task == FlightType.SEAD_ESCORT: + self.configure_sead_escort(group, flight) + elif self.task == FlightType.STRIKE: + self.configure_strike(group, flight) + elif self.task == FlightType.ANTISHIP: + self.configure_anti_ship(group, flight) + elif self.task == FlightType.ESCORT: + self.configure_escort(group, flight) + elif self.task == FlightType.OCA_RUNWAY: + self.configure_runway_attack(group, flight) + elif self.task == FlightType.OCA_AIRCRAFT: + self.configure_oca_strike(group, flight) + elif self.task == FlightType.TRANSPORT: + self.configure_transport(group, flight) + elif self.task == FlightType.FERRY: + self.configure_ferry(group, flight) + else: + self.configure_unknown_task(group, flight) + + self.configure_eplrs(group, flight) + + def configure_behavior( + self, + flight: Flight, + group: FlyingGroup[Any], + react_on_threat: OptReactOnThreat.Values = OptReactOnThreat.Values.EvadeFire, + roe: Optional[int] = None, + rtb_winchester: Optional[OptRTBOnOutOfAmmo.Values] = None, + restrict_jettison: Optional[bool] = None, + mission_uses_gun: bool = True, + ) -> None: + group.points[0].tasks.clear() + group.points[0].tasks.append(OptReactOnThreat(react_on_threat)) + if roe is not None: + group.points[0].tasks.append(OptROE(roe)) + if restrict_jettison is not None: + group.points[0].tasks.append(OptRestrictJettison(restrict_jettison)) + if rtb_winchester is not None: + group.points[0].tasks.append(OptRTBOnOutOfAmmo(rtb_winchester)) + + # Confiscate the bullets of AI missions that do not rely on the gun. There is no + # "all but gun" RTB winchester option, so air to ground missions with mixed + # weapon types will insist on using all of their bullets after running out of + # missiles and bombs. Take away their bullets so they don't strafe a Tor. + # + # Exceptions are made for player flights and for airframes where the gun is + # essential like the A-10 or warbirds. + if not mission_uses_gun and not self.flight_always_keeps_gun(flight): + for unit in group.units: + unit.gun = 0 + + group.points[0].tasks.append(OptRTBOnBingoFuel(True)) + # Do not restrict afterburner. + # https://forums.eagle.ru/forum/english/digital-combat-simulator/dcs-world-2-5/bugs-and-problems-ai/ai-ad/7121294-ai-stuck-at-high-aoa-after-making-sharp-turn-if-afterburner-is-restricted + + @staticmethod + def configure_eplrs(group: FlyingGroup[Any], flight: Flight) -> None: + if flight.unit_type.eplrs_capable: + group.points[0].tasks.append(EPLRS(group.id)) + + def configure_cap(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = CAP.name + + if not flight.unit_type.gunfighter: + ammo_type = OptRTBOnOutOfAmmo.Values.AAM + else: + ammo_type = OptRTBOnOutOfAmmo.Values.Cannon + + self.configure_behavior(flight, group, rtb_winchester=ammo_type) + + def configure_sweep(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = FighterSweep.name + + if not flight.unit_type.gunfighter: + ammo_type = OptRTBOnOutOfAmmo.Values.AAM + else: + ammo_type = OptRTBOnOutOfAmmo.Values.Cannon + + self.configure_behavior(flight, group, rtb_winchester=ammo_type) + + def configure_cas(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = CAS.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + rtb_winchester=OptRTBOnOutOfAmmo.Values.Unguided, + restrict_jettison=True, + ) + + def configure_dead(self, group: FlyingGroup[Any], flight: Flight) -> None: + # Only CAS and SEAD are capable of the Attack Group task. SEAD is arguably more + # appropriate but it has an extremely limited list of capable aircraft, whereas + # CAS has a much wider selection of units. + # + # Note that the only effect that the DCS task type has is in determining which + # waypoint actions the group may perform. + group.task = CAS.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + rtb_winchester=OptRTBOnOutOfAmmo.Values.All, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_sead(self, group: FlyingGroup[Any], flight: Flight) -> None: + # CAS is able to perform all the same tasks as SEAD using a superset of the + # available aircraft, and F-14s are not able to be SEAD despite having TALDs. + # https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/ + group.task = CAS.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + # ASM includes ARMs and TALDs (among other things, but those are the useful + # weapons for SEAD). + rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_strike(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = GroundAttack.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_anti_ship(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = AntishipStrike.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_runway_attack(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = RunwayAttack.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_oca_strike(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = CAS.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.OpenFire, + restrict_jettison=True, + ) + + def configure_awacs(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = AWACS.name + + if not isinstance(flight.flight_plan, AwacsFlightPlan): + logging.error( + f"Cannot configure AEW&C tasks for {flight} because it does not have " + "an AEW&C flight plan." + ) + return + + # Awacs task action + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.WeaponHold, + restrict_jettison=True, + ) + + group.points[0].tasks.append(AWACSTaskAction()) + + def configure_refueling(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = Refueling.name + + if not isinstance(flight.flight_plan, RefuelingFlightPlan): + logging.error( + f"Cannot configure racetrack refueling tasks for {flight} because it " + "does not have an racetrack refueling flight plan." + ) + return + + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.WeaponHold, + restrict_jettison=True, + ) + + def configure_escort(self, group: FlyingGroup[Any], flight: Flight) -> None: + # Escort groups are actually given the CAP task so they can perform the + # Search Then Engage task, which we have to use instead of the Escort + # task for the reasons explained in JoinPointBuilder. + group.task = CAP.name + self.configure_behavior( + flight, group, roe=OptROE.Values.OpenFire, restrict_jettison=True + ) + + def configure_sead_escort(self, group: FlyingGroup[Any], flight: Flight) -> None: + # CAS is able to perform all the same tasks as SEAD using a superset of the + # available aircraft, and F-14s are not able to be SEAD despite having TALDs. + # https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/ + group.task = CAS.name + self.configure_behavior( + flight, + group, + roe=OptROE.Values.OpenFire, + # ASM includes ARMs and TALDs (among other things, but those are the useful + # weapons for SEAD). + rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM, + restrict_jettison=True, + mission_uses_gun=False, + ) + + def configure_transport(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = Transport.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.WeaponHold, + restrict_jettison=True, + ) + + def configure_ferry(self, group: FlyingGroup[Any], flight: Flight) -> None: + group.task = Nothing.name + self.configure_behavior( + flight, + group, + react_on_threat=OptReactOnThreat.Values.EvadeFire, + roe=OptROE.Values.WeaponHold, + restrict_jettison=True, + ) + + def configure_unknown_task(self, group: FlyingGroup[Any], flight: Flight) -> None: + logging.error(f"Unhandled flight type: {flight.flight_type}") + self.configure_behavior(flight, group) + + @staticmethod + def flight_always_keeps_gun(flight: Flight) -> bool: + # Never take bullets from players. They're smart enough to know when to use it + # and when to RTB. + if flight.client_count > 0: + return True + + return flight.unit_type.always_keeps_gun diff --git a/game/missiongenerator/aircraft/aircraftgenerator.py b/game/missiongenerator/aircraft/aircraftgenerator.py new file mode 100644 index 00000000..863d01e4 --- /dev/null +++ b/game/missiongenerator/aircraft/aircraftgenerator.py @@ -0,0 +1,432 @@ +from __future__ import annotations + +import logging +import random +from functools import cached_property +from typing import Any, Dict, List, TYPE_CHECKING, Type, Union + +from dcs import helicopters +from dcs.country import Country +from dcs.mapping import Point +from dcs.mission import Mission, StartType as DcsStartType +from dcs.planes import ( + Su_33, +) +from dcs.point import PointAction +from dcs.ships import KUZNECOW +from dcs.terrain.terrain import Airport, NoParkingSlotError +from dcs.unitgroup import FlyingGroup, ShipGroup, StaticGroup +from dcs.unittype import FlyingType + +from game.ato.airtaaskingorder import AirTaskingOrder +from game.ato.flight import Flight +from game.ato.flighttype import FlightType +from game.ato.package import Package +from game.ato.starttype import StartType +from game.factions.faction import Faction +from game.missiongenerator.airsupport import AirSupport +from game.missiongenerator.lasercoderegistry import LaserCodeRegistry +from game.radio.radios import RadioRegistry +from game.radio.tacan import TacanRegistry +from game.settings import Settings +from game.theater.controlpoint import ( + Airfield, + ControlPoint, + NavalControlPoint, + OffMapSpawn, +) +from game.unitmap import UnitMap +from game.utils import meters +from gen.flights.traveltime import GroundSpeed +from gen.naming import namegen +from gen.runways import RunwayData +from .aircraftpainter import AircraftPainter +from .flightdata import FlightData +from .flightgroupconfigurator import FlightGroupConfigurator + +if TYPE_CHECKING: + from game import Game + from game.squadrons import Squadron + +WARM_START_HELI_ALT = meters(500) +WARM_START_ALTITUDE = meters(3000) + +RTB_ALTITUDE = meters(800) +RTB_DISTANCE = 5000 +HELI_ALT = 500 + + +class AircraftGenerator: + def __init__( + self, + mission: Mission, + settings: Settings, + game: Game, + radio_registry: RadioRegistry, + tacan_registry: TacanRegistry, + laser_code_registry: LaserCodeRegistry, + unit_map: UnitMap, + air_support: AirSupport, + helipads: dict[ControlPoint, list[StaticGroup]], + ) -> None: + self.m = mission + self.game = game + self.settings = settings + self.radio_registry = radio_registry + self.tacan_registy = tacan_registry + self.laser_code_registry = laser_code_registry + self.unit_map = unit_map + self.flights: List[FlightData] = [] + self.air_support = air_support + self.helipads = helipads + + @cached_property + def use_client(self) -> bool: + """True if Client should be used instead of Player.""" + blue_clients = self.client_slots_in_ato(self.game.blue.ato) + red_clients = self.client_slots_in_ato(self.game.red.ato) + return blue_clients + red_clients > 1 + + @staticmethod + def client_slots_in_ato(ato: AirTaskingOrder) -> int: + total = 0 + for package in ato.packages: + for flight in package.flights: + total += flight.client_count + return total + + @staticmethod + def _start_type(start_type: str) -> DcsStartType: + if start_type == "Runway": + return DcsStartType.Runway + elif start_type == "Cold": + return DcsStartType.Cold + return DcsStartType.Warm + + @staticmethod + def _start_type_at_group( + start_type: str, + unit_type: Type[FlyingType], + at: Union[ShipGroup, StaticGroup], + ) -> DcsStartType: + group_units = at.units + # Setting Su-33s starting from the non-supercarrier Kuznetsov to take off from runway + # to work around a DCS AI issue preventing Su-33s from taking off when set to "Takeoff from ramp" (#1352) + if ( + unit_type.id == Su_33.id + and group_units[0] is not None + and group_units[0].type == KUZNECOW.id + ): + return DcsStartType.Runway + else: + return AircraftGenerator._start_type(start_type) + + def _generate_at_airport( + self, + name: str, + side: Country, + unit_type: Type[FlyingType], + count: int, + start_type: str, + airport: Airport, + ) -> FlyingGroup[Any]: + assert count > 0 + + # TODO: Delayed runway starts should be converted to air starts for multiplayer. + # Runway starts do not work with late activated aircraft in multiplayer. Instead + # of spawning on the runway the aircraft will spawn on the taxiway, potentially + # somewhere that they don't fit anyway. We should either upgrade these to air + # starts or (less likely) downgrade to warm starts to avoid the issue when the + # player is generating the mission for multiplayer (which would need a new + # option). + logging.info("airgen: {} for {} at {}".format(unit_type, side.id, airport)) + return self.m.flight_group_from_airport( + country=side, + name=name, + aircraft_type=unit_type, + airport=airport, + maintask=None, + start_type=self._start_type(start_type), + group_size=count, + parking_slots=None, + ) + + def _generate_over_departure( + self, name: str, side: Country, flight: Flight, origin: ControlPoint + ) -> FlyingGroup[Any]: + assert flight.count > 0 + at = origin.position + + alt_type = "RADIO" + if isinstance(origin, OffMapSpawn): + alt = flight.flight_plan.waypoints[0].alt + alt_type = flight.flight_plan.waypoints[0].alt_type + elif flight.unit_type in helicopters.helicopter_map.values(): + alt = WARM_START_HELI_ALT + else: + alt = WARM_START_ALTITUDE + + speed = GroundSpeed.for_flight(flight, alt) + + pos = Point(at.x + random.randint(100, 1000), at.y + random.randint(100, 1000)) + + logging.info( + "airgen: {} for {} at {} at {}".format( + flight.unit_type, side.id, alt, int(speed.kph) + ) + ) + group = self.m.flight_group( + country=side, + name=name, + aircraft_type=flight.unit_type.dcs_unit_type, + airport=None, + position=pos, + altitude=alt.meters, + speed=speed.kph, + maintask=None, + group_size=flight.count, + ) + + group.points[0].alt_type = alt_type + return group + + def _generate_at_group( + self, + name: str, + side: Country, + unit_type: Type[FlyingType], + count: int, + start_type: str, + at: Union[ShipGroup, StaticGroup], + ) -> FlyingGroup[Any]: + assert count > 0 + + logging.info("airgen: {} for {} at unit {}".format(unit_type, side.id, at)) + return self.m.flight_group_from_unit( + country=side, + name=name, + aircraft_type=unit_type, + pad_group=at, + maintask=None, + start_type=self._start_type_at_group(start_type, unit_type, at), + group_size=count, + ) + + def _generate_at_cp_helipad( + self, + name: str, + side: Country, + unit_type: Type[FlyingType], + count: int, + start_type: str, + cp: ControlPoint, + ) -> FlyingGroup[Any]: + assert count > 0 + + logging.info( + "airgen at cp's helipads : {} for {} at {}".format( + unit_type, side.id, cp.name + ) + ) + + try: + helipad = self.helipads[cp].pop() + except IndexError as ex: + raise RuntimeError(f"Not enough helipads available at {cp}") from ex + + group = self._generate_at_group( + name=name, + side=side, + unit_type=unit_type, + count=count, + start_type=start_type, + at=helipad, + ) + + # Note : A bit dirty, need better support in pydcs + group.points[0].action = PointAction.FromGroundArea + group.points[0].type = "TakeOffGround" + group.units[0].heading = helipad.units[0].heading + if start_type != "Cold": + group.points[0].action = PointAction.FromGroundAreaHot + group.points[0].type = "TakeOffGroundHot" + + for i in range(count - 1): + try: + helipad = self.helipads[cp].pop() + group.units[1 + i].position = Point(helipad.x, helipad.y) + group.units[1 + i].heading = helipad.units[0].heading + except IndexError as ex: + raise RuntimeError(f"Not enough helipads available at {cp}") from ex + return group + + def clear_parking_slots(self) -> None: + for cp in self.game.theater.controlpoints: + for parking_slot in cp.parking_slots: + parking_slot.unit_id = None + + def generate_flights( + self, + country: Country, + ato: AirTaskingOrder, + dynamic_runways: Dict[str, RunwayData], + ) -> None: + """Adds aircraft to the mission for every flight in the ATO. + + Aircraft generation is done by walking the ATO and spawning each flight in turn. + After the flight is generated the group is added to the UnitMap so aircraft + deaths can be tracked. + + Args: + country: The country from the mission to use for this ATO. + ato: The ATO to spawn aircraft for. + dynamic_runways: Runway data for carriers and FARPs. + """ + for package in ato.packages: + if not package.flights: + continue + for flight in package.flights: + logging.info(f"Generating flight: {flight.unit_type}") + group = self.create_and_configure_flight( + flight, country, dynamic_runways + ) + self.unit_map.add_aircraft(group, flight) + + def spawn_unused_aircraft( + self, player_country: Country, enemy_country: Country + ) -> None: + for control_point in self.game.theater.controlpoints: + if not isinstance(control_point, Airfield): + continue + + faction = self.game.coalition_for(control_point.captured).faction + if control_point.captured: + country = player_country + else: + country = enemy_country + + for squadron in control_point.squadrons: + try: + self._spawn_unused_for(squadron, country, faction) + except NoParkingSlotError: + # If we run out of parking, stop spawning aircraft at this base. + break + + def _spawn_unused_for( + self, squadron: Squadron, country: Country, faction: Faction + ) -> None: + assert isinstance(squadron.location, Airfield) + for _ in range(squadron.untasked_aircraft): + # Creating a flight even those this isn't a fragged mission lets us + # reuse the existing debriefing code. + # TODO: Special flight type? + flight = Flight( + Package(squadron.location), + faction.country, + squadron, + 1, + FlightType.BARCAP, + StartType.COLD, + divert=None, + ) + + group = self._generate_at_airport( + name=namegen.next_aircraft_name(country, flight.departure.id, flight), + side=country, + unit_type=squadron.aircraft.dcs_unit_type, + count=1, + start_type="Cold", + airport=squadron.location.airport, + ) + + group.uncontrolled = True + AircraftPainter(flight, group).apply_livery() + self.unit_map.add_aircraft(group, flight) + + def create_and_configure_flight( + self, flight: Flight, country: Country, dynamic_runways: Dict[str, RunwayData] + ) -> FlyingGroup[Any]: + group = self.generate_planned_flight(country, flight) + self.flights.append( + FlightGroupConfigurator( + flight, + group, + self.game, + self.m, + self.radio_registry, + self.tacan_registy, + self.laser_code_registry, + self.air_support, + dynamic_runways, + self.use_client, + ).configure() + ) + return group + + def generate_flight_at_departure( + self, country: Country, flight: Flight, start_type: StartType + ) -> FlyingGroup[Any]: + name = namegen.next_aircraft_name(country, flight.departure.id, flight) + cp = flight.departure + try: + if start_type is StartType.IN_FLIGHT: + group = self._generate_over_departure( + name=name, side=country, flight=flight, origin=cp + ) + return group + elif isinstance(cp, NavalControlPoint): + group_name = cp.get_carrier_group_name() + carrier_group = self.m.find_group(group_name) + if not isinstance(carrier_group, ShipGroup): + raise RuntimeError( + f"Carrier group {carrier_group} is a " + "{carrier_group.__class__.__name__}, expected a ShipGroup" + ) + return self._generate_at_group( + name=name, + side=country, + unit_type=flight.unit_type.dcs_unit_type, + count=flight.count, + start_type=start_type.value, + at=carrier_group, + ) + else: + # If the flight is an helicopter flight, then prioritize dedicated helipads + if flight.unit_type.helicopter: + return self._generate_at_cp_helipad( + name=name, + side=country, + unit_type=flight.unit_type.dcs_unit_type, + count=flight.count, + start_type=start_type.value, + cp=cp, + ) + + if not isinstance(cp, Airfield): + raise RuntimeError( + f"Attempted to spawn at airfield for non-airfield {cp}" + ) + return self._generate_at_airport( + name=name, + side=country, + unit_type=flight.unit_type.dcs_unit_type, + count=flight.count, + start_type=start_type.value, + airport=cp.airport, + ) + except NoParkingSlotError: + # Generated when there is no place on Runway or on Parking Slots + logging.exception( + "No room on runway or parking slots. Starting from the air." + ) + flight.start_type = StartType.IN_FLIGHT + group = self._generate_over_departure( + name=name, side=country, flight=flight, origin=cp + ) + group.points[0].alt = 1500 + return group + + def generate_planned_flight( + self, country: Country, flight: Flight + ) -> FlyingGroup[Any]: + return self.generate_flight_at_departure(country, flight, flight.start_type) diff --git a/game/missiongenerator/aircraft/aircraftpainter.py b/game/missiongenerator/aircraft/aircraftpainter.py new file mode 100644 index 00000000..476635ef --- /dev/null +++ b/game/missiongenerator/aircraft/aircraftpainter.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import random +from typing import Any, Optional + +from dcs.unitgroup import FlyingGroup + +from game import db +from game.ato import Flight + + +class AircraftPainter: + def __init__(self, flight: Flight, group: FlyingGroup[Any]) -> None: + self.flight = flight + self.group = group + + def livery_from_db(self) -> Optional[str]: + return db.PLANE_LIVERY_OVERRIDES.get(self.flight.unit_type.dcs_unit_type) + + def livery_from_faction(self) -> Optional[str]: + faction = self.flight.squadron.coalition.faction + if ( + choices := faction.liveries_overrides.get(self.flight.unit_type) + ) is not None: + return random.choice(choices) + return None + + def livery_from_squadron(self) -> Optional[str]: + return self.flight.squadron.livery + + def determine_livery(self) -> Optional[str]: + if (livery := self.livery_from_squadron()) is not None: + return livery + if (livery := self.livery_from_faction()) is not None: + return livery + if (livery := self.livery_from_db()) is not None: + return livery + return None + + def apply_livery(self) -> None: + livery = self.determine_livery() + if livery is None: + return + for unit in self.group.units: + unit.livery_id = livery diff --git a/game/missiongenerator/aircraft/flightdata.py b/game/missiongenerator/aircraft/flightdata.py new file mode 100644 index 00000000..2a295632 --- /dev/null +++ b/game/missiongenerator/aircraft/flightdata.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Optional, TYPE_CHECKING + +from dcs.flyingunit import FlyingUnit + +from gen.callsigns import create_group_callsign_from_unit + +if TYPE_CHECKING: + from game.ato import FlightType, FlightWaypoint, Package + from game.dcs.aircrafttype import AircraftType + from game.radio.radios import RadioFrequency + from gen.runways import RunwayData + + +@dataclass(frozen=True) +class ChannelAssignment: + radio_id: int + channel: int + + +@dataclass +class FlightData: + """Details of a planned flight.""" + + #: The package that the flight belongs to. + package: Package + + flight_type: FlightType + + aircraft_type: AircraftType + + #: All units in the flight. + units: list[FlyingUnit] + + #: Total number of aircraft in the flight. + size: int + + #: True if this flight belongs to the player's coalition. + friendly: bool + + #: Number of seconds after mission start the flight is set to depart. + departure_delay: timedelta + + #: Arrival airport. + arrival: RunwayData + + #: Departure airport. + departure: RunwayData + + #: Diver airport. + divert: Optional[RunwayData] + + #: Waypoints of the flight plan. + waypoints: list[FlightWaypoint] + + #: Radio frequency for intra-flight communications. + intra_flight_channel: RadioFrequency + + #: Bingo fuel value in lbs. + bingo_fuel: Optional[int] + + joker_fuel: Optional[int] + + laser_codes: list[Optional[int]] + + custom_name: Optional[str] + + callsign: str = field(init=False) + + #: Map of radio frequencies to their assigned radio and channel, if any. + frequency_to_channel_map: dict[RadioFrequency, ChannelAssignment] = field( + init=False, default_factory=dict + ) + + def __post_init__(self) -> None: + self.callsign = create_group_callsign_from_unit(self.units[0]) + + @property + def client_units(self) -> list[FlyingUnit]: + """List of playable units in the flight.""" + return [u for u in self.units if u.is_human()] + + def num_radio_channels(self, radio_id: int) -> int: + """Returns the number of preset channels for the given radio.""" + # Note: pydcs only initializes the radio presets for client slots. + return self.client_units[0].num_radio_channels(radio_id) + + def channel_for(self, frequency: RadioFrequency) -> Optional[ChannelAssignment]: + """Returns the radio and channel number for the given frequency.""" + return self.frequency_to_channel_map.get(frequency, None) + + def assign_channel( + self, radio_id: int, channel_id: int, frequency: RadioFrequency + ) -> None: + """Assigns a preset radio channel to the given frequency.""" + for unit in self.client_units: + unit.set_radio_channel_preset(radio_id, channel_id, frequency.mhz) + + # One frequency could be bound to multiple channels. Prefer the first, + # since with the current implementation it will be the lowest numbered + # channel. + if frequency not in self.frequency_to_channel_map: + self.frequency_to_channel_map[frequency] = ChannelAssignment( + radio_id, channel_id + ) diff --git a/game/missiongenerator/aircraft/flightgroupconfigurator.py b/game/missiongenerator/aircraft/flightgroupconfigurator.py new file mode 100644 index 00000000..001fd4b4 --- /dev/null +++ b/game/missiongenerator/aircraft/flightgroupconfigurator.py @@ -0,0 +1,212 @@ +from __future__ import annotations + +import logging +from typing import Any, Optional, TYPE_CHECKING + +from dcs import Mission +from dcs.flyingunit import FlyingUnit +from dcs.planes import C_101CC, C_101EB, F_14B, Su_33 +from dcs.task import CAP +from dcs.unit import Skill +from dcs.unitgroup import FlyingGroup + +from game.ato import Flight, FlightType +from game.data.weapons import Pylon, WeaponType as WeaponTypeEnum +from game.missiongenerator.airsupport import AirSupport, AwacsInfo, TankerInfo +from game.missiongenerator.lasercoderegistry import LaserCodeRegistry +from game.radio.radios import RadioFrequency, RadioRegistry +from game.radio.tacan import TacanBand, TacanRegistry, TacanUsage +from game.squadrons import Pilot +from gen.callsigns import callsign_for_support_unit +from gen.flights.flightplan import AwacsFlightPlan, RefuelingFlightPlan +from gen.runways import RunwayData +from .aircraftbehavior import AircraftBehavior +from .aircraftpainter import AircraftPainter +from .flightdata import FlightData +from .waypoints import WaypointGenerator + +if TYPE_CHECKING: + from game import Game + + +class FlightGroupConfigurator: + def __init__( + self, + flight: Flight, + group: FlyingGroup[Any], + game: Game, + mission: Mission, + radio_registry: RadioRegistry, + tacan_registry: TacanRegistry, + laser_code_registry: LaserCodeRegistry, + air_support: AirSupport, + dynamic_runways: dict[str, RunwayData], + use_client: bool, + ) -> None: + self.flight = flight + self.group = group + self.game = game + self.mission = mission + self.radio_registry = radio_registry + self.tacan_registry = tacan_registry + self.laser_code_registry = laser_code_registry + self.air_support = air_support + self.dynamic_runways = dynamic_runways + self.use_client = use_client + + def configure(self) -> FlightData: + AircraftBehavior(self.flight.flight_type).apply_to(self.flight, self.group) + AircraftPainter(self.flight, self.group).apply_livery() + self.setup_payload() + self.setup_fuel() + flight_channel = self.setup_radios() + + laser_codes: list[Optional[int]] = [] + for unit, pilot in zip(self.group.units, self.flight.roster.pilots): + self.configure_flight_member(unit, pilot, laser_codes) + + divert = None + if self.flight.divert is not None: + divert = self.flight.divert.active_runway( + self.game.conditions, self.dynamic_runways + ) + + mission_start_time, waypoints = WaypointGenerator( + self.flight, self.group, self.mission, self.game.settings, self.air_support + ).create_waypoints() + + return FlightData( + package=self.flight.package, + aircraft_type=self.flight.unit_type, + flight_type=self.flight.flight_type, + units=self.group.units, + size=len(self.group.units), + friendly=self.flight.from_cp.captured, + departure_delay=mission_start_time, + departure=self.flight.departure.active_runway( + self.game.conditions, self.dynamic_runways + ), + arrival=self.flight.arrival.active_runway( + self.game.conditions, self.dynamic_runways + ), + divert=divert, + waypoints=waypoints, + intra_flight_channel=flight_channel, + bingo_fuel=self.flight.flight_plan.bingo_fuel, + joker_fuel=self.flight.flight_plan.joker_fuel, + custom_name=self.flight.custom_name, + laser_codes=laser_codes, + ) + + def configure_flight_member( + self, unit: FlyingUnit, pilot: Optional[Pilot], laser_codes: list[Optional[int]] + ) -> None: + player = pilot is not None and pilot.player + self.set_skill(unit, pilot) + if self.flight.loadout.has_weapon_of_type(WeaponTypeEnum.TGP) and player: + laser_codes.append(self.laser_code_registry.get_next_laser_code()) + else: + laser_codes.append(None) + if unit.unit_type is F_14B: + unit.set_property(F_14B.Properties.INSAlignmentStored.id, True) + + def setup_radios(self) -> RadioFrequency: + if self.flight.flight_type in {FlightType.AEWC, FlightType.REFUELING}: + channel = self.radio_registry.alloc_uhf() + self.register_air_support(channel) + else: + channel = self.flight.unit_type.alloc_flight_radio(self.radio_registry) + + self.group.set_frequency(channel.mhz) + return channel + + def register_air_support(self, channel: RadioFrequency) -> None: + callsign = callsign_for_support_unit(self.group) + if isinstance(self.flight.flight_plan, AwacsFlightPlan): + self.air_support.awacs.append( + AwacsInfo( + group_name=str(self.group.name), + callsign=callsign, + freq=channel, + depature_location=self.flight.departure.name, + end_time=self.flight.flight_plan.mission_departure_time, + start_time=self.flight.flight_plan.mission_start_time, + blue=self.flight.departure.captured, + ) + ) + elif isinstance(self.flight.flight_plan, RefuelingFlightPlan): + tacan = self.tacan_registry.alloc_for_band(TacanBand.Y, TacanUsage.AirToAir) + self.air_support.tankers.append( + TankerInfo( + group_name=str(self.group.name), + callsign=callsign, + variant=self.flight.unit_type.name, + freq=channel, + tacan=tacan, + start_time=self.flight.flight_plan.patrol_start_time, + end_time=self.flight.flight_plan.patrol_end_time, + blue=self.flight.departure.captured, + ) + ) + + def set_skill(self, unit: FlyingUnit, pilot: Optional[Pilot]) -> None: + if pilot is None or not pilot.player: + unit.skill = self.skill_level_for(unit, pilot) + return + + if self.use_client: + unit.set_client() + else: + unit.set_player() + + def skill_level_for(self, unit: FlyingUnit, pilot: Optional[Pilot]) -> Skill: + if self.flight.squadron.player: + base_skill = Skill(self.game.settings.player_skill) + else: + base_skill = Skill(self.game.settings.enemy_skill) + + if pilot is None: + logging.error(f"Cannot determine skill level: {unit.name} has not pilot") + return base_skill + + levels = [ + Skill.Average, + Skill.Good, + Skill.High, + Skill.Excellent, + ] + current_level = levels.index(base_skill) + missions_for_skill_increase = 4 + increase = pilot.record.missions_flown // missions_for_skill_increase + capped_increase = min(current_level + increase, len(levels) - 1) + new_level = (capped_increase, current_level)[ + self.game.settings.ai_pilot_levelling + ] + return levels[new_level] + + def setup_payload(self) -> None: + for p in self.group.units: + p.pylons.clear() + + loadout = self.flight.loadout + if self.game.settings.restrict_weapons_by_date: + loadout = loadout.degrade_for_date(self.flight.unit_type, self.game.date) + + for pylon_number, weapon in loadout.pylons.items(): + if weapon is None: + continue + pylon = Pylon.for_aircraft(self.flight.unit_type, pylon_number) + pylon.equip(self.group, weapon) + + def setup_fuel(self) -> None: + # Special case so Su 33 and C101 can take off + unit_type = self.flight.unit_type.dcs_unit_type + if unit_type == Su_33: + for unit in self.group.units: + if self.group.task == CAP: + unit.fuel = Su_33.fuel_max / 2.2 + else: + unit.fuel = Su_33.fuel_max * 0.8 + elif unit_type in {C_101EB, C_101CC}: + for unit in self.group.units: + unit.fuel = unit_type.fuel_max * 0.5 diff --git a/game/missiongenerator/aircraft/waypoints/__init__.py b/game/missiongenerator/aircraft/waypoints/__init__.py new file mode 100644 index 00000000..b21f9cbf --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/__init__.py @@ -0,0 +1 @@ +from .waypointgenerator import WaypointGenerator diff --git a/game/missiongenerator/aircraft/waypoints/baiingress.py b/game/missiongenerator/aircraft/waypoints/baiingress.py new file mode 100644 index 00000000..bdaf8532 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/baiingress.py @@ -0,0 +1,42 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import AttackGroup, WeaponType + +from game.theater import TheaterGroundObject +from game.transfers import MultiGroupTransport +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class BaiIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + # TODO: Add common "UnitGroupTarget" base type. + group_names = [] + target = self.package.target + if isinstance(target, TheaterGroundObject): + for group in target.groups: + group_names.append(group.name) + elif isinstance(target, MultiGroupTransport): + group_names.append(target.name) + else: + logging.error( + "Unexpected target type for BAI mission: %s", + target.__class__.__name__, + ) + return waypoint + + for group_name in group_names: + group = self.mission.find_group(group_name) + if group is None: + logging.error("Could not find group for BAI mission %s", group_name) + continue + + task = AttackGroup(group.id, weapon_type=WeaponType.Auto) + task.params["attackQtyLimit"] = False + task.params["directionEnabled"] = False + task.params["altitudeEnabled"] = False + task.params["groupAttack"] = True + waypoint.tasks.append(task) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/cargostop.py b/game/missiongenerator/aircraft/waypoints/cargostop.py new file mode 100644 index 00000000..f8aa06ef --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/cargostop.py @@ -0,0 +1,14 @@ +from dcs.point import MovingPoint, PointAction + +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class CargoStopBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + waypoint.type = "LandingReFuAr" + waypoint.action = PointAction.LandingReFuAr + waypoint.landing_refuel_rearm_time = 2 # Minutes. + if (control_point := self.waypoint.control_point) is not None: + waypoint.airdrome_id = control_point.airdrome_id_for_landing + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/casingress.py b/game/missiongenerator/aircraft/waypoints/casingress.py new file mode 100644 index 00000000..dd068e6e --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/casingress.py @@ -0,0 +1,38 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import EngageTargets, EngageTargetsInZone, Targets + +from game.utils import nautical_miles +from gen.flights.flightplan import CasFlightPlan +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class CasIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + if isinstance(self.flight.flight_plan, CasFlightPlan): + waypoint.add_task( + EngageTargetsInZone( + position=self.flight.flight_plan.target.position, + radius=int(self.flight.flight_plan.engagement_distance.meters), + targets=[ + Targets.All.GroundUnits.GroundVehicles, + Targets.All.GroundUnits.AirDefence.AAA, + Targets.All.GroundUnits.Infantry, + ], + ) + ) + else: + logging.error("No CAS waypoint found. Falling back to search and engage") + waypoint.add_task( + EngageTargets( + max_distance=int(nautical_miles(10).meters), + targets=[ + Targets.All.GroundUnits.GroundVehicles, + Targets.All.GroundUnits.AirDefence.AAA, + Targets.All.GroundUnits.Infantry, + ], + ) + ) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/deadingress.py b/game/missiongenerator/aircraft/waypoints/deadingress.py new file mode 100644 index 00000000..dd57e6f8 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/deadingress.py @@ -0,0 +1,36 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import AttackGroup, WeaponType + +from game.theater import TheaterGroundObject +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class DeadIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + self.register_special_waypoints(self.waypoint.targets) + + target = self.package.target + if not isinstance(target, TheaterGroundObject): + logging.error( + "Unexpected target type for DEAD mission: %s", + target.__class__.__name__, + ) + return waypoint + + for group in target.groups: + miz_group = self.mission.find_group(group.name) + if miz_group is None: + logging.error(f"Could not find group for DEAD mission {group.name}") + continue + + task = AttackGroup(miz_group.id, weapon_type=WeaponType.Auto) + task.params["expend"] = "All" + task.params["attackQtyLimit"] = False + task.params["directionEnabled"] = False + task.params["altitudeEnabled"] = False + task.params["groupAttack"] = True + waypoint.tasks.append(task) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/default.py b/game/missiongenerator/aircraft/waypoints/default.py new file mode 100644 index 00000000..c60f69fe --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/default.py @@ -0,0 +1,5 @@ +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class DefaultWaypointBuilder(PydcsWaypointBuilder): + pass diff --git a/game/missiongenerator/aircraft/waypoints/holdpoint.py b/game/missiongenerator/aircraft/waypoints/holdpoint.py new file mode 100644 index 00000000..496e7f0b --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/holdpoint.py @@ -0,0 +1,28 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import ControlledTask, OrbitAction + +from gen.flights.flightplan import LoiterFlightPlan +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class HoldPointBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + loiter = ControlledTask( + OrbitAction(altitude=waypoint.alt, pattern=OrbitAction.OrbitPattern.Circle) + ) + if not isinstance(self.flight.flight_plan, LoiterFlightPlan): + flight_plan_type = self.flight.flight_plan.__class__.__name__ + logging.error( + f"Cannot configure hold for for {self.flight} because " + f"{flight_plan_type} does not define a push time. AI will push " + "immediately and may flight unsuitable speeds." + ) + return waypoint + push_time = self.flight.flight_plan.push_time + self.waypoint.departure_time = push_time + loiter.stop_after_time(int(push_time.total_seconds())) + waypoint.add_task(loiter) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/joinpoint.py b/game/missiongenerator/aircraft/waypoints/joinpoint.py new file mode 100644 index 00000000..c61f5c2b --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/joinpoint.py @@ -0,0 +1,70 @@ +from typing import List, Type + +from dcs.point import MovingPoint +from dcs.task import ControlledTask, EngageTargets, TargetType, Targets + +from game.ato import FlightType +from game.utils import nautical_miles +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class JoinPointBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + if self.flight.flight_type == FlightType.ESCORT: + self.configure_escort_tasks( + waypoint, + [ + Targets.All.Air.Planes.Fighters, + Targets.All.Air.Planes.MultiroleFighters, + ], + ) + elif self.flight.flight_type == FlightType.SEAD_ESCORT: + self.configure_escort_tasks( + waypoint, [Targets.All.GroundUnits.AirDefence.AAA.SAMRelated] + ) + return waypoint + + @staticmethod + def configure_escort_tasks( + waypoint: MovingPoint, target_types: List[Type[TargetType]] + ) -> None: + # Ideally we would use the escort mission type and escort task to have + # the AI automatically but the AI only escorts AI flights while they are + # traveling between waypoints. When an AI flight performs an attack + # (such as attacking the mission target), AI escorts wander aimlessly + # until the escorted group resumes its flight plan. + # + # As such, we instead use the Search Then Engage task, which is an + # enroute task that causes the AI to follow their flight plan and engage + # enemies of the set type within a certain distance. The downside to + # this approach is that AI escorts are no longer related to the group + # they are escorting, aside from the fact that they fly a similar flight + # plan at the same time. With Escort, the escorts will follow the + # escorted group out of the area. The strike element may or may not fly + # directly over the target, and they may or may not require multiple + # attack runs. For the escort flight we must just assume a flight plan + # for the escort to fly. If the strike flight doesn't need to overfly + # the target, the escorts are needlessly going in harms way. If the + # strike flight needs multiple passes, the escorts may leave before the + # escorted aircraft do. + # + # Another possible option would be to use Search Then Engage for join -> + # ingress and egress -> split, but use a Search Then Engage in Zone task + # for the target area that is set to end on a flag flip that occurs when + # the strike aircraft finish their attack task. + # + # https://forums.eagle.ru/topic/251798-options-for-alternate-ai-escort-behavior + waypoint.add_task( + ControlledTask( + EngageTargets( + # TODO: From doctrine. + max_distance=int(nautical_miles(30).meters), + targets=target_types, + ) + ) + ) + + # We could set this task to end at the split point. pydcs doesn't + # currently support that task end condition though, and we don't really + # need it. diff --git a/game/missiongenerator/aircraft/waypoints/landingpoint.py b/game/missiongenerator/aircraft/waypoints/landingpoint.py new file mode 100644 index 00000000..69c1e71d --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/landingpoint.py @@ -0,0 +1,13 @@ +from dcs.point import MovingPoint, PointAction + +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class LandingPointBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + waypoint.type = "Land" + waypoint.action = PointAction.Landing + if (control_point := self.waypoint.control_point) is not None: + waypoint.airdrome_id = control_point.airdrome_id_for_landing + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/ocaaircraftingress.py b/game/missiongenerator/aircraft/waypoints/ocaaircraftingress.py new file mode 100644 index 00000000..1a683cc7 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/ocaaircraftingress.py @@ -0,0 +1,35 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import EngageTargetsInZone, Targets + +from game.theater import Airfield +from game.utils import nautical_miles +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class OcaAircraftIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + target = self.package.target + if not isinstance(target, Airfield): + logging.error( + "Unexpected target type for OCA Strike mission: %s", + target.__class__.__name__, + ) + return waypoint + + task = EngageTargetsInZone( + position=target.position, + # Al Dhafra is 4 nm across at most. Add a little wiggle room in case + # the airport position from DCS is not centered. + radius=int(nautical_miles(3).meters), + targets=[Targets.All.Air], + ) + task.params["attackQtyLimit"] = False + task.params["directionEnabled"] = False + task.params["altitudeEnabled"] = False + task.params["groupAttack"] = True + waypoint.tasks.append(task) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/ocarunwayingress.py b/game/missiongenerator/aircraft/waypoints/ocarunwayingress.py new file mode 100644 index 00000000..4009dc8b --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/ocarunwayingress.py @@ -0,0 +1,25 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import BombingRunway + +from game.theater import Airfield +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class OcaRunwayIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + target = self.package.target + if not isinstance(target, Airfield): + logging.error( + "Unexpected target type for runway bombing mission: %s", + target.__class__.__name__, + ) + return waypoint + + waypoint.tasks.append( + BombingRunway(airport_id=target.airport.id, group_attack=True) + ) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/pydcswaypointbuilder.py b/game/missiongenerator/aircraft/waypoints/pydcswaypointbuilder.py new file mode 100644 index 00000000..acc84dfa --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/pydcswaypointbuilder.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from datetime import timedelta +from typing import Any, Iterable, Union + +from dcs import Mission, Point +from dcs.planes import AJS37, F_14B, JF_17 +from dcs.point import MovingPoint, PointAction +from dcs.unit import Unit +from dcs.unitgroup import FlyingGroup + +from game.ato import Flight, FlightWaypoint +from game.ato.flightwaypointtype import FlightWaypointType +from game.missiongenerator.airsupport import AirSupport +from game.theater import MissionTarget + +TARGET_WAYPOINTS = ( + FlightWaypointType.TARGET_GROUP_LOC, + FlightWaypointType.TARGET_POINT, + FlightWaypointType.TARGET_SHIP, +) + + +class PydcsWaypointBuilder: + def __init__( + self, + waypoint: FlightWaypoint, + group: FlyingGroup[Any], + flight: Flight, + mission: Mission, + air_support: AirSupport, + ) -> None: + self.waypoint = waypoint + self.group = group + self.package = flight.package + self.flight = flight + self.mission = mission + self.air_support = air_support + + def build(self) -> MovingPoint: + waypoint = self.group.add_waypoint( + Point(self.waypoint.x, self.waypoint.y), + self.waypoint.alt.meters, + name=self.waypoint.name, + ) + + if self.waypoint.flyover: + waypoint.action = PointAction.FlyOverPoint + # It seems we need to leave waypoint.type exactly as it is even + # though it's set to "Turning Point". If I set this to "Fly Over + # Point" and then save the mission in the ME DCS resets it. + + waypoint.alt_type = self.waypoint.alt_type + tot = self.flight.flight_plan.tot_for_waypoint(self.waypoint) + if tot is not None: + self.set_waypoint_tot(waypoint, tot) + return waypoint + + def set_waypoint_tot(self, waypoint: MovingPoint, tot: timedelta) -> None: + self.waypoint.tot = tot + if not self._viggen_client_tot(): + waypoint.ETA = int(tot.total_seconds()) + waypoint.ETA_locked = True + waypoint.speed_locked = False + + def _viggen_client_tot(self) -> bool: + """Viggen player aircraft consider any waypoint with a TOT set to be a target ("M") waypoint. + If the flight is a player controlled Viggen flight, no TOT should be set on any waypoint except actual target waypoints. + """ + if ( + self.flight.client_count > 0 + and self.flight.unit_type.dcs_unit_type == AJS37 + ) and (self.waypoint.waypoint_type not in TARGET_WAYPOINTS): + return True + else: + return False + + def register_special_waypoints( + self, targets: Iterable[Union[MissionTarget, Unit]] + ) -> None: + """Create special target waypoints for various aircraft""" + for i, t in enumerate(targets): + if self.group.units[0].unit_type == JF_17 and i < 4: + self.group.add_nav_target_point(t.position, "PP" + str(i + 1)) + if self.group.units[0].unit_type == F_14B and i == 0: + self.group.add_nav_target_point(t.position, "ST") diff --git a/game/missiongenerator/aircraft/waypoints/racetrack.py b/game/missiongenerator/aircraft/waypoints/racetrack.py new file mode 100644 index 00000000..a29677e3 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/racetrack.py @@ -0,0 +1,87 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import ( + ActivateBeaconCommand, + ControlledTask, + EngageTargets, + OrbitAction, + Tanker, + Targets, +) + +from game.ato import FlightType +from gen.flights.flightplan import PatrollingFlightPlan +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class RaceTrackBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + flight_plan = self.flight.flight_plan + + if not isinstance(flight_plan, PatrollingFlightPlan): + flight_plan_type = flight_plan.__class__.__name__ + logging.error( + f"Cannot create race track for {self.flight} because " + f"{flight_plan_type} does not define a patrol." + ) + return waypoint + + # NB: It's important that the engage task comes before the orbit task. + # Though they're on the same waypoint, if the orbit task comes first it + # is their first priority and they will not engage any targets because + # they're fully focused on orbiting. If the STE task is first, they will + # engage targets if available and orbit if they find nothing to shoot. + if self.flight.flight_type is FlightType.REFUELING: + self.configure_refueling_actions(waypoint) + + # TODO: Move the properties of this task into the flight plan? + # CAP is the only current user of this so it's not a big deal, but might + # be good to make this usable for things like BAI when we add that + # later. + cap_types = {FlightType.BARCAP, FlightType.TARCAP} + if self.flight.flight_type in cap_types: + engagement_distance = int(flight_plan.engagement_distance.meters) + waypoint.tasks.append( + EngageTargets( + max_distance=engagement_distance, targets=[Targets.All.Air] + ) + ) + + orbit = OrbitAction( + altitude=waypoint.alt, + pattern=OrbitAction.OrbitPattern.RaceTrack, + speed=int(flight_plan.patrol_speed.kph), + ) + + racetrack = ControlledTask(orbit) + self.set_waypoint_tot(waypoint, flight_plan.patrol_start_time) + racetrack.stop_after_time(int(flight_plan.patrol_end_time.total_seconds())) + waypoint.add_task(racetrack) + + return waypoint + + def configure_refueling_actions(self, waypoint: MovingPoint) -> None: + waypoint.add_task(Tanker()) + + if self.flight.unit_type.dcs_unit_type.tacan: + tanker_info = self.air_support.tankers[-1] + tacan = tanker_info.tacan + tacan_callsign = { + "Texaco": "TEX", + "Arco": "ARC", + "Shell": "SHL", + }.get(tanker_info.callsign) + + waypoint.add_task( + ActivateBeaconCommand( + tacan.number, + tacan.band.value, + tacan_callsign, + bearing=True, + unit_id=self.group.units[0].id, + aa=True, + ) + ) diff --git a/game/missiongenerator/aircraft/waypoints/racetrackend.py b/game/missiongenerator/aircraft/waypoints/racetrackend.py new file mode 100644 index 00000000..dfddd103 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/racetrackend.py @@ -0,0 +1,22 @@ +import logging + +from dcs.point import MovingPoint + +from gen.flights.flightplan import PatrollingFlightPlan +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class RaceTrackEndBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + if not isinstance(self.flight.flight_plan, PatrollingFlightPlan): + flight_plan_type = self.flight.flight_plan.__class__.__name__ + logging.error( + f"Cannot create race track for {self.flight} because " + f"{flight_plan_type} does not define a patrol." + ) + return waypoint + + self.waypoint.departure_time = self.flight.flight_plan.patrol_end_time + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/seadingress.py b/game/missiongenerator/aircraft/waypoints/seadingress.py new file mode 100644 index 00000000..d7fe87d7 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/seadingress.py @@ -0,0 +1,36 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import AttackGroup, WeaponType + +from game.theater import TheaterGroundObject +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class SeadIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + self.register_special_waypoints(self.waypoint.targets) + + target = self.package.target + if not isinstance(target, TheaterGroundObject): + logging.error( + "Unexpected target type for SEAD mission: %s", + target.__class__.__name__, + ) + return waypoint + + for group in target.groups: + miz_group = self.mission.find_group(group.name) + if miz_group is None: + logging.error(f"Could not find group for SEAD mission {group.name}") + continue + + task = AttackGroup(miz_group.id, weapon_type=WeaponType.Guided) + task.params["expend"] = "All" + task.params["attackQtyLimit"] = False + task.params["directionEnabled"] = False + task.params["altitudeEnabled"] = False + task.params["groupAttack"] = True + waypoint.tasks.append(task) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/strikeingress.py b/game/missiongenerator/aircraft/waypoints/strikeingress.py new file mode 100644 index 00000000..7e084729 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/strikeingress.py @@ -0,0 +1,50 @@ +from dcs import Point +from dcs.planes import B_17G, B_52H, Tu_22M3 +from dcs.point import MovingPoint +from dcs.task import Bombing, WeaponType + +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class StrikeIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + if self.group.units[0].unit_type in [B_17G, B_52H, Tu_22M3]: + return self.build_bombing() + else: + return self.build_strike() + + def build_bombing(self) -> MovingPoint: + waypoint = super().build() + + targets = self.waypoint.targets + if not targets: + return waypoint + + center = Point(0, 0) + for target in targets: + center.x += target.position.x + center.y += target.position.y + center.x /= len(targets) + center.y /= len(targets) + bombing = Bombing(center, weapon_type=WeaponType.Bombs) + bombing.params["expend"] = "All" + bombing.params["attackQtyLimit"] = False + bombing.params["directionEnabled"] = False + bombing.params["altitudeEnabled"] = False + bombing.params["groupAttack"] = True + waypoint.tasks.append(bombing) + return waypoint + + def build_strike(self) -> MovingPoint: + waypoint = super().build() + for target in self.waypoint.targets: + bombing = Bombing(target.position, weapon_type=WeaponType.Auto) + # If there is only one target, drop all ordnance in one pass. + if len(self.waypoint.targets) == 1: + bombing.params["expend"] = "All" + bombing.params["groupAttack"] = True + waypoint.tasks.append(bombing) + + # Register special waypoints + self.register_special_waypoints(self.waypoint.targets) + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/sweepingress.py b/game/missiongenerator/aircraft/waypoints/sweepingress.py new file mode 100644 index 00000000..15ae6250 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/sweepingress.py @@ -0,0 +1,33 @@ +import logging + +from dcs.point import MovingPoint +from dcs.task import EngageTargets, Targets + +from game.utils import nautical_miles +from gen.flights.flightplan import SweepFlightPlan +from .pydcswaypointbuilder import PydcsWaypointBuilder + + +class SweepIngressBuilder(PydcsWaypointBuilder): + def build(self) -> MovingPoint: + waypoint = super().build() + + if not isinstance(self.flight.flight_plan, SweepFlightPlan): + flight_plan_type = self.flight.flight_plan.__class__.__name__ + logging.error( + f"Cannot create sweep for {self.flight} because " + f"{flight_plan_type} is not a sweep flight plan." + ) + return waypoint + + waypoint.tasks.append( + EngageTargets( + max_distance=int(nautical_miles(50).meters), + targets=[ + Targets.All.Air.Planes.Fighters, + Targets.All.Air.Planes.MultiroleFighters, + ], + ) + ) + + return waypoint diff --git a/game/missiongenerator/aircraft/waypoints/waypointgenerator.py b/game/missiongenerator/aircraft/waypoints/waypointgenerator.py new file mode 100644 index 00000000..3dab37f2 --- /dev/null +++ b/game/missiongenerator/aircraft/waypoints/waypointgenerator.py @@ -0,0 +1,263 @@ +import itertools +import random +from datetime import timedelta +from typing import Any + +from dcs import Mission +from dcs.action import AITaskPush, ActivateGroup +from dcs.condition import CoalitionHasAirdrome, TimeAfter +from dcs.planes import AJS37 +from dcs.task import StartCommand +from dcs.triggers import Event, TriggerOnce, TriggerRule +from dcs.unitgroup import FlyingGroup + +from game.ato import Flight, FlightWaypoint +from game.ato.flightwaypointtype import FlightWaypointType +from game.ato.starttype import StartType +from game.missiongenerator.airsupport import AirSupport +from game.settings import Settings +from game.theater import ControlPointType +from game.utils import meters, pairwise +from gen.flights.traveltime import TotEstimator + +from .pydcswaypointbuilder import PydcsWaypointBuilder, TARGET_WAYPOINTS +from .baiingress import BaiIngressBuilder +from .cargostop import CargoStopBuilder +from .casingress import CasIngressBuilder +from .deadingress import DeadIngressBuilder +from .default import DefaultWaypointBuilder +from .holdpoint import HoldPointBuilder +from .joinpoint import JoinPointBuilder +from .landingpoint import LandingPointBuilder +from .ocaaircraftingress import OcaAircraftIngressBuilder +from .ocarunwayingress import OcaRunwayIngressBuilder +from .racetrack import RaceTrackBuilder +from .racetrackend import RaceTrackEndBuilder +from .seadingress import SeadIngressBuilder +from .strikeingress import StrikeIngressBuilder +from .sweepingress import SweepIngressBuilder + + +class WaypointGenerator: + def __init__( + self, + flight: Flight, + group: FlyingGroup[Any], + mission: Mission, + settings: Settings, + air_support: AirSupport, + ) -> None: + self.flight = flight + self.group = group + self.mission = mission + self.settings = settings + self.air_support = air_support + + def create_waypoints(self) -> tuple[timedelta, list[FlightWaypoint]]: + for waypoint in self.flight.points: + waypoint.tot = None + + takeoff_point = FlightWaypoint.from_pydcs( + self.group.points[0], self.flight.from_cp + ) + mission_start_time = self.set_takeoff_time(takeoff_point) + + filtered_points: list[FlightWaypoint] = [] + + for point in self.flight.points: + if point.only_for_player and not self.flight.client_count: + continue + filtered_points.append(point) + # Only add 1 target waypoint for Viggens. This only affects player flights, the + # Viggen can't have more than 9 waypoints which leaves us with two target point + # under the current flight plans. + # TODO: Make this smarter. It currently targets a random unit in the group. + # This could be updated to make it pick the "best" two targets in the group. + if self.flight.unit_type.dcs_unit_type is AJS37 and self.flight.client_count: + viggen_target_points = [ + (idx, point) + for idx, point in enumerate(filtered_points) + if point.waypoint_type in TARGET_WAYPOINTS + ] + if viggen_target_points: + keep_target = viggen_target_points[ + random.randint(0, len(viggen_target_points) - 1) + ] + filtered_points = [ + point + for idx, point in enumerate(filtered_points) + if ( + point.waypoint_type not in TARGET_WAYPOINTS + or idx == keep_target[0] + ) + ] + + for idx, point in enumerate(filtered_points): + self.builder_for_waypoint(point).build() + + # Set here rather than when the FlightData is created so they waypoints + # have their TOTs and fuel minimums set. Once we're more confident in our fuel + # estimation ability the minimum fuel amounts will be calculated during flight + # plan construction, but for now it's only used by the kneeboard so is generated + # late. + waypoints = [takeoff_point] + self.flight.points + self._estimate_min_fuel_for(waypoints) + return mission_start_time, waypoints + + def builder_for_waypoint(self, waypoint: FlightWaypoint) -> PydcsWaypointBuilder: + builders = { + FlightWaypointType.DROP_OFF: CargoStopBuilder, + FlightWaypointType.INGRESS_BAI: BaiIngressBuilder, + FlightWaypointType.INGRESS_CAS: CasIngressBuilder, + FlightWaypointType.INGRESS_DEAD: DeadIngressBuilder, + FlightWaypointType.INGRESS_OCA_AIRCRAFT: OcaAircraftIngressBuilder, + FlightWaypointType.INGRESS_OCA_RUNWAY: OcaRunwayIngressBuilder, + FlightWaypointType.INGRESS_SEAD: SeadIngressBuilder, + FlightWaypointType.INGRESS_STRIKE: StrikeIngressBuilder, + FlightWaypointType.INGRESS_SWEEP: SweepIngressBuilder, + FlightWaypointType.JOIN: JoinPointBuilder, + FlightWaypointType.LANDING_POINT: LandingPointBuilder, + FlightWaypointType.LOITER: HoldPointBuilder, + FlightWaypointType.PATROL: RaceTrackEndBuilder, + FlightWaypointType.PATROL_TRACK: RaceTrackBuilder, + FlightWaypointType.PICKUP: CargoStopBuilder, + } + builder = builders.get(waypoint.waypoint_type, DefaultWaypointBuilder) + return builder( + waypoint, self.group, self.flight, self.mission, self.air_support + ) + + def _estimate_min_fuel_for(self, waypoints: list[FlightWaypoint]) -> None: + if self.flight.unit_type.fuel_consumption is None: + return + + combat_speed_types = { + FlightWaypointType.INGRESS_BAI, + FlightWaypointType.INGRESS_CAS, + FlightWaypointType.INGRESS_DEAD, + FlightWaypointType.INGRESS_ESCORT, + FlightWaypointType.INGRESS_OCA_AIRCRAFT, + FlightWaypointType.INGRESS_OCA_RUNWAY, + FlightWaypointType.INGRESS_SEAD, + FlightWaypointType.INGRESS_STRIKE, + FlightWaypointType.INGRESS_SWEEP, + FlightWaypointType.SPLIT, + } | set(TARGET_WAYPOINTS) + + consumption = self.flight.unit_type.fuel_consumption + min_fuel: float = consumption.min_safe + + # The flight plan (in reverse) up to and including the arrival point. + main_flight_plan = reversed(waypoints) + try: + while waypoint := next(main_flight_plan): + if waypoint.waypoint_type is FlightWaypointType.LANDING_POINT: + waypoint.min_fuel = min_fuel + main_flight_plan = itertools.chain([waypoint], main_flight_plan) + break + except StopIteration: + # Some custom flight plan without a landing point. Skip it. + return + + for b, a in pairwise(main_flight_plan): + distance = meters(a.position.distance_to_point(b.position)) + if a.waypoint_type is FlightWaypointType.TAKEOFF: + ppm = consumption.climb + elif b.waypoint_type in combat_speed_types: + ppm = consumption.combat + else: + ppm = consumption.cruise + min_fuel += distance.nautical_miles * ppm + a.min_fuel = min_fuel + + def set_takeoff_time(self, waypoint: FlightWaypoint) -> timedelta: + estimator = TotEstimator(self.flight.package) + start_time = estimator.mission_start_time(self.flight) + + if self.should_delay_flight(start_time): + if self.should_activate_late(): + # Late activation causes the aircraft to not be spawned + # until triggered. + self.set_activation_time(start_time) + elif self.flight.start_type is StartType.COLD: + # Setting the start time causes the AI to wait until the + # specified time to begin their startup sequence. + self.set_startup_time(start_time) + + # And setting *our* waypoint TOT causes the takeoff time to show up in + # the player's kneeboard. + waypoint.tot = self.flight.flight_plan.takeoff_time() + return start_time + + def set_activation_time(self, delay: timedelta) -> None: + # Note: Late activation causes the waypoint TOTs to look *weird* in the + # mission editor. Waypoint times will be relative to the group + # activation time rather than in absolute local time. A flight delayed + # until 09:10 when the overall mission start time is 09:00, with a join + # time of 09:30 will show the join time as 00:30, not 09:30. + self.group.late_activation = True + + activation_trigger = TriggerOnce( + Event.NoEvent, f"FlightLateActivationTrigger{self.group.id}" + ) + activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds()))) + + self.prevent_spawn_at_hostile_airbase(activation_trigger) + activation_trigger.add_action(ActivateGroup(self.group.id)) + self.mission.triggerrules.triggers.append(activation_trigger) + + def prevent_spawn_at_hostile_airbase(self, trigger: TriggerRule) -> None: + # Prevent delayed flights from spawning at airbases if they were + # captured before they've spawned. + if self.flight.from_cp.cptype != ControlPointType.AIRBASE: + return + + trigger.add_condition( + CoalitionHasAirdrome( + self.flight.squadron.coalition.coalition_id, self.flight.from_cp.id + ) + ) + + def set_startup_time(self, delay: timedelta) -> None: + # Uncontrolled causes the AI unit to spawn, but not begin startup. + self.group.uncontrolled = True + + activation_trigger = TriggerOnce( + Event.NoEvent, f"FlightStartTrigger{self.group.id}" + ) + activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds()))) + + self.prevent_spawn_at_hostile_airbase(activation_trigger) + self.group.add_trigger_action(StartCommand()) + activation_trigger.add_action(AITaskPush(self.group.id, len(self.group.tasks))) + self.mission.triggerrules.triggers.append(activation_trigger) + + def should_delay_flight(self, start_time: timedelta) -> bool: + if start_time.total_seconds() <= 0: + return False + + if not self.flight.client_count: + return True + + if start_time < timedelta(minutes=10): + # Don't bother delaying client flights with short start delays. Much + # more than ten minutes starts to eat into fuel a bit more + # (espeicially for something fuel limited like a Harrier). + return False + + return not self.settings.never_delay_player_flights + + def should_activate_late(self) -> bool: + if self.flight.start_type is StartType.COLD: + # Avoid spawning aircraft in the air or on the runway until it's + # time for their mission. Also avoid burning through gas spawning + # hot aircraft hours before their takeoff time. + return True + + if self.flight.from_cp.is_fleet: + # Carrier spawns will crowd the carrier deck, especially without + # super carrier. + # TODO: Is there enough parking on the supercarrier? + return True + + return False diff --git a/game/missiongenerator/aircraftgenerator.py b/game/missiongenerator/aircraftgenerator.py deleted file mode 100644 index 85224300..00000000 --- a/game/missiongenerator/aircraftgenerator.py +++ /dev/null @@ -1,1923 +0,0 @@ -from __future__ import annotations - -import itertools -import logging -import random -from dataclasses import dataclass, field -from datetime import timedelta -from functools import cached_property -from typing import Dict, List, Optional, TYPE_CHECKING, Type, Union, Iterable, Any - -from dcs import helicopters -from dcs.action import AITaskPush, ActivateGroup -from dcs.condition import CoalitionHasAirdrome, TimeAfter -from dcs.country import Country -from dcs.flyingunit import FlyingUnit -from dcs.mapping import Point -from dcs.mission import Mission, StartType as DcsStartType -from dcs.planes import ( - AJS37, - B_17G, - B_52H, - C_101CC, - C_101EB, - F_14B, - JF_17, - Su_33, - Tu_22M3, -) -from dcs.point import MovingPoint, PointAction -from dcs.ships import KUZNECOW -from dcs.task import ( - AWACS, - AWACSTaskAction, - ActivateBeaconCommand, - AntishipStrike, - AttackGroup, - Bombing, - BombingRunway, - CAP, - CAS, - ControlledTask, - EPLRS, - EngageTargets, - EngageTargetsInZone, - FighterSweep, - GroundAttack, - OptROE, - OptRTBOnBingoFuel, - OptRTBOnOutOfAmmo, - OptReactOnThreat, - OptRestrictJettison, - OrbitAction, - Refueling, - RunwayAttack, - StartCommand, - Tanker, - Targets, - Transport, - WeaponType, - TargetType, - Nothing, -) -from dcs.terrain.terrain import Airport, NoParkingSlotError -from dcs.triggers import Event, TriggerOnce, TriggerRule -from dcs.unit import Unit, Skill -from dcs.unitgroup import FlyingGroup, ShipGroup, StaticGroup -from dcs.unittype import FlyingType - -from game import db -from game.ato.starttype import StartType -from game.data.weapons import Pylon, WeaponType as WeaponTypeEnum -from game.dcs.aircrafttype import AircraftType -from game.factions.faction import Faction -from game.settings import Settings -from game.theater.controlpoint import ( - Airfield, - ControlPoint, - ControlPointType, - NavalControlPoint, - OffMapSpawn, -) -from game.theater.missiontarget import MissionTarget -from game.theater.theatergroundobject import TheaterGroundObject -from game.transfers import MultiGroupTransport -from game.unitmap import UnitMap -from game.utils import Distance, meters, nautical_miles, pairwise -from game.ato.airtaaskingorder import AirTaskingOrder -from game.ato.package import Package -from gen.callsigns import create_group_callsign_from_unit -from game.ato.flighttype import FlightType -from game.ato.flightwaypointtype import FlightWaypointType -from game.ato.flightwaypoint import FlightWaypoint -from game.ato.flight import Flight -from game.radio.radios import RadioFrequency, RadioRegistry -from gen.runways import RunwayData -from game.radio.tacan import TacanBand, TacanRegistry, TacanUsage -from gen.callsigns import callsign_for_support_unit -from gen.flights.flightplan import ( - AwacsFlightPlan, - CasFlightPlan, - LoiterFlightPlan, - PatrollingFlightPlan, - RefuelingFlightPlan, - SweepFlightPlan, -) -from gen.flights.traveltime import GroundSpeed, TotEstimator -from gen.naming import namegen - -from .airsupport import AirSupport, AwacsInfo, TankerInfo -from .lasercoderegistry import LaserCodeRegistry - -if TYPE_CHECKING: - from game import Game - from game.squadrons import Pilot, Squadron - -WARM_START_HELI_ALT = meters(500) -WARM_START_ALTITUDE = meters(3000) - -RTB_ALTITUDE = meters(800) -RTB_DISTANCE = 5000 -HELI_ALT = 500 - -TARGET_WAYPOINTS = ( - FlightWaypointType.TARGET_GROUP_LOC, - FlightWaypointType.TARGET_POINT, - FlightWaypointType.TARGET_SHIP, -) - - -@dataclass(frozen=True) -class ChannelAssignment: - radio_id: int - channel: int - - -@dataclass -class FlightData: - """Details of a planned flight.""" - - #: The package that the flight belongs to. - package: Package - - flight_type: FlightType - - aircraft_type: AircraftType - - #: All units in the flight. - units: List[FlyingUnit] - - #: Total number of aircraft in the flight. - size: int - - #: True if this flight belongs to the player's coalition. - friendly: bool - - #: Number of seconds after mission start the flight is set to depart. - departure_delay: timedelta - - #: Arrival airport. - arrival: RunwayData - - #: Departure airport. - departure: RunwayData - - #: Diver airport. - divert: Optional[RunwayData] - - #: Waypoints of the flight plan. - waypoints: List[FlightWaypoint] - - #: Radio frequency for intra-flight communications. - intra_flight_channel: RadioFrequency - - #: Bingo fuel value in lbs. - bingo_fuel: Optional[int] - - joker_fuel: Optional[int] - - laser_codes: list[Optional[int]] - - custom_name: Optional[str] - - callsign: str = field(init=False) - - #: Map of radio frequencies to their assigned radio and channel, if any. - frequency_to_channel_map: Dict[RadioFrequency, ChannelAssignment] = field( - init=False, default_factory=dict - ) - - def __post_init__(self) -> None: - self.callsign = create_group_callsign_from_unit(self.units[0]) - - @property - def client_units(self) -> List[FlyingUnit]: - """List of playable units in the flight.""" - return [u for u in self.units if u.is_human()] - - def num_radio_channels(self, radio_id: int) -> int: - """Returns the number of preset channels for the given radio.""" - # Note: pydcs only initializes the radio presets for client slots. - return self.client_units[0].num_radio_channels(radio_id) - - def channel_for(self, frequency: RadioFrequency) -> Optional[ChannelAssignment]: - """Returns the radio and channel number for the given frequency.""" - return self.frequency_to_channel_map.get(frequency, None) - - def assign_channel( - self, radio_id: int, channel_id: int, frequency: RadioFrequency - ) -> None: - """Assigns a preset radio channel to the given frequency.""" - for unit in self.client_units: - unit.set_radio_channel_preset(radio_id, channel_id, frequency.mhz) - - # One frequency could be bound to multiple channels. Prefer the first, - # since with the current implementation it will be the lowest numbered - # channel. - if frequency not in self.frequency_to_channel_map: - self.frequency_to_channel_map[frequency] = ChannelAssignment( - radio_id, channel_id - ) - - -class AircraftGenerator: - def __init__( - self, - mission: Mission, - settings: Settings, - game: Game, - radio_registry: RadioRegistry, - tacan_registry: TacanRegistry, - laser_code_registry: LaserCodeRegistry, - unit_map: UnitMap, - air_support: AirSupport, - helipads: dict[ControlPoint, list[StaticGroup]], - ) -> None: - self.m = mission - self.game = game - self.settings = settings - self.radio_registry = radio_registry - self.tacan_registy = tacan_registry - self.laser_code_registry = laser_code_registry - self.unit_map = unit_map - self.flights: List[FlightData] = [] - self.air_support = air_support - self.helipads = helipads - - @cached_property - def use_client(self) -> bool: - """True if Client should be used instead of Player.""" - blue_clients = self.client_slots_in_ato(self.game.blue.ato) - red_clients = self.client_slots_in_ato(self.game.red.ato) - return blue_clients + red_clients > 1 - - @staticmethod - def client_slots_in_ato(ato: AirTaskingOrder) -> int: - total = 0 - for package in ato.packages: - for flight in package.flights: - total += flight.client_count - return total - - @staticmethod - def _start_type(start_type: str) -> DcsStartType: - if start_type == "Runway": - return DcsStartType.Runway - elif start_type == "Cold": - return DcsStartType.Cold - return DcsStartType.Warm - - @staticmethod - def _start_type_at_group( - start_type: str, - unit_type: Type[FlyingType], - at: Union[ShipGroup, StaticGroup], - ) -> DcsStartType: - group_units = at.units - # Setting Su-33s starting from the non-supercarrier Kuznetsov to take off from runway - # to work around a DCS AI issue preventing Su-33s from taking off when set to "Takeoff from ramp" (#1352) - if ( - unit_type.id == Su_33.id - and group_units[0] is not None - and group_units[0].type == KUZNECOW.id - ): - return DcsStartType.Runway - else: - return AircraftGenerator._start_type(start_type) - - def skill_level_for( - self, unit: FlyingUnit, pilot: Optional[Pilot], blue: bool - ) -> Skill: - if blue: - base_skill = Skill(self.game.settings.player_skill) - else: - base_skill = Skill(self.game.settings.enemy_skill) - - if pilot is None: - logging.error(f"Cannot determine skill level: {unit.name} has not pilot") - return base_skill - - levels = [ - Skill.Average, - Skill.Good, - Skill.High, - Skill.Excellent, - ] - current_level = levels.index(base_skill) - missions_for_skill_increase = 4 - increase = pilot.record.missions_flown // missions_for_skill_increase - capped_increase = min(current_level + increase, len(levels) - 1) - new_level = (capped_increase, current_level)[ - self.game.settings.ai_pilot_levelling - ] - return levels[new_level] - - def set_skill(self, unit: FlyingUnit, pilot: Optional[Pilot], blue: bool) -> None: - if pilot is None or not pilot.player: - unit.skill = self.skill_level_for(unit, pilot, blue) - return - - if self.use_client: - unit.set_client() - else: - unit.set_player() - - @staticmethod - def livery_from_db(flight: Flight) -> Optional[str]: - return db.PLANE_LIVERY_OVERRIDES.get(flight.unit_type.dcs_unit_type) - - def livery_from_faction(self, flight: Flight) -> Optional[str]: - faction = self.game.faction_for(player=flight.departure.captured) - if (choices := faction.liveries_overrides.get(flight.unit_type)) is not None: - return random.choice(choices) - return None - - @staticmethod - def livery_from_squadron(flight: Flight) -> Optional[str]: - return flight.squadron.livery - - def livery_for(self, flight: Flight) -> Optional[str]: - if (livery := self.livery_from_squadron(flight)) is not None: - return livery - if (livery := self.livery_from_faction(flight)) is not None: - return livery - if (livery := self.livery_from_db(flight)) is not None: - return livery - return None - - def _setup_livery(self, flight: Flight, group: FlyingGroup[Any]) -> None: - livery = self.livery_for(flight) - if livery is None: - return - for unit in group.units: - unit.livery_id = livery - - def _setup_group( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - unit_type = group.units[0].unit_type - - self._setup_payload(flight, group) - self._setup_livery(flight, group) - - laser_codes = [] - for unit, pilot in zip(group.units, flight.roster.pilots): - player = pilot is not None and pilot.player - self.set_skill(unit, pilot, blue=flight.departure.captured) - - code: Optional[int] = None - if flight.loadout.has_weapon_of_type(WeaponTypeEnum.TGP) and player: - code = self.laser_code_registry.get_next_laser_code() - laser_codes.append(code) - - # Set up F-14 Client to have pre-stored alignment - if unit_type is F_14B: - unit.set_property(F_14B.Properties.INSAlignmentStored.id, True) - - group.points[0].tasks.append( - OptReactOnThreat(OptReactOnThreat.Values.EvadeFire) - ) - - if ( - flight.flight_type == FlightType.AEWC - or flight.flight_type == FlightType.REFUELING - ): - channel = self.radio_registry.alloc_uhf() - else: - channel = flight.unit_type.alloc_flight_radio(self.radio_registry) - - try: - group.set_frequency(channel.mhz) - except TypeError: - # TODO: Remote try/except when pydcs bug is fixed. - # https://github.com/pydcs/dcs/issues/175 - # pydcs now emits an error when attempting to set a preset channel for an - # aircraft that doesn't support them. We're not choosing to set a preset - # here, we're just trying to set the AI's frequency. pydcs automatically - # tries to set channel 1 when it does that and doesn't suppress this new - # error. - pass - - divert = None - if flight.divert is not None: - divert = flight.divert.active_runway(self.game.conditions, dynamic_runways) - - self.flights.append( - FlightData( - package=package, - aircraft_type=flight.unit_type, - flight_type=flight.flight_type, - units=group.units, - size=len(group.units), - friendly=flight.from_cp.captured, - # Set later. - departure_delay=timedelta(), - departure=flight.departure.active_runway( - self.game.conditions, dynamic_runways - ), - arrival=flight.arrival.active_runway( - self.game.conditions, dynamic_runways - ), - divert=divert, - # Waypoints are added later, after they've had their TOTs set. - waypoints=[], - intra_flight_channel=channel, - bingo_fuel=flight.flight_plan.bingo_fuel, - joker_fuel=flight.flight_plan.joker_fuel, - custom_name=flight.custom_name, - laser_codes=laser_codes, - ) - ) - - # Special case so Su 33 and C101 can take off - if unit_type in [Su_33, C_101EB, C_101CC]: - self.set_reduced_fuel(flight, group, unit_type) - - if isinstance(flight.flight_plan, AwacsFlightPlan): - callsign = callsign_for_support_unit(group) - - self.air_support.awacs.append( - AwacsInfo( - group_name=str(group.name), - callsign=callsign, - freq=channel, - depature_location=flight.departure.name, - end_time=flight.flight_plan.mission_departure_time, - start_time=flight.flight_plan.mission_start_time, - blue=flight.departure.captured, - ) - ) - - if isinstance(flight.flight_plan, RefuelingFlightPlan): - callsign = callsign_for_support_unit(group) - - tacan = self.tacan_registy.alloc_for_band(TacanBand.Y, TacanUsage.AirToAir) - self.air_support.tankers.append( - TankerInfo( - group_name=str(group.name), - callsign=callsign, - variant=flight.unit_type.name, - freq=channel, - tacan=tacan, - start_time=flight.flight_plan.patrol_start_time, - end_time=flight.flight_plan.patrol_end_time, - blue=flight.departure.captured, - ) - ) - - def _generate_at_airport( - self, - name: str, - side: Country, - unit_type: Type[FlyingType], - count: int, - start_type: str, - airport: Airport, - ) -> FlyingGroup[Any]: - assert count > 0 - - logging.info("airgen: {} for {} at {}".format(unit_type, side.id, airport)) - return self.m.flight_group_from_airport( - country=side, - name=name, - aircraft_type=unit_type, - airport=airport, - maintask=None, - start_type=self._start_type(start_type), - group_size=count, - parking_slots=None, - ) - - def _generate_over_departure( - self, name: str, side: Country, flight: Flight, origin: ControlPoint - ) -> FlyingGroup[Any]: - assert flight.count > 0 - at = origin.position - - alt_type = "RADIO" - if isinstance(origin, OffMapSpawn): - alt = flight.flight_plan.waypoints[0].alt - alt_type = flight.flight_plan.waypoints[0].alt_type - elif flight.unit_type in helicopters.helicopter_map.values(): - alt = WARM_START_HELI_ALT - else: - alt = WARM_START_ALTITUDE - - speed = GroundSpeed.for_flight(flight, alt) - - pos = Point(at.x + random.randint(100, 1000), at.y + random.randint(100, 1000)) - - logging.info( - "airgen: {} for {} at {} at {}".format( - flight.unit_type, side.id, alt, int(speed.kph) - ) - ) - group = self.m.flight_group( - country=side, - name=name, - aircraft_type=flight.unit_type.dcs_unit_type, - airport=None, - position=pos, - altitude=alt.meters, - speed=speed.kph, - maintask=None, - group_size=flight.count, - ) - - group.points[0].alt_type = alt_type - return group - - def _generate_at_group( - self, - name: str, - side: Country, - unit_type: Type[FlyingType], - count: int, - start_type: str, - at: Union[ShipGroup, StaticGroup], - ) -> FlyingGroup[Any]: - assert count > 0 - - logging.info("airgen: {} for {} at unit {}".format(unit_type, side.id, at)) - return self.m.flight_group_from_unit( - country=side, - name=name, - aircraft_type=unit_type, - pad_group=at, - maintask=None, - start_type=self._start_type_at_group(start_type, unit_type, at), - group_size=count, - ) - - def _generate_at_cp_helipad( - self, - name: str, - side: Country, - unit_type: Type[FlyingType], - count: int, - start_type: str, - cp: ControlPoint, - ) -> FlyingGroup[Any]: - assert count > 0 - - logging.info( - "airgen at cp's helipads : {} for {} at {}".format( - unit_type, side.id, cp.name - ) - ) - - try: - helipad = self.helipads[cp].pop() - except IndexError as ex: - raise RuntimeError(f"Not enough helipads available at {cp}") from ex - - group = self._generate_at_group( - name=name, - side=side, - unit_type=unit_type, - count=count, - start_type=start_type, - at=helipad, - ) - - # Note : A bit dirty, need better support in pydcs - group.points[0].action = PointAction.FromGroundArea - group.points[0].type = "TakeOffGround" - group.units[0].heading = helipad.units[0].heading - if start_type != "Cold": - group.points[0].action = PointAction.FromGroundAreaHot - group.points[0].type = "TakeOffGroundHot" - - for i in range(count - 1): - try: - helipad = self.helipads[cp].pop() - group.units[1 + i].position = Point(helipad.x, helipad.y) - group.units[1 + i].heading = helipad.units[0].heading - except IndexError as ex: - raise RuntimeError(f"Not enough helipads available at {cp}") from ex - return group - - def _add_radio_waypoint( - self, - group: FlyingGroup[Any], - position: Point, - altitude: Distance, - airspeed: int = 600, - ) -> MovingPoint: - point = group.add_waypoint(position, altitude.meters, airspeed) - point.alt_type = "RADIO" - return point - - @staticmethod - def _at_position(at: Union[Point, ShipGroup, Type[Airport]]) -> Point: - if isinstance(at, Point): - return at - elif isinstance(at, ShipGroup): - return at.position - elif issubclass(at, Airport): - return at.position - else: - assert False - - def _setup_payload(self, flight: Flight, group: FlyingGroup[Any]) -> None: - for p in group.units: - p.pylons.clear() - - loadout = flight.loadout - if self.game.settings.restrict_weapons_by_date: - loadout = loadout.degrade_for_date(flight.unit_type, self.game.date) - - for pylon_number, weapon in loadout.pylons.items(): - if weapon is None: - continue - pylon = Pylon.for_aircraft(flight.unit_type, pylon_number) - pylon.equip(group, weapon) - - def clear_parking_slots(self) -> None: - for cp in self.game.theater.controlpoints: - for parking_slot in cp.parking_slots: - parking_slot.unit_id = None - - def generate_flights( - self, - country: Country, - ato: AirTaskingOrder, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - - for package in ato.packages: - if not package.flights: - continue - for flight in package.flights: - logging.info(f"Generating flight: {flight.unit_type}") - group = self.generate_planned_flight(country, flight) - self.unit_map.add_aircraft(group, flight) - self.setup_flight_group(group, package, flight, dynamic_runways) - self.create_waypoints(group, package, flight) - - def spawn_unused_aircraft( - self, player_country: Country, enemy_country: Country - ) -> None: - for control_point in self.game.theater.controlpoints: - if not isinstance(control_point, Airfield): - continue - - faction = self.game.coalition_for(control_point.captured).faction - if control_point.captured: - country = player_country - else: - country = enemy_country - - for squadron in control_point.squadrons: - try: - self._spawn_unused_for(squadron, country, faction) - except NoParkingSlotError: - # If we run out of parking, stop spawning aircraft at this base. - break - - def _spawn_unused_for( - self, squadron: Squadron, country: Country, faction: Faction - ) -> None: - assert isinstance(squadron.location, Airfield) - for _ in range(squadron.untasked_aircraft): - # Creating a flight even those this isn't a fragged mission lets us - # reuse the existing debriefing code. - # TODO: Special flight type? - flight = Flight( - Package(squadron.location), - faction.country, - squadron, - 1, - FlightType.BARCAP, - StartType.COLD, - divert=None, - ) - - group = self._generate_at_airport( - name=namegen.next_aircraft_name(country, flight.departure.id, flight), - side=country, - unit_type=squadron.aircraft.dcs_unit_type, - count=1, - start_type="Cold", - airport=squadron.location.airport, - ) - - self._setup_livery(flight, group) - - group.uncontrolled = True - self.unit_map.add_aircraft(group, flight) - - def set_activation_time( - self, flight: Flight, group: FlyingGroup[Any], delay: timedelta - ) -> None: - # Note: Late activation causes the waypoint TOTs to look *weird* in the - # mission editor. Waypoint times will be relative to the group - # activation time rather than in absolute local time. A flight delayed - # until 09:10 when the overall mission start time is 09:00, with a join - # time of 09:30 will show the join time as 00:30, not 09:30. - group.late_activation = True - - activation_trigger = TriggerOnce( - Event.NoEvent, f"FlightLateActivationTrigger{group.id}" - ) - activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds()))) - - self.prevent_spawn_at_hostile_airbase(flight, activation_trigger) - activation_trigger.add_action(ActivateGroup(group.id)) - self.m.triggerrules.triggers.append(activation_trigger) - - def set_startup_time( - self, flight: Flight, group: FlyingGroup[Any], delay: timedelta - ) -> None: - # Uncontrolled causes the AI unit to spawn, but not begin startup. - group.uncontrolled = True - - activation_trigger = TriggerOnce(Event.NoEvent, f"FlightStartTrigger{group.id}") - activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds()))) - - self.prevent_spawn_at_hostile_airbase(flight, activation_trigger) - group.add_trigger_action(StartCommand()) - activation_trigger.add_action(AITaskPush(group.id, len(group.tasks))) - self.m.triggerrules.triggers.append(activation_trigger) - - def prevent_spawn_at_hostile_airbase( - self, flight: Flight, trigger: TriggerRule - ) -> None: - # Prevent delayed flights from spawning at airbases if they were - # captured before they've spawned. - if flight.from_cp.cptype != ControlPointType.AIRBASE: - return - - coalition = self.game.coalition_for(flight.departure.captured).coalition_id - trigger.add_condition(CoalitionHasAirdrome(coalition, flight.from_cp.id)) - - def generate_flight_at_departure( - self, country: Country, flight: Flight, start_type: StartType - ) -> FlyingGroup[Any]: - name = namegen.next_aircraft_name(country, flight.departure.id, flight) - cp = flight.departure - try: - if start_type is StartType.IN_FLIGHT: - group = self._generate_over_departure( - name=name, side=country, flight=flight, origin=cp - ) - return group - elif isinstance(cp, NavalControlPoint): - group_name = cp.get_carrier_group_name() - carrier_group = self.m.find_group(group_name) - if not isinstance(carrier_group, ShipGroup): - raise RuntimeError( - f"Carrier group {carrier_group} is a " - "{carrier_group.__class__.__name__}, expected a ShipGroup" - ) - return self._generate_at_group( - name=name, - side=country, - unit_type=flight.unit_type.dcs_unit_type, - count=flight.count, - start_type=start_type.value, - at=carrier_group, - ) - else: - # If the flight is an helicopter flight, then prioritize dedicated helipads - if flight.unit_type.helicopter: - return self._generate_at_cp_helipad( - name=name, - side=country, - unit_type=flight.unit_type.dcs_unit_type, - count=flight.count, - start_type=start_type.value, - cp=cp, - ) - - if not isinstance(cp, Airfield): - raise RuntimeError( - f"Attempted to spawn at airfield for non-airfield {cp}" - ) - return self._generate_at_airport( - name=name, - side=country, - unit_type=flight.unit_type.dcs_unit_type, - count=flight.count, - start_type=start_type.value, - airport=cp.airport, - ) - except NoParkingSlotError: - # Generated when there is no place on Runway or on Parking Slots - logging.exception( - "No room on runway or parking slots. Starting from the air." - ) - flight.start_type = StartType.IN_FLIGHT - group = self._generate_over_departure( - name=name, side=country, flight=flight, origin=cp - ) - group.points[0].alt = 1500 - return group - - def generate_planned_flight( - self, country: Country, flight: Flight - ) -> FlyingGroup[Any]: - return self.generate_flight_at_departure(country, flight, flight.start_type) - - @staticmethod - def set_reduced_fuel( - flight: Flight, group: FlyingGroup[Any], unit_type: Type[FlyingType] - ) -> None: - if unit_type is Su_33: - for unit in group.units: - if flight.flight_type is not CAP: - unit.fuel = Su_33.fuel_max / 2.2 - else: - unit.fuel = Su_33.fuel_max * 0.8 - elif unit_type in [C_101EB, C_101CC]: - for unit in group.units: - unit.fuel = unit_type.fuel_max * 0.5 - else: - raise RuntimeError(f"No reduced fuel case for type {unit_type}") - - @staticmethod - def flight_always_keeps_gun(flight: Flight) -> bool: - # Never take bullets from players. They're smart enough to know when to use it - # and when to RTB. - if flight.client_count > 0: - return True - - return flight.unit_type.always_keeps_gun - - def configure_behavior( - self, - flight: Flight, - group: FlyingGroup[Any], - react_on_threat: Optional[OptReactOnThreat.Values] = None, - roe: Optional[int] = None, - rtb_winchester: Optional[OptRTBOnOutOfAmmo.Values] = None, - restrict_jettison: Optional[bool] = None, - mission_uses_gun: bool = True, - ) -> None: - group.points[0].tasks.clear() - if react_on_threat is not None: - group.points[0].tasks.append(OptReactOnThreat(react_on_threat)) - if roe is not None: - group.points[0].tasks.append(OptROE(roe)) - if restrict_jettison is not None: - group.points[0].tasks.append(OptRestrictJettison(restrict_jettison)) - if rtb_winchester is not None: - group.points[0].tasks.append(OptRTBOnOutOfAmmo(rtb_winchester)) - - # Confiscate the bullets of AI missions that do not rely on the gun. There is no - # "all but gun" RTB winchester option, so air to ground missions with mixed - # weapon types will insist on using all of their bullets after running out of - # missiles and bombs. Take away their bullets so they don't strafe a Tor. - # - # Exceptions are made for player flights and for airframes where the gun is - # essential like the A-10 or warbirds. - if not mission_uses_gun and not self.flight_always_keeps_gun(flight): - for unit in group.units: - unit.gun = 0 - - group.points[0].tasks.append(OptRTBOnBingoFuel(True)) - # Do not restrict afterburner. - # https://forums.eagle.ru/forum/english/digital-combat-simulator/dcs-world-2-5/bugs-and-problems-ai/ai-ad/7121294-ai-stuck-at-high-aoa-after-making-sharp-turn-if-afterburner-is-restricted - - @staticmethod - def configure_eplrs(group: FlyingGroup[Any], flight: Flight) -> None: - if flight.unit_type.eplrs_capable: - group.points[0].tasks.append(EPLRS(group.id)) - - def configure_cap( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = CAP.name - self._setup_group(group, package, flight, dynamic_runways) - - if not flight.unit_type.gunfighter: - ammo_type = OptRTBOnOutOfAmmo.Values.AAM - else: - ammo_type = OptRTBOnOutOfAmmo.Values.Cannon - - self.configure_behavior(flight, group, rtb_winchester=ammo_type) - - def configure_sweep( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = FighterSweep.name - self._setup_group(group, package, flight, dynamic_runways) - - if not flight.unit_type.gunfighter: - ammo_type = OptRTBOnOutOfAmmo.Values.AAM - else: - ammo_type = OptRTBOnOutOfAmmo.Values.Cannon - - self.configure_behavior(flight, group, rtb_winchester=ammo_type) - - def configure_cas( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = CAS.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - rtb_winchester=OptRTBOnOutOfAmmo.Values.Unguided, - restrict_jettison=True, - ) - - def configure_dead( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - # Only CAS and SEAD are capable of the Attack Group task. SEAD is arguably more - # appropriate but it has an extremely limited list of capable aircraft, whereas - # CAS has a much wider selection of units. - # - # Note that the only effect that the DCS task type has is in determining which - # waypoint actions the group may perform. - group.task = CAS.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - rtb_winchester=OptRTBOnOutOfAmmo.Values.All, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_sead( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - # CAS is able to perform all the same tasks as SEAD using a superset of the - # available aircraft, and F-14s are not able to be SEAD despite having TALDs. - # https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/ - group.task = CAS.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - # ASM includes ARMs and TALDs (among other things, but those are the useful - # weapons for SEAD). - rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_strike( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = GroundAttack.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_anti_ship( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = AntishipStrike.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_runway_attack( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = RunwayAttack.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_oca_strike( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = CAS.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.OpenFire, - restrict_jettison=True, - ) - - def configure_awacs( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = AWACS.name - - if not isinstance(flight.flight_plan, AwacsFlightPlan): - logging.error( - f"Cannot configure AEW&C tasks for {flight} because it does not have an AEW&C flight plan." - ) - return - - self._setup_group(group, package, flight, dynamic_runways) - - # Awacs task action - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.WeaponHold, - restrict_jettison=True, - ) - - group.points[0].tasks.append(AWACSTaskAction()) - - def configure_refueling( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = Refueling.name - - if not isinstance(flight.flight_plan, RefuelingFlightPlan): - logging.error( - f"Cannot configure racetrack refueling tasks for {flight} because it " - "does not have an racetrack refueling flight plan." - ) - return - - self._setup_group(group, package, flight, dynamic_runways) - - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.WeaponHold, - restrict_jettison=True, - ) - - def configure_escort( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - # Escort groups are actually given the CAP task so they can perform the - # Search Then Engage task, which we have to use instead of the Escort - # task for the reasons explained in JoinPointBuilder. - group.task = CAP.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, group, roe=OptROE.Values.OpenFire, restrict_jettison=True - ) - - def configure_sead_escort( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - # CAS is able to perform all the same tasks as SEAD using a superset of the - # available aircraft, and F-14s are not able to be SEAD despite having TALDs. - # https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/ - group.task = CAS.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - roe=OptROE.Values.OpenFire, - # ASM includes ARMs and TALDs (among other things, but those are the useful - # weapons for SEAD). - rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM, - restrict_jettison=True, - mission_uses_gun=False, - ) - - def configure_transport( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = Transport.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.WeaponHold, - restrict_jettison=True, - ) - - def configure_ferry( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - group.task = Nothing.name - self._setup_group(group, package, flight, dynamic_runways) - self.configure_behavior( - flight, - group, - react_on_threat=OptReactOnThreat.Values.EvadeFire, - roe=OptROE.Values.WeaponHold, - restrict_jettison=True, - ) - - def configure_unknown_task(self, group: FlyingGroup[Any], flight: Flight) -> None: - logging.error(f"Unhandled flight type: {flight.flight_type}") - self.configure_behavior(flight, group) - - def setup_flight_group( - self, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - dynamic_runways: Dict[str, RunwayData], - ) -> None: - flight_type = flight.flight_type - if flight_type in [ - FlightType.BARCAP, - FlightType.TARCAP, - FlightType.INTERCEPTION, - ]: - self.configure_cap(group, package, flight, dynamic_runways) - elif flight_type == FlightType.SWEEP: - self.configure_sweep(group, package, flight, dynamic_runways) - elif flight_type == FlightType.AEWC: - self.configure_awacs(group, package, flight, dynamic_runways) - elif flight_type == FlightType.REFUELING: - self.configure_refueling(group, package, flight, dynamic_runways) - elif flight_type in [FlightType.CAS, FlightType.BAI]: - self.configure_cas(group, package, flight, dynamic_runways) - elif flight_type == FlightType.DEAD: - self.configure_dead(group, package, flight, dynamic_runways) - elif flight_type == FlightType.SEAD: - self.configure_sead(group, package, flight, dynamic_runways) - elif flight_type == FlightType.SEAD_ESCORT: - self.configure_sead_escort(group, package, flight, dynamic_runways) - elif flight_type == FlightType.STRIKE: - self.configure_strike(group, package, flight, dynamic_runways) - elif flight_type == FlightType.ANTISHIP: - self.configure_anti_ship(group, package, flight, dynamic_runways) - elif flight_type == FlightType.ESCORT: - self.configure_escort(group, package, flight, dynamic_runways) - elif flight_type == FlightType.OCA_RUNWAY: - self.configure_runway_attack(group, package, flight, dynamic_runways) - elif flight_type == FlightType.OCA_AIRCRAFT: - self.configure_oca_strike(group, package, flight, dynamic_runways) - elif flight_type == FlightType.TRANSPORT: - self.configure_transport(group, package, flight, dynamic_runways) - elif flight_type == FlightType.FERRY: - self.configure_ferry(group, package, flight, dynamic_runways) - else: - self.configure_unknown_task(group, flight) - - self.configure_eplrs(group, flight) - - def create_waypoints( - self, group: FlyingGroup[Any], package: Package, flight: Flight - ) -> None: - - for waypoint in flight.points: - waypoint.tot = None - - takeoff_point = FlightWaypoint.from_pydcs(group.points[0], flight.from_cp) - self.set_takeoff_time(takeoff_point, package, flight, group) - - filtered_points = [] # type: List[FlightWaypoint] - - for point in flight.points: - if point.only_for_player and not flight.client_count: - continue - filtered_points.append(point) - # Only add 1 target waypoint for Viggens. This only affects player flights, - # the Viggen can't have more than 9 waypoints which leaves us with two target point - # under the current flight plans. - # TODO: Make this smarter, it currently selects a random unit in the group for target, - # this could be updated to make it pick the "best" two targets in the group. - if flight.unit_type.dcs_unit_type is AJS37 and flight.client_count: - viggen_target_points = [ - (idx, point) - for idx, point in enumerate(filtered_points) - if point.waypoint_type in TARGET_WAYPOINTS - ] - if viggen_target_points: - keep_target = viggen_target_points[ - random.randint(0, len(viggen_target_points) - 1) - ] - filtered_points = [ - point - for idx, point in enumerate(filtered_points) - if ( - point.waypoint_type not in TARGET_WAYPOINTS - or idx == keep_target[0] - ) - ] - - for idx, point in enumerate(filtered_points): - PydcsWaypointBuilder.for_waypoint( - point, group, package, flight, self.m, self.air_support - ).build() - - # Set here rather than when the FlightData is created so they waypoints - # have their TOTs and fuel minimums set. Once we're more confident in our fuel - # estimation ability the minimum fuel amounts will be calculated during flight - # plan construction, but for now it's only used by the kneeboard so is generated - # late. - waypoints = [takeoff_point] + flight.points - self._estimate_min_fuel_for(flight, waypoints) - self.flights[-1].waypoints = waypoints - - @staticmethod - def _estimate_min_fuel_for(flight: Flight, waypoints: list[FlightWaypoint]) -> None: - if flight.unit_type.fuel_consumption is None: - return - - combat_speed_types = { - FlightWaypointType.INGRESS_BAI, - FlightWaypointType.INGRESS_CAS, - FlightWaypointType.INGRESS_DEAD, - FlightWaypointType.INGRESS_ESCORT, - FlightWaypointType.INGRESS_OCA_AIRCRAFT, - FlightWaypointType.INGRESS_OCA_RUNWAY, - FlightWaypointType.INGRESS_SEAD, - FlightWaypointType.INGRESS_STRIKE, - FlightWaypointType.INGRESS_SWEEP, - FlightWaypointType.SPLIT, - } | set(TARGET_WAYPOINTS) - - consumption = flight.unit_type.fuel_consumption - min_fuel: float = consumption.min_safe - - # The flight plan (in reverse) up to and including the arrival point. - main_flight_plan = reversed(waypoints) - try: - while waypoint := next(main_flight_plan): - if waypoint.waypoint_type is FlightWaypointType.LANDING_POINT: - waypoint.min_fuel = min_fuel - main_flight_plan = itertools.chain([waypoint], main_flight_plan) - break - except StopIteration: - # Some custom flight plan without a landing point. Skip it. - return - - for b, a in pairwise(main_flight_plan): - distance = meters(a.position.distance_to_point(b.position)) - if a.waypoint_type is FlightWaypointType.TAKEOFF: - ppm = consumption.climb - elif b.waypoint_type in combat_speed_types: - ppm = consumption.combat - else: - ppm = consumption.cruise - min_fuel += distance.nautical_miles * ppm - a.min_fuel = min_fuel - - def should_delay_flight(self, flight: Flight, start_time: timedelta) -> bool: - if start_time.total_seconds() <= 0: - return False - - if not flight.client_count: - return True - - if start_time < timedelta(minutes=10): - # Don't bother delaying client flights with short start delays. Much - # more than ten minutes starts to eat into fuel a bit more - # (espeicially for something fuel limited like a Harrier). - return False - - return not self.settings.never_delay_player_flights - - def set_takeoff_time( - self, - waypoint: FlightWaypoint, - package: Package, - flight: Flight, - group: FlyingGroup[Any], - ) -> None: - estimator = TotEstimator(package) - start_time = estimator.mission_start_time(flight) - - if self.should_delay_flight(flight, start_time): - if self.should_activate_late(flight): - # Late activation causes the aircraft to not be spawned - # until triggered. - self.set_activation_time(flight, group, start_time) - elif flight.start_type is StartType.COLD: - # Setting the start time causes the AI to wait until the - # specified time to begin their startup sequence. - self.set_startup_time(flight, group, start_time) - - # And setting *our* waypoint TOT causes the takeoff time to show up in - # the player's kneeboard. - waypoint.tot = flight.flight_plan.takeoff_time() - # And finally assign it to the FlightData info so it shows correctly in - # the briefing. - self.flights[-1].departure_delay = start_time - - @staticmethod - def should_activate_late(flight: Flight) -> bool: - if flight.start_type is StartType.COLD: - # Avoid spawning aircraft in the air or on the runway until it's - # time for their mission. Also avoid burning through gas spawning - # hot aircraft hours before their takeoff time. - return True - - if flight.from_cp.is_fleet: - # Carrier spawns will crowd the carrier deck, especially without - # super carrier. - # TODO: Is there enough parking on the supercarrier? - return True - - return False - - -class PydcsWaypointBuilder: - def __init__( - self, - waypoint: FlightWaypoint, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - mission: Mission, - air_support: AirSupport, - ) -> None: - self.waypoint = waypoint - self.group = group - self.package = package - self.flight = flight - self.mission = mission - self.air_support = air_support - - def build(self) -> MovingPoint: - waypoint = self.group.add_waypoint( - Point(self.waypoint.x, self.waypoint.y), - self.waypoint.alt.meters, - name=self.waypoint.name, - ) - - if self.waypoint.flyover: - waypoint.action = PointAction.FlyOverPoint - # It seems we need to leave waypoint.type exactly as it is even - # though it's set to "Turning Point". If I set this to "Fly Over - # Point" and then save the mission in the ME DCS resets it. - - waypoint.alt_type = self.waypoint.alt_type - tot = self.flight.flight_plan.tot_for_waypoint(self.waypoint) - if tot is not None: - self.set_waypoint_tot(waypoint, tot) - return waypoint - - def set_waypoint_tot(self, waypoint: MovingPoint, tot: timedelta) -> None: - self.waypoint.tot = tot - if not self._viggen_client_tot(): - waypoint.ETA = int(tot.total_seconds()) - waypoint.ETA_locked = True - waypoint.speed_locked = False - - @classmethod - def for_waypoint( - cls, - waypoint: FlightWaypoint, - group: FlyingGroup[Any], - package: Package, - flight: Flight, - mission: Mission, - air_support: AirSupport, - ) -> PydcsWaypointBuilder: - builders = { - FlightWaypointType.DROP_OFF: CargoStopBuilder, - FlightWaypointType.INGRESS_BAI: BaiIngressBuilder, - FlightWaypointType.INGRESS_CAS: CasIngressBuilder, - FlightWaypointType.INGRESS_DEAD: DeadIngressBuilder, - FlightWaypointType.INGRESS_OCA_AIRCRAFT: OcaAircraftIngressBuilder, - FlightWaypointType.INGRESS_OCA_RUNWAY: OcaRunwayIngressBuilder, - FlightWaypointType.INGRESS_SEAD: SeadIngressBuilder, - FlightWaypointType.INGRESS_STRIKE: StrikeIngressBuilder, - FlightWaypointType.INGRESS_SWEEP: SweepIngressBuilder, - FlightWaypointType.JOIN: JoinPointBuilder, - FlightWaypointType.LANDING_POINT: LandingPointBuilder, - FlightWaypointType.LOITER: HoldPointBuilder, - FlightWaypointType.PATROL: RaceTrackEndBuilder, - FlightWaypointType.PATROL_TRACK: RaceTrackBuilder, - FlightWaypointType.PICKUP: CargoStopBuilder, - } - builder = builders.get(waypoint.waypoint_type, DefaultWaypointBuilder) - return builder(waypoint, group, package, flight, mission, air_support) - - def _viggen_client_tot(self) -> bool: - """Viggen player aircraft consider any waypoint with a TOT set to be a target ("M") waypoint. - If the flight is a player controlled Viggen flight, no TOT should be set on any waypoint except actual target waypoints. - """ - if ( - self.flight.client_count > 0 - and self.flight.unit_type.dcs_unit_type == AJS37 - ) and (self.waypoint.waypoint_type not in TARGET_WAYPOINTS): - return True - else: - return False - - def register_special_waypoints( - self, targets: Iterable[Union[MissionTarget, Unit]] - ) -> None: - """Create special target waypoints for various aircraft""" - for i, t in enumerate(targets): - if self.group.units[0].unit_type == JF_17 and i < 4: - self.group.add_nav_target_point(t.position, "PP" + str(i + 1)) - if self.group.units[0].unit_type == F_14B and i == 0: - self.group.add_nav_target_point(t.position, "ST") - - -class DefaultWaypointBuilder(PydcsWaypointBuilder): - pass - - -class HoldPointBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - loiter = ControlledTask( - OrbitAction(altitude=waypoint.alt, pattern=OrbitAction.OrbitPattern.Circle) - ) - if not isinstance(self.flight.flight_plan, LoiterFlightPlan): - flight_plan_type = self.flight.flight_plan.__class__.__name__ - logging.error( - f"Cannot configure hold for for {self.flight} because " - f"{flight_plan_type} does not define a push time. AI will push " - "immediately and may flight unsuitable speeds." - ) - return waypoint - push_time = self.flight.flight_plan.push_time - self.waypoint.departure_time = push_time - loiter.stop_after_time(int(push_time.total_seconds())) - waypoint.add_task(loiter) - return waypoint - - -class BaiIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - # TODO: Add common "UnitGroupTarget" base type. - group_names = [] - target = self.package.target - if isinstance(target, TheaterGroundObject): - for group in target.groups: - group_names.append(group.name) - elif isinstance(target, MultiGroupTransport): - group_names.append(target.name) - else: - logging.error( - "Unexpected target type for BAI mission: %s", - target.__class__.__name__, - ) - return waypoint - - for group_name in group_names: - group = self.mission.find_group(group_name) - if group is None: - logging.error("Could not find group for BAI mission %s", group_name) - continue - - task = AttackGroup(group.id, weapon_type=WeaponType.Auto) - task.params["attackQtyLimit"] = False - task.params["directionEnabled"] = False - task.params["altitudeEnabled"] = False - task.params["groupAttack"] = True - waypoint.tasks.append(task) - return waypoint - - -class CasIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - if isinstance(self.flight.flight_plan, CasFlightPlan): - waypoint.add_task( - EngageTargetsInZone( - position=self.flight.flight_plan.target.position, - radius=int(self.flight.flight_plan.engagement_distance.meters), - targets=[ - Targets.All.GroundUnits.GroundVehicles, - Targets.All.GroundUnits.AirDefence.AAA, - Targets.All.GroundUnits.Infantry, - ], - ) - ) - else: - logging.error("No CAS waypoint found. Falling back to search and engage") - waypoint.add_task( - EngageTargets( - max_distance=int(nautical_miles(10).meters), - targets=[ - Targets.All.GroundUnits.GroundVehicles, - Targets.All.GroundUnits.AirDefence.AAA, - Targets.All.GroundUnits.Infantry, - ], - ) - ) - return waypoint - - -class DeadIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - self.register_special_waypoints(self.waypoint.targets) - - target = self.package.target - if not isinstance(target, TheaterGroundObject): - logging.error( - "Unexpected target type for DEAD mission: %s", - target.__class__.__name__, - ) - return waypoint - - for group in target.groups: - miz_group = self.mission.find_group(group.name) - if miz_group is None: - logging.error(f"Could not find group for DEAD mission {group.name}") - continue - - task = AttackGroup(miz_group.id, weapon_type=WeaponType.Auto) - task.params["expend"] = "All" - task.params["attackQtyLimit"] = False - task.params["directionEnabled"] = False - task.params["altitudeEnabled"] = False - task.params["groupAttack"] = True - waypoint.tasks.append(task) - return waypoint - - -class OcaAircraftIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - target = self.package.target - if not isinstance(target, Airfield): - logging.error( - "Unexpected target type for OCA Strike mission: %s", - target.__class__.__name__, - ) - return waypoint - - task = EngageTargetsInZone( - position=target.position, - # Al Dhafra is 4 nm across at most. Add a little wiggle room in case - # the airport position from DCS is not centered. - radius=int(nautical_miles(3).meters), - targets=[Targets.All.Air], - ) - task.params["attackQtyLimit"] = False - task.params["directionEnabled"] = False - task.params["altitudeEnabled"] = False - task.params["groupAttack"] = True - waypoint.tasks.append(task) - return waypoint - - -class OcaRunwayIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - target = self.package.target - if not isinstance(target, Airfield): - logging.error( - "Unexpected target type for runway bombing mission: %s", - target.__class__.__name__, - ) - return waypoint - - waypoint.tasks.append( - BombingRunway(airport_id=target.airport.id, group_attack=True) - ) - return waypoint - - -class SeadIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - self.register_special_waypoints(self.waypoint.targets) - - target = self.package.target - if not isinstance(target, TheaterGroundObject): - logging.error( - "Unexpected target type for SEAD mission: %s", - target.__class__.__name__, - ) - return waypoint - - for group in target.groups: - miz_group = self.mission.find_group(group.name) - if miz_group is None: - logging.error(f"Could not find group for SEAD mission {group.name}") - continue - - task = AttackGroup(miz_group.id, weapon_type=WeaponType.Guided) - task.params["expend"] = "All" - task.params["attackQtyLimit"] = False - task.params["directionEnabled"] = False - task.params["altitudeEnabled"] = False - task.params["groupAttack"] = True - waypoint.tasks.append(task) - return waypoint - - -class StrikeIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - if self.group.units[0].unit_type in [B_17G, B_52H, Tu_22M3]: - return self.build_bombing() - else: - return self.build_strike() - - def build_bombing(self) -> MovingPoint: - waypoint = super().build() - - targets = self.waypoint.targets - if not targets: - return waypoint - - center = Point(0, 0) - for target in targets: - center.x += target.position.x - center.y += target.position.y - center.x /= len(targets) - center.y /= len(targets) - bombing = Bombing(center, weapon_type=WeaponType.Bombs) - bombing.params["expend"] = "All" - bombing.params["attackQtyLimit"] = False - bombing.params["directionEnabled"] = False - bombing.params["altitudeEnabled"] = False - bombing.params["groupAttack"] = True - waypoint.tasks.append(bombing) - return waypoint - - def build_strike(self) -> MovingPoint: - waypoint = super().build() - for target in self.waypoint.targets: - bombing = Bombing(target.position, weapon_type=WeaponType.Auto) - # If there is only one target, drop all ordnance in one pass. - if len(self.waypoint.targets) == 1: - bombing.params["expend"] = "All" - bombing.params["groupAttack"] = True - waypoint.tasks.append(bombing) - - # Register special waypoints - self.register_special_waypoints(self.waypoint.targets) - return waypoint - - -class SweepIngressBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - if not isinstance(self.flight.flight_plan, SweepFlightPlan): - flight_plan_type = self.flight.flight_plan.__class__.__name__ - logging.error( - f"Cannot create sweep for {self.flight} because " - f"{flight_plan_type} is not a sweep flight plan." - ) - return waypoint - - waypoint.tasks.append( - EngageTargets( - max_distance=int(nautical_miles(50).meters), - targets=[ - Targets.All.Air.Planes.Fighters, - Targets.All.Air.Planes.MultiroleFighters, - ], - ) - ) - - return waypoint - - -class JoinPointBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - if self.flight.flight_type == FlightType.ESCORT: - self.configure_escort_tasks( - waypoint, - [ - Targets.All.Air.Planes.Fighters, - Targets.All.Air.Planes.MultiroleFighters, - ], - ) - elif self.flight.flight_type == FlightType.SEAD_ESCORT: - self.configure_escort_tasks( - waypoint, [Targets.All.GroundUnits.AirDefence.AAA.SAMRelated] - ) - return waypoint - - @staticmethod - def configure_escort_tasks( - waypoint: MovingPoint, target_types: List[Type[TargetType]] - ) -> None: - # Ideally we would use the escort mission type and escort task to have - # the AI automatically but the AI only escorts AI flights while they are - # traveling between waypoints. When an AI flight performs an attack - # (such as attacking the mission target), AI escorts wander aimlessly - # until the escorted group resumes its flight plan. - # - # As such, we instead use the Search Then Engage task, which is an - # enroute task that causes the AI to follow their flight plan and engage - # enemies of the set type within a certain distance. The downside to - # this approach is that AI escorts are no longer related to the group - # they are escorting, aside from the fact that they fly a similar flight - # plan at the same time. With Escort, the escorts will follow the - # escorted group out of the area. The strike element may or may not fly - # directly over the target, and they may or may not require multiple - # attack runs. For the escort flight we must just assume a flight plan - # for the escort to fly. If the strike flight doesn't need to overfly - # the target, the escorts are needlessly going in harms way. If the - # strike flight needs multiple passes, the escorts may leave before the - # escorted aircraft do. - # - # Another possible option would be to use Search Then Engage for join -> - # ingress and egress -> split, but use a Search Then Engage in Zone task - # for the target area that is set to end on a flag flip that occurs when - # the strike aircraft finish their attack task. - # - # https://forums.eagle.ru/topic/251798-options-for-alternate-ai-escort-behavior - waypoint.add_task( - ControlledTask( - EngageTargets( - # TODO: From doctrine. - max_distance=int(nautical_miles(30).meters), - targets=target_types, - ) - ) - ) - - # We could set this task to end at the split point. pydcs doesn't - # currently support that task end condition though, and we don't really - # need it. - - -class LandingPointBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - waypoint.type = "Land" - waypoint.action = PointAction.Landing - if (control_point := self.waypoint.control_point) is not None: - waypoint.airdrome_id = control_point.airdrome_id_for_landing - return waypoint - - -class CargoStopBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - waypoint.type = "LandingReFuAr" - waypoint.action = PointAction.LandingReFuAr - waypoint.landing_refuel_rearm_time = 2 # Minutes. - if (control_point := self.waypoint.control_point) is not None: - waypoint.airdrome_id = control_point.airdrome_id_for_landing - return waypoint - - -class RaceTrackBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - flight_plan = self.flight.flight_plan - - if not isinstance(flight_plan, PatrollingFlightPlan): - flight_plan_type = flight_plan.__class__.__name__ - logging.error( - f"Cannot create race track for {self.flight} because " - f"{flight_plan_type} does not define a patrol." - ) - return waypoint - - # NB: It's important that the engage task comes before the orbit task. - # Though they're on the same waypoint, if the orbit task comes first it - # is their first priority and they will not engage any targets because - # they're fully focused on orbiting. If the STE task is first, they will - # engage targets if available and orbit if they find nothing to shoot. - if self.flight.flight_type is FlightType.REFUELING: - self.configure_refueling_actions(waypoint) - - # TODO: Move the properties of this task into the flight plan? - # CAP is the only current user of this so it's not a big deal, but might - # be good to make this usable for things like BAI when we add that - # later. - cap_types = {FlightType.BARCAP, FlightType.TARCAP} - if self.flight.flight_type in cap_types: - engagement_distance = int(flight_plan.engagement_distance.meters) - waypoint.tasks.append( - EngageTargets( - max_distance=engagement_distance, targets=[Targets.All.Air] - ) - ) - - orbit = OrbitAction( - altitude=waypoint.alt, - pattern=OrbitAction.OrbitPattern.RaceTrack, - speed=int(flight_plan.patrol_speed.kph), - ) - - racetrack = ControlledTask(orbit) - self.set_waypoint_tot(waypoint, flight_plan.patrol_start_time) - racetrack.stop_after_time(int(flight_plan.patrol_end_time.total_seconds())) - waypoint.add_task(racetrack) - - return waypoint - - def configure_refueling_actions(self, waypoint: MovingPoint) -> None: - waypoint.add_task(Tanker()) - - if self.flight.unit_type.dcs_unit_type.tacan: - tanker_info = self.air_support.tankers[-1] - tacan = tanker_info.tacan - tacan_callsign = { - "Texaco": "TEX", - "Arco": "ARC", - "Shell": "SHL", - }.get(tanker_info.callsign) - - waypoint.add_task( - ActivateBeaconCommand( - tacan.number, - tacan.band.value, - tacan_callsign, - bearing=True, - unit_id=self.group.units[0].id, - aa=True, - ) - ) - - -class RaceTrackEndBuilder(PydcsWaypointBuilder): - def build(self) -> MovingPoint: - waypoint = super().build() - - if not isinstance(self.flight.flight_plan, PatrollingFlightPlan): - flight_plan_type = self.flight.flight_plan.__class__.__name__ - logging.error( - f"Cannot create race track for {self.flight} because " - f"{flight_plan_type} does not define a patrol." - ) - return waypoint - - self.waypoint.departure_time = self.flight.flight_plan.patrol_end_time - return waypoint diff --git a/game/missiongenerator/briefinggenerator.py b/game/missiongenerator/briefinggenerator.py index 7f6c2962..4759b369 100644 --- a/game/missiongenerator/briefinggenerator.py +++ b/game/missiongenerator/briefinggenerator.py @@ -17,7 +17,7 @@ from gen.ground_forces.combat_stance import CombatStance from game.radio.radios import RadioFrequency from gen.runways import RunwayData -from .aircraftgenerator import FlightData +from .aircraft.flightdata import FlightData from .airsupportgenerator import AwacsInfo, TankerInfo from .flotgenerator import JtacInfo diff --git a/game/missiongenerator/kneeboard.py b/game/missiongenerator/kneeboard.py index f7a8f227..383ede61 100644 --- a/game/missiongenerator/kneeboard.py +++ b/game/missiongenerator/kneeboard.py @@ -48,7 +48,7 @@ from game.ato.flightwaypoint import FlightWaypoint from game.radio.radios import RadioFrequency from gen.runways import RunwayData -from .aircraftgenerator import FlightData +from .aircraft.flightdata import FlightData from .airsupportgenerator import AwacsInfo, TankerInfo from .briefinggenerator import CommInfo, JtacInfo, MissionInfoGenerator diff --git a/game/missiongenerator/luagenerator.py b/game/missiongenerator/luagenerator.py index af0045a4..60bc33df 100644 --- a/game/missiongenerator/luagenerator.py +++ b/game/missiongenerator/luagenerator.py @@ -14,7 +14,7 @@ from game.ato import FlightType from game.plugins import LuaPluginManager from game.theater import TheaterGroundObject -from .aircraftgenerator import FlightData +from .aircraft.flightdata import FlightData from .airsupport import AirSupport if TYPE_CHECKING: diff --git a/game/missiongenerator/missiongenerator.py b/game/missiongenerator/missiongenerator.py index 63720422..1ad58df0 100644 --- a/game/missiongenerator/missiongenerator.py +++ b/game/missiongenerator/missiongenerator.py @@ -17,7 +17,10 @@ from game.theater import Airfield, FrontLine from game.unitmap import UnitMap from gen.airfields import AIRFIELD_DATA from gen.naming import namegen -from .aircraftgenerator import AircraftGenerator, FlightData +from game.missiongenerator.aircraft.aircraftgenerator import ( + AircraftGenerator, +) +from .aircraft.flightdata import FlightData from .airsupport import AirSupport from .airsupportgenerator import AirSupportGenerator from .beacons import load_beacons_for_terrain diff --git a/game/radio/channels.py b/game/radio/channels.py index 04e2d8f5..176642c3 100644 --- a/game/radio/channels.py +++ b/game/radio/channels.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from typing import Optional, Any, TYPE_CHECKING if TYPE_CHECKING: - from game.missiongenerator.aircraftgenerator import FlightData + from game.missiongenerator.aircraft.flightdata import FlightData from game.missiongenerator.airsupport import AirSupport