Replace existing campaign planner with an HTN.

An HTN (https://en.wikipedia.org/wiki/Hierarchical_task_network) is
similar to a decision tree, but it is able to reset to an earlier stage
if a subtask fails and tasks are able to account for the changes in
world state caused by earlier tasks.

Currently this just uses exactly the same strategy as before so we can
prove the system, but it should make it simpler to improve on task
planning.
This commit is contained in:
Dan Albert 2021-06-21 01:25:48 -07:00
parent 81c8052449
commit 783ac18222
33 changed files with 1283 additions and 610 deletions

View File

@ -0,0 +1 @@
from .theatercommander import TheaterCommander

View File

@ -0,0 +1,62 @@
from dataclasses import field, dataclass
from enum import Enum, auto
from typing import Optional
from game.theater import MissionTarget
from game.utils import Distance
from gen.flights.flight import FlightType
class EscortType(Enum):
AirToAir = auto()
Sead = auto()
@dataclass(frozen=True)
class ProposedFlight:
"""A flight outline proposed by the mission planner.
Proposed flights haven't been assigned specific aircraft yet. They have only
a task, a required number of aircraft, and a maximum distance allowed
between the objective and the departure airfield.
"""
#: The flight's role.
task: FlightType
#: The number of aircraft required.
num_aircraft: int
#: The maximum distance between the objective and the departure airfield.
max_distance: Distance
#: The type of threat this flight defends against if it is an escort. Escort
#: flights will be pruned if the rest of the package is not threatened by
#: the threat they defend against. If this flight is not an escort, this
#: field is None.
escort_type: Optional[EscortType] = field(default=None)
def __str__(self) -> str:
return f"{self.task} {self.num_aircraft} ship"
@dataclass(frozen=True)
class ProposedMission:
"""A mission outline proposed by the mission planner.
Proposed missions haven't been assigned aircraft yet. They have only an
objective location and a list of proposed flights that are required for the
mission.
"""
#: The mission objective.
location: MissionTarget
#: The proposed flights that are required for the mission.
flights: list[ProposedFlight]
asap: bool = field(default=False)
def __str__(self) -> str:
flights = ", ".join([str(f) for f in self.flights])
return f"{self.location.name}: {flights}"

View File

@ -0,0 +1,312 @@
from __future__ import annotations
import math
import operator
from collections import Iterator, Iterable
from typing import TypeVar, TYPE_CHECKING, Any
from game.theater import (
ControlPoint,
OffMapSpawn,
TheaterGroundObject,
MissionTarget,
Fob,
FrontLine,
Airfield,
)
from game.theater.theatergroundobject import (
EwrGroundObject,
SamGroundObject,
VehicleGroupGroundObject,
NavalGroundObject,
BuildingGroundObject,
IadsGroundObject,
)
from game.transfers import CargoShip, Convoy
from game.utils import meters, nautical_miles, Distance
from gen.flights.closestairfields import ObjectiveDistanceCache, ClosestAirfields
if TYPE_CHECKING:
from game import Game
MissionTargetType = TypeVar("MissionTargetType", bound=MissionTarget)
class ObjectiveFinder:
"""Identifies potential objectives for the mission planner."""
# TODO: Merge into doctrine.
AIRFIELD_THREAT_RANGE = nautical_miles(150)
SAM_THREAT_RANGE = nautical_miles(100)
def __init__(self, game: Game, is_player: bool) -> None:
self.game = game
self.is_player = is_player
def enemy_air_defenses(self) -> Iterator[tuple[IadsGroundObject, Distance]]:
"""Iterates over all enemy SAM sites."""
doctrine = self.game.faction_for(self.is_player).doctrine
threat_zones = self.game.threat_zone_for(not self.is_player)
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if ground_object.is_dead:
continue
if isinstance(ground_object, EwrGroundObject):
if threat_zones.threatened_by_air_defense(ground_object):
# This is a very weak heuristic for determining whether the EWR
# is close enough to be worth targeting before a SAM that is
# covering it. Ingress distance corresponds to the beginning of
# the attack range and is sufficient for most standoff weapons,
# so treating the ingress distance as the threat distance sorts
# these EWRs such that they will be attacked before SAMs that do
# not threaten the ingress point, but after those that do.
target_range = doctrine.ingress_egress_distance
else:
# But if the EWR isn't covered then we should only be worrying
# about its detection range.
target_range = ground_object.max_detection_range()
elif isinstance(ground_object, SamGroundObject):
target_range = ground_object.max_threat_range()
else:
continue
yield ground_object, target_range
def threatening_air_defenses(self) -> Iterator[IadsGroundObject]:
"""Iterates over enemy SAMs in threat range of friendly control points.
SAM sites are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
target_ranges: list[tuple[IadsGroundObject, Distance]] = []
for target, threat_range in self.enemy_air_defenses():
ranges: list[Distance] = []
for cp in self.friendly_control_points():
ranges.append(meters(target.distance_to(cp)) - threat_range)
target_ranges.append((target, min(ranges)))
target_ranges = sorted(target_ranges, key=operator.itemgetter(1))
for target, _range in target_ranges:
yield target
def enemy_vehicle_groups(self) -> Iterator[VehicleGroupGroundObject]:
"""Iterates over all enemy vehicle groups."""
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if not isinstance(ground_object, VehicleGroupGroundObject):
continue
if ground_object.is_dead:
continue
yield ground_object
def threatening_vehicle_groups(self) -> Iterator[VehicleGroupGroundObject]:
"""Iterates over enemy vehicle groups near friendly control points.
Groups are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
return self._targets_by_range(self.enemy_vehicle_groups())
def enemy_ships(self) -> Iterator[NavalGroundObject]:
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if not isinstance(ground_object, NavalGroundObject):
continue
if ground_object.is_dead:
continue
yield ground_object
def threatening_ships(self) -> Iterator[NavalGroundObject]:
"""Iterates over enemy ships near friendly control points.
Groups are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
return self._targets_by_range(self.enemy_ships())
def _targets_by_range(
self, targets: Iterable[MissionTargetType]
) -> Iterator[MissionTargetType]:
target_ranges: list[tuple[MissionTargetType, float]] = []
for target in targets:
ranges: list[float] = []
for cp in self.friendly_control_points():
ranges.append(target.distance_to(cp))
target_ranges.append((target, min(ranges)))
target_ranges = sorted(target_ranges, key=operator.itemgetter(1))
for target, _range in target_ranges:
yield target
def strike_targets(self) -> Iterator[TheaterGroundObject[Any]]:
"""Iterates over enemy strike targets.
Targets are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
targets: list[tuple[TheaterGroundObject[Any], float]] = []
# Building objectives are made of several individual TGOs (one per
# building).
found_targets: set[str] = set()
for enemy_cp in self.enemy_control_points():
for ground_object in enemy_cp.ground_objects:
# TODO: Reuse ground_object.mission_types.
# The mission types for ground objects are currently not
# accurate because we include things like strike and BAI for all
# targets since they have different planning behavior (waypoint
# generation is better for players with strike when the targets
# are stationary, AI behavior against weaker air defenses is
# better with BAI), so that's not a useful filter. Once we have
# better control over planning profiles and target dependent
# loadouts we can clean this up.
if isinstance(ground_object, VehicleGroupGroundObject):
# BAI target, not strike target.
continue
if isinstance(ground_object, NavalGroundObject):
# Anti-ship target, not strike target.
continue
if isinstance(ground_object, SamGroundObject):
# SAMs are targeted by DEAD. No need to double plan.
continue
is_building = isinstance(ground_object, BuildingGroundObject)
is_fob = isinstance(enemy_cp, Fob)
if is_building and is_fob and ground_object.is_control_point:
# This is the FOB structure itself. Can't be repaired or
# targeted by the player, so shouldn't be targetable by the
# AI.
continue
if ground_object.is_dead:
continue
if ground_object.name in found_targets:
continue
ranges: list[float] = []
for friendly_cp in self.friendly_control_points():
ranges.append(ground_object.distance_to(friendly_cp))
targets.append((ground_object, min(ranges)))
found_targets.add(ground_object.name)
targets = sorted(targets, key=operator.itemgetter(1))
for target, _range in targets:
yield target
def front_lines(self) -> Iterator[FrontLine]:
"""Iterates over all active front lines in the theater."""
yield from self.game.theater.conflicts()
def vulnerable_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over friendly CPs that are vulnerable to enemy CPs.
Vulnerability is defined as any enemy CP within threat range of of the
CP.
"""
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
# Off-map spawn locations don't need protection.
continue
airfields_in_proximity = self.closest_airfields_to(cp)
airfields_in_threat_range = (
airfields_in_proximity.operational_airfields_within(
self.AIRFIELD_THREAT_RANGE
)
)
for airfield in airfields_in_threat_range:
if not airfield.is_friendly(self.is_player):
yield cp
break
def oca_targets(self, min_aircraft: int) -> Iterator[ControlPoint]:
airfields = []
for control_point in self.enemy_control_points():
if not isinstance(control_point, Airfield):
continue
if control_point.base.total_aircraft >= min_aircraft:
airfields.append(control_point)
return self._targets_by_range(airfields)
def convoys(self) -> Iterator[Convoy]:
for front_line in self.front_lines():
yield from self.game.transfers.convoys.travelling_to(
front_line.control_point_hostile_to(self.is_player)
)
def cargo_ships(self) -> Iterator[CargoShip]:
for front_line in self.front_lines():
yield from self.game.transfers.cargo_ships.travelling_to(
front_line.control_point_hostile_to(self.is_player)
)
def friendly_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over all friendly control points."""
return (
c for c in self.game.theater.controlpoints if c.is_friendly(self.is_player)
)
def farthest_friendly_control_point(self) -> ControlPoint:
"""Finds the friendly control point that is farthest from any threats."""
threat_zones = self.game.threat_zone_for(not self.is_player)
farthest = None
max_distance = meters(0)
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
continue
distance = threat_zones.distance_to_threat(cp.position)
if distance > max_distance:
farthest = cp
max_distance = distance
if farthest is None:
raise RuntimeError("Found no friendly control points. You probably lost.")
return farthest
def closest_friendly_control_point(self) -> ControlPoint:
"""Finds the friendly control point that is closest to any threats."""
threat_zones = self.game.threat_zone_for(not self.is_player)
closest = None
min_distance = meters(math.inf)
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
continue
distance = threat_zones.distance_to_threat(cp.position)
if distance < min_distance:
closest = cp
min_distance = distance
if closest is None:
raise RuntimeError("Found no friendly control points. You probably lost.")
return closest
def enemy_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over all enemy control points."""
return (
c
for c in self.game.theater.controlpoints
if not c.is_friendly(self.is_player)
)
def all_possible_targets(self) -> Iterator[MissionTarget]:
"""Iterates over all possible mission targets in the theater.
Valid mission targets are control points (airfields and carriers), front
lines, and ground objects (SAM sites, factories, resource extraction
sites, etc).
"""
for cp in self.game.theater.controlpoints:
yield cp
yield from cp.ground_objects
yield from self.front_lines()
@staticmethod
def closest_airfields_to(location: MissionTarget) -> ClosestAirfields:
"""Returns the closest airfields to the given location."""
return ObjectiveDistanceCache.get_closest_airfields(location)

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.aewc import PlanAewc
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class PlanAewcSupport(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for target in state.aewc_targets:
yield [PlanAewc(target)]

View File

@ -0,0 +1,15 @@
from collections import Iterator
from dataclasses import dataclass
from game.commander.tasks.primitive.oca import PlanOcaStrike
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
@dataclass(frozen=True)
class AttackAirInfrastructure(CompoundTask[TheaterState]):
aircraft_cold_start: bool
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for garrison in state.oca_targets:
yield [PlanOcaStrike(garrison, self.aircraft_cold_start)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.strike import PlanStrike
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class AttackBuildings(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for garrison in state.strike_targets:
yield [PlanStrike(garrison)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.bai import PlanBai
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class AttackGarrisons(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for garrison in state.enemy_garrisons:
yield [PlanBai(garrison)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.dead import PlanDead
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class DegradeIads(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for air_defense in state.threatening_air_defenses:
yield [PlanDead(air_defense)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.antiship import PlanAntiShip
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class DestroyShips(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for ship in state.threatening_ships:
yield [PlanAntiShip(ship)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.cas import PlanCas
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class FrontLineDefense(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for front_line in state.vulnerable_front_lines:
yield [PlanCas(front_line)]

View File

@ -0,0 +1,27 @@
from collections import Iterator
from game.commander.tasks.primitive.antishipping import PlanAntiShipping
from game.commander.tasks.primitive.convoyinterdiction import PlanConvoyInterdiction
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class InterdictReinforcements(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
# These will only rarely get planned. When a convoy is travelling multiple legs,
# they're targetable after the first leg. The reason for this is that
# procurement happens *after* mission planning so that the missions that could
# not be filled will guide the procurement process. Procurement is the stage
# that convoys are created (because they're created to move ground units that
# were just purchased), so we haven't created any yet. Any incomplete transfers
# from the previous turn (multi-leg journeys) will still be present though so
# they can be targeted.
#
# Even after this is fixed, the player's convoys that were created through the
# UI will never be targeted on the first turn of their journey because the AI
# stops planning after the start of the turn. We could potentially fix this by
# moving opfor mission planning until the takeoff button is pushed.
for convoy in state.enemy_convoys:
yield [PlanConvoyInterdiction(convoy)]
for ship in state.enemy_shipping:
yield [PlanAntiShipping(ship)]

View File

@ -0,0 +1,36 @@
from collections import Iterator
from dataclasses import dataclass
from game.commander.tasks.compound.aewcsupport import PlanAewcSupport
from game.commander.tasks.compound.attackairinfrastructure import (
AttackAirInfrastructure,
)
from game.commander.tasks.compound.attackbuildings import AttackBuildings
from game.commander.tasks.compound.attackgarrisons import AttackGarrisons
from game.commander.tasks.compound.degradeiads import DegradeIads
from game.commander.tasks.compound.destroyships import DestroyShips
from game.commander.tasks.compound.frontlinedefense import FrontLineDefense
from game.commander.tasks.compound.interdictreinforcements import (
InterdictReinforcements,
)
from game.commander.tasks.compound.protectairspace import ProtectAirSpace
from game.commander.tasks.compound.refuelingsupport import PlanRefuelingSupport
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
@dataclass(frozen=True)
class PlanNextAction(CompoundTask[TheaterState]):
aircraft_cold_start: bool
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
yield [PlanAewcSupport()]
yield [PlanRefuelingSupport()]
yield [ProtectAirSpace()]
yield [FrontLineDefense()]
yield [DegradeIads()]
yield [InterdictReinforcements()]
yield [DestroyShips()]
yield [AttackGarrisons()]
yield [AttackAirInfrastructure(self.aircraft_cold_start)]
yield [AttackBuildings()]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.barcap import PlanBarcap
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class ProtectAirSpace(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for cp in state.vulnerable_control_points:
yield [PlanBarcap(cp)]

View File

@ -0,0 +1,11 @@
from collections import Iterator
from game.commander.tasks.primitive.refueling import PlanRefueling
from game.commander.theaterstate import TheaterState
from game.htn import CompoundTask, Method
class PlanRefuelingSupport(CompoundTask[TheaterState]):
def each_valid_method(self, state: TheaterState) -> Iterator[Method[TheaterState]]:
for target in state.refueling_targets:
yield [PlanRefueling(target)]

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional, Generic, TypeVar
from game.commander.missionproposals import ProposedFlight, EscortType, ProposedMission
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.htn import PrimitiveTask
from game.profiling import MultiEventTracer
from game.theater import MissionTarget
from game.utils import Distance
from gen.flights.flight import FlightType
if TYPE_CHECKING:
from gen.flights.ai_flight_planner import CoalitionMissionPlanner
MissionTargetT = TypeVar("MissionTargetT", bound=MissionTarget)
# TODO: Refactor so that we don't need to call up to the mission planner.
# Bypass type checker due to https://github.com/python/mypy/issues/5374
@dataclass # type: ignore
class PackagePlanningTask(PrimitiveTask[TheaterState], Generic[MissionTargetT]):
target: MissionTargetT
flights: list[ProposedFlight] = field(init=False)
def __post_init__(self) -> None:
self.flights = []
def execute(
self, mission_planner: CoalitionMissionPlanner, tracer: MultiEventTracer
) -> None:
self.propose_flights(mission_planner.doctrine)
mission_planner.plan_mission(ProposedMission(self.target, self.flights), tracer)
@abstractmethod
def propose_flights(self, doctrine: Doctrine) -> None:
...
def propose_flight(
self,
task: FlightType,
num_aircraft: int,
max_distance: Optional[Distance],
escort_type: Optional[EscortType] = None,
) -> None:
if max_distance is None:
max_distance = Distance.inf()
self.flights.append(
ProposedFlight(task, num_aircraft, max_distance, escort_type)
)
@property
def asap(self) -> bool:
return False
def propose_common_escorts(self, doctrine: Doctrine) -> None:
self.propose_flight(
FlightType.SEAD_ESCORT,
2,
doctrine.mission_ranges.offensive,
EscortType.Sead,
)
self.propose_flight(
FlightType.ESCORT,
2,
doctrine.mission_ranges.offensive,
EscortType.AirToAir,
)

View File

@ -0,0 +1,26 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater import MissionTarget
from gen.flights.flight import FlightType
@dataclass
class PlanAewc(PackagePlanningTask[MissionTarget]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.aewc_targets
def apply_effects(self, state: TheaterState) -> None:
state.aewc_targets.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.AEWC, 1, doctrine.mission_ranges.aewc)
@property
def asap(self) -> bool:
# Supports all the early CAP flights, so should be in the air ASAP.
return True

View File

@ -0,0 +1,28 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.missionproposals import EscortType
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater import NavalGroundObject
from gen.flights.flight import FlightType
@dataclass
class PlanAntiShip(PackagePlanningTask[NavalGroundObject]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.threatening_ships
def apply_effects(self, state: TheaterState) -> None:
state.threatening_ships.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.ANTISHIP, 2, doctrine.mission_ranges.offensive)
self.propose_flight(
FlightType.ESCORT,
2,
doctrine.mission_ranges.offensive,
EscortType.AirToAir,
)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.transfers import CargoShip
from gen.flights.flight import FlightType
@dataclass
class PlanAntiShipping(PackagePlanningTask[CargoShip]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.enemy_shipping
def apply_effects(self, state: TheaterState) -> None:
state.enemy_shipping.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.ANTISHIP, 2, doctrine.mission_ranges.offensive)
self.propose_common_escorts(doctrine)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater.theatergroundobject import VehicleGroupGroundObject
from gen.flights.flight import FlightType
@dataclass
class PlanBai(PackagePlanningTask[VehicleGroupGroundObject]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.enemy_garrisons
def apply_effects(self, state: TheaterState) -> None:
state.enemy_garrisons.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.BAI, 2, doctrine.mission_ranges.offensive)
self.propose_common_escorts(doctrine)

View File

@ -0,0 +1,55 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from game.commander.missionproposals import ProposedMission, ProposedFlight
from game.commander.tasks.theatercommandertask import TheaterCommanderTask
from game.commander.theaterstate import TheaterState
from game.profiling import MultiEventTracer
from game.theater import ControlPoint
from gen.flights.flight import FlightType
if TYPE_CHECKING:
from gen.flights.ai_flight_planner import CoalitionMissionPlanner
@dataclass
class PlanBarcap(TheaterCommanderTask):
target: ControlPoint
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.vulnerable_control_points
def apply_effects(self, state: TheaterState) -> None:
state.vulnerable_control_points.remove(self.target)
def execute(
self, mission_planner: CoalitionMissionPlanner, tracer: MultiEventTracer
) -> None:
# Plan enough rounds of CAP that the target has coverage over the expected
# mission duration.
mission_duration = int(
mission_planner.game.settings.desired_player_mission_duration.total_seconds()
)
barcap_duration = int(
mission_planner.faction.doctrine.cap_duration.total_seconds()
)
for _ in range(
0,
mission_duration,
barcap_duration,
):
mission_planner.plan_mission(
ProposedMission(
self.target,
[
ProposedFlight(
FlightType.BARCAP,
2,
mission_planner.doctrine.mission_ranges.cap,
),
],
),
tracer,
)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater import FrontLine
from gen.flights.flight import FlightType
@dataclass
class PlanCas(PackagePlanningTask[FrontLine]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.vulnerable_front_lines
def apply_effects(self, state: TheaterState) -> None:
state.vulnerable_front_lines.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.CAS, 2, doctrine.mission_ranges.cas)
self.propose_flight(FlightType.TARCAP, 2, doctrine.mission_ranges.cap)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.transfers import Convoy
from gen.flights.flight import FlightType
@dataclass
class PlanConvoyInterdiction(PackagePlanningTask[Convoy]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.enemy_convoys
def apply_effects(self, state: TheaterState) -> None:
state.enemy_convoys.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.BAI, 2, doctrine.mission_ranges.offensive)
self.propose_common_escorts(doctrine)

View File

@ -0,0 +1,51 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.missionproposals import EscortType
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater.theatergroundobject import IadsGroundObject
from gen.flights.flight import FlightType
@dataclass
class PlanDead(PackagePlanningTask[IadsGroundObject]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.threatening_air_defenses
def apply_effects(self, state: TheaterState) -> None:
state.threatening_air_defenses.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.DEAD, 2, doctrine.mission_ranges.offensive)
# Only include SEAD against SAMs that still have emitters. No need to
# suppress an EWR, and SEAD isn't useful against a SAM that no longer has a
# working track radar.
#
# For SAMs without track radars and EWRs, we still want a SEAD escort if
# needed.
#
# Note that there is a quirk here: we should potentially be included a SEAD
# escort *and* SEAD when the target is a radar SAM but the flight path is
# also threatened by SAMs. We don't want to include a SEAD escort if the
# package is *only* threatened by the target though. Could be improved, but
# needs a decent refactor to the escort planning to do so.
if self.target.has_live_radar_sam:
self.propose_flight(FlightType.SEAD, 2, doctrine.mission_ranges.offensive)
else:
self.propose_flight(
FlightType.SEAD_ESCORT,
2,
doctrine.mission_ranges.offensive,
EscortType.Sead,
)
self.propose_flight(
FlightType.ESCORT,
2,
doctrine.mission_ranges.offensive,
EscortType.AirToAir,
)

View File

@ -0,0 +1,28 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater import ControlPoint
from gen.flights.flight import FlightType
@dataclass
class PlanOcaStrike(PackagePlanningTask[ControlPoint]):
aircraft_cold_start: bool
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.oca_targets
def apply_effects(self, state: TheaterState) -> None:
state.oca_targets.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.OCA_RUNWAY, 2, doctrine.mission_ranges.offensive)
if self.aircraft_cold_start:
self.propose_flight(
FlightType.OCA_AIRCRAFT, 2, doctrine.mission_ranges.offensive
)
self.propose_common_escorts(doctrine)

View File

@ -0,0 +1,21 @@
from __future__ import annotations
from dataclasses import dataclass
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater import MissionTarget
from gen.flights.flight import FlightType
@dataclass
class PlanRefueling(PackagePlanningTask[MissionTarget]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.refueling_targets
def apply_effects(self, state: TheaterState) -> None:
state.refueling_targets.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.REFUELING, 1, doctrine.mission_ranges.refueling)

View File

@ -0,0 +1,23 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from game.commander.tasks.packageplanningtask import PackagePlanningTask
from game.commander.theaterstate import TheaterState
from game.data.doctrine import Doctrine
from game.theater.theatergroundobject import TheaterGroundObject
from gen.flights.flight import FlightType
@dataclass
class PlanStrike(PackagePlanningTask[TheaterGroundObject[Any]]):
def preconditions_met(self, state: TheaterState) -> bool:
return self.target in state.strike_targets
def apply_effects(self, state: TheaterState) -> None:
state.strike_targets.remove(self.target)
def propose_flights(self, doctrine: Doctrine) -> None:
self.propose_flight(FlightType.STRIKE, 2, doctrine.mission_ranges.offensive)
self.propose_common_escorts(doctrine)

View File

@ -0,0 +1,20 @@
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
from game.commander.theaterstate import TheaterState
from game.htn import PrimitiveTask
from game.profiling import MultiEventTracer
if TYPE_CHECKING:
from gen.flights.ai_flight_planner import CoalitionMissionPlanner
# TODO: Refactor so that we don't need to call up to the mission planner.
class TheaterCommanderTask(PrimitiveTask[TheaterState]):
@abstractmethod
def execute(
self, mission_planner: CoalitionMissionPlanner, tracer: MultiEventTracer
) -> None:
...

View File

@ -0,0 +1,91 @@
"""The Theater Commander is the highest level campaign AI.
Target selection is performed with a hierarchical-task-network (HTN, linked below).
These work by giving the planner an initial "task" which decomposes into other tasks
until a concrete set of actions is formed. For example, the "capture base" task may
decompose in the following manner:
* Defend
* Reinforce front line
* Set front line stance to defend
* Destroy enemy front line units
* Set front line stance to elimination
* Plan CAS at front line
* Prepare
* Destroy enemy IADS
* Plan DEAD against SAM Armadillo
* ...
* Destroy enemy front line units
* Set front line stance to elimination
* Plan CAS at front line
* Inhibit
* Destroy enemy unit production infrastructure
* Destroy factory at Palmyra
* ...
* Destroy enemy front line units
* Set front line stance to elimination
* Plan CAS at front line
* Attack
* Set front line stance to breakthrough
* Destroy enemy front line units
* Set front line stance to elimination
* Plan CAS at front line
This is not a reflection of the actual task composition but illustrates the capability
of the system. Each task has preconditions which are checked before the task is
decomposed. If preconditions are not met the task is ignored and the next is considered.
For example the task to destroy the factory at Palmyra might be excluded until the air
defenses protecting it are eliminated; or defensive air operations might be excluded if
the enemy does not have sufficient air forces, or if the protected target has sufficient
SAM coverage.
Each action updates the world state, which causes each action to account for the result
of the tasks executed before it. Above, the preconditions for attacking the factory at
Palmyra may not have been met due to the IADS coverage, leading the planning to decide
on an attack against the IADS in the area instead. When planning the next task in the
same turn, the world state will have been updated to account for the (hopefully)
destroyed SAM sites, allowing the planner to choose the mission to attack the factory.
Preconditions can be aware of previous actions as well. A precondition for "Plan CAS at
front line" can be "No CAS missions planned at front line" to avoid over-planning CAS
even though it is a primitive task used by many other tasks.
https://en.wikipedia.org/wiki/Hierarchical_task_network
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from game.commander.tasks.compound.nextaction import PlanNextAction
from game.commander.tasks.theatercommandertask import TheaterCommanderTask
from game.commander.theaterstate import TheaterState
from game.htn import Planner
from game.profiling import MultiEventTracer
if TYPE_CHECKING:
from game import Game
from gen.flights.ai_flight_planner import CoalitionMissionPlanner
class TheaterCommander(Planner[TheaterState, TheaterCommanderTask]):
def __init__(self, game: Game, player: bool) -> None:
super().__init__(
PlanNextAction(
aircraft_cold_start=game.settings.default_start_type == "Cold"
)
)
self.game = game
self.player = player
def plan_missions(
self, mission_planner: CoalitionMissionPlanner, tracer: MultiEventTracer
) -> None:
state = TheaterState.from_game(self.game, self.player)
while True:
result = self.plan(state)
if result is None:
# Planned all viable tasks this turn.
return
for task in result.tasks:
task.execute(mission_planner, tracer)
state = result.end_state

View File

@ -0,0 +1,67 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from game.commander.objectivefinder import ObjectiveFinder
from game.htn import WorldState
from game.theater import ControlPoint, FrontLine, MissionTarget
from game.theater.theatergroundobject import (
TheaterGroundObject,
VehicleGroupGroundObject,
NavalGroundObject,
IadsGroundObject,
)
from game.transfers import Convoy, CargoShip
if TYPE_CHECKING:
from game import Game
@dataclass
class TheaterState(WorldState["TheaterState"]):
vulnerable_control_points: list[ControlPoint]
vulnerable_front_lines: list[FrontLine]
aewc_targets: list[MissionTarget]
refueling_targets: list[MissionTarget]
threatening_air_defenses: list[IadsGroundObject]
enemy_convoys: list[Convoy]
enemy_shipping: list[CargoShip]
threatening_ships: list[NavalGroundObject]
enemy_garrisons: list[VehicleGroupGroundObject]
oca_targets: list[ControlPoint]
strike_targets: list[TheaterGroundObject[Any]]
def clone(self) -> TheaterState:
# Do not use copy.deepcopy. Copying every TGO, control point, etc is absurdly
# expensive.
return TheaterState(
vulnerable_control_points=list(self.vulnerable_control_points),
vulnerable_front_lines=list(self.vulnerable_front_lines),
aewc_targets=list(self.aewc_targets),
refueling_targets=list(self.refueling_targets),
threatening_air_defenses=list(self.threatening_air_defenses),
enemy_convoys=list(self.enemy_convoys),
enemy_shipping=list(self.enemy_shipping),
threatening_ships=list(self.threatening_ships),
enemy_garrisons=list(self.enemy_garrisons),
oca_targets=list(self.oca_targets),
strike_targets=list(self.strike_targets),
)
@classmethod
def from_game(cls, game: Game, player: bool) -> TheaterState:
finder = ObjectiveFinder(game, player)
return TheaterState(
vulnerable_control_points=list(finder.vulnerable_control_points()),
vulnerable_front_lines=list(finder.front_lines()),
aewc_targets=[finder.farthest_friendly_control_point()],
refueling_targets=[finder.closest_friendly_control_point()],
threatening_air_defenses=list(finder.threatening_air_defenses()),
enemy_convoys=list(finder.convoys()),
enemy_shipping=list(finder.cargo_ships()),
threatening_ships=list(finder.threatening_ships()),
enemy_garrisons=list(finder.threatening_vehicle_groups()),
oca_targets=list(finder.oca_targets(min_aircraft=20)),
strike_targets=list(finder.strike_targets()),
)

View File

@ -1,9 +1,8 @@
from dataclasses import dataclass from dataclasses import dataclass, field
from datetime import timedelta from datetime import timedelta
from dcs.task import Reconnaissance
from game.utils import Distance, feet, nautical_miles
from game.data.groundunitclass import GroundUnitClass from game.data.groundunitclass import GroundUnitClass
from game.utils import Distance, feet, nautical_miles
@dataclass @dataclass
@ -17,6 +16,15 @@ class GroundUnitProcurementRatios:
return 0.0 return 0.0
@dataclass(frozen=True)
class MissionPlannerMaxRanges:
cap: Distance = field(default=nautical_miles(100))
cas: Distance = field(default=nautical_miles(50))
offensive: Distance = field(default=nautical_miles(150))
aewc: Distance = field(default=Distance.inf())
refueling: Distance = field(default=nautical_miles(200))
@dataclass(frozen=True) @dataclass(frozen=True)
class Doctrine: class Doctrine:
cas: bool cas: bool
@ -65,6 +73,8 @@ class Doctrine:
ground_unit_procurement_ratios: GroundUnitProcurementRatios ground_unit_procurement_ratios: GroundUnitProcurementRatios
mission_ranges: MissionPlannerMaxRanges = field(default=MissionPlannerMaxRanges())
MODERN_DOCTRINE = Doctrine( MODERN_DOCTRINE = Doctrine(
cap=True, cap=True,

125
game/htn.py Normal file
View File

@ -0,0 +1,125 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections import Iterator, deque
from dataclasses import dataclass
from typing import Any, Generic, Optional, TypeVar
WorldStateT = TypeVar("WorldStateT", bound="WorldState[Any]")
class WorldState(ABC, Generic[WorldStateT]):
@abstractmethod
def clone(self) -> WorldStateT:
...
class Task(Generic[WorldStateT]):
pass
Method = list[Task[WorldStateT]]
class PrimitiveTask(Task[WorldStateT], Generic[WorldStateT], ABC):
@abstractmethod
def preconditions_met(self, state: WorldStateT) -> bool:
...
@abstractmethod
def apply_effects(self, state: WorldStateT) -> None:
...
class CompoundTask(Task[WorldStateT], Generic[WorldStateT], ABC):
@abstractmethod
def each_valid_method(self, state: WorldStateT) -> Iterator[Method[WorldStateT]]:
...
PrimitiveTaskT = TypeVar("PrimitiveTaskT", bound=PrimitiveTask[Any])
@dataclass
class PlanningState(Generic[WorldStateT, PrimitiveTaskT]):
state: WorldStateT
tasks_to_process: deque[Task[WorldStateT]]
plan: list[PrimitiveTaskT]
methods: Optional[Iterator[Method[WorldStateT]]]
@dataclass(frozen=True)
class PlanningResult(Generic[WorldStateT, PrimitiveTaskT]):
tasks: list[PrimitiveTaskT]
end_state: WorldStateT
class PlanningHistory(Generic[WorldStateT, PrimitiveTaskT]):
def __init__(self) -> None:
self.states: list[PlanningState[WorldStateT, PrimitiveTaskT]] = []
def push(self, planning_state: PlanningState[WorldStateT, PrimitiveTaskT]) -> None:
self.states.append(planning_state)
def pop(self) -> PlanningState[WorldStateT, PrimitiveTaskT]:
return self.states.pop()
class Planner(Generic[WorldStateT, PrimitiveTaskT]):
def __init__(self, main_task: Task[WorldStateT]) -> None:
self.main_task = main_task
def plan(
self, initial_state: WorldStateT
) -> Optional[PlanningResult[WorldStateT, PrimitiveTaskT]]:
planning_state: PlanningState[WorldStateT, PrimitiveTaskT] = PlanningState(
initial_state, deque([self.main_task]), [], None
)
history: PlanningHistory[WorldStateT, PrimitiveTaskT] = PlanningHistory()
while planning_state.tasks_to_process:
task = planning_state.tasks_to_process.popleft()
if isinstance(task, PrimitiveTask):
if task.preconditions_met(planning_state.state):
task.apply_effects(planning_state.state)
# Ignore type erasure. We've already verified that this is a Planner
# with a WorldStateT and a PrimitiveTaskT, so we know that the task
# list is a list of CompoundTask[WorldStateT] and PrimitiveTaskT. We
# could scatter more unions throughout to be more explicit but
# there's no way around the type erasure that mypy uses for
# isinstance.
planning_state.plan.append(task) # type: ignore
else:
planning_state = history.pop()
else:
assert isinstance(task, CompoundTask)
# If the methods field of our current state is not None that means we're
# resuming a prior attempt to execute this task after a subtask of the
# previously selected method failed.
#
# Otherwise this is the first exectution of this task so we need to
# create the generator.
if planning_state.methods is None:
methods = task.each_valid_method(planning_state.state)
else:
methods = planning_state.methods
try:
method = next(methods)
history.push(
PlanningState(
planning_state.state.clone(),
# Push the current node back onto the stack so that we
# resume handling this task when we pop back to this state.
planning_state.tasks_to_process + deque([task]),
planning_state.plan,
methods,
)
)
planning_state.methods = None
planning_state.tasks_to_process.extend(method)
except StopIteration:
try:
planning_state = history.pop()
except IndexError:
# No valid plan was found.
return None
return PlanningResult(planning_state.plan, planning_state.state)

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import itertools import itertools
import logging import logging
from abc import ABC
from collections import Sequence from collections import Sequence
from typing import Iterator, List, TYPE_CHECKING, Union, Generic, TypeVar, Any from typing import Iterator, List, TYPE_CHECKING, Union, Generic, TypeVar, Any
@ -463,10 +464,19 @@ class CoastalSiteGroundObject(TheaterGroundObject[VehicleGroup]):
return False return False
class IadsGroundObject(TheaterGroundObject[VehicleGroup], ABC):
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from gen.flights.flight import FlightType
if not self.is_friendly(for_player):
yield FlightType.DEAD
yield from super().mission_types(for_player)
# The SamGroundObject represents all type of AA # The SamGroundObject represents all type of AA
# The TGO can have multiple types of units (AAA,SAM,Support...) # The TGO can have multiple types of units (AAA,SAM,Support...)
# Differentiation can be made during generation with the airdefensegroupgenerator # Differentiation can be made during generation with the airdefensegroupgenerator
class SamGroundObject(TheaterGroundObject[VehicleGroup]): class SamGroundObject(IadsGroundObject):
def __init__( def __init__(
self, self,
name: str, name: str,
@ -491,7 +501,11 @@ class SamGroundObject(TheaterGroundObject[VehicleGroup]):
if not self.is_friendly(for_player): if not self.is_friendly(for_player):
yield FlightType.DEAD yield FlightType.DEAD
yield FlightType.SEAD yield FlightType.SEAD
yield from super().mission_types(for_player) for mission_type in super().mission_types(for_player):
# We yielded this ourselves to move it to the top of the list. Don't yield
# it twice.
if mission_type is not FlightType.DEAD:
yield mission_type
@property @property
def might_have_aa(self) -> bool: def might_have_aa(self) -> bool:
@ -558,7 +572,7 @@ class VehicleGroupGroundObject(TheaterGroundObject[VehicleGroup]):
return True return True
class EwrGroundObject(TheaterGroundObject[VehicleGroup]): class EwrGroundObject(IadsGroundObject):
def __init__( def __init__(
self, self,
name: str, name: str,
@ -583,13 +597,6 @@ class EwrGroundObject(TheaterGroundObject[VehicleGroup]):
# Use Group Id and uppercase EWR # Use Group Id and uppercase EWR
return f"{self.faction_color}|EWR|{self.group_id}" return f"{self.faction_color}|EWR|{self.group_id}"
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from gen.flights.flight import FlightType
if not self.is_friendly(for_player):
yield FlightType.DEAD
yield from super().mission_types(for_player)
@property @property
def might_have_aa(self) -> bool: def might_have_aa(self) -> bool:
return True return True

View File

@ -1,54 +1,39 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import math
import operator
import random import random
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass, field
from datetime import timedelta from datetime import timedelta
from enum import Enum, auto
from typing import ( from typing import (
Dict, Dict,
Iterable, Iterable,
Iterator, Iterator,
List,
Optional, Optional,
Set, Set,
TYPE_CHECKING, TYPE_CHECKING,
Tuple, Tuple,
TypeVar,
Any,
) )
from game.commander import TheaterCommander
from game.commander.missionproposals import ProposedFlight, ProposedMission, EscortType
from game.commander.objectivefinder import ObjectiveFinder
from game.data.doctrine import Doctrine
from game.dcs.aircrafttype import AircraftType from game.dcs.aircrafttype import AircraftType
from game.factions.faction import Faction
from game.infos.information import Information from game.infos.information import Information
from game.procurement import AircraftProcurementRequest from game.procurement import AircraftProcurementRequest
from game.profiling import logged_duration, MultiEventTracer from game.profiling import logged_duration, MultiEventTracer
from game.squadrons import AirWing, Squadron from game.squadrons import AirWing, Squadron
from game.theater import ( from game.theater import (
Airfield,
ControlPoint, ControlPoint,
Fob,
FrontLine,
MissionTarget, MissionTarget,
OffMapSpawn, OffMapSpawn,
SamGroundObject,
TheaterGroundObject,
) )
from game.theater.theatergroundobject import ( from game.utils import nautical_miles
BuildingGroundObject,
EwrGroundObject,
NavalGroundObject,
VehicleGroupGroundObject,
)
from game.transfers import CargoShip, Convoy
from game.utils import Distance, nautical_miles, meters
from gen.ato import Package from gen.ato import Package
from gen.flights.ai_flight_planner_db import aircraft_for_task from gen.flights.ai_flight_planner_db import aircraft_for_task
from gen.flights.closestairfields import ( from gen.flights.closestairfields import (
ClosestAirfields, ClosestAirfields,
ObjectiveDistanceCache,
) )
from gen.flights.flight import ( from gen.flights.flight import (
Flight, Flight,
@ -63,61 +48,6 @@ if TYPE_CHECKING:
from game.inventory import GlobalAircraftInventory from game.inventory import GlobalAircraftInventory
class EscortType(Enum):
AirToAir = auto()
Sead = auto()
@dataclass(frozen=True)
class ProposedFlight:
"""A flight outline proposed by the mission planner.
Proposed flights haven't been assigned specific aircraft yet. They have only
a task, a required number of aircraft, and a maximum distance allowed
between the objective and the departure airfield.
"""
#: The flight's role.
task: FlightType
#: The number of aircraft required.
num_aircraft: int
#: The maximum distance between the objective and the departure airfield.
max_distance: Distance
#: The type of threat this flight defends against if it is an escort. Escort
#: flights will be pruned if the rest of the package is not threatened by
#: the threat they defend against. If this flight is not an escort, this
#: field is None.
escort_type: Optional[EscortType] = field(default=None)
def __str__(self) -> str:
return f"{self.task} {self.num_aircraft} ship"
@dataclass(frozen=True)
class ProposedMission:
"""A mission outline proposed by the mission planner.
Proposed missions haven't been assigned aircraft yet. They have only an
objective location and a list of proposed flights that are required for the
mission.
"""
#: The mission objective.
location: MissionTarget
#: The proposed flights that are required for the mission.
flights: List[ProposedFlight]
asap: bool = field(default=False)
def __str__(self) -> str:
flights = ", ".join([str(f) for f in self.flights])
return f"{self.location.name}: {flights}"
class AircraftAllocator: class AircraftAllocator:
"""Finds suitable aircraft for proposed missions.""" """Finds suitable aircraft for proposed missions."""
@ -271,289 +201,6 @@ class PackageBuilder:
self.package.remove_flight(flight) self.package.remove_flight(flight)
MissionTargetType = TypeVar("MissionTargetType", bound=MissionTarget)
class ObjectiveFinder:
"""Identifies potential objectives for the mission planner."""
# TODO: Merge into doctrine.
AIRFIELD_THREAT_RANGE = nautical_miles(150)
SAM_THREAT_RANGE = nautical_miles(100)
def __init__(self, game: Game, is_player: bool) -> None:
self.game = game
self.is_player = is_player
def enemy_air_defenses(self) -> Iterator[tuple[TheaterGroundObject[Any], Distance]]:
"""Iterates over all enemy SAM sites."""
doctrine = self.game.faction_for(self.is_player).doctrine
threat_zones = self.game.threat_zone_for(not self.is_player)
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if ground_object.is_dead:
continue
if isinstance(ground_object, EwrGroundObject):
if threat_zones.threatened_by_air_defense(ground_object):
# This is a very weak heuristic for determining whether the EWR
# is close enough to be worth targeting before a SAM that is
# covering it. Ingress distance corresponds to the beginning of
# the attack range and is sufficient for most standoff weapons,
# so treating the ingress distance as the threat distance sorts
# these EWRs such that they will be attacked before SAMs that do
# not threaten the ingress point, but after those that do.
target_range = doctrine.ingress_egress_distance
else:
# But if the EWR isn't covered then we should only be worrying
# about its detection range.
target_range = ground_object.max_detection_range()
elif isinstance(ground_object, SamGroundObject):
target_range = ground_object.max_threat_range()
else:
continue
yield ground_object, target_range
def threatening_air_defenses(self) -> Iterator[TheaterGroundObject[Any]]:
"""Iterates over enemy SAMs in threat range of friendly control points.
SAM sites are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
target_ranges: list[tuple[TheaterGroundObject[Any], Distance]] = []
for target, threat_range in self.enemy_air_defenses():
ranges: list[Distance] = []
for cp in self.friendly_control_points():
ranges.append(meters(target.distance_to(cp)) - threat_range)
target_ranges.append((target, min(ranges)))
target_ranges = sorted(target_ranges, key=operator.itemgetter(1))
for target, _range in target_ranges:
yield target
def enemy_vehicle_groups(self) -> Iterator[VehicleGroupGroundObject]:
"""Iterates over all enemy vehicle groups."""
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if not isinstance(ground_object, VehicleGroupGroundObject):
continue
if ground_object.is_dead:
continue
yield ground_object
def threatening_vehicle_groups(self) -> Iterator[MissionTarget]:
"""Iterates over enemy vehicle groups near friendly control points.
Groups are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
return self._targets_by_range(self.enemy_vehicle_groups())
def enemy_ships(self) -> Iterator[NavalGroundObject]:
for cp in self.enemy_control_points():
for ground_object in cp.ground_objects:
if not isinstance(ground_object, NavalGroundObject):
continue
if ground_object.is_dead:
continue
yield ground_object
def threatening_ships(self) -> Iterator[MissionTarget]:
"""Iterates over enemy ships near friendly control points.
Groups are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
return self._targets_by_range(self.enemy_ships())
def _targets_by_range(
self, targets: Iterable[MissionTargetType]
) -> Iterator[MissionTargetType]:
target_ranges: list[tuple[MissionTargetType, float]] = []
for target in targets:
ranges: list[float] = []
for cp in self.friendly_control_points():
ranges.append(target.distance_to(cp))
target_ranges.append((target, min(ranges)))
target_ranges = sorted(target_ranges, key=operator.itemgetter(1))
for target, _range in target_ranges:
yield target
def strike_targets(self) -> Iterator[TheaterGroundObject[Any]]:
"""Iterates over enemy strike targets.
Targets are sorted by their closest proximity to any friendly control
point (airfield or fleet).
"""
targets: list[tuple[TheaterGroundObject[Any], float]] = []
# Building objectives are made of several individual TGOs (one per
# building).
found_targets: Set[str] = set()
for enemy_cp in self.enemy_control_points():
for ground_object in enemy_cp.ground_objects:
# TODO: Reuse ground_object.mission_types.
# The mission types for ground objects are currently not
# accurate because we include things like strike and BAI for all
# targets since they have different planning behavior (waypoint
# generation is better for players with strike when the targets
# are stationary, AI behavior against weaker air defenses is
# better with BAI), so that's not a useful filter. Once we have
# better control over planning profiles and target dependent
# loadouts we can clean this up.
if isinstance(ground_object, VehicleGroupGroundObject):
# BAI target, not strike target.
continue
if isinstance(ground_object, NavalGroundObject):
# Anti-ship target, not strike target.
continue
if isinstance(ground_object, SamGroundObject):
# SAMs are targeted by DEAD. No need to double plan.
continue
is_building = isinstance(ground_object, BuildingGroundObject)
is_fob = isinstance(enemy_cp, Fob)
if is_building and is_fob and ground_object.is_control_point:
# This is the FOB structure itself. Can't be repaired or
# targeted by the player, so shouldn't be targetable by the
# AI.
continue
if ground_object.is_dead:
continue
if ground_object.name in found_targets:
continue
ranges: list[float] = []
for friendly_cp in self.friendly_control_points():
ranges.append(ground_object.distance_to(friendly_cp))
targets.append((ground_object, min(ranges)))
found_targets.add(ground_object.name)
targets = sorted(targets, key=operator.itemgetter(1))
for target, _range in targets:
yield target
def front_lines(self) -> Iterator[FrontLine]:
"""Iterates over all active front lines in the theater."""
yield from self.game.theater.conflicts()
def vulnerable_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over friendly CPs that are vulnerable to enemy CPs.
Vulnerability is defined as any enemy CP within threat range of of the
CP.
"""
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
# Off-map spawn locations don't need protection.
continue
airfields_in_proximity = self.closest_airfields_to(cp)
airfields_in_threat_range = (
airfields_in_proximity.operational_airfields_within(
self.AIRFIELD_THREAT_RANGE
)
)
for airfield in airfields_in_threat_range:
if not airfield.is_friendly(self.is_player):
yield cp
break
def oca_targets(self, min_aircraft: int) -> Iterator[MissionTarget]:
airfields = []
for control_point in self.enemy_control_points():
if not isinstance(control_point, Airfield):
continue
if control_point.base.total_aircraft >= min_aircraft:
airfields.append(control_point)
return self._targets_by_range(airfields)
def convoys(self) -> Iterator[Convoy]:
for front_line in self.front_lines():
yield from self.game.transfers.convoys.travelling_to(
front_line.control_point_hostile_to(self.is_player)
)
def cargo_ships(self) -> Iterator[CargoShip]:
for front_line in self.front_lines():
yield from self.game.transfers.cargo_ships.travelling_to(
front_line.control_point_hostile_to(self.is_player)
)
def friendly_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over all friendly control points."""
return (
c for c in self.game.theater.controlpoints if c.is_friendly(self.is_player)
)
def farthest_friendly_control_point(self) -> ControlPoint:
"""Finds the friendly control point that is farthest from any threats."""
threat_zones = self.game.threat_zone_for(not self.is_player)
farthest = None
max_distance = meters(0)
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
continue
distance = threat_zones.distance_to_threat(cp.position)
if distance > max_distance:
farthest = cp
max_distance = distance
if farthest is None:
raise RuntimeError("Found no friendly control points. You probably lost.")
return farthest
def closest_friendly_control_point(self) -> ControlPoint:
"""Finds the friendly control point that is closest to any threats."""
threat_zones = self.game.threat_zone_for(not self.is_player)
closest = None
min_distance = meters(math.inf)
for cp in self.friendly_control_points():
if isinstance(cp, OffMapSpawn):
continue
distance = threat_zones.distance_to_threat(cp.position)
if distance < min_distance:
closest = cp
min_distance = distance
if closest is None:
raise RuntimeError("Found no friendly control points. You probably lost.")
return closest
def enemy_control_points(self) -> Iterator[ControlPoint]:
"""Iterates over all enemy control points."""
return (
c
for c in self.game.theater.controlpoints
if not c.is_friendly(self.is_player)
)
def all_possible_targets(self) -> Iterator[MissionTarget]:
"""Iterates over all possible mission targets in the theater.
Valid mission targets are control points (airfields and carriers), front
lines, and ground objects (SAM sites, factories, resource extraction
sites, etc).
"""
for cp in self.game.theater.controlpoints:
yield cp
yield from cp.ground_objects
yield from self.front_lines()
@staticmethod
def closest_airfields_to(location: MissionTarget) -> ClosestAirfields:
"""Returns the closest airfields to the given location."""
return ObjectiveDistanceCache.get_closest_airfields(location)
class CoalitionMissionPlanner: class CoalitionMissionPlanner:
"""Coalition flight planning AI. """Coalition flight planning AI.
@ -577,17 +224,6 @@ class CoalitionMissionPlanner:
TODO: Stance and doctrine-specific planning behavior. TODO: Stance and doctrine-specific planning behavior.
""" """
# TODO: Merge into doctrine, also limit by aircraft.
MAX_CAP_RANGE = nautical_miles(100)
MAX_CAS_RANGE = nautical_miles(50)
MAX_ANTISHIP_RANGE = nautical_miles(150)
MAX_BAI_RANGE = nautical_miles(150)
MAX_OCA_RANGE = nautical_miles(150)
MAX_SEAD_RANGE = nautical_miles(150)
MAX_STRIKE_RANGE = nautical_miles(150)
MAX_AWEC_RANGE = Distance.inf()
MAX_TANKER_RANGE = nautical_miles(200)
def __init__(self, game: Game, is_player: bool) -> None: def __init__(self, game: Game, is_player: bool) -> None:
self.game = game self.game = game
self.is_player = is_player self.is_player = is_player
@ -595,7 +231,11 @@ class CoalitionMissionPlanner:
self.ato = self.game.blue_ato if is_player else self.game.red_ato self.ato = self.game.blue_ato if is_player else self.game.red_ato
self.threat_zones = self.game.threat_zone_for(not self.is_player) self.threat_zones = self.game.threat_zone_for(not self.is_player)
self.procurement_requests = self.game.procurement_requests_for(self.is_player) self.procurement_requests = self.game.procurement_requests_for(self.is_player)
self.faction = self.game.faction_for(self.is_player) self.faction: Faction = self.game.faction_for(self.is_player)
@property
def doctrine(self) -> Doctrine:
return self.faction.doctrine
def air_wing_can_plan(self, mission_type: FlightType) -> bool: def air_wing_can_plan(self, mission_type: FlightType) -> bool:
"""Returns True if it is possible for the air wing to plan this mission type. """Returns True if it is possible for the air wing to plan this mission type.
@ -607,237 +247,13 @@ class CoalitionMissionPlanner:
""" """
return self.game.air_wing_for(self.is_player).can_auto_plan(mission_type) return self.game.air_wing_for(self.is_player).can_auto_plan(mission_type)
def critical_missions(self) -> Iterator[ProposedMission]:
"""Identifies the most important missions to plan this turn.
Non-critical missions that cannot be fulfilled will create purchase
orders for the next turn. Critical missions will create a purchase order
unless the mission can be doubly fulfilled. In other words, the AI will
attempt to have *double* the aircraft it needs for these missions to
ensure that they can be planned again next turn even if all aircraft are
eliminated this turn.
"""
# Find farthest, friendly CP for AEWC.
yield ProposedMission(
self.objective_finder.farthest_friendly_control_point(),
[ProposedFlight(FlightType.AEWC, 1, self.MAX_AWEC_RANGE)],
# Supports all the early CAP flights, so should be in the air ASAP.
asap=True,
)
yield ProposedMission(
self.objective_finder.closest_friendly_control_point(),
[ProposedFlight(FlightType.REFUELING, 1, self.MAX_TANKER_RANGE)],
)
# Find friendly CPs within 100 nmi from an enemy airfield, plan CAP.
for cp in self.objective_finder.vulnerable_control_points():
# Plan CAP in such a way, that it is established during the whole desired mission length
for _ in range(
0,
int(self.game.settings.desired_player_mission_duration.total_seconds()),
int(self.faction.doctrine.cap_duration.total_seconds()),
):
yield ProposedMission(
cp,
[
ProposedFlight(FlightType.BARCAP, 2, self.MAX_CAP_RANGE),
],
)
# Find front lines, plan CAS.
for front_line in self.objective_finder.front_lines():
yield ProposedMission(
front_line,
[
ProposedFlight(FlightType.CAS, 2, self.MAX_CAS_RANGE),
# This is *not* an escort because front lines don't create a threat
# zone. Generating threat zones from front lines causes the front
# line to push back BARCAPs as it gets closer to the base. While
# front lines do have the same problem of potentially pulling
# BARCAPs off bases to engage a front line TARCAP, that's probably
# the one time where we do want that.
#
# TODO: Use intercepts and extra TARCAPs to cover bases near fronts.
# We don't have intercept missions yet so this isn't something we
# can do today, but we should probably return to having the front
# line project a threat zone (so that strike missions will route
# around it) and instead *not plan* a BARCAP at bases near the
# front, since there isn't a place to put a barrier. Instead, the
# aircraft that would have been a BARCAP could be used as additional
# interceptors and TARCAPs which will defend the base but won't be
# trying to avoid front line contacts.
ProposedFlight(FlightType.TARCAP, 2, self.MAX_CAP_RANGE),
],
)
def propose_missions(self) -> Iterator[ProposedMission]:
"""Identifies and iterates over potential mission in priority order."""
yield from self.critical_missions()
# Find enemy SAM sites with ranges that cover friendly CPs, front lines,
# or objects, plan DEAD.
# Find enemy SAM sites with ranges that extend to within 50 nmi of
# friendly CPs, front, lines, or objects, plan DEAD.
for sam in self.objective_finder.threatening_air_defenses():
flights = [ProposedFlight(FlightType.DEAD, 2, self.MAX_SEAD_RANGE)]
# Only include SEAD against SAMs that still have emitters. No need to
# suppress an EWR, and SEAD isn't useful against a SAM that no longer has a
# working track radar.
#
# For SAMs without track radars and EWRs, we still want a SEAD escort if
# needed.
#
# Note that there is a quirk here: we should potentially be included a SEAD
# escort *and* SEAD when the target is a radar SAM but the flight path is
# also threatened by SAMs. We don't want to include a SEAD escort if the
# package is *only* threatened by the target though. Could be improved, but
# needs a decent refactor to the escort planning to do so.
if sam.has_live_radar_sam:
flights.append(ProposedFlight(FlightType.SEAD, 2, self.MAX_SEAD_RANGE))
else:
flights.append(
ProposedFlight(
FlightType.SEAD_ESCORT, 2, self.MAX_SEAD_RANGE, EscortType.Sead
)
)
# TODO: Max escort range.
flights.append(
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_SEAD_RANGE, EscortType.AirToAir
)
)
yield ProposedMission(sam, flights)
# These will only rarely get planned. When a convoy is travelling multiple legs,
# they're targetable after the first leg. The reason for this is that
# procurement happens *after* mission planning so that the missions that could
# not be filled will guide the procurement process. Procurement is the stage
# that convoys are created (because they're created to move ground units that
# were just purchased), so we haven't created any yet. Any incomplete transfers
# from the previous turn (multi-leg journeys) will still be present though so
# they can be targeted.
#
# Even after this is fixed, the player's convoys that were created through the
# UI will never be targeted on the first turn of their journey because the AI
# stops planning after the start of the turn. We could potentially fix this by
# moving opfor mission planning until the takeoff button is pushed.
for convoy in self.objective_finder.convoys():
yield ProposedMission(
convoy,
[
ProposedFlight(FlightType.BAI, 2, self.MAX_BAI_RANGE),
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_BAI_RANGE, EscortType.AirToAir
),
ProposedFlight(
FlightType.SEAD_ESCORT, 2, self.MAX_BAI_RANGE, EscortType.Sead
),
],
)
for ship in self.objective_finder.cargo_ships():
yield ProposedMission(
ship,
[
ProposedFlight(FlightType.ANTISHIP, 2, self.MAX_ANTISHIP_RANGE),
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_BAI_RANGE, EscortType.AirToAir
),
ProposedFlight(
FlightType.SEAD_ESCORT, 2, self.MAX_BAI_RANGE, EscortType.Sead
),
],
)
for group in self.objective_finder.threatening_ships():
yield ProposedMission(
group,
[
ProposedFlight(FlightType.ANTISHIP, 2, self.MAX_ANTISHIP_RANGE),
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT,
2,
self.MAX_ANTISHIP_RANGE,
EscortType.AirToAir,
),
],
)
for group in self.objective_finder.threatening_vehicle_groups():
yield ProposedMission(
group,
[
ProposedFlight(FlightType.BAI, 2, self.MAX_BAI_RANGE),
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_BAI_RANGE, EscortType.AirToAir
),
ProposedFlight(
FlightType.SEAD_ESCORT, 2, self.MAX_OCA_RANGE, EscortType.Sead
),
],
)
for target in self.objective_finder.oca_targets(min_aircraft=20):
flights = [
ProposedFlight(FlightType.OCA_RUNWAY, 2, self.MAX_OCA_RANGE),
]
if self.game.settings.default_start_type == "Cold":
# Only schedule if the default start type is Cold. If the player
# has set anything else there are no targets to hit.
flights.append(
ProposedFlight(FlightType.OCA_AIRCRAFT, 2, self.MAX_OCA_RANGE)
)
flights.extend(
[
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_OCA_RANGE, EscortType.AirToAir
),
ProposedFlight(
FlightType.SEAD_ESCORT, 2, self.MAX_OCA_RANGE, EscortType.Sead
),
]
)
yield ProposedMission(target, flights)
# Plan strike missions.
for target in self.objective_finder.strike_targets():
yield ProposedMission(
target,
[
ProposedFlight(FlightType.STRIKE, 2, self.MAX_STRIKE_RANGE),
# TODO: Max escort range.
ProposedFlight(
FlightType.ESCORT, 2, self.MAX_STRIKE_RANGE, EscortType.AirToAir
),
ProposedFlight(
FlightType.SEAD_ESCORT,
2,
self.MAX_STRIKE_RANGE,
EscortType.Sead,
),
],
)
def plan_missions(self) -> None: def plan_missions(self) -> None:
"""Identifies and plans mission for the turn.""" """Identifies and plans mission for the turn."""
player = "Blue" if self.is_player else "Red" player = "Blue" if self.is_player else "Red"
with logged_duration(f"{player} mission identification and fulfillment"): with logged_duration(f"{player} mission identification and fulfillment"):
with MultiEventTracer() as tracer: with MultiEventTracer() as tracer:
for proposed_mission in self.propose_missions(): commander = TheaterCommander(self.game, self.is_player)
self.plan_mission(proposed_mission, tracer) commander.plan_missions(self, tracer)
with logged_duration(f"{player} reserve mission planning"):
with MultiEventTracer() as tracer:
for critical_mission in self.critical_missions():
self.plan_mission(critical_mission, tracer, reserves=True)
with logged_duration(f"{player} mission scheduling"): with logged_duration(f"{player} mission scheduling"):
self.stagger_missions() self.stagger_missions()
@ -847,6 +263,9 @@ class CoalitionMissionPlanner:
for aircraft, available in inventory.all_aircraft: for aircraft, available in inventory.all_aircraft:
self.message("Unused aircraft", f"{available} {aircraft} from {cp}") self.message("Unused aircraft", f"{available} {aircraft} from {cp}")
coalition_text = "player" if self.is_player else "opfor"
logging.debug(f"Planned {len(self.ato.packages)} {coalition_text} missions")
def plan_flight( def plan_flight(
self, self,
mission: ProposedMission, mission: ProposedMission,