Clean up AircraftGenerator.

This class does far too many things and the file is huge. Split it up
into a few more classes.
This commit is contained in:
Dan Albert 2021-10-23 20:06:39 -07:00
parent 49033f67f3
commit 88b4039e47
32 changed files with 2014 additions and 1931 deletions

View File

@ -40,8 +40,6 @@ class Flight:
self.roster = FlightRoster(self.squadron, initial_size=count)
else:
self.roster = roster
self.departure = self.squadron.location
self.arrival = self.squadron.arrival
self.divert = divert
self.flight_type = flight_type
# TODO: Replace with FlightPlan.
@ -63,6 +61,14 @@ class Flight:
package=package, flight=self, custom_waypoints=[]
)
@property
def departure(self) -> ControlPoint:
return self.squadron.location
@property
def arrival(self) -> ControlPoint:
return self.squadron.arrival
@property
def count(self) -> int:
return self.roster.max_size

View File

@ -40,7 +40,7 @@ from game.utils import (
)
if TYPE_CHECKING:
from game.missiongenerator.aircraftgenerator import FlightData
from game.missiongenerator.aircraft.flightdata import FlightData
from game.missiongenerator.airsupport import AirSupport
from game.radio.radios import Radio, RadioFrequency, RadioRegistry

View File

@ -0,0 +1,316 @@
import logging
from typing import Any, Optional
from dcs.task import (
AWACS,
AWACSTaskAction,
AntishipStrike,
CAP,
CAS,
EPLRS,
FighterSweep,
GroundAttack,
Nothing,
OptROE,
OptRTBOnBingoFuel,
OptRTBOnOutOfAmmo,
OptReactOnThreat,
OptRestrictJettison,
Refueling,
RunwayAttack,
Transport,
)
from dcs.unitgroup import FlyingGroup
from game.ato import Flight, FlightType
from gen.flights.flightplan import AwacsFlightPlan, RefuelingFlightPlan
class AircraftBehavior:
def __init__(self, task: FlightType) -> None:
self.task = task
def apply_to(self, flight: Flight, group: FlyingGroup[Any]) -> None:
if self.task in [
FlightType.BARCAP,
FlightType.TARCAP,
FlightType.INTERCEPTION,
]:
self.configure_cap(group, flight)
elif self.task == FlightType.SWEEP:
self.configure_sweep(group, flight)
elif self.task == FlightType.AEWC:
self.configure_awacs(group, flight)
elif self.task == FlightType.REFUELING:
self.configure_refueling(group, flight)
elif self.task in [FlightType.CAS, FlightType.BAI]:
self.configure_cas(group, flight)
elif self.task == FlightType.DEAD:
self.configure_dead(group, flight)
elif self.task == FlightType.SEAD:
self.configure_sead(group, flight)
elif self.task == FlightType.SEAD_ESCORT:
self.configure_sead_escort(group, flight)
elif self.task == FlightType.STRIKE:
self.configure_strike(group, flight)
elif self.task == FlightType.ANTISHIP:
self.configure_anti_ship(group, flight)
elif self.task == FlightType.ESCORT:
self.configure_escort(group, flight)
elif self.task == FlightType.OCA_RUNWAY:
self.configure_runway_attack(group, flight)
elif self.task == FlightType.OCA_AIRCRAFT:
self.configure_oca_strike(group, flight)
elif self.task == FlightType.TRANSPORT:
self.configure_transport(group, flight)
elif self.task == FlightType.FERRY:
self.configure_ferry(group, flight)
else:
self.configure_unknown_task(group, flight)
self.configure_eplrs(group, flight)
def configure_behavior(
self,
flight: Flight,
group: FlyingGroup[Any],
react_on_threat: OptReactOnThreat.Values = OptReactOnThreat.Values.EvadeFire,
roe: Optional[int] = None,
rtb_winchester: Optional[OptRTBOnOutOfAmmo.Values] = None,
restrict_jettison: Optional[bool] = None,
mission_uses_gun: bool = True,
) -> None:
group.points[0].tasks.clear()
group.points[0].tasks.append(OptReactOnThreat(react_on_threat))
if roe is not None:
group.points[0].tasks.append(OptROE(roe))
if restrict_jettison is not None:
group.points[0].tasks.append(OptRestrictJettison(restrict_jettison))
if rtb_winchester is not None:
group.points[0].tasks.append(OptRTBOnOutOfAmmo(rtb_winchester))
# Confiscate the bullets of AI missions that do not rely on the gun. There is no
# "all but gun" RTB winchester option, so air to ground missions with mixed
# weapon types will insist on using all of their bullets after running out of
# missiles and bombs. Take away their bullets so they don't strafe a Tor.
#
# Exceptions are made for player flights and for airframes where the gun is
# essential like the A-10 or warbirds.
if not mission_uses_gun and not self.flight_always_keeps_gun(flight):
for unit in group.units:
unit.gun = 0
group.points[0].tasks.append(OptRTBOnBingoFuel(True))
# Do not restrict afterburner.
# https://forums.eagle.ru/forum/english/digital-combat-simulator/dcs-world-2-5/bugs-and-problems-ai/ai-ad/7121294-ai-stuck-at-high-aoa-after-making-sharp-turn-if-afterburner-is-restricted
@staticmethod
def configure_eplrs(group: FlyingGroup[Any], flight: Flight) -> None:
if flight.unit_type.eplrs_capable:
group.points[0].tasks.append(EPLRS(group.id))
def configure_cap(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = CAP.name
if not flight.unit_type.gunfighter:
ammo_type = OptRTBOnOutOfAmmo.Values.AAM
else:
ammo_type = OptRTBOnOutOfAmmo.Values.Cannon
self.configure_behavior(flight, group, rtb_winchester=ammo_type)
def configure_sweep(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = FighterSweep.name
if not flight.unit_type.gunfighter:
ammo_type = OptRTBOnOutOfAmmo.Values.AAM
else:
ammo_type = OptRTBOnOutOfAmmo.Values.Cannon
self.configure_behavior(flight, group, rtb_winchester=ammo_type)
def configure_cas(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = CAS.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
rtb_winchester=OptRTBOnOutOfAmmo.Values.Unguided,
restrict_jettison=True,
)
def configure_dead(self, group: FlyingGroup[Any], flight: Flight) -> None:
# Only CAS and SEAD are capable of the Attack Group task. SEAD is arguably more
# appropriate but it has an extremely limited list of capable aircraft, whereas
# CAS has a much wider selection of units.
#
# Note that the only effect that the DCS task type has is in determining which
# waypoint actions the group may perform.
group.task = CAS.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
rtb_winchester=OptRTBOnOutOfAmmo.Values.All,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_sead(self, group: FlyingGroup[Any], flight: Flight) -> None:
# CAS is able to perform all the same tasks as SEAD using a superset of the
# available aircraft, and F-14s are not able to be SEAD despite having TALDs.
# https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/
group.task = CAS.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
# ASM includes ARMs and TALDs (among other things, but those are the useful
# weapons for SEAD).
rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_strike(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = GroundAttack.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_anti_ship(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = AntishipStrike.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_runway_attack(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = RunwayAttack.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_oca_strike(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = CAS.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.OpenFire,
restrict_jettison=True,
)
def configure_awacs(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = AWACS.name
if not isinstance(flight.flight_plan, AwacsFlightPlan):
logging.error(
f"Cannot configure AEW&C tasks for {flight} because it does not have "
"an AEW&C flight plan."
)
return
# Awacs task action
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.WeaponHold,
restrict_jettison=True,
)
group.points[0].tasks.append(AWACSTaskAction())
def configure_refueling(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = Refueling.name
if not isinstance(flight.flight_plan, RefuelingFlightPlan):
logging.error(
f"Cannot configure racetrack refueling tasks for {flight} because it "
"does not have an racetrack refueling flight plan."
)
return
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.WeaponHold,
restrict_jettison=True,
)
def configure_escort(self, group: FlyingGroup[Any], flight: Flight) -> None:
# Escort groups are actually given the CAP task so they can perform the
# Search Then Engage task, which we have to use instead of the Escort
# task for the reasons explained in JoinPointBuilder.
group.task = CAP.name
self.configure_behavior(
flight, group, roe=OptROE.Values.OpenFire, restrict_jettison=True
)
def configure_sead_escort(self, group: FlyingGroup[Any], flight: Flight) -> None:
# CAS is able to perform all the same tasks as SEAD using a superset of the
# available aircraft, and F-14s are not able to be SEAD despite having TALDs.
# https://forums.eagle.ru/topic/272112-cannot-assign-f-14-to-sead/
group.task = CAS.name
self.configure_behavior(
flight,
group,
roe=OptROE.Values.OpenFire,
# ASM includes ARMs and TALDs (among other things, but those are the useful
# weapons for SEAD).
rtb_winchester=OptRTBOnOutOfAmmo.Values.ASM,
restrict_jettison=True,
mission_uses_gun=False,
)
def configure_transport(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = Transport.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.WeaponHold,
restrict_jettison=True,
)
def configure_ferry(self, group: FlyingGroup[Any], flight: Flight) -> None:
group.task = Nothing.name
self.configure_behavior(
flight,
group,
react_on_threat=OptReactOnThreat.Values.EvadeFire,
roe=OptROE.Values.WeaponHold,
restrict_jettison=True,
)
def configure_unknown_task(self, group: FlyingGroup[Any], flight: Flight) -> None:
logging.error(f"Unhandled flight type: {flight.flight_type}")
self.configure_behavior(flight, group)
@staticmethod
def flight_always_keeps_gun(flight: Flight) -> bool:
# Never take bullets from players. They're smart enough to know when to use it
# and when to RTB.
if flight.client_count > 0:
return True
return flight.unit_type.always_keeps_gun

View File

@ -0,0 +1,432 @@
from __future__ import annotations
import logging
import random
from functools import cached_property
from typing import Any, Dict, List, TYPE_CHECKING, Type, Union
from dcs import helicopters
from dcs.country import Country
from dcs.mapping import Point
from dcs.mission import Mission, StartType as DcsStartType
from dcs.planes import (
Su_33,
)
from dcs.point import PointAction
from dcs.ships import KUZNECOW
from dcs.terrain.terrain import Airport, NoParkingSlotError
from dcs.unitgroup import FlyingGroup, ShipGroup, StaticGroup
from dcs.unittype import FlyingType
from game.ato.airtaaskingorder import AirTaskingOrder
from game.ato.flight import Flight
from game.ato.flighttype import FlightType
from game.ato.package import Package
from game.ato.starttype import StartType
from game.factions.faction import Faction
from game.missiongenerator.airsupport import AirSupport
from game.missiongenerator.lasercoderegistry import LaserCodeRegistry
from game.radio.radios import RadioRegistry
from game.radio.tacan import TacanRegistry
from game.settings import Settings
from game.theater.controlpoint import (
Airfield,
ControlPoint,
NavalControlPoint,
OffMapSpawn,
)
from game.unitmap import UnitMap
from game.utils import meters
from gen.flights.traveltime import GroundSpeed
from gen.naming import namegen
from gen.runways import RunwayData
from .aircraftpainter import AircraftPainter
from .flightdata import FlightData
from .flightgroupconfigurator import FlightGroupConfigurator
if TYPE_CHECKING:
from game import Game
from game.squadrons import Squadron
WARM_START_HELI_ALT = meters(500)
WARM_START_ALTITUDE = meters(3000)
RTB_ALTITUDE = meters(800)
RTB_DISTANCE = 5000
HELI_ALT = 500
class AircraftGenerator:
def __init__(
self,
mission: Mission,
settings: Settings,
game: Game,
radio_registry: RadioRegistry,
tacan_registry: TacanRegistry,
laser_code_registry: LaserCodeRegistry,
unit_map: UnitMap,
air_support: AirSupport,
helipads: dict[ControlPoint, list[StaticGroup]],
) -> None:
self.m = mission
self.game = game
self.settings = settings
self.radio_registry = radio_registry
self.tacan_registy = tacan_registry
self.laser_code_registry = laser_code_registry
self.unit_map = unit_map
self.flights: List[FlightData] = []
self.air_support = air_support
self.helipads = helipads
@cached_property
def use_client(self) -> bool:
"""True if Client should be used instead of Player."""
blue_clients = self.client_slots_in_ato(self.game.blue.ato)
red_clients = self.client_slots_in_ato(self.game.red.ato)
return blue_clients + red_clients > 1
@staticmethod
def client_slots_in_ato(ato: AirTaskingOrder) -> int:
total = 0
for package in ato.packages:
for flight in package.flights:
total += flight.client_count
return total
@staticmethod
def _start_type(start_type: str) -> DcsStartType:
if start_type == "Runway":
return DcsStartType.Runway
elif start_type == "Cold":
return DcsStartType.Cold
return DcsStartType.Warm
@staticmethod
def _start_type_at_group(
start_type: str,
unit_type: Type[FlyingType],
at: Union[ShipGroup, StaticGroup],
) -> DcsStartType:
group_units = at.units
# Setting Su-33s starting from the non-supercarrier Kuznetsov to take off from runway
# to work around a DCS AI issue preventing Su-33s from taking off when set to "Takeoff from ramp" (#1352)
if (
unit_type.id == Su_33.id
and group_units[0] is not None
and group_units[0].type == KUZNECOW.id
):
return DcsStartType.Runway
else:
return AircraftGenerator._start_type(start_type)
def _generate_at_airport(
self,
name: str,
side: Country,
unit_type: Type[FlyingType],
count: int,
start_type: str,
airport: Airport,
) -> FlyingGroup[Any]:
assert count > 0
# TODO: Delayed runway starts should be converted to air starts for multiplayer.
# Runway starts do not work with late activated aircraft in multiplayer. Instead
# of spawning on the runway the aircraft will spawn on the taxiway, potentially
# somewhere that they don't fit anyway. We should either upgrade these to air
# starts or (less likely) downgrade to warm starts to avoid the issue when the
# player is generating the mission for multiplayer (which would need a new
# option).
logging.info("airgen: {} for {} at {}".format(unit_type, side.id, airport))
return self.m.flight_group_from_airport(
country=side,
name=name,
aircraft_type=unit_type,
airport=airport,
maintask=None,
start_type=self._start_type(start_type),
group_size=count,
parking_slots=None,
)
def _generate_over_departure(
self, name: str, side: Country, flight: Flight, origin: ControlPoint
) -> FlyingGroup[Any]:
assert flight.count > 0
at = origin.position
alt_type = "RADIO"
if isinstance(origin, OffMapSpawn):
alt = flight.flight_plan.waypoints[0].alt
alt_type = flight.flight_plan.waypoints[0].alt_type
elif flight.unit_type in helicopters.helicopter_map.values():
alt = WARM_START_HELI_ALT
else:
alt = WARM_START_ALTITUDE
speed = GroundSpeed.for_flight(flight, alt)
pos = Point(at.x + random.randint(100, 1000), at.y + random.randint(100, 1000))
logging.info(
"airgen: {} for {} at {} at {}".format(
flight.unit_type, side.id, alt, int(speed.kph)
)
)
group = self.m.flight_group(
country=side,
name=name,
aircraft_type=flight.unit_type.dcs_unit_type,
airport=None,
position=pos,
altitude=alt.meters,
speed=speed.kph,
maintask=None,
group_size=flight.count,
)
group.points[0].alt_type = alt_type
return group
def _generate_at_group(
self,
name: str,
side: Country,
unit_type: Type[FlyingType],
count: int,
start_type: str,
at: Union[ShipGroup, StaticGroup],
) -> FlyingGroup[Any]:
assert count > 0
logging.info("airgen: {} for {} at unit {}".format(unit_type, side.id, at))
return self.m.flight_group_from_unit(
country=side,
name=name,
aircraft_type=unit_type,
pad_group=at,
maintask=None,
start_type=self._start_type_at_group(start_type, unit_type, at),
group_size=count,
)
def _generate_at_cp_helipad(
self,
name: str,
side: Country,
unit_type: Type[FlyingType],
count: int,
start_type: str,
cp: ControlPoint,
) -> FlyingGroup[Any]:
assert count > 0
logging.info(
"airgen at cp's helipads : {} for {} at {}".format(
unit_type, side.id, cp.name
)
)
try:
helipad = self.helipads[cp].pop()
except IndexError as ex:
raise RuntimeError(f"Not enough helipads available at {cp}") from ex
group = self._generate_at_group(
name=name,
side=side,
unit_type=unit_type,
count=count,
start_type=start_type,
at=helipad,
)
# Note : A bit dirty, need better support in pydcs
group.points[0].action = PointAction.FromGroundArea
group.points[0].type = "TakeOffGround"
group.units[0].heading = helipad.units[0].heading
if start_type != "Cold":
group.points[0].action = PointAction.FromGroundAreaHot
group.points[0].type = "TakeOffGroundHot"
for i in range(count - 1):
try:
helipad = self.helipads[cp].pop()
group.units[1 + i].position = Point(helipad.x, helipad.y)
group.units[1 + i].heading = helipad.units[0].heading
except IndexError as ex:
raise RuntimeError(f"Not enough helipads available at {cp}") from ex
return group
def clear_parking_slots(self) -> None:
for cp in self.game.theater.controlpoints:
for parking_slot in cp.parking_slots:
parking_slot.unit_id = None
def generate_flights(
self,
country: Country,
ato: AirTaskingOrder,
dynamic_runways: Dict[str, RunwayData],
) -> None:
"""Adds aircraft to the mission for every flight in the ATO.
Aircraft generation is done by walking the ATO and spawning each flight in turn.
After the flight is generated the group is added to the UnitMap so aircraft
deaths can be tracked.
Args:
country: The country from the mission to use for this ATO.
ato: The ATO to spawn aircraft for.
dynamic_runways: Runway data for carriers and FARPs.
"""
for package in ato.packages:
if not package.flights:
continue
for flight in package.flights:
logging.info(f"Generating flight: {flight.unit_type}")
group = self.create_and_configure_flight(
flight, country, dynamic_runways
)
self.unit_map.add_aircraft(group, flight)
def spawn_unused_aircraft(
self, player_country: Country, enemy_country: Country
) -> None:
for control_point in self.game.theater.controlpoints:
if not isinstance(control_point, Airfield):
continue
faction = self.game.coalition_for(control_point.captured).faction
if control_point.captured:
country = player_country
else:
country = enemy_country
for squadron in control_point.squadrons:
try:
self._spawn_unused_for(squadron, country, faction)
except NoParkingSlotError:
# If we run out of parking, stop spawning aircraft at this base.
break
def _spawn_unused_for(
self, squadron: Squadron, country: Country, faction: Faction
) -> None:
assert isinstance(squadron.location, Airfield)
for _ in range(squadron.untasked_aircraft):
# Creating a flight even those this isn't a fragged mission lets us
# reuse the existing debriefing code.
# TODO: Special flight type?
flight = Flight(
Package(squadron.location),
faction.country,
squadron,
1,
FlightType.BARCAP,
StartType.COLD,
divert=None,
)
group = self._generate_at_airport(
name=namegen.next_aircraft_name(country, flight.departure.id, flight),
side=country,
unit_type=squadron.aircraft.dcs_unit_type,
count=1,
start_type="Cold",
airport=squadron.location.airport,
)
group.uncontrolled = True
AircraftPainter(flight, group).apply_livery()
self.unit_map.add_aircraft(group, flight)
def create_and_configure_flight(
self, flight: Flight, country: Country, dynamic_runways: Dict[str, RunwayData]
) -> FlyingGroup[Any]:
group = self.generate_planned_flight(country, flight)
self.flights.append(
FlightGroupConfigurator(
flight,
group,
self.game,
self.m,
self.radio_registry,
self.tacan_registy,
self.laser_code_registry,
self.air_support,
dynamic_runways,
self.use_client,
).configure()
)
return group
def generate_flight_at_departure(
self, country: Country, flight: Flight, start_type: StartType
) -> FlyingGroup[Any]:
name = namegen.next_aircraft_name(country, flight.departure.id, flight)
cp = flight.departure
try:
if start_type is StartType.IN_FLIGHT:
group = self._generate_over_departure(
name=name, side=country, flight=flight, origin=cp
)
return group
elif isinstance(cp, NavalControlPoint):
group_name = cp.get_carrier_group_name()
carrier_group = self.m.find_group(group_name)
if not isinstance(carrier_group, ShipGroup):
raise RuntimeError(
f"Carrier group {carrier_group} is a "
"{carrier_group.__class__.__name__}, expected a ShipGroup"
)
return self._generate_at_group(
name=name,
side=country,
unit_type=flight.unit_type.dcs_unit_type,
count=flight.count,
start_type=start_type.value,
at=carrier_group,
)
else:
# If the flight is an helicopter flight, then prioritize dedicated helipads
if flight.unit_type.helicopter:
return self._generate_at_cp_helipad(
name=name,
side=country,
unit_type=flight.unit_type.dcs_unit_type,
count=flight.count,
start_type=start_type.value,
cp=cp,
)
if not isinstance(cp, Airfield):
raise RuntimeError(
f"Attempted to spawn at airfield for non-airfield {cp}"
)
return self._generate_at_airport(
name=name,
side=country,
unit_type=flight.unit_type.dcs_unit_type,
count=flight.count,
start_type=start_type.value,
airport=cp.airport,
)
except NoParkingSlotError:
# Generated when there is no place on Runway or on Parking Slots
logging.exception(
"No room on runway or parking slots. Starting from the air."
)
flight.start_type = StartType.IN_FLIGHT
group = self._generate_over_departure(
name=name, side=country, flight=flight, origin=cp
)
group.points[0].alt = 1500
return group
def generate_planned_flight(
self, country: Country, flight: Flight
) -> FlyingGroup[Any]:
return self.generate_flight_at_departure(country, flight, flight.start_type)

View File

@ -0,0 +1,45 @@
from __future__ import annotations
import random
from typing import Any, Optional
from dcs.unitgroup import FlyingGroup
from game import db
from game.ato import Flight
class AircraftPainter:
def __init__(self, flight: Flight, group: FlyingGroup[Any]) -> None:
self.flight = flight
self.group = group
def livery_from_db(self) -> Optional[str]:
return db.PLANE_LIVERY_OVERRIDES.get(self.flight.unit_type.dcs_unit_type)
def livery_from_faction(self) -> Optional[str]:
faction = self.flight.squadron.coalition.faction
if (
choices := faction.liveries_overrides.get(self.flight.unit_type)
) is not None:
return random.choice(choices)
return None
def livery_from_squadron(self) -> Optional[str]:
return self.flight.squadron.livery
def determine_livery(self) -> Optional[str]:
if (livery := self.livery_from_squadron()) is not None:
return livery
if (livery := self.livery_from_faction()) is not None:
return livery
if (livery := self.livery_from_db()) is not None:
return livery
return None
def apply_livery(self) -> None:
livery = self.determine_livery()
if livery is None:
return
for unit in self.group.units:
unit.livery_id = livery

View File

@ -0,0 +1,108 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional, TYPE_CHECKING
from dcs.flyingunit import FlyingUnit
from gen.callsigns import create_group_callsign_from_unit
if TYPE_CHECKING:
from game.ato import FlightType, FlightWaypoint, Package
from game.dcs.aircrafttype import AircraftType
from game.radio.radios import RadioFrequency
from gen.runways import RunwayData
@dataclass(frozen=True)
class ChannelAssignment:
radio_id: int
channel: int
@dataclass
class FlightData:
"""Details of a planned flight."""
#: The package that the flight belongs to.
package: Package
flight_type: FlightType
aircraft_type: AircraftType
#: All units in the flight.
units: list[FlyingUnit]
#: Total number of aircraft in the flight.
size: int
#: True if this flight belongs to the player's coalition.
friendly: bool
#: Number of seconds after mission start the flight is set to depart.
departure_delay: timedelta
#: Arrival airport.
arrival: RunwayData
#: Departure airport.
departure: RunwayData
#: Diver airport.
divert: Optional[RunwayData]
#: Waypoints of the flight plan.
waypoints: list[FlightWaypoint]
#: Radio frequency for intra-flight communications.
intra_flight_channel: RadioFrequency
#: Bingo fuel value in lbs.
bingo_fuel: Optional[int]
joker_fuel: Optional[int]
laser_codes: list[Optional[int]]
custom_name: Optional[str]
callsign: str = field(init=False)
#: Map of radio frequencies to their assigned radio and channel, if any.
frequency_to_channel_map: dict[RadioFrequency, ChannelAssignment] = field(
init=False, default_factory=dict
)
def __post_init__(self) -> None:
self.callsign = create_group_callsign_from_unit(self.units[0])
@property
def client_units(self) -> list[FlyingUnit]:
"""List of playable units in the flight."""
return [u for u in self.units if u.is_human()]
def num_radio_channels(self, radio_id: int) -> int:
"""Returns the number of preset channels for the given radio."""
# Note: pydcs only initializes the radio presets for client slots.
return self.client_units[0].num_radio_channels(radio_id)
def channel_for(self, frequency: RadioFrequency) -> Optional[ChannelAssignment]:
"""Returns the radio and channel number for the given frequency."""
return self.frequency_to_channel_map.get(frequency, None)
def assign_channel(
self, radio_id: int, channel_id: int, frequency: RadioFrequency
) -> None:
"""Assigns a preset radio channel to the given frequency."""
for unit in self.client_units:
unit.set_radio_channel_preset(radio_id, channel_id, frequency.mhz)
# One frequency could be bound to multiple channels. Prefer the first,
# since with the current implementation it will be the lowest numbered
# channel.
if frequency not in self.frequency_to_channel_map:
self.frequency_to_channel_map[frequency] = ChannelAssignment(
radio_id, channel_id
)

View File

@ -0,0 +1,212 @@
from __future__ import annotations
import logging
from typing import Any, Optional, TYPE_CHECKING
from dcs import Mission
from dcs.flyingunit import FlyingUnit
from dcs.planes import C_101CC, C_101EB, F_14B, Su_33
from dcs.task import CAP
from dcs.unit import Skill
from dcs.unitgroup import FlyingGroup
from game.ato import Flight, FlightType
from game.data.weapons import Pylon, WeaponType as WeaponTypeEnum
from game.missiongenerator.airsupport import AirSupport, AwacsInfo, TankerInfo
from game.missiongenerator.lasercoderegistry import LaserCodeRegistry
from game.radio.radios import RadioFrequency, RadioRegistry
from game.radio.tacan import TacanBand, TacanRegistry, TacanUsage
from game.squadrons import Pilot
from gen.callsigns import callsign_for_support_unit
from gen.flights.flightplan import AwacsFlightPlan, RefuelingFlightPlan
from gen.runways import RunwayData
from .aircraftbehavior import AircraftBehavior
from .aircraftpainter import AircraftPainter
from .flightdata import FlightData
from .waypoints import WaypointGenerator
if TYPE_CHECKING:
from game import Game
class FlightGroupConfigurator:
def __init__(
self,
flight: Flight,
group: FlyingGroup[Any],
game: Game,
mission: Mission,
radio_registry: RadioRegistry,
tacan_registry: TacanRegistry,
laser_code_registry: LaserCodeRegistry,
air_support: AirSupport,
dynamic_runways: dict[str, RunwayData],
use_client: bool,
) -> None:
self.flight = flight
self.group = group
self.game = game
self.mission = mission
self.radio_registry = radio_registry
self.tacan_registry = tacan_registry
self.laser_code_registry = laser_code_registry
self.air_support = air_support
self.dynamic_runways = dynamic_runways
self.use_client = use_client
def configure(self) -> FlightData:
AircraftBehavior(self.flight.flight_type).apply_to(self.flight, self.group)
AircraftPainter(self.flight, self.group).apply_livery()
self.setup_payload()
self.setup_fuel()
flight_channel = self.setup_radios()
laser_codes: list[Optional[int]] = []
for unit, pilot in zip(self.group.units, self.flight.roster.pilots):
self.configure_flight_member(unit, pilot, laser_codes)
divert = None
if self.flight.divert is not None:
divert = self.flight.divert.active_runway(
self.game.conditions, self.dynamic_runways
)
mission_start_time, waypoints = WaypointGenerator(
self.flight, self.group, self.mission, self.game.settings, self.air_support
).create_waypoints()
return FlightData(
package=self.flight.package,
aircraft_type=self.flight.unit_type,
flight_type=self.flight.flight_type,
units=self.group.units,
size=len(self.group.units),
friendly=self.flight.from_cp.captured,
departure_delay=mission_start_time,
departure=self.flight.departure.active_runway(
self.game.conditions, self.dynamic_runways
),
arrival=self.flight.arrival.active_runway(
self.game.conditions, self.dynamic_runways
),
divert=divert,
waypoints=waypoints,
intra_flight_channel=flight_channel,
bingo_fuel=self.flight.flight_plan.bingo_fuel,
joker_fuel=self.flight.flight_plan.joker_fuel,
custom_name=self.flight.custom_name,
laser_codes=laser_codes,
)
def configure_flight_member(
self, unit: FlyingUnit, pilot: Optional[Pilot], laser_codes: list[Optional[int]]
) -> None:
player = pilot is not None and pilot.player
self.set_skill(unit, pilot)
if self.flight.loadout.has_weapon_of_type(WeaponTypeEnum.TGP) and player:
laser_codes.append(self.laser_code_registry.get_next_laser_code())
else:
laser_codes.append(None)
if unit.unit_type is F_14B:
unit.set_property(F_14B.Properties.INSAlignmentStored.id, True)
def setup_radios(self) -> RadioFrequency:
if self.flight.flight_type in {FlightType.AEWC, FlightType.REFUELING}:
channel = self.radio_registry.alloc_uhf()
self.register_air_support(channel)
else:
channel = self.flight.unit_type.alloc_flight_radio(self.radio_registry)
self.group.set_frequency(channel.mhz)
return channel
def register_air_support(self, channel: RadioFrequency) -> None:
callsign = callsign_for_support_unit(self.group)
if isinstance(self.flight.flight_plan, AwacsFlightPlan):
self.air_support.awacs.append(
AwacsInfo(
group_name=str(self.group.name),
callsign=callsign,
freq=channel,
depature_location=self.flight.departure.name,
end_time=self.flight.flight_plan.mission_departure_time,
start_time=self.flight.flight_plan.mission_start_time,
blue=self.flight.departure.captured,
)
)
elif isinstance(self.flight.flight_plan, RefuelingFlightPlan):
tacan = self.tacan_registry.alloc_for_band(TacanBand.Y, TacanUsage.AirToAir)
self.air_support.tankers.append(
TankerInfo(
group_name=str(self.group.name),
callsign=callsign,
variant=self.flight.unit_type.name,
freq=channel,
tacan=tacan,
start_time=self.flight.flight_plan.patrol_start_time,
end_time=self.flight.flight_plan.patrol_end_time,
blue=self.flight.departure.captured,
)
)
def set_skill(self, unit: FlyingUnit, pilot: Optional[Pilot]) -> None:
if pilot is None or not pilot.player:
unit.skill = self.skill_level_for(unit, pilot)
return
if self.use_client:
unit.set_client()
else:
unit.set_player()
def skill_level_for(self, unit: FlyingUnit, pilot: Optional[Pilot]) -> Skill:
if self.flight.squadron.player:
base_skill = Skill(self.game.settings.player_skill)
else:
base_skill = Skill(self.game.settings.enemy_skill)
if pilot is None:
logging.error(f"Cannot determine skill level: {unit.name} has not pilot")
return base_skill
levels = [
Skill.Average,
Skill.Good,
Skill.High,
Skill.Excellent,
]
current_level = levels.index(base_skill)
missions_for_skill_increase = 4
increase = pilot.record.missions_flown // missions_for_skill_increase
capped_increase = min(current_level + increase, len(levels) - 1)
new_level = (capped_increase, current_level)[
self.game.settings.ai_pilot_levelling
]
return levels[new_level]
def setup_payload(self) -> None:
for p in self.group.units:
p.pylons.clear()
loadout = self.flight.loadout
if self.game.settings.restrict_weapons_by_date:
loadout = loadout.degrade_for_date(self.flight.unit_type, self.game.date)
for pylon_number, weapon in loadout.pylons.items():
if weapon is None:
continue
pylon = Pylon.for_aircraft(self.flight.unit_type, pylon_number)
pylon.equip(self.group, weapon)
def setup_fuel(self) -> None:
# Special case so Su 33 and C101 can take off
unit_type = self.flight.unit_type.dcs_unit_type
if unit_type == Su_33:
for unit in self.group.units:
if self.group.task == CAP:
unit.fuel = Su_33.fuel_max / 2.2
else:
unit.fuel = Su_33.fuel_max * 0.8
elif unit_type in {C_101EB, C_101CC}:
for unit in self.group.units:
unit.fuel = unit_type.fuel_max * 0.5

View File

@ -0,0 +1 @@
from .waypointgenerator import WaypointGenerator

View File

@ -0,0 +1,42 @@
import logging
from dcs.point import MovingPoint
from dcs.task import AttackGroup, WeaponType
from game.theater import TheaterGroundObject
from game.transfers import MultiGroupTransport
from .pydcswaypointbuilder import PydcsWaypointBuilder
class BaiIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
# TODO: Add common "UnitGroupTarget" base type.
group_names = []
target = self.package.target
if isinstance(target, TheaterGroundObject):
for group in target.groups:
group_names.append(group.name)
elif isinstance(target, MultiGroupTransport):
group_names.append(target.name)
else:
logging.error(
"Unexpected target type for BAI mission: %s",
target.__class__.__name__,
)
return waypoint
for group_name in group_names:
group = self.mission.find_group(group_name)
if group is None:
logging.error("Could not find group for BAI mission %s", group_name)
continue
task = AttackGroup(group.id, weapon_type=WeaponType.Auto)
task.params["attackQtyLimit"] = False
task.params["directionEnabled"] = False
task.params["altitudeEnabled"] = False
task.params["groupAttack"] = True
waypoint.tasks.append(task)
return waypoint

View File

@ -0,0 +1,14 @@
from dcs.point import MovingPoint, PointAction
from .pydcswaypointbuilder import PydcsWaypointBuilder
class CargoStopBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
waypoint.type = "LandingReFuAr"
waypoint.action = PointAction.LandingReFuAr
waypoint.landing_refuel_rearm_time = 2 # Minutes.
if (control_point := self.waypoint.control_point) is not None:
waypoint.airdrome_id = control_point.airdrome_id_for_landing
return waypoint

View File

@ -0,0 +1,38 @@
import logging
from dcs.point import MovingPoint
from dcs.task import EngageTargets, EngageTargetsInZone, Targets
from game.utils import nautical_miles
from gen.flights.flightplan import CasFlightPlan
from .pydcswaypointbuilder import PydcsWaypointBuilder
class CasIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
if isinstance(self.flight.flight_plan, CasFlightPlan):
waypoint.add_task(
EngageTargetsInZone(
position=self.flight.flight_plan.target.position,
radius=int(self.flight.flight_plan.engagement_distance.meters),
targets=[
Targets.All.GroundUnits.GroundVehicles,
Targets.All.GroundUnits.AirDefence.AAA,
Targets.All.GroundUnits.Infantry,
],
)
)
else:
logging.error("No CAS waypoint found. Falling back to search and engage")
waypoint.add_task(
EngageTargets(
max_distance=int(nautical_miles(10).meters),
targets=[
Targets.All.GroundUnits.GroundVehicles,
Targets.All.GroundUnits.AirDefence.AAA,
Targets.All.GroundUnits.Infantry,
],
)
)
return waypoint

View File

@ -0,0 +1,36 @@
import logging
from dcs.point import MovingPoint
from dcs.task import AttackGroup, WeaponType
from game.theater import TheaterGroundObject
from .pydcswaypointbuilder import PydcsWaypointBuilder
class DeadIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
self.register_special_waypoints(self.waypoint.targets)
target = self.package.target
if not isinstance(target, TheaterGroundObject):
logging.error(
"Unexpected target type for DEAD mission: %s",
target.__class__.__name__,
)
return waypoint
for group in target.groups:
miz_group = self.mission.find_group(group.name)
if miz_group is None:
logging.error(f"Could not find group for DEAD mission {group.name}")
continue
task = AttackGroup(miz_group.id, weapon_type=WeaponType.Auto)
task.params["expend"] = "All"
task.params["attackQtyLimit"] = False
task.params["directionEnabled"] = False
task.params["altitudeEnabled"] = False
task.params["groupAttack"] = True
waypoint.tasks.append(task)
return waypoint

View File

@ -0,0 +1,5 @@
from .pydcswaypointbuilder import PydcsWaypointBuilder
class DefaultWaypointBuilder(PydcsWaypointBuilder):
pass

View File

@ -0,0 +1,28 @@
import logging
from dcs.point import MovingPoint
from dcs.task import ControlledTask, OrbitAction
from gen.flights.flightplan import LoiterFlightPlan
from .pydcswaypointbuilder import PydcsWaypointBuilder
class HoldPointBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
loiter = ControlledTask(
OrbitAction(altitude=waypoint.alt, pattern=OrbitAction.OrbitPattern.Circle)
)
if not isinstance(self.flight.flight_plan, LoiterFlightPlan):
flight_plan_type = self.flight.flight_plan.__class__.__name__
logging.error(
f"Cannot configure hold for for {self.flight} because "
f"{flight_plan_type} does not define a push time. AI will push "
"immediately and may flight unsuitable speeds."
)
return waypoint
push_time = self.flight.flight_plan.push_time
self.waypoint.departure_time = push_time
loiter.stop_after_time(int(push_time.total_seconds()))
waypoint.add_task(loiter)
return waypoint

View File

@ -0,0 +1,70 @@
from typing import List, Type
from dcs.point import MovingPoint
from dcs.task import ControlledTask, EngageTargets, TargetType, Targets
from game.ato import FlightType
from game.utils import nautical_miles
from .pydcswaypointbuilder import PydcsWaypointBuilder
class JoinPointBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
if self.flight.flight_type == FlightType.ESCORT:
self.configure_escort_tasks(
waypoint,
[
Targets.All.Air.Planes.Fighters,
Targets.All.Air.Planes.MultiroleFighters,
],
)
elif self.flight.flight_type == FlightType.SEAD_ESCORT:
self.configure_escort_tasks(
waypoint, [Targets.All.GroundUnits.AirDefence.AAA.SAMRelated]
)
return waypoint
@staticmethod
def configure_escort_tasks(
waypoint: MovingPoint, target_types: List[Type[TargetType]]
) -> None:
# Ideally we would use the escort mission type and escort task to have
# the AI automatically but the AI only escorts AI flights while they are
# traveling between waypoints. When an AI flight performs an attack
# (such as attacking the mission target), AI escorts wander aimlessly
# until the escorted group resumes its flight plan.
#
# As such, we instead use the Search Then Engage task, which is an
# enroute task that causes the AI to follow their flight plan and engage
# enemies of the set type within a certain distance. The downside to
# this approach is that AI escorts are no longer related to the group
# they are escorting, aside from the fact that they fly a similar flight
# plan at the same time. With Escort, the escorts will follow the
# escorted group out of the area. The strike element may or may not fly
# directly over the target, and they may or may not require multiple
# attack runs. For the escort flight we must just assume a flight plan
# for the escort to fly. If the strike flight doesn't need to overfly
# the target, the escorts are needlessly going in harms way. If the
# strike flight needs multiple passes, the escorts may leave before the
# escorted aircraft do.
#
# Another possible option would be to use Search Then Engage for join ->
# ingress and egress -> split, but use a Search Then Engage in Zone task
# for the target area that is set to end on a flag flip that occurs when
# the strike aircraft finish their attack task.
#
# https://forums.eagle.ru/topic/251798-options-for-alternate-ai-escort-behavior
waypoint.add_task(
ControlledTask(
EngageTargets(
# TODO: From doctrine.
max_distance=int(nautical_miles(30).meters),
targets=target_types,
)
)
)
# We could set this task to end at the split point. pydcs doesn't
# currently support that task end condition though, and we don't really
# need it.

View File

@ -0,0 +1,13 @@
from dcs.point import MovingPoint, PointAction
from .pydcswaypointbuilder import PydcsWaypointBuilder
class LandingPointBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
waypoint.type = "Land"
waypoint.action = PointAction.Landing
if (control_point := self.waypoint.control_point) is not None:
waypoint.airdrome_id = control_point.airdrome_id_for_landing
return waypoint

View File

@ -0,0 +1,35 @@
import logging
from dcs.point import MovingPoint
from dcs.task import EngageTargetsInZone, Targets
from game.theater import Airfield
from game.utils import nautical_miles
from .pydcswaypointbuilder import PydcsWaypointBuilder
class OcaAircraftIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
target = self.package.target
if not isinstance(target, Airfield):
logging.error(
"Unexpected target type for OCA Strike mission: %s",
target.__class__.__name__,
)
return waypoint
task = EngageTargetsInZone(
position=target.position,
# Al Dhafra is 4 nm across at most. Add a little wiggle room in case
# the airport position from DCS is not centered.
radius=int(nautical_miles(3).meters),
targets=[Targets.All.Air],
)
task.params["attackQtyLimit"] = False
task.params["directionEnabled"] = False
task.params["altitudeEnabled"] = False
task.params["groupAttack"] = True
waypoint.tasks.append(task)
return waypoint

View File

@ -0,0 +1,25 @@
import logging
from dcs.point import MovingPoint
from dcs.task import BombingRunway
from game.theater import Airfield
from .pydcswaypointbuilder import PydcsWaypointBuilder
class OcaRunwayIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
target = self.package.target
if not isinstance(target, Airfield):
logging.error(
"Unexpected target type for runway bombing mission: %s",
target.__class__.__name__,
)
return waypoint
waypoint.tasks.append(
BombingRunway(airport_id=target.airport.id, group_attack=True)
)
return waypoint

View File

@ -0,0 +1,86 @@
from __future__ import annotations
from datetime import timedelta
from typing import Any, Iterable, Union
from dcs import Mission, Point
from dcs.planes import AJS37, F_14B, JF_17
from dcs.point import MovingPoint, PointAction
from dcs.unit import Unit
from dcs.unitgroup import FlyingGroup
from game.ato import Flight, FlightWaypoint
from game.ato.flightwaypointtype import FlightWaypointType
from game.missiongenerator.airsupport import AirSupport
from game.theater import MissionTarget
TARGET_WAYPOINTS = (
FlightWaypointType.TARGET_GROUP_LOC,
FlightWaypointType.TARGET_POINT,
FlightWaypointType.TARGET_SHIP,
)
class PydcsWaypointBuilder:
def __init__(
self,
waypoint: FlightWaypoint,
group: FlyingGroup[Any],
flight: Flight,
mission: Mission,
air_support: AirSupport,
) -> None:
self.waypoint = waypoint
self.group = group
self.package = flight.package
self.flight = flight
self.mission = mission
self.air_support = air_support
def build(self) -> MovingPoint:
waypoint = self.group.add_waypoint(
Point(self.waypoint.x, self.waypoint.y),
self.waypoint.alt.meters,
name=self.waypoint.name,
)
if self.waypoint.flyover:
waypoint.action = PointAction.FlyOverPoint
# It seems we need to leave waypoint.type exactly as it is even
# though it's set to "Turning Point". If I set this to "Fly Over
# Point" and then save the mission in the ME DCS resets it.
waypoint.alt_type = self.waypoint.alt_type
tot = self.flight.flight_plan.tot_for_waypoint(self.waypoint)
if tot is not None:
self.set_waypoint_tot(waypoint, tot)
return waypoint
def set_waypoint_tot(self, waypoint: MovingPoint, tot: timedelta) -> None:
self.waypoint.tot = tot
if not self._viggen_client_tot():
waypoint.ETA = int(tot.total_seconds())
waypoint.ETA_locked = True
waypoint.speed_locked = False
def _viggen_client_tot(self) -> bool:
"""Viggen player aircraft consider any waypoint with a TOT set to be a target ("M") waypoint.
If the flight is a player controlled Viggen flight, no TOT should be set on any waypoint except actual target waypoints.
"""
if (
self.flight.client_count > 0
and self.flight.unit_type.dcs_unit_type == AJS37
) and (self.waypoint.waypoint_type not in TARGET_WAYPOINTS):
return True
else:
return False
def register_special_waypoints(
self, targets: Iterable[Union[MissionTarget, Unit]]
) -> None:
"""Create special target waypoints for various aircraft"""
for i, t in enumerate(targets):
if self.group.units[0].unit_type == JF_17 and i < 4:
self.group.add_nav_target_point(t.position, "PP" + str(i + 1))
if self.group.units[0].unit_type == F_14B and i == 0:
self.group.add_nav_target_point(t.position, "ST")

View File

@ -0,0 +1,87 @@
import logging
from dcs.point import MovingPoint
from dcs.task import (
ActivateBeaconCommand,
ControlledTask,
EngageTargets,
OrbitAction,
Tanker,
Targets,
)
from game.ato import FlightType
from gen.flights.flightplan import PatrollingFlightPlan
from .pydcswaypointbuilder import PydcsWaypointBuilder
class RaceTrackBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
flight_plan = self.flight.flight_plan
if not isinstance(flight_plan, PatrollingFlightPlan):
flight_plan_type = flight_plan.__class__.__name__
logging.error(
f"Cannot create race track for {self.flight} because "
f"{flight_plan_type} does not define a patrol."
)
return waypoint
# NB: It's important that the engage task comes before the orbit task.
# Though they're on the same waypoint, if the orbit task comes first it
# is their first priority and they will not engage any targets because
# they're fully focused on orbiting. If the STE task is first, they will
# engage targets if available and orbit if they find nothing to shoot.
if self.flight.flight_type is FlightType.REFUELING:
self.configure_refueling_actions(waypoint)
# TODO: Move the properties of this task into the flight plan?
# CAP is the only current user of this so it's not a big deal, but might
# be good to make this usable for things like BAI when we add that
# later.
cap_types = {FlightType.BARCAP, FlightType.TARCAP}
if self.flight.flight_type in cap_types:
engagement_distance = int(flight_plan.engagement_distance.meters)
waypoint.tasks.append(
EngageTargets(
max_distance=engagement_distance, targets=[Targets.All.Air]
)
)
orbit = OrbitAction(
altitude=waypoint.alt,
pattern=OrbitAction.OrbitPattern.RaceTrack,
speed=int(flight_plan.patrol_speed.kph),
)
racetrack = ControlledTask(orbit)
self.set_waypoint_tot(waypoint, flight_plan.patrol_start_time)
racetrack.stop_after_time(int(flight_plan.patrol_end_time.total_seconds()))
waypoint.add_task(racetrack)
return waypoint
def configure_refueling_actions(self, waypoint: MovingPoint) -> None:
waypoint.add_task(Tanker())
if self.flight.unit_type.dcs_unit_type.tacan:
tanker_info = self.air_support.tankers[-1]
tacan = tanker_info.tacan
tacan_callsign = {
"Texaco": "TEX",
"Arco": "ARC",
"Shell": "SHL",
}.get(tanker_info.callsign)
waypoint.add_task(
ActivateBeaconCommand(
tacan.number,
tacan.band.value,
tacan_callsign,
bearing=True,
unit_id=self.group.units[0].id,
aa=True,
)
)

View File

@ -0,0 +1,22 @@
import logging
from dcs.point import MovingPoint
from gen.flights.flightplan import PatrollingFlightPlan
from .pydcswaypointbuilder import PydcsWaypointBuilder
class RaceTrackEndBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
if not isinstance(self.flight.flight_plan, PatrollingFlightPlan):
flight_plan_type = self.flight.flight_plan.__class__.__name__
logging.error(
f"Cannot create race track for {self.flight} because "
f"{flight_plan_type} does not define a patrol."
)
return waypoint
self.waypoint.departure_time = self.flight.flight_plan.patrol_end_time
return waypoint

View File

@ -0,0 +1,36 @@
import logging
from dcs.point import MovingPoint
from dcs.task import AttackGroup, WeaponType
from game.theater import TheaterGroundObject
from .pydcswaypointbuilder import PydcsWaypointBuilder
class SeadIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
self.register_special_waypoints(self.waypoint.targets)
target = self.package.target
if not isinstance(target, TheaterGroundObject):
logging.error(
"Unexpected target type for SEAD mission: %s",
target.__class__.__name__,
)
return waypoint
for group in target.groups:
miz_group = self.mission.find_group(group.name)
if miz_group is None:
logging.error(f"Could not find group for SEAD mission {group.name}")
continue
task = AttackGroup(miz_group.id, weapon_type=WeaponType.Guided)
task.params["expend"] = "All"
task.params["attackQtyLimit"] = False
task.params["directionEnabled"] = False
task.params["altitudeEnabled"] = False
task.params["groupAttack"] = True
waypoint.tasks.append(task)
return waypoint

View File

@ -0,0 +1,50 @@
from dcs import Point
from dcs.planes import B_17G, B_52H, Tu_22M3
from dcs.point import MovingPoint
from dcs.task import Bombing, WeaponType
from .pydcswaypointbuilder import PydcsWaypointBuilder
class StrikeIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
if self.group.units[0].unit_type in [B_17G, B_52H, Tu_22M3]:
return self.build_bombing()
else:
return self.build_strike()
def build_bombing(self) -> MovingPoint:
waypoint = super().build()
targets = self.waypoint.targets
if not targets:
return waypoint
center = Point(0, 0)
for target in targets:
center.x += target.position.x
center.y += target.position.y
center.x /= len(targets)
center.y /= len(targets)
bombing = Bombing(center, weapon_type=WeaponType.Bombs)
bombing.params["expend"] = "All"
bombing.params["attackQtyLimit"] = False
bombing.params["directionEnabled"] = False
bombing.params["altitudeEnabled"] = False
bombing.params["groupAttack"] = True
waypoint.tasks.append(bombing)
return waypoint
def build_strike(self) -> MovingPoint:
waypoint = super().build()
for target in self.waypoint.targets:
bombing = Bombing(target.position, weapon_type=WeaponType.Auto)
# If there is only one target, drop all ordnance in one pass.
if len(self.waypoint.targets) == 1:
bombing.params["expend"] = "All"
bombing.params["groupAttack"] = True
waypoint.tasks.append(bombing)
# Register special waypoints
self.register_special_waypoints(self.waypoint.targets)
return waypoint

View File

@ -0,0 +1,33 @@
import logging
from dcs.point import MovingPoint
from dcs.task import EngageTargets, Targets
from game.utils import nautical_miles
from gen.flights.flightplan import SweepFlightPlan
from .pydcswaypointbuilder import PydcsWaypointBuilder
class SweepIngressBuilder(PydcsWaypointBuilder):
def build(self) -> MovingPoint:
waypoint = super().build()
if not isinstance(self.flight.flight_plan, SweepFlightPlan):
flight_plan_type = self.flight.flight_plan.__class__.__name__
logging.error(
f"Cannot create sweep for {self.flight} because "
f"{flight_plan_type} is not a sweep flight plan."
)
return waypoint
waypoint.tasks.append(
EngageTargets(
max_distance=int(nautical_miles(50).meters),
targets=[
Targets.All.Air.Planes.Fighters,
Targets.All.Air.Planes.MultiroleFighters,
],
)
)
return waypoint

View File

@ -0,0 +1,263 @@
import itertools
import random
from datetime import timedelta
from typing import Any
from dcs import Mission
from dcs.action import AITaskPush, ActivateGroup
from dcs.condition import CoalitionHasAirdrome, TimeAfter
from dcs.planes import AJS37
from dcs.task import StartCommand
from dcs.triggers import Event, TriggerOnce, TriggerRule
from dcs.unitgroup import FlyingGroup
from game.ato import Flight, FlightWaypoint
from game.ato.flightwaypointtype import FlightWaypointType
from game.ato.starttype import StartType
from game.missiongenerator.airsupport import AirSupport
from game.settings import Settings
from game.theater import ControlPointType
from game.utils import meters, pairwise
from gen.flights.traveltime import TotEstimator
from .pydcswaypointbuilder import PydcsWaypointBuilder, TARGET_WAYPOINTS
from .baiingress import BaiIngressBuilder
from .cargostop import CargoStopBuilder
from .casingress import CasIngressBuilder
from .deadingress import DeadIngressBuilder
from .default import DefaultWaypointBuilder
from .holdpoint import HoldPointBuilder
from .joinpoint import JoinPointBuilder
from .landingpoint import LandingPointBuilder
from .ocaaircraftingress import OcaAircraftIngressBuilder
from .ocarunwayingress import OcaRunwayIngressBuilder
from .racetrack import RaceTrackBuilder
from .racetrackend import RaceTrackEndBuilder
from .seadingress import SeadIngressBuilder
from .strikeingress import StrikeIngressBuilder
from .sweepingress import SweepIngressBuilder
class WaypointGenerator:
def __init__(
self,
flight: Flight,
group: FlyingGroup[Any],
mission: Mission,
settings: Settings,
air_support: AirSupport,
) -> None:
self.flight = flight
self.group = group
self.mission = mission
self.settings = settings
self.air_support = air_support
def create_waypoints(self) -> tuple[timedelta, list[FlightWaypoint]]:
for waypoint in self.flight.points:
waypoint.tot = None
takeoff_point = FlightWaypoint.from_pydcs(
self.group.points[0], self.flight.from_cp
)
mission_start_time = self.set_takeoff_time(takeoff_point)
filtered_points: list[FlightWaypoint] = []
for point in self.flight.points:
if point.only_for_player and not self.flight.client_count:
continue
filtered_points.append(point)
# Only add 1 target waypoint for Viggens. This only affects player flights, the
# Viggen can't have more than 9 waypoints which leaves us with two target point
# under the current flight plans.
# TODO: Make this smarter. It currently targets a random unit in the group.
# This could be updated to make it pick the "best" two targets in the group.
if self.flight.unit_type.dcs_unit_type is AJS37 and self.flight.client_count:
viggen_target_points = [
(idx, point)
for idx, point in enumerate(filtered_points)
if point.waypoint_type in TARGET_WAYPOINTS
]
if viggen_target_points:
keep_target = viggen_target_points[
random.randint(0, len(viggen_target_points) - 1)
]
filtered_points = [
point
for idx, point in enumerate(filtered_points)
if (
point.waypoint_type not in TARGET_WAYPOINTS
or idx == keep_target[0]
)
]
for idx, point in enumerate(filtered_points):
self.builder_for_waypoint(point).build()
# Set here rather than when the FlightData is created so they waypoints
# have their TOTs and fuel minimums set. Once we're more confident in our fuel
# estimation ability the minimum fuel amounts will be calculated during flight
# plan construction, but for now it's only used by the kneeboard so is generated
# late.
waypoints = [takeoff_point] + self.flight.points
self._estimate_min_fuel_for(waypoints)
return mission_start_time, waypoints
def builder_for_waypoint(self, waypoint: FlightWaypoint) -> PydcsWaypointBuilder:
builders = {
FlightWaypointType.DROP_OFF: CargoStopBuilder,
FlightWaypointType.INGRESS_BAI: BaiIngressBuilder,
FlightWaypointType.INGRESS_CAS: CasIngressBuilder,
FlightWaypointType.INGRESS_DEAD: DeadIngressBuilder,
FlightWaypointType.INGRESS_OCA_AIRCRAFT: OcaAircraftIngressBuilder,
FlightWaypointType.INGRESS_OCA_RUNWAY: OcaRunwayIngressBuilder,
FlightWaypointType.INGRESS_SEAD: SeadIngressBuilder,
FlightWaypointType.INGRESS_STRIKE: StrikeIngressBuilder,
FlightWaypointType.INGRESS_SWEEP: SweepIngressBuilder,
FlightWaypointType.JOIN: JoinPointBuilder,
FlightWaypointType.LANDING_POINT: LandingPointBuilder,
FlightWaypointType.LOITER: HoldPointBuilder,
FlightWaypointType.PATROL: RaceTrackEndBuilder,
FlightWaypointType.PATROL_TRACK: RaceTrackBuilder,
FlightWaypointType.PICKUP: CargoStopBuilder,
}
builder = builders.get(waypoint.waypoint_type, DefaultWaypointBuilder)
return builder(
waypoint, self.group, self.flight, self.mission, self.air_support
)
def _estimate_min_fuel_for(self, waypoints: list[FlightWaypoint]) -> None:
if self.flight.unit_type.fuel_consumption is None:
return
combat_speed_types = {
FlightWaypointType.INGRESS_BAI,
FlightWaypointType.INGRESS_CAS,
FlightWaypointType.INGRESS_DEAD,
FlightWaypointType.INGRESS_ESCORT,
FlightWaypointType.INGRESS_OCA_AIRCRAFT,
FlightWaypointType.INGRESS_OCA_RUNWAY,
FlightWaypointType.INGRESS_SEAD,
FlightWaypointType.INGRESS_STRIKE,
FlightWaypointType.INGRESS_SWEEP,
FlightWaypointType.SPLIT,
} | set(TARGET_WAYPOINTS)
consumption = self.flight.unit_type.fuel_consumption
min_fuel: float = consumption.min_safe
# The flight plan (in reverse) up to and including the arrival point.
main_flight_plan = reversed(waypoints)
try:
while waypoint := next(main_flight_plan):
if waypoint.waypoint_type is FlightWaypointType.LANDING_POINT:
waypoint.min_fuel = min_fuel
main_flight_plan = itertools.chain([waypoint], main_flight_plan)
break
except StopIteration:
# Some custom flight plan without a landing point. Skip it.
return
for b, a in pairwise(main_flight_plan):
distance = meters(a.position.distance_to_point(b.position))
if a.waypoint_type is FlightWaypointType.TAKEOFF:
ppm = consumption.climb
elif b.waypoint_type in combat_speed_types:
ppm = consumption.combat
else:
ppm = consumption.cruise
min_fuel += distance.nautical_miles * ppm
a.min_fuel = min_fuel
def set_takeoff_time(self, waypoint: FlightWaypoint) -> timedelta:
estimator = TotEstimator(self.flight.package)
start_time = estimator.mission_start_time(self.flight)
if self.should_delay_flight(start_time):
if self.should_activate_late():
# Late activation causes the aircraft to not be spawned
# until triggered.
self.set_activation_time(start_time)
elif self.flight.start_type is StartType.COLD:
# Setting the start time causes the AI to wait until the
# specified time to begin their startup sequence.
self.set_startup_time(start_time)
# And setting *our* waypoint TOT causes the takeoff time to show up in
# the player's kneeboard.
waypoint.tot = self.flight.flight_plan.takeoff_time()
return start_time
def set_activation_time(self, delay: timedelta) -> None:
# Note: Late activation causes the waypoint TOTs to look *weird* in the
# mission editor. Waypoint times will be relative to the group
# activation time rather than in absolute local time. A flight delayed
# until 09:10 when the overall mission start time is 09:00, with a join
# time of 09:30 will show the join time as 00:30, not 09:30.
self.group.late_activation = True
activation_trigger = TriggerOnce(
Event.NoEvent, f"FlightLateActivationTrigger{self.group.id}"
)
activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds())))
self.prevent_spawn_at_hostile_airbase(activation_trigger)
activation_trigger.add_action(ActivateGroup(self.group.id))
self.mission.triggerrules.triggers.append(activation_trigger)
def prevent_spawn_at_hostile_airbase(self, trigger: TriggerRule) -> None:
# Prevent delayed flights from spawning at airbases if they were
# captured before they've spawned.
if self.flight.from_cp.cptype != ControlPointType.AIRBASE:
return
trigger.add_condition(
CoalitionHasAirdrome(
self.flight.squadron.coalition.coalition_id, self.flight.from_cp.id
)
)
def set_startup_time(self, delay: timedelta) -> None:
# Uncontrolled causes the AI unit to spawn, but not begin startup.
self.group.uncontrolled = True
activation_trigger = TriggerOnce(
Event.NoEvent, f"FlightStartTrigger{self.group.id}"
)
activation_trigger.add_condition(TimeAfter(seconds=int(delay.total_seconds())))
self.prevent_spawn_at_hostile_airbase(activation_trigger)
self.group.add_trigger_action(StartCommand())
activation_trigger.add_action(AITaskPush(self.group.id, len(self.group.tasks)))
self.mission.triggerrules.triggers.append(activation_trigger)
def should_delay_flight(self, start_time: timedelta) -> bool:
if start_time.total_seconds() <= 0:
return False
if not self.flight.client_count:
return True
if start_time < timedelta(minutes=10):
# Don't bother delaying client flights with short start delays. Much
# more than ten minutes starts to eat into fuel a bit more
# (espeicially for something fuel limited like a Harrier).
return False
return not self.settings.never_delay_player_flights
def should_activate_late(self) -> bool:
if self.flight.start_type is StartType.COLD:
# Avoid spawning aircraft in the air or on the runway until it's
# time for their mission. Also avoid burning through gas spawning
# hot aircraft hours before their takeoff time.
return True
if self.flight.from_cp.is_fleet:
# Carrier spawns will crowd the carrier deck, especially without
# super carrier.
# TODO: Is there enough parking on the supercarrier?
return True
return False

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,7 @@ from gen.ground_forces.combat_stance import CombatStance
from game.radio.radios import RadioFrequency
from gen.runways import RunwayData
from .aircraftgenerator import FlightData
from .aircraft.flightdata import FlightData
from .airsupportgenerator import AwacsInfo, TankerInfo
from .flotgenerator import JtacInfo

View File

@ -48,7 +48,7 @@ from game.ato.flightwaypoint import FlightWaypoint
from game.radio.radios import RadioFrequency
from gen.runways import RunwayData
from .aircraftgenerator import FlightData
from .aircraft.flightdata import FlightData
from .airsupportgenerator import AwacsInfo, TankerInfo
from .briefinggenerator import CommInfo, JtacInfo, MissionInfoGenerator

View File

@ -14,7 +14,7 @@ from game.ato import FlightType
from game.plugins import LuaPluginManager
from game.theater import TheaterGroundObject
from .aircraftgenerator import FlightData
from .aircraft.flightdata import FlightData
from .airsupport import AirSupport
if TYPE_CHECKING:

View File

@ -17,7 +17,10 @@ from game.theater import Airfield, FrontLine
from game.unitmap import UnitMap
from gen.airfields import AIRFIELD_DATA
from gen.naming import namegen
from .aircraftgenerator import AircraftGenerator, FlightData
from game.missiongenerator.aircraft.aircraftgenerator import (
AircraftGenerator,
)
from .aircraft.flightdata import FlightData
from .airsupport import AirSupport
from .airsupportgenerator import AirSupportGenerator
from .beacons import load_beacons_for_terrain

View File

@ -4,7 +4,7 @@ from dataclasses import dataclass
from typing import Optional, Any, TYPE_CHECKING
if TYPE_CHECKING:
from game.missiongenerator.aircraftgenerator import FlightData
from game.missiongenerator.aircraft.flightdata import FlightData
from game.missiongenerator.airsupport import AirSupport