dcs-retribution/game/theater/controlpoint.py

1676 lines
55 KiB
Python

from __future__ import annotations
import heapq
import itertools
import logging
import math
import uuid
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum, IntEnum, auto, unique
from functools import cached_property, total_ordering
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
TYPE_CHECKING,
Tuple,
Type,
Union,
)
from uuid import UUID
from dcs.mapping import Point
from dcs.ships import (
CVN_71,
CVN_72,
CVN_73,
CVN_75,
CV_1143_5,
Forrestal,
KUZNECOW,
LHA_Tarawa,
Stennis,
Type_071,
Hms_invincible,
)
from dcs.terrain.terrain import Airport, ParkingSlot
from dcs.unitgroup import ShipGroup, StaticGroup
from dcs.unittype import ShipType
from game.ato.closestairfields import ObjectiveDistanceCache
from game.ground_forces.combat_stance import CombatStance
from game.point_with_heading import PointWithHeading
from game.runways import RunwayAssigner, RunwayData
from game.scenery_group import SceneryGroup
from game.sidc import (
Entity,
LandInstallationEntity,
SeaSurfaceEntity,
SidcDescribable,
StandardIdentity,
Status,
SymbolSet,
)
from game.theater.presetlocation import PresetLocation
from game.utils import Distance, Heading, meters
from pydcs_extensions import L02, L52, L61
from .base import Base
from .frontline import FrontLine
from .interfaces.CTLD import CTLD
from .missiontarget import MissionTarget
from .theatergroundobject import (
GenericCarrierGroundObject,
TheaterGroundObject,
VehicleGroupGroundObject,
)
from .theatergroup import TheaterUnit
from ..ato.starttype import StartType
from ..data.units import UnitClass
from ..db import Database
from ..dcs.aircrafttype import AircraftType
from ..dcs.groundunittype import GroundUnitType
from ..radio.ICLSContainer import ICLSContainer
from ..radio.Link4Container import Link4Container
from ..radio.RadioFrequencyContainer import RadioFrequencyContainer
from ..radio.TacanContainer import TacanContainer
from ..utils import nautical_miles
from ..weather.conditions import Conditions
if TYPE_CHECKING:
from game import Game
from game.ato.flighttype import FlightType
from game.coalition import Coalition
from game.lasercodes.lasercoderegistry import LaserCodeRegistry
from game.sim import GameUpdateEvents
from game.squadrons.squadron import Squadron
from game.transfers import PendingTransfers
from .conflicttheater import ConflictTheater
FREE_FRONTLINE_UNIT_SUPPLY: int = 15
AMMO_DEPOT_FRONTLINE_UNIT_CONTRIBUTION: int = 12
TRIGGER_RADIUS_CAPTURE = 3000
class ControlPointType(Enum):
#: An airbase with slots for everything.
AIRBASE = 0
#: A group with a Stennis type carrier (F/A-18, F-14 compatible).
AIRCRAFT_CARRIER_GROUP = 1
#: A group with a Tarawa carrier (Helicopters & Harrier).
LHA_GROUP = 2
#: A FARP, with slots for helicopters & harrier
FARP = 4
#: A FOB (ground units only)
FOB = 5
OFF_MAP = 6
@dataclass
class PresetLocations:
"""Defines the preset locations loaded from the campaign mission file."""
#: Locations used by non-carrier ships that will be spawned unless the faction has
#: no navy or the player has disabled ship generation for the owning side.
ships: List[PresetLocation] = field(default_factory=list)
#: Locations used by coastal defenses that are generated if the faction is capable.
coastal_defenses: List[PresetLocation] = field(default_factory=list)
#: Locations used by ground based strike objectives.
strike_locations: List[PresetLocation] = field(default_factory=list)
#: Locations used by offshore strike objectives.
offshore_strike_locations: List[PresetLocation] = field(default_factory=list)
#: Locations used by missile sites like scuds and V-2s that are generated if the
#: faction is capable.
missile_sites: List[PresetLocation] = field(default_factory=list)
#: Locations of long range SAMs.
long_range_sams: List[PresetLocation] = field(default_factory=list)
#: Locations of medium range SAMs.
medium_range_sams: List[PresetLocation] = field(default_factory=list)
#: Locations of short range SAMs.
short_range_sams: List[PresetLocation] = field(default_factory=list)
#: Locations of AAA groups.
aaa: List[PresetLocation] = field(default_factory=list)
#: Locations of EWRs.
ewrs: List[PresetLocation] = field(default_factory=list)
#: Locations of map scenery to create zones for.
scenery: List[SceneryGroup] = field(default_factory=list)
#: Locations of factories for producing ground units.
factories: List[PresetLocation] = field(default_factory=list)
#: Locations of ammo depots for controlling number of units on the front line at a
#: control point.
ammunition_depots: List[PresetLocation] = field(default_factory=list)
#: Locations of stationary armor groups.
armor_groups: List[PresetLocation] = field(default_factory=list)
#: Locations of skynet specific groups
iads_connection_node: List[PresetLocation] = field(default_factory=list)
iads_power_source: List[PresetLocation] = field(default_factory=list)
iads_command_center: List[PresetLocation] = field(default_factory=list)
@dataclass(frozen=True)
class AircraftAllocations:
present: dict[AircraftType, int]
ordered: dict[AircraftType, int]
transferring: dict[AircraftType, int]
@property
def total_value(self) -> int:
total: int = 0
for unit_type, count in self.present.items():
total += unit_type.price * count
for unit_type, count in self.ordered.items():
total += unit_type.price * count
for unit_type, count in self.transferring.items():
total += unit_type.price * count
return total
@property
def total(self) -> int:
return self.total_present + self.total_ordered + self.total_transferring
@property
def total_present(self) -> int:
return sum(self.present.values())
@property
def total_ordered(self) -> int:
return sum(self.ordered.values())
@property
def total_transferring(self) -> int:
return sum(self.transferring.values())
@dataclass(frozen=True)
class GroundUnitAllocations:
present: dict[GroundUnitType, int]
ordered: dict[GroundUnitType, int]
transferring: dict[GroundUnitType, int]
@property
def all(self) -> dict[GroundUnitType, int]:
combined: dict[GroundUnitType, int] = defaultdict(int)
for unit_type, count in itertools.chain(
self.present.items(), self.ordered.items(), self.transferring.items()
):
combined[unit_type] += count
return dict(combined)
@property
def total_value(self) -> int:
total: int = 0
for unit_type, count in self.present.items():
total += unit_type.price * count
for unit_type, count in self.ordered.items():
total += unit_type.price * count
for unit_type, count in self.transferring.items():
total += unit_type.price * count
return total
@cached_property
def total(self) -> int:
return self.total_present + self.total_ordered + self.total_transferring
@cached_property
def total_present(self) -> int:
return sum(self.present.values())
@cached_property
def total_ordered(self) -> int:
return sum(self.ordered.values())
@cached_property
def total_transferring(self) -> int:
return sum(self.transferring.values())
@dataclass
class RunwayStatus:
damaged: bool = False
repair_turns_remaining: Optional[int] = None
def damage(self) -> None:
self.damaged = True
# If the runway is already under repair and is damaged again, progress
# is reset.
self.repair_turns_remaining = None
def repair(self) -> None:
self.repair_turns_remaining = None
self.damaged = False
def begin_repair(self) -> None:
if self.repair_turns_remaining is not None:
logging.error("Runway already under repair. Restarting.")
self.repair_turns_remaining = 4
def process_turn(self) -> None:
if self.repair_turns_remaining is not None:
if self.repair_turns_remaining == 1:
self.repair()
else:
self.repair_turns_remaining -= 1
@property
def needs_repair(self) -> bool:
return self.damaged and self.repair_turns_remaining is None
def __str__(self) -> str:
if not self.damaged:
return "Runway operational"
turns_remaining = self.repair_turns_remaining
if turns_remaining is None:
return "Runway damaged"
return f"Runway repairing, {turns_remaining} turns remaining"
@total_ordering
class GroundUnitDestination:
def __init__(self, control_point: ControlPoint) -> None:
self.control_point = control_point
@property
def total_value(self) -> float:
return self.control_point.base.total_armor_value
def __eq__(self, other: Any) -> bool:
if not isinstance(other, GroundUnitDestination):
raise TypeError
return self.total_value == other.total_value
def __lt__(self, other: Any) -> bool:
if not isinstance(other, GroundUnitDestination):
raise TypeError
return self.total_value < other.total_value
@unique
class ControlPointStatus(IntEnum):
Functional = auto()
Damaged = auto()
Destroyed = auto()
StartingPosition = Union[ShipGroup, StaticGroup, Airport, Point]
class ParkingType:
def __init__(
self,
fixed_wing: bool = False,
fixed_wing_stol: bool = False,
rotary_wing: bool = False,
) -> None:
self.include_fixed_wing = fixed_wing
self.include_fixed_wing_stol = fixed_wing_stol
self.include_rotary_wing = rotary_wing
def from_squadron(self, squadron: Squadron) -> ParkingType:
return self.from_aircraft(
squadron.aircraft, squadron.coalition.game.settings.ground_start_ai_planes
)
def from_aircraft(
self, aircraft: AircraftType, ground_start_ai_planes: bool
) -> ParkingType:
if aircraft.helicopter or aircraft.lha_capable:
self.include_rotary_wing = True
self.include_fixed_wing = True
self.include_fixed_wing_stol = True
elif aircraft.flyable or ground_start_ai_planes:
self.include_rotary_wing = False
self.include_fixed_wing = True
self.include_fixed_wing_stol = True
else:
self.include_rotary_wing = False
self.include_fixed_wing = True
self.include_fixed_wing_stol = False
return self
#: Fixed wing aircraft with no STOL or VTOL capability
include_fixed_wing: bool
#: Fixed wing aircraft with STOL capability
include_fixed_wing_stol: bool
#: Helicopters and VTOL aircraft
include_rotary_wing: bool
class ControlPoint(MissionTarget, SidcDescribable, ABC):
# Not sure what distance DCS uses, but assuming it's about 2NM since that's roughly
# the distance of the circle on the map.
CAPTURE_DISTANCE = meters(TRIGGER_RADIUS_CAPTURE)
# TODO: Only airbases have IDs.
# TODO: cptype is obsolete.
def __init__(
self,
name: str,
position: Point,
at: StartingPosition,
theater: ConflictTheater,
starts_blue: bool,
cptype: ControlPointType = ControlPointType.AIRBASE,
) -> None:
super().__init__(name, position)
self.id = uuid.uuid4()
self.full_name = name
self.at = at
self.theater = theater
self.starts_blue = starts_blue
self.connected_objectives: List[TheaterGroundObject] = []
self.preset_locations = PresetLocations()
self.helipads: List[PointWithHeading] = []
self.helipads_quad: List[PointWithHeading] = []
self.helipads_invisible: List[PointWithHeading] = []
self.ground_spawns_roadbase: List[Tuple[PointWithHeading, Point]] = []
self.ground_spawns: List[Tuple[PointWithHeading, Point]] = []
self._coalition: Optional[Coalition] = None
self.captured_invert = False
self.front_lines: dict[ControlPoint, FrontLine] = {}
# TODO: Should be Airbase specific.
self.connected_points: List[ControlPoint] = []
self.convoy_routes: Dict[ControlPoint, Tuple[Point, ...]] = {}
self.convoy_spawns: Dict[ControlPoint, Tuple[Point, ...]] = {}
self.shipping_lanes: Dict[ControlPoint, Tuple[Point, ...]] = {}
self.base: Base = Base()
self.cptype = cptype
# TODO: Should be Airbase specific.
self.stances: dict[UUID, CombatStance] = {} # Connected-CP -> CombatStance
from ..groundunitorders import GroundUnitOrders
self.ground_unit_orders = GroundUnitOrders(self)
self.target_position: Optional[Point] = None
# Initialized late because ControlPoints are constructed before the game is.
self._front_line_db: Database[FrontLine] | None = None
def __repr__(self) -> str:
return f"<{self.__class__}: {self.name}>"
@property
def dcs_airport(self) -> Airport | None:
return None
@property
def coalition(self) -> Coalition:
if self._coalition is None:
raise RuntimeError("ControlPoint not fully initialized: coalition not set")
return self._coalition
def finish_init(self, game: Game) -> None:
assert self._coalition is None
self._coalition = game.coalition_for(self.starts_blue)
assert self._front_line_db is None
self._front_line_db = game.db.front_lines
def initialize_turn_0(self, laser_code_registry: LaserCodeRegistry) -> None:
# We don't need to send events for turn 0. The UI isn't up yet, and it'll fetch
# the entire game state when it comes up.
from game.sim import GameUpdateEvents
self._create_missing_front_lines(laser_code_registry, GameUpdateEvents())
@property
def front_line_db(self) -> Database[FrontLine]:
assert self._front_line_db is not None
return self._front_line_db
def _create_missing_front_lines(
self, laser_code_registry: LaserCodeRegistry, events: GameUpdateEvents
) -> None:
for connection in self.convoy_routes.keys():
if not connection.front_line_active_with(
self
) and not connection.is_friendly_to(self):
self._create_front_line_with(laser_code_registry, connection, events)
def _create_front_line_with(
self,
laser_code_registry: LaserCodeRegistry,
connection: ControlPoint,
events: GameUpdateEvents,
) -> None:
blue, red = FrontLine.sort_control_points(self, connection)
front = FrontLine(blue, red, laser_code_registry.alloc_laser_code())
self.front_lines[connection] = front
connection.front_lines[self] = front
self.front_line_db.add(front.id, front)
events.update_front_line(front)
def _remove_front_line_with(
self,
connection: ControlPoint,
events: GameUpdateEvents,
) -> None:
front = self.front_lines[connection]
del self.front_lines[connection]
del connection.front_lines[self]
self.front_line_db.remove(front.id)
front.laser_code.release()
events.delete_front_line(front)
def _clear_front_lines(self, events: GameUpdateEvents) -> None:
for opponent in list(self.front_lines.keys()):
self._remove_front_line_with(opponent, events)
@property
def has_frontline(self) -> bool:
return bool(self.front_lines)
def front_line_active_with(self, other: ControlPoint) -> bool:
return other in self.front_lines
def front_line_with(self, other: ControlPoint) -> FrontLine:
return self.front_lines[other]
@property
def captured(self) -> bool:
return self.coalition.player
@property
def standard_identity(self) -> StandardIdentity:
return (
StandardIdentity.FRIEND if self.captured else StandardIdentity.HOSTILE_FAKER
)
@property
def sidc_status(self) -> Status:
if self.status is ControlPointStatus.Functional:
return Status.PRESENT
if self.status is ControlPointStatus.Damaged:
return Status.PRESENT_DAMAGED
if self.status is ControlPointStatus.Destroyed:
return Status.PRESENT_DESTROYED
raise ValueError(f"Unexpected ControlPointStatus: {self.status}")
@property
def ground_objects(self) -> List[TheaterGroundObject]:
return list(self.connected_objectives)
@property
def squadrons(self) -> Iterator[Squadron]:
for squadron in self.coalition.air_wing.iter_squadrons():
if squadron.location == self:
yield squadron
@property
@abstractmethod
def heading(self) -> Heading:
...
def __str__(self) -> str:
return self.name
@property
def is_isolated(self) -> bool:
return not self.connected_points
@property
def is_global(self) -> bool:
return self.is_isolated
def transitive_connected_friendly_points(
self, seen: Optional[Set[ControlPoint]] = None
) -> List[ControlPoint]:
if seen is None:
seen = {self}
connected = []
for cp in self.connected_points:
if cp.captured != self.captured:
continue
if cp in seen:
continue
seen.add(cp)
connected.append(cp)
connected.extend(cp.transitive_connected_friendly_points(seen))
return connected
def transitive_friendly_shipping_destinations(
self, seen: Optional[Set[ControlPoint]] = None
) -> List[ControlPoint]:
if seen is None:
seen = {self}
connected = []
for cp in self.shipping_lanes:
if cp.captured != self.captured:
continue
if cp in seen:
continue
seen.add(cp)
connected.append(cp)
connected.extend(cp.transitive_friendly_shipping_destinations(seen))
return connected
@property
def has_factory(self) -> bool:
for tgo in self.connected_objectives:
if tgo.is_factory and not tgo.is_dead:
return True
return False
@property
def has_helipads(self) -> bool:
"""
Returns true if cp has helipads
"""
return (
len(self.helipads) + len(self.helipads_quad) + len(self.helipads_invisible)
> 0
)
@property
def has_ground_spawns(self) -> bool:
"""
Returns true if cp can operate STOL aircraft
"""
return len(self.ground_spawns_roadbase) + len(self.ground_spawns) > 0
def can_recruit_ground_units(self, game: Game) -> bool:
"""Returns True if this control point is capable of recruiting ground units."""
if not self.can_deploy_ground_units:
return False
if game.turn == 0:
# Allow units to be recruited anywhere on turn 0 to avoid long delays to get
# everyone to the front line.
return True
return self.has_factory
def has_ground_unit_source(self, game: Game) -> bool:
"""Returns True if this control point has access to ground reinforcements."""
if not self.can_deploy_ground_units:
return False
for cp in game.theater.controlpoints:
if cp.is_friendly(self.captured) and cp.can_recruit_ground_units(game):
return True
return False
@property
def is_carrier(self) -> bool:
"""
:return: Whether this control point is an aircraft carrier
"""
return False
@property
def is_fleet(self) -> bool:
"""
:return: Whether this control point is a boat (mobile)
"""
return False
@property
def is_lha(self) -> bool:
"""
:return: Whether this control point is an LHA
"""
return False
@property
def is_fob(self) -> bool:
"""
:return: Whether this control point is a FOB
"""
return False
@property
def moveable(self) -> bool:
"""
:return: Whether this control point can be moved around
"""
return self.max_move_distance > meters(0)
@property
def max_move_distance(self) -> Distance:
return meters(0)
def destination_in_range(self, destination: Point) -> bool:
distance = meters(destination.distance_to_point(self.position))
return distance <= self.max_move_distance
@property
@abstractmethod
def can_deploy_ground_units(self) -> bool:
...
@abstractmethod
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
"""
:return: The maximum number of aircraft that can be stored in this
control point
"""
...
def convoy_origin_for(self, destination: ControlPoint) -> Point:
return self.convoy_route_to(destination)[0]
def convoy_route_to(self, destination: ControlPoint) -> Sequence[Point]:
return self.convoy_routes[destination]
def create_convoy_route(
self, to: ControlPoint, waypoints: Iterable[Point], spawns: Iterable[Point]
) -> None:
self.connected_points.append(to)
self.stances[to.id] = CombatStance.DEFENSIVE
self.convoy_routes[to] = tuple(waypoints)
self.convoy_spawns[to] = tuple(spawns)
def create_shipping_lane(
self, to: ControlPoint, waypoints: Iterable[Point]
) -> None:
self.shipping_lanes[to] = tuple(waypoints)
@abstractmethod
def runway_is_operational(self) -> bool:
"""
Check whether this control point supports taking offs and landings.
:return:
"""
...
# TODO: Should be naval specific.
def get_carrier_group_name(self) -> Optional[str]:
"""
Get the carrier group name if the airbase is a carrier
:return: Carrier group name
"""
if self.cptype in [
ControlPointType.AIRCRAFT_CARRIER_GROUP,
ControlPointType.LHA_GROUP,
]:
for g in self.ground_objects:
for group in g.groups:
for u in group.units:
if u.unit_type and u.unit_type.unit_class in [
UnitClass.AIRCRAFT_CARRIER,
UnitClass.HELICOPTER_CARRIER,
]:
return group.group_name
return None
def get_carrier_group_type(
self, always_supercarrier: bool = False
) -> Optional[Type[ShipType]]:
"""
Get the carrier group type if the airbase is a carrier. Arguments:
always_supercarrier: True if should always return the supercarrier type, False if should only
return the supercarrier type when the supercarrier option is enabled in settings.
:return: Carrier group type
"""
if self.cptype in [
ControlPointType.AIRCRAFT_CARRIER_GROUP,
ControlPointType.LHA_GROUP,
]:
for g in self.ground_objects:
for group in g.groups:
u = group.units[0]
carrier_type = u.type
if (
u.unit_type
and u.unit_type.unit_class
in [
UnitClass.AIRCRAFT_CARRIER,
UnitClass.HELICOPTER_CARRIER,
]
and issubclass(carrier_type, ShipType)
):
if (
self.coalition.game.settings.supercarrier
or always_supercarrier
):
return self.upgrade_to_supercarrier(carrier_type, self.name)
return carrier_type
return None
@staticmethod
def upgrade_to_supercarrier(unit: Type[ShipType], name: str) -> Type[ShipType]:
if unit == Stennis:
if name == "CVN-71 Theodore Roosevelt":
return CVN_71
elif name == "CVN-72 Abraham Lincoln":
return CVN_72
elif name == "CVN-73 George Washington":
return CVN_73
elif name == "CVN-75 Harry S. Truman":
return CVN_75
elif name == "Carrier Strike Group 8":
return CVN_75
else:
return CVN_71
elif unit == KUZNECOW:
return CV_1143_5
else:
return unit
# TODO: Should be Airbase specific.
def is_connected(self, to: ControlPoint) -> bool:
return to in self.connected_points
def find_ground_objects_by_obj_name(
self, obj_name: str
) -> list[TheaterGroundObject]:
found = []
for g in self.ground_objects:
if g.obj_name == obj_name:
found.append(g)
return found
def is_friendly(self, to_player: bool) -> bool:
return self.captured == to_player
def is_friendly_to(self, control_point: ControlPoint) -> bool:
return control_point.is_friendly(self.captured)
def capture_equipment(self, game: Game) -> None:
total = self.base.total_armor_value
self.base.armor.clear()
game.adjust_budget(total, player=not self.captured)
game.message(
f"{self.name} is not connected to any friendly points. Ground "
f"vehicles have been captured and sold for ${total}M."
)
def retreat_ground_units(self, game: Game) -> None:
# When there are multiple valid destinations, deliver units to whichever
# base is least defended first. The closest approximation of unit
# strength we have is price
destinations = [
GroundUnitDestination(cp)
for cp in self.connected_points
if cp.captured == self.captured
]
if not destinations:
self.capture_equipment(game)
return
heapq.heapify(destinations)
destination = heapq.heappop(destinations)
while self.base.armor:
unit_type, count = self.base.armor.popitem()
for _ in range(count):
destination.control_point.base.commission_units({unit_type: 1})
destination = heapq.heappushpop(destinations, destination)
def capture_aircraft(self, game: Game, airframe: AircraftType, count: int) -> None:
value = airframe.price * count
game.adjust_budget(value, player=not self.captured)
game.message(
f"No valid retreat destination in range of {self.name} for {airframe} "
f"{count} aircraft have been captured and sold for ${value}M."
)
def aircraft_retreat_destination(
self, squadron: Squadron
) -> Optional[ControlPoint]:
closest = ObjectiveDistanceCache.get_closest_airfields(self)
# Multiply the max mission range by two when evaluating retreats,
# since you only need to fly one way in that case
max_retreat_distance = squadron.aircraft.max_mission_range * 2
# Skip the first airbase because that's the airbase we're retreating
# from.
airfields = list(closest.operational_airfields_within(max_retreat_distance))[1:]
not_preferred: Optional[ControlPoint] = None
overfull: list[ControlPoint] = []
parking_type = ParkingType().from_squadron(squadron)
for airbase in airfields:
if airbase.captured != self.captured:
continue
if airbase.unclaimed_parking(parking_type) < squadron.owned_aircraft:
if airbase.can_operate(squadron.aircraft):
overfull.append(airbase)
continue
elif isinstance(airbase, Airfield):
dcs_unit_type = squadron.aircraft.dcs_unit_type
free_slots = airbase.airport.free_parking_slots(dcs_unit_type)
if len(free_slots) < squadron.owned_aircraft or len(free_slots) == 0:
overfull.append(airbase)
continue
if squadron.operates_from(airbase):
# Has room, is a preferred base type for this squadron, and is the
# closest choice. No need to keep looking.
return airbase
if not_preferred is None and airbase.can_operate(squadron.aircraft):
# Has room and is capable of operating from this base, but it isn't
# preferred. Remember this option and use it if we can't find a
# preferred base type with room.
not_preferred = airbase
if not_preferred is not None:
# It's not our best choice but the other choices don't have room for the
# squadron and would lead to aircraft being captured.
return not_preferred
# No base was available with enough room. Find whichever base has the most room
# available so we lose as little as possible. The overfull list is already
# sorted by distance, and filtered for appropriate destinations.
base_for_fewest_losses: Optional[ControlPoint] = None
loss_count = math.inf
for airbase in overfull:
overflow = -(
airbase.unclaimed_parking(parking_type)
- squadron.owned_aircraft
- squadron.pending_deliveries
)
if overflow < loss_count:
loss_count = overflow
base_for_fewest_losses = airbase
return base_for_fewest_losses
def _retreat_squadron(self, game: Game, squadron: Squadron) -> None:
destination = self.aircraft_retreat_destination(squadron)
if destination is None:
squadron.refund_orders()
self.capture_aircraft(game, squadron.aircraft, squadron.owned_aircraft)
return
parking_type = ParkingType().from_squadron(squadron)
logging.debug(f"{squadron} retreating to {destination} from {self}")
squadron.relocate_to(destination)
squadron.cancel_overflow_orders()
overflow = -destination.unclaimed_parking(parking_type)
if overflow > 0:
logging.debug(
f"Not enough room for {squadron} at {destination}. Capturing "
f"{overflow} aircraft."
)
self.capture_aircraft(game, squadron.aircraft, overflow)
squadron.owned_aircraft -= overflow
def retreat_air_units(self, game: Game) -> None:
# TODO: Capture in order of price to retain maximum value?
for squadron in self.squadrons:
self._retreat_squadron(game, squadron)
def depopulate_uncapturable_tgos(self) -> None:
# TODO Rework this.
for tgo in self.connected_objectives:
if not tgo.capturable:
tgo.clear()
def release_parking_slots(self) -> None:
pass
# TODO: Should be Airbase specific.
def capture(self, game: Game, events: GameUpdateEvents, for_player: bool) -> None:
new_coalition = game.coalition_for(for_player)
self.ground_unit_orders.refund_all(self.coalition)
self.retreat_ground_units(game)
self.retreat_air_units(game)
self.release_parking_slots()
self.depopulate_uncapturable_tgos()
self._coalition = new_coalition
self.base.set_strength_to_minimum()
self._clear_front_lines(events)
self._create_missing_front_lines(game.laser_code_registry, events)
events.update_control_point(self)
# All the attached TGOs have either been depopulated or captured. Tell the UI to
# update their state. Also update orientation and IADS state for specific tgos
for tgo in self.connected_objectives:
is_vehicle_go = isinstance(tgo, VehicleGroupGroundObject)
if tgo.is_iads or is_vehicle_go:
if tgo.is_iads:
game.theater.iads_network.update_tgo(tgo, events)
conflict_heading = game.theater.heading_to_conflict_from(tgo.position)
tgo.rotate(conflict_heading or tgo.heading)
if not tgo.is_control_point:
events.update_tgo(tgo)
@property
def required_aircraft_start_type(self) -> Optional[StartType]:
return None
@abstractmethod
def can_operate(self, aircraft: AircraftType) -> bool:
...
def unclaimed_parking(self, parking_type: ParkingType) -> int:
return (
self.total_aircraft_parking(parking_type)
- self.allocated_aircraft(parking_type).total
)
@abstractmethod
def active_runway(
self,
theater: ConflictTheater,
conditions: Conditions,
dynamic_runways: Dict[str, RunwayData],
) -> RunwayData:
...
def stub_runway_data(self) -> RunwayData:
return RunwayData(
self.full_name, runway_heading=Heading.from_degrees(0), runway_name=""
)
@property
def airdrome_id_for_landing(self) -> Optional[int]:
return None
@property
def parking_slots(self) -> Iterator[ParkingSlot]:
yield from []
@property
@abstractmethod
def runway_is_destroyable(self) -> bool:
...
@property
@abstractmethod
def runway_status(self) -> RunwayStatus:
...
@property
def runway_can_be_repaired(self) -> bool:
return self.runway_status.needs_repair
def begin_runway_repair(self) -> None:
if not self.runway_can_be_repaired:
logging.error(f"Cannot repair runway at {self}")
return
self.runway_status.begin_repair()
def process_turn(self, game: Game) -> None:
self.ground_unit_orders.process(game)
runway_status = self.runway_status
if runway_status is not None:
runway_status.process_turn()
# Process movements for ships control points group
if self.target_position is not None:
delta = self.target_position - self.position
self.position = self.target_position
self.target_position = None
# Move the linked unit groups
for ground_object in self.ground_objects:
if isinstance(ground_object, GenericCarrierGroundObject):
ground_object.position.x = ground_object.position.x + delta.x
ground_object.position.y = ground_object.position.y + delta.y
for group in ground_object.groups:
for u in group.units:
u.position.x = u.position.x + delta.x
u.position.y = u.position.y + delta.y
def allocated_aircraft(self, parking_type: ParkingType) -> AircraftAllocations:
present: dict[AircraftType, int] = defaultdict(int)
on_order: dict[AircraftType, int] = defaultdict(int)
transferring: dict[AircraftType, int] = defaultdict(int)
for squadron in self.squadrons:
if not parking_type.include_rotary_wing and (
squadron.aircraft.helicopter or squadron.aircraft.lha_capable
):
continue
elif not parking_type.include_fixed_wing and (
not squadron.aircraft.helicopter or squadron.aircraft.lha_capable
):
continue
present[squadron.aircraft] += squadron.owned_aircraft
if squadron.destination is None:
on_order[squadron.aircraft] += squadron.pending_deliveries
else:
transferring[squadron.aircraft] -= squadron.owned_aircraft
for squadron in self.coalition.air_wing.iter_squadrons():
if not parking_type.include_rotary_wing and (
squadron.aircraft.helicopter or squadron.aircraft.lha_capable
):
continue
elif not parking_type.include_fixed_wing and (
not squadron.aircraft.helicopter or squadron.aircraft.lha_capable
):
continue
if squadron.destination == self:
on_order[squadron.aircraft] += squadron.pending_deliveries
transferring[squadron.aircraft] += squadron.owned_aircraft
return AircraftAllocations(present, on_order, transferring)
def allocated_ground_units(
self, transfers: PendingTransfers
) -> GroundUnitAllocations:
on_order = {}
for unit_bought, count in self.ground_unit_orders.units.items():
if isinstance(unit_bought, GroundUnitType):
on_order[unit_bought] = count
transferring: dict[GroundUnitType, int] = defaultdict(int)
for transfer in transfers:
if transfer.destination == self:
for unit_type, count in transfer.units.items():
transferring[unit_type] += count
return GroundUnitAllocations(
self.base.armor,
on_order,
transferring,
)
@property
def income_per_turn(self) -> int:
return 0
@property
def has_active_frontline(self) -> bool:
return any(not c.is_friendly(self.captured) for c in self.connected_points)
def front_is_active(self, other: ControlPoint) -> bool:
if other not in self.connected_points:
raise ValueError
return self.captured != other.captured
@property
def deployable_front_line_units(self) -> int:
return self.deployable_front_line_units_with(self.active_ammo_depots_count)
def deployable_front_line_units_with(self, ammo_depot_count: int) -> int:
return min(
self.front_line_capacity_with(ammo_depot_count), self.base.total_armor
)
def front_line_capacity_with(self, ammo_depot_count: int) -> int:
return min(
self.coalition.game.settings.perf_frontline_units_max_supply,
(
FREE_FRONTLINE_UNIT_SUPPLY
+ ammo_depot_count * AMMO_DEPOT_FRONTLINE_UNIT_CONTRIBUTION
),
)
@property
def frontline_unit_count_limit(self) -> int:
return self.front_line_capacity_with(self.active_ammo_depots_count)
@property
def all_ammo_depots(self) -> Iterator[TheaterGroundObject]:
for tgo in self.connected_objectives:
if tgo.is_ammo_depot:
yield tgo
def ammo_depot_count(self, alive_only: bool = False) -> int:
return sum(
ammo_depot.alive_unit_count if alive_only else ammo_depot.unit_count
for ammo_depot in self.all_ammo_depots
)
@property
def active_ammo_depots_count(self) -> int:
"""Return the number of available ammo depots"""
return self.ammo_depot_count(True)
@property
def total_ammo_depots_count(self) -> int:
"""Return the number of ammo depots, including dead ones"""
return self.ammo_depot_count()
@property
def active_fuel_depots_count(self) -> int:
"""Return the number of available fuel depots"""
return len(
[
obj
for obj in self.connected_objectives
if obj.category == "fuel" and not obj.is_dead
]
)
@property
def total_fuel_depots_count(self) -> int:
"""Return the number of fuel depots, including dead ones"""
return len([obj for obj in self.connected_objectives if obj.category == "fuel"])
@property
def strike_targets(self) -> list[TheaterUnit]:
return []
@property
@abstractmethod
def category(self) -> str:
...
@property
@abstractmethod
def status(self) -> ControlPointStatus:
...
class Airfield(ControlPoint, CTLD):
def __init__(
self,
airport: Airport,
theater: ConflictTheater,
starts_blue: bool,
ctld_zones: Optional[List[Tuple[Point, float]]] = None,
) -> None:
super().__init__(
airport.name,
airport.position,
airport,
theater,
starts_blue,
cptype=ControlPointType.AIRBASE,
)
self.airport = airport
self._runway_status = RunwayStatus()
self.ctld_zones = ctld_zones
@property
def dcs_airport(self) -> Airport:
return self.airport
@property
def symbol_set_and_entity(self) -> tuple[SymbolSet, Entity]:
return SymbolSet.LAND_INSTALLATIONS, LandInstallationEntity.AIPORT_AIR_BASE
def can_operate(self, aircraft: AircraftType) -> bool:
# TODO: Allow helicopters.
# Need to implement ground spawns so the helos don't use the runway.
# TODO: Allow harrier.
# Needs ground spawns just like helos do, but also need to be able to
# limit takeoff weight to ~20500 lbs or it won't be able to take off.
parking_type = ParkingType().from_aircraft(
aircraft, self.coalition.game.settings.ground_start_ai_planes
)
if parking_type.include_rotary_wing and self.has_helipads:
return True
if parking_type.include_fixed_wing_stol and self.has_ground_spawns:
return True
return self.runway_is_operational()
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from game.ato import FlightType
if not self.is_friendly(for_player):
yield from [
FlightType.OCA_AIRCRAFT,
FlightType.OCA_RUNWAY,
FlightType.AIR_ASSAULT,
]
yield from super().mission_types(for_player)
if self.is_friendly(for_player):
yield from [
FlightType.AEWC,
# TODO: FlightType.INTERCEPTION
# TODO: FlightType.LOGISTICS
]
yield FlightType.REFUELING
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
"""
Return total aircraft parking slots available
Note : additional helipads shouldn't contribute to this score as it could allow airfield
to buy more planes than what they are able to host
"""
parking_slots = 0
if parking_type.include_rotary_wing:
parking_slots += (
len(self.helipads)
+ 4 * len(self.helipads_quad)
+ len(self.helipads_invisible)
)
if parking_type.include_fixed_wing_stol:
parking_slots += len(self.ground_spawns)
parking_slots += len(self.ground_spawns_roadbase)
if parking_type.include_fixed_wing:
parking_slots += len(self.airport.parking_slots)
return parking_slots
def release_parking_slots(self) -> None:
for slot in self.parking_slots:
slot.unit_id = None
@property
def heading(self) -> Heading:
return Heading.from_degrees(self.airport.runways[0].heading)
@property
def runway_is_destroyable(self) -> bool:
return True
def runway_is_operational(self) -> bool:
return not self.runway_status.damaged
@property
def runway_status(self) -> RunwayStatus:
return self._runway_status
def damage_runway(self) -> None:
self.runway_status.damage()
def active_runway(
self,
theater: ConflictTheater,
conditions: Conditions,
dynamic_runways: Dict[str, RunwayData],
) -> RunwayData:
if not self.airport.runways:
# Some airfields are heliports and don't have any runways. This isn't really
# the best fix, since we should still try to generate partial data for TACAN
# beacons, but it'll do for a bug fix, and the proper fix probably involves
# making heliports their own CP type.
# https://github.com/dcs-liberation/dcs_liberation/issues/2710
return self.stub_runway_data()
assigner = RunwayAssigner(conditions)
return assigner.get_preferred_runway(theater, self.airport)
@property
def airdrome_id_for_landing(self) -> Optional[int]:
return self.airport.id
@property
def parking_slots(self) -> Iterator[ParkingSlot]:
yield from self.airport.parking_slots
@property
def can_deploy_ground_units(self) -> bool:
return True
@property
def income_per_turn(self) -> int:
return 20
@property
def category(self) -> str:
return "airfield"
@property
def status(self) -> ControlPointStatus:
runway_staus = self.runway_status
if runway_staus.needs_repair:
return ControlPointStatus.Destroyed
elif runway_staus.damaged:
return ControlPointStatus.Damaged
return ControlPointStatus.Functional
class NavalControlPoint(
ControlPoint, ABC, Link4Container, TacanContainer, ICLSContainer
):
@property
def is_fleet(self) -> bool:
return True
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from game.ato import FlightType
if self.is_friendly(for_player):
yield from [
# TODO: FlightType.INTERCEPTION
# TODO: Buddy tanking for the A-4?
# TODO: Rescue chopper?
# TODO: Inter-ship logistics?
]
else:
yield from [
FlightType.ANTISHIP,
FlightType.SEAD_ESCORT,
]
yield from super().mission_types(for_player)
@property
def heading(self) -> Heading:
return Heading.from_degrees(0) # TODO compute heading
def find_main_tgo(self) -> GenericCarrierGroundObject:
for g in self.ground_objects:
if isinstance(g, GenericCarrierGroundObject):
return g
raise RuntimeError(f"Found no carrier/LHA group for {self.name}")
@property
def runway_is_destroyable(self) -> bool:
return False
def runway_is_operational(self) -> bool:
# Necessary because it's possible for the carrier itself to have sunk
# while its escorts are still alive.
for group in self.find_main_tgo().groups:
for u in group.units:
if u.alive and u.type in [
Forrestal,
Stennis,
LHA_Tarawa,
KUZNECOW,
Type_071,
Hms_invincible,
L02,
L52,
L61,
]:
return True
return False
def active_runway(
self,
theater: ConflictTheater,
conditions: Conditions,
dynamic_runways: Dict[str, RunwayData],
) -> RunwayData:
# TODO: Assign TACAN and ICLS earlier so we don't need this.
fallback = RunwayData(
self.full_name, runway_heading=Heading.from_degrees(0), runway_name=""
)
return dynamic_runways.get(self.full_name, fallback)
@property
def runway_status(self) -> RunwayStatus:
return RunwayStatus(damaged=not self.runway_is_operational())
@property
def runway_can_be_repaired(self) -> bool:
return False
@property
def max_move_distance(self) -> Distance:
return nautical_miles(80)
@property
def can_deploy_ground_units(self) -> bool:
return False
@property
def status(self) -> ControlPointStatus:
if not self.runway_is_operational():
return ControlPointStatus.Destroyed
if self.find_main_tgo().dead_units:
return ControlPointStatus.Damaged
return ControlPointStatus.Functional
class Carrier(NavalControlPoint):
def __init__(
self, name: str, at: Point, theater: ConflictTheater, starts_blue: bool
):
super().__init__(
name,
at,
at,
theater,
starts_blue,
cptype=ControlPointType.AIRCRAFT_CARRIER_GROUP,
)
@property
def symbol_set_and_entity(self) -> tuple[SymbolSet, Entity]:
return SymbolSet.SEA_SURFACE, SeaSurfaceEntity.CARRIER
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from game.ato.flighttype import FlightType
yield from super().mission_types(for_player)
if self.is_friendly(for_player):
yield from [
FlightType.AEWC,
FlightType.REFUELING,
]
def capture(self, game: Game, events: GameUpdateEvents, for_player: bool) -> None:
raise RuntimeError("Carriers cannot be captured")
@property
def is_carrier(self) -> bool:
return True
def can_operate(self, aircraft: AircraftType) -> bool:
return aircraft.carrier_capable
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
return 90
@property
def category(self) -> str:
return "cv"
class Lha(NavalControlPoint):
def __init__(
self, name: str, at: Point, theater: ConflictTheater, starts_blue: bool
):
super().__init__(
name, at, at, theater, starts_blue, cptype=ControlPointType.LHA_GROUP
)
@property
def symbol_set_and_entity(self) -> tuple[SymbolSet, Entity]:
return SymbolSet.SEA_SURFACE, SeaSurfaceEntity.AMPHIBIOUS_ASSAULT_SHIP_GENERAL
def capture(self, game: Game, events: GameUpdateEvents, for_player: bool) -> None:
raise RuntimeError("LHAs cannot be captured")
@property
def is_lha(self) -> bool:
return True
def can_operate(self, aircraft: AircraftType) -> bool:
return aircraft.lha_capable
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
return 20
@property
def category(self) -> str:
return "lha"
class OffMapSpawn(ControlPoint):
def runway_is_operational(self) -> bool:
return True
def __init__(
self, name: str, position: Point, theater: ConflictTheater, starts_blue: bool
):
super().__init__(
name,
position,
position,
theater,
starts_blue,
cptype=ControlPointType.OFF_MAP,
)
@property
def symbol_set_and_entity(self) -> tuple[SymbolSet, Entity]:
return SymbolSet.LAND_INSTALLATIONS, LandInstallationEntity.AIPORT_AIR_BASE
def capture(self, game: Game, events: GameUpdateEvents, for_player: bool) -> None:
raise RuntimeError("Off map control points cannot be captured")
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
yield from []
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
return 1000
def can_operate(self, aircraft: AircraftType) -> bool:
return True
@property
def required_aircraft_start_type(self) -> Optional[StartType]:
return StartType.IN_FLIGHT
@property
def heading(self) -> Heading:
return Heading.from_degrees(0)
def active_runway(
self,
theater: ConflictTheater,
conditions: Conditions,
dynamic_runways: Dict[str, RunwayData],
) -> RunwayData:
logging.warning("TODO: Off map spawns have no runways.")
return self.stub_runway_data()
@property
def runway_is_destroyable(self) -> bool:
return False
@property
def runway_status(self) -> RunwayStatus:
return RunwayStatus()
@property
def can_deploy_ground_units(self) -> bool:
return False
@property
def category(self) -> str:
return "offmap"
@property
def status(self) -> ControlPointStatus:
return ControlPointStatus.Functional
class Fob(ControlPoint, RadioFrequencyContainer, CTLD):
def __init__(
self,
name: str,
at: Point,
theater: ConflictTheater,
starts_blue: bool,
ctld_zones: Optional[List[Tuple[Point, float]]] = None,
) -> None:
super().__init__(
name, at, at, theater, starts_blue, cptype=ControlPointType.FOB
)
self.name = name
self.ctld_zones = ctld_zones
@property
def symbol_set_and_entity(self) -> tuple[SymbolSet, Entity]:
return SymbolSet.LAND_INSTALLATIONS, LandInstallationEntity.MILITARY_BASE
@property
def runway_is_destroyable(self) -> bool:
return False
def runway_is_operational(self) -> bool:
return self.has_helipads or self.has_ground_spawns
def active_runway(
self,
theater: ConflictTheater,
conditions: Conditions,
dynamic_runways: Dict[str, RunwayData],
) -> RunwayData:
logging.warning("TODO: FOBs have no runways.")
return self.stub_runway_data()
@property
def runway_status(self) -> RunwayStatus:
return RunwayStatus()
def mission_types(self, for_player: bool) -> Iterator[FlightType]:
from game.ato import FlightType
if not self.is_friendly(for_player):
yield FlightType.STRIKE
yield FlightType.AIR_ASSAULT
if self.total_aircraft_parking(ParkingType(True, True, True)):
yield FlightType.OCA_AIRCRAFT
else:
yield FlightType.AEWC
yield from super().mission_types(for_player)
def total_aircraft_parking(self, parking_type: ParkingType) -> int:
parking_slots = 0
if parking_type.include_rotary_wing:
parking_slots += (
len(self.helipads)
+ 4 * len(self.helipads_quad)
+ len(self.helipads_invisible)
)
try:
if parking_type.include_fixed_wing_stol:
parking_slots += len(self.ground_spawns)
parking_slots += len(self.ground_spawns_roadbase)
except AttributeError:
self.ground_spawns_roadbase = []
self.ground_spawns = []
return parking_slots
def can_operate(self, aircraft: AircraftType) -> bool:
parking_type = ParkingType().from_aircraft(
aircraft, self.coalition.game.settings.ground_start_ai_planes
)
if parking_type.include_rotary_wing and self.has_helipads:
return True
if parking_type.include_fixed_wing_stol and self.has_ground_spawns:
return True
return False
@property
def heading(self) -> Heading:
return Heading.from_degrees(0)
@property
def can_deploy_ground_units(self) -> bool:
return True
@property
def income_per_turn(self) -> int:
return 10
@property
def is_fob(self) -> bool:
"""
:return: Whether this control point is a FOB
"""
return True
@property
def category(self) -> str:
return "fob"
@property
def status(self) -> ControlPointStatus:
return ControlPointStatus.Functional