mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
We estimate the longest possible time from mission start to TOT for all flights in a package and use that to set the TOT (plus any delay used to stagger flights). This both cuts down on loiter time for shorter flights and ensures that long flights will make it to the target in time. This is also used to compute the start time for the AI, so the explicit delay option is no longer needed.
536 lines
20 KiB
Python
536 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import random
|
|
import operator
|
|
from dataclasses import dataclass
|
|
from typing import Iterator, List, Optional, Set, TYPE_CHECKING, Tuple, Type
|
|
|
|
from dcs.unittype import FlyingType, UnitType
|
|
|
|
from game import db
|
|
from game.data.radar_db import UNITS_WITH_RADAR
|
|
from game.infos.information import Information
|
|
from game.utils import nm_to_meter
|
|
from gen import Conflict, PackageWaypointTiming
|
|
from gen.ato import Package
|
|
from gen.flights.ai_flight_planner_db import (
|
|
CAP_CAPABLE,
|
|
CAP_PREFERRED,
|
|
CAS_CAPABLE,
|
|
CAS_PREFERRED,
|
|
SEAD_CAPABLE,
|
|
SEAD_PREFERRED,
|
|
STRIKE_CAPABLE,
|
|
STRIKE_PREFERRED,
|
|
)
|
|
from gen.flights.closestairfields import (
|
|
ClosestAirfields,
|
|
ObjectiveDistanceCache,
|
|
)
|
|
from gen.flights.flight import (
|
|
Flight,
|
|
FlightType,
|
|
)
|
|
from gen.flights.flightplan import FlightPlanBuilder
|
|
from gen.flights.traveltime import TotEstimator
|
|
from theater import (
|
|
ControlPoint,
|
|
FrontLine,
|
|
MissionTarget,
|
|
TheaterGroundObject,
|
|
)
|
|
|
|
# Avoid importing some types that cause circular imports unless type checking.
|
|
if TYPE_CHECKING:
|
|
from game import Game
|
|
from game.inventory import GlobalAircraftInventory
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProposedFlight:
|
|
"""A flight outline proposed by the mission planner.
|
|
|
|
Proposed flights haven't been assigned specific aircraft yet. They have only
|
|
a task, a required number of aircraft, and a maximum distance allowed
|
|
between the objective and the departure airfield.
|
|
"""
|
|
|
|
#: The flight's role.
|
|
task: FlightType
|
|
|
|
#: The number of aircraft required.
|
|
num_aircraft: int
|
|
|
|
#: The maximum distance between the objective and the departure airfield.
|
|
max_distance: int
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.task.name} {self.num_aircraft} ship"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProposedMission:
|
|
"""A mission outline proposed by the mission planner.
|
|
|
|
Proposed missions haven't been assigned aircraft yet. They have only an
|
|
objective location and a list of proposed flights that are required for the
|
|
mission.
|
|
"""
|
|
|
|
#: The mission objective.
|
|
location: MissionTarget
|
|
|
|
#: The proposed flights that are required for the mission.
|
|
flights: List[ProposedFlight]
|
|
|
|
def __str__(self) -> str:
|
|
flights = ', '.join([str(f) for f in self.flights])
|
|
return f"{self.location.name}: {flights}"
|
|
|
|
|
|
class AircraftAllocator:
|
|
"""Finds suitable aircraft for proposed missions."""
|
|
|
|
def __init__(self, closest_airfields: ClosestAirfields,
|
|
global_inventory: GlobalAircraftInventory,
|
|
is_player: bool) -> None:
|
|
self.closest_airfields = closest_airfields
|
|
self.global_inventory = global_inventory
|
|
self.is_player = is_player
|
|
|
|
def find_aircraft_for_flight(
|
|
self, flight: ProposedFlight
|
|
) -> Optional[Tuple[ControlPoint, UnitType]]:
|
|
"""Finds aircraft suitable for the given mission.
|
|
|
|
Searches for aircraft capable of performing the given mission within the
|
|
maximum allowed range. If insufficient aircraft are available for the
|
|
mission, None is returned.
|
|
|
|
Airfields are searched ordered nearest to farthest from the target and
|
|
searched twice. The first search looks for aircraft which prefer the
|
|
mission type, and the second search looks for any aircraft which are
|
|
capable of the mission type. For example, an F-14 from a nearby carrier
|
|
will be preferred for the CAP of an airfield that has only F-16s, but if
|
|
the carrier has only F/A-18s the F-16s will be used for CAP instead.
|
|
|
|
Note that aircraft *will* be removed from the global inventory on
|
|
success. This is to ensure that the same aircraft are not matched twice
|
|
on subsequent calls. If the found aircraft are not used, the caller is
|
|
responsible for returning them to the inventory.
|
|
"""
|
|
result = self.find_aircraft_of_type(
|
|
flight, self.preferred_aircraft_for_task(flight.task)
|
|
)
|
|
if result is not None:
|
|
return result
|
|
return self.find_aircraft_of_type(
|
|
flight, self.capable_aircraft_for_task(flight.task)
|
|
)
|
|
|
|
@staticmethod
|
|
def preferred_aircraft_for_task(task: FlightType) -> List[Type[FlyingType]]:
|
|
cap_missions = (FlightType.BARCAP, FlightType.CAP, FlightType.TARCAP)
|
|
if task in cap_missions:
|
|
return CAP_PREFERRED
|
|
elif task == FlightType.CAS:
|
|
return CAS_PREFERRED
|
|
elif task in (FlightType.DEAD, FlightType.SEAD):
|
|
return SEAD_PREFERRED
|
|
elif task == FlightType.STRIKE:
|
|
return STRIKE_PREFERRED
|
|
elif task == FlightType.ESCORT:
|
|
return CAP_PREFERRED
|
|
else:
|
|
return []
|
|
|
|
@staticmethod
|
|
def capable_aircraft_for_task(task: FlightType) -> List[Type[FlyingType]]:
|
|
cap_missions = (FlightType.BARCAP, FlightType.CAP, FlightType.TARCAP)
|
|
if task in cap_missions:
|
|
return CAP_CAPABLE
|
|
elif task == FlightType.CAS:
|
|
return CAS_CAPABLE
|
|
elif task in (FlightType.DEAD, FlightType.SEAD):
|
|
return SEAD_CAPABLE
|
|
elif task == FlightType.STRIKE:
|
|
return STRIKE_CAPABLE
|
|
elif task == FlightType.ESCORT:
|
|
return CAP_CAPABLE
|
|
else:
|
|
logging.error(f"Unplannable flight type: {task}")
|
|
return []
|
|
|
|
def find_aircraft_of_type(
|
|
self, flight: ProposedFlight, types: List[Type[FlyingType]],
|
|
) -> Optional[Tuple[ControlPoint, UnitType]]:
|
|
airfields_in_range = self.closest_airfields.airfields_within(
|
|
flight.max_distance
|
|
)
|
|
for airfield in airfields_in_range:
|
|
if not airfield.is_friendly(self.is_player):
|
|
continue
|
|
inventory = self.global_inventory.for_control_point(airfield)
|
|
for aircraft, available in inventory.all_aircraft:
|
|
if aircraft in types and available >= flight.num_aircraft:
|
|
inventory.remove_aircraft(aircraft, flight.num_aircraft)
|
|
return airfield, aircraft
|
|
|
|
return None
|
|
|
|
|
|
class PackageBuilder:
|
|
"""Builds a Package for the flights it receives."""
|
|
|
|
def __init__(self, location: MissionTarget,
|
|
closest_airfields: ClosestAirfields,
|
|
global_inventory: GlobalAircraftInventory,
|
|
is_player: bool,
|
|
start_type: str) -> None:
|
|
self.package = Package(location)
|
|
self.allocator = AircraftAllocator(closest_airfields, global_inventory,
|
|
is_player)
|
|
self.global_inventory = global_inventory
|
|
self.start_type = start_type
|
|
|
|
def plan_flight(self, plan: ProposedFlight) -> bool:
|
|
"""Allocates aircraft for the given flight and adds them to the package.
|
|
|
|
If no suitable aircraft are available, False is returned. If the failed
|
|
flight was critical and the rest of the mission will be scrubbed, the
|
|
caller should return any previously planned flights to the inventory
|
|
using release_planned_aircraft.
|
|
"""
|
|
assignment = self.allocator.find_aircraft_for_flight(plan)
|
|
if assignment is None:
|
|
return False
|
|
airfield, aircraft = assignment
|
|
flight = Flight(aircraft, plan.num_aircraft, airfield, plan.task,
|
|
self.start_type)
|
|
self.package.add_flight(flight)
|
|
return True
|
|
|
|
def build(self) -> Package:
|
|
"""Returns the built package."""
|
|
return self.package
|
|
|
|
def release_planned_aircraft(self) -> None:
|
|
"""Returns any planned flights to the inventory."""
|
|
flights = list(self.package.flights)
|
|
for flight in flights:
|
|
self.global_inventory.return_from_flight(flight)
|
|
self.package.remove_flight(flight)
|
|
|
|
|
|
class ObjectiveFinder:
|
|
"""Identifies potential objectives for the mission planner."""
|
|
|
|
# TODO: Merge into doctrine.
|
|
AIRFIELD_THREAT_RANGE = nm_to_meter(150)
|
|
SAM_THREAT_RANGE = nm_to_meter(100)
|
|
|
|
def __init__(self, game: Game, is_player: bool) -> None:
|
|
self.game = game
|
|
self.is_player = is_player
|
|
|
|
def enemy_sams(self) -> Iterator[TheaterGroundObject]:
|
|
"""Iterates over all enemy SAM sites."""
|
|
# Control points might have the same ground object several times, for
|
|
# some reason.
|
|
found_targets: Set[str] = set()
|
|
for cp in self.enemy_control_points():
|
|
for ground_object in cp.ground_objects:
|
|
if ground_object.name in found_targets:
|
|
continue
|
|
|
|
if ground_object.dcs_identifier != "AA":
|
|
continue
|
|
|
|
if not self.object_has_radar(ground_object):
|
|
continue
|
|
|
|
# TODO: Yield in order of most threatening.
|
|
# Need to sort in order of how close their defensive range comes
|
|
# to friendly assets. To do that we need to add effective range
|
|
# information to the database.
|
|
yield ground_object
|
|
found_targets.add(ground_object.name)
|
|
|
|
def threatening_sams(self) -> Iterator[TheaterGroundObject]:
|
|
"""Iterates over enemy SAMs in threat range of friendly control points.
|
|
|
|
SAM sites are sorted by their closest proximity to any friendly control
|
|
point (airfield or fleet).
|
|
"""
|
|
sams: List[Tuple[TheaterGroundObject, int]] = []
|
|
for sam in self.enemy_sams():
|
|
ranges: List[int] = []
|
|
for cp in self.friendly_control_points():
|
|
ranges.append(sam.distance_to(cp))
|
|
sams.append((sam, min(ranges)))
|
|
|
|
sams = sorted(sams, key=operator.itemgetter(1))
|
|
for sam, _range in sams:
|
|
yield sam
|
|
|
|
def strike_targets(self) -> Iterator[TheaterGroundObject]:
|
|
"""Iterates over enemy strike targets.
|
|
|
|
Targets are sorted by their closest proximity to any friendly control
|
|
point (airfield or fleet).
|
|
"""
|
|
targets: List[Tuple[TheaterGroundObject, int]] = []
|
|
# Control points might have the same ground object several times, for
|
|
# some reason.
|
|
found_targets: Set[str] = set()
|
|
for enemy_cp in self.enemy_control_points():
|
|
for ground_object in enemy_cp.ground_objects:
|
|
if ground_object.name in found_targets:
|
|
continue
|
|
ranges: List[int] = []
|
|
for friendly_cp in self.friendly_control_points():
|
|
ranges.append(ground_object.distance_to(friendly_cp))
|
|
targets.append((ground_object, min(ranges)))
|
|
found_targets.add(ground_object.name)
|
|
targets = sorted(targets, key=operator.itemgetter(1))
|
|
for target, _range in targets:
|
|
yield target
|
|
|
|
@staticmethod
|
|
def object_has_radar(ground_object: TheaterGroundObject) -> bool:
|
|
"""Returns True if the ground object contains a unit with radar."""
|
|
for group in ground_object.groups:
|
|
for unit in group.units:
|
|
if db.unit_type_from_name(unit.type) in UNITS_WITH_RADAR:
|
|
return True
|
|
return False
|
|
|
|
def front_lines(self) -> Iterator[FrontLine]:
|
|
"""Iterates over all active front lines in the theater."""
|
|
for cp in self.friendly_control_points():
|
|
for connected in cp.connected_points:
|
|
if connected.is_friendly(self.is_player):
|
|
continue
|
|
|
|
if Conflict.has_frontline_between(cp, connected):
|
|
yield FrontLine(cp, connected)
|
|
|
|
def vulnerable_control_points(self) -> Iterator[ControlPoint]:
|
|
"""Iterates over friendly CPs that are vulnerable to enemy CPs.
|
|
|
|
Vulnerability is defined as any enemy CP within threat range of of the
|
|
CP.
|
|
"""
|
|
for cp in self.friendly_control_points():
|
|
airfields_in_proximity = self.closest_airfields_to(cp)
|
|
airfields_in_threat_range = airfields_in_proximity.airfields_within(
|
|
self.AIRFIELD_THREAT_RANGE
|
|
)
|
|
for airfield in airfields_in_threat_range:
|
|
if not airfield.is_friendly(self.is_player):
|
|
yield cp
|
|
break
|
|
|
|
def friendly_control_points(self) -> Iterator[ControlPoint]:
|
|
"""Iterates over all friendly control points."""
|
|
return (c for c in self.game.theater.controlpoints if
|
|
c.is_friendly(self.is_player))
|
|
|
|
def enemy_control_points(self) -> Iterator[ControlPoint]:
|
|
"""Iterates over all enemy control points."""
|
|
return (c for c in self.game.theater.controlpoints if
|
|
not c.is_friendly(self.is_player))
|
|
|
|
def all_possible_targets(self) -> Iterator[MissionTarget]:
|
|
"""Iterates over all possible mission targets in the theater.
|
|
|
|
Valid mission targets are control points (airfields and carriers), front
|
|
lines, and ground objects (SAM sites, factories, resource extraction
|
|
sites, etc).
|
|
"""
|
|
for cp in self.game.theater.controlpoints:
|
|
yield cp
|
|
yield from cp.ground_objects
|
|
yield from self.front_lines()
|
|
|
|
@staticmethod
|
|
def closest_airfields_to(location: MissionTarget) -> ClosestAirfields:
|
|
"""Returns the closest airfields to the given location."""
|
|
return ObjectiveDistanceCache.get_closest_airfields(location)
|
|
|
|
|
|
class CoalitionMissionPlanner:
|
|
"""Coalition flight planning AI.
|
|
|
|
This class is responsible for automatically planning missions for the
|
|
coalition at the start of the turn.
|
|
|
|
The primary goal of the mission planner is to protect existing friendly
|
|
assets. Missions will be planned with the following priorities:
|
|
|
|
1. CAP for airfields/fleets in close proximity to the enemy to prevent heavy
|
|
losses of friendly aircraft.
|
|
2. CAP for front line areas to protect ground and CAS units.
|
|
3. DEAD to reduce necessity of SEAD for future missions.
|
|
4. CAS to protect friendly ground units.
|
|
5. Strike missions to reduce the enemy's resources.
|
|
|
|
TODO: Anti-ship and airfield strikes to reduce enemy sortie rates.
|
|
TODO: BAI to prevent enemy forces from reaching the front line.
|
|
TODO: Should fleets always have a CAP?
|
|
|
|
TODO: Stance and doctrine-specific planning behavior.
|
|
"""
|
|
|
|
# TODO: Merge into doctrine, also limit by aircraft.
|
|
MAX_CAP_RANGE = nm_to_meter(100)
|
|
MAX_CAS_RANGE = nm_to_meter(50)
|
|
MAX_SEAD_RANGE = nm_to_meter(150)
|
|
MAX_STRIKE_RANGE = nm_to_meter(150)
|
|
|
|
NON_CAP_MIN_DELAY = 1
|
|
NON_CAP_MAX_DELAY = 5
|
|
|
|
def __init__(self, game: Game, is_player: bool) -> None:
|
|
self.game = game
|
|
self.is_player = is_player
|
|
self.objective_finder = ObjectiveFinder(self.game, self.is_player)
|
|
self.ato = self.game.blue_ato if is_player else self.game.red_ato
|
|
|
|
def propose_missions(self) -> Iterator[ProposedMission]:
|
|
"""Identifies and iterates over potential mission in priority order."""
|
|
# Find friendly CPs within 100 nmi from an enemy airfield, plan CAP.
|
|
for cp in self.objective_finder.vulnerable_control_points():
|
|
yield ProposedMission(cp, [
|
|
ProposedFlight(FlightType.CAP, 2, self.MAX_CAP_RANGE),
|
|
])
|
|
|
|
# Find front lines, plan CAP.
|
|
for front_line in self.objective_finder.front_lines():
|
|
yield ProposedMission(front_line, [
|
|
ProposedFlight(FlightType.TARCAP, 2, self.MAX_CAP_RANGE),
|
|
ProposedFlight(FlightType.CAS, 2, self.MAX_CAS_RANGE),
|
|
])
|
|
|
|
# Find enemy SAM sites with ranges that cover friendly CPs, front lines,
|
|
# or objects, plan DEAD.
|
|
# Find enemy SAM sites with ranges that extend to within 50 nmi of
|
|
# friendly CPs, front, lines, or objects, plan DEAD.
|
|
for sam in self.objective_finder.threatening_sams():
|
|
yield ProposedMission(sam, [
|
|
ProposedFlight(FlightType.DEAD, 2, self.MAX_SEAD_RANGE),
|
|
# TODO: Max escort range.
|
|
ProposedFlight(FlightType.ESCORT, 2, self.MAX_SEAD_RANGE),
|
|
])
|
|
|
|
# Plan strike missions.
|
|
for target in self.objective_finder.strike_targets():
|
|
yield ProposedMission(target, [
|
|
ProposedFlight(FlightType.STRIKE, 2, self.MAX_STRIKE_RANGE),
|
|
# TODO: Max escort range.
|
|
ProposedFlight(FlightType.SEAD, 2, self.MAX_STRIKE_RANGE),
|
|
ProposedFlight(FlightType.ESCORT, 2, self.MAX_STRIKE_RANGE),
|
|
])
|
|
|
|
def plan_missions(self) -> None:
|
|
"""Identifies and plans mission for the turn."""
|
|
for proposed_mission in self.propose_missions():
|
|
self.plan_mission(proposed_mission)
|
|
|
|
self.stagger_missions()
|
|
|
|
for cp in self.objective_finder.friendly_control_points():
|
|
inventory = self.game.aircraft_inventory.for_control_point(cp)
|
|
for aircraft, available in inventory.all_aircraft:
|
|
self.message("Unused aircraft",
|
|
f"{available} {aircraft.id} from {cp}")
|
|
|
|
def plan_mission(self, mission: ProposedMission) -> None:
|
|
"""Allocates aircraft for a proposed mission and adds it to the ATO."""
|
|
|
|
if self.game.settings.perf_ai_parking_start:
|
|
start_type = "Cold"
|
|
else:
|
|
start_type = "Warm"
|
|
|
|
builder = PackageBuilder(
|
|
mission.location,
|
|
self.objective_finder.closest_airfields_to(mission.location),
|
|
self.game.aircraft_inventory,
|
|
self.is_player,
|
|
start_type
|
|
)
|
|
|
|
missing_types: Set[FlightType] = set()
|
|
for proposed_flight in mission.flights:
|
|
if not builder.plan_flight(proposed_flight):
|
|
missing_types.add(proposed_flight.task)
|
|
|
|
if missing_types:
|
|
missing_types_str = ", ".join(
|
|
sorted([t.name for t in missing_types]))
|
|
builder.release_planned_aircraft()
|
|
self.message(
|
|
"Insufficient aircraft",
|
|
f"Not enough aircraft in range for {mission.location.name} "
|
|
f"capable of: {missing_types_str}")
|
|
return
|
|
|
|
package = builder.build()
|
|
flight_plan_builder = FlightPlanBuilder(self.game, package,
|
|
self.is_player)
|
|
for flight in package.flights:
|
|
flight_plan_builder.populate_flight_plan(flight)
|
|
self.ato.add_package(package)
|
|
|
|
def stagger_missions(self) -> None:
|
|
def start_time_generator(count: int, earliest: int, latest: int,
|
|
margin: int) -> Iterator[int]:
|
|
interval = latest // count
|
|
for time in range(earliest, latest, interval):
|
|
error = random.randint(-margin, margin)
|
|
yield max(0, time + error)
|
|
|
|
dca_types = (
|
|
FlightType.BARCAP,
|
|
FlightType.CAP,
|
|
FlightType.INTERCEPTION,
|
|
)
|
|
|
|
non_dca_packages = [p for p in self.ato.packages if
|
|
p.primary_task not in dca_types]
|
|
|
|
start_time = start_time_generator(
|
|
count=len(non_dca_packages),
|
|
earliest=5,
|
|
latest=90,
|
|
margin=5
|
|
)
|
|
for package in self.ato.packages:
|
|
tot = TotEstimator(package).earliest_tot()
|
|
if package.primary_task in dca_types:
|
|
# All CAP missions should be on station ASAP.
|
|
package.time_over_target = tot
|
|
else:
|
|
# But other packages should be spread out a bit. Note that take
|
|
# times are delayed, but all aircraft will become active at
|
|
# mission start. This makes it more worthwhile to attack enemy
|
|
# airfields to hit grounded aircraft, since they're more likely
|
|
# to be present. Runway and air started aircraft will be
|
|
# delayed until their takeoff time by AirConflictGenerator.
|
|
package.time_over_target = next(start_time) * 60 + tot
|
|
|
|
def message(self, title, text) -> None:
|
|
"""Emits a planning message to the player.
|
|
|
|
If the mission planner belongs to the players coalition, this emits a
|
|
message to the info panel.
|
|
"""
|
|
if self.is_player:
|
|
self.game.informations.append(
|
|
Information(title, text, self.game.turn)
|
|
)
|
|
else:
|
|
logging.info(f"{title}: {text}")
|