Add frozen combat modelling.

This doesn't do anything yet, but sets up the data model handling for
frozen combat. The next step is to show combat in the map view, since
that will be helpful when debugging the step after that one: resolving
frozen combat.

This would benefit from caching the Shapely data for SAM threat zones.
Right now it's generating them once per tick and the stuttering is
visible at max speed.

https://github.com/dcs-liberation/dcs_liberation/issues/1680
This commit is contained in:
Dan Albert
2021-11-07 13:56:10 -08:00
parent ce4628b64f
commit fb10a8d28e
22 changed files with 473 additions and 110 deletions

View File

@@ -1,37 +0,0 @@
from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from dcs import Point
from shapely.geometry import Point as ShapelyPoint
from shapely.ops import unary_union
from game.ato.flightstate import InFlight
if TYPE_CHECKING:
from game.ato import Flight
from game.ato.airtaaskingorder import AirTaskingOrder
from game.threatzones import ThreatPoly
class AircraftEngagementZones:
def __init__(self, threat_zones: ThreatPoly) -> None:
self.threat_zones = threat_zones
def covers(self, position: Point) -> bool:
return self.threat_zones.intersects(ShapelyPoint(position.x, position.y))
@classmethod
def from_ato(cls, ato: AirTaskingOrder) -> AircraftEngagementZones:
commit_regions = []
for package in ato.packages:
for flight in package.flights:
if (region := cls.commit_region(flight)) is not None:
commit_regions.append(region)
return AircraftEngagementZones(unary_union(commit_regions))
@classmethod
def commit_region(cls, flight: Flight) -> Optional[ThreatPoly]:
if isinstance(flight.state, InFlight):
return flight.state.a2a_commit_region()
return None

View File

@@ -8,7 +8,7 @@ from typing_extensions import TYPE_CHECKING
from game.ato import Flight
from game.ato.flightstate import (
InFlight,
Navigating,
StartUp,
Takeoff,
Taxi,
@@ -16,8 +16,8 @@ from game.ato.flightstate import (
WaitingForStart,
)
from game.ato.starttype import StartType
from game.sim.aircraftengagementzones import AircraftEngagementZones
from gen.flights.traveltime import TotEstimator
from .combat import CombatInitiator, FrozenCombat
if TYPE_CHECKING:
from game import Game
@@ -26,6 +26,7 @@ if TYPE_CHECKING:
class AircraftSimulation:
def __init__(self, game: Game) -> None:
self.game = game
self.combats: list[FrozenCombat] = []
def begin_simulation(self) -> None:
self.reset()
@@ -35,12 +36,9 @@ class AircraftSimulation:
for flight in self.iter_flights():
flight.on_game_tick(time, duration)
# Finish updating all flights before computing engagement zones so that the new
# Finish updating all flights before checking for combat so that the new
# positions are used.
blue_a2a = AircraftEngagementZones.from_ato(self.game.blue.ato)
red_a2a = AircraftEngagementZones.from_ato(self.game.red.ato)
for flight in self.iter_flights():
flight.check_for_combat(red_a2a if flight.squadron.player else blue_a2a)
CombatInitiator(self.game, self.combats).update_active_combats()
# After updating all combat states, check for halts.
for flight in self.iter_flights():
@@ -68,7 +66,7 @@ class AircraftSimulation:
elif flight.start_type is StartType.RUNWAY:
flight.set_state(Takeoff(flight, self.game.settings, now))
elif flight.start_type is StartType.IN_FLIGHT:
flight.set_state(InFlight(flight, self.game.settings, waypoint_index=0))
flight.set_state(Navigating(flight, self.game.settings, waypoint_index=0))
else:
raise ValueError(f"Unknown start type {flight.start_type} for {flight}")

View File

@@ -0,0 +1,2 @@
from .combatinitiator import CombatInitiator
from .frozencombat import FrozenCombat

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from shapely.ops import unary_union
from game.ato.flightstate import InFlight
from game.utils import dcs_to_shapely_point
from .joinablecombat import JoinableCombat
if TYPE_CHECKING:
from game.ato import Flight
class AirCombat(JoinableCombat):
def __init__(self, flights: list[Flight]) -> None:
super().__init__(flights)
footprints = []
for flight in self.flights:
if (region := flight.state.a2a_commit_region()) is not None:
footprints.append(region)
self.footprint = unary_union(footprints)
def joinable_by(self, flight: Flight) -> bool:
if not flight.state.will_join_air_combat:
return False
if not isinstance(flight.state, InFlight):
raise NotImplementedError(
f"Only InFlight flights are expected to join air combat. {flight} is "
"not InFlight"
)
if self.footprint.intersects(
dcs_to_shapely_point(flight.state.estimate_position())
):
return True
return False
def because(self) -> str:
blue_flights = []
red_flights = []
for flight in self.flights:
if flight.squadron.player:
blue_flights.append(str(flight))
else:
red_flights.append(str(flight))
blue = ", ".join(blue_flights)
red = ", ".join(red_flights)
return f"of air combat {blue} vs {red}"
def describe(self) -> str:
return f"in air-to-air combat"

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
from collections.abc import Iterator
from typing import Optional, TYPE_CHECKING
from dcs import Point
from shapely.ops import unary_union
from game.utils import dcs_to_shapely_point
if TYPE_CHECKING:
from game.ato import Flight
from game.ato.airtaaskingorder import AirTaskingOrder
from game.threatzones import ThreatPoly
from game.sim.combat import FrozenCombat
class AircraftEngagementZones:
def __init__(self, individual_zones: dict[Flight, ThreatPoly]) -> None:
self.individual_zones = individual_zones
self.threat_zones = self._make_combined_zone()
def update_for_combat(self, combat: FrozenCombat) -> None:
for flight in combat.iter_flights():
try:
del self.individual_zones[flight]
except KeyError:
pass
self.threat_zones = self._make_combined_zone()
def remove_flight(self, flight: Flight) -> None:
try:
del self.individual_zones[flight]
except KeyError:
pass
self.threat_zones = self._make_combined_zone()
def _make_combined_zone(self) -> ThreatPoly:
return unary_union(self.individual_zones.values())
def covers(self, position: Point) -> bool:
return self.threat_zones.intersects(dcs_to_shapely_point(position))
def iter_intercepting_flights(self, position: Point) -> Iterator[Flight]:
for flight, zone in self.individual_zones.items():
if zone.intersects(dcs_to_shapely_point(position)):
yield flight
@classmethod
def from_ato(cls, ato: AirTaskingOrder) -> AircraftEngagementZones:
zones = {}
for package in ato.packages:
for flight in package.flights:
if (region := cls.commit_region(flight)) is not None:
zones[flight] = region
return AircraftEngagementZones(zones)
@classmethod
def commit_region(cls, flight: Flight) -> Optional[ThreatPoly]:
return flight.state.a2a_commit_region()

24
game/sim/combat/atip.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from collections.abc import Iterator
from typing import TYPE_CHECKING
from .frozencombat import FrozenCombat
if TYPE_CHECKING:
from game.ato import Flight
class AtIp(FrozenCombat):
def __init__(self, flight: Flight) -> None:
super().__init__()
self.flight = flight
def because(self) -> str:
return f"{self.flight} is at its IP"
def describe(self) -> str:
return f"at IP"
def iter_flights(self) -> Iterator[Flight]:
yield self.flight

View File

@@ -0,0 +1,109 @@
from __future__ import annotations
import itertools
import logging
from collections.abc import Iterator
from typing import Optional, TYPE_CHECKING
from game.ato.flightstate import InFlight
from .aircombat import AirCombat
from .aircraftengagementzones import AircraftEngagementZones
from .atip import AtIp
from .defendingsam import DefendingSam
from .joinablecombat import JoinableCombat
from .samengagementzones import SamEngagementZones
if TYPE_CHECKING:
from game import Game
from game.ato import Flight
from .frozencombat import FrozenCombat
class CombatInitiator:
def __init__(self, game: Game, combats: list[FrozenCombat]) -> None:
self.game = game
self.combats = combats
def update_active_combats(self) -> None:
blue_a2a = AircraftEngagementZones.from_ato(self.game.blue.ato)
red_a2a = AircraftEngagementZones.from_ato(self.game.red.ato)
blue_sam = SamEngagementZones.from_theater(self.game.theater, player=True)
red_sam = SamEngagementZones.from_theater(self.game.theater, player=False)
# Check each vulnerable flight to see if it has initiated combat. If any flight
# initiates combat, a single FrozenCombat will be created for all involved
# flights and the FlightState of each flight will be updated accordingly.
#
# There's some nuance to this behavior. Duplicate combats are avoided because
# InCombat flight states are not considered vulnerable. That means that once an
# aircraft has entered combat it will not be rechecked later in the loop or on
# another tick.
for flight in self.iter_flights():
if flight.squadron.player:
a2a = red_a2a
own_a2a = blue_a2a
sam = red_sam
else:
a2a = blue_a2a
own_a2a = red_a2a
sam = blue_sam
self.check_flight_for_combat(flight, a2a, own_a2a, sam)
def check_flight_for_combat(
self,
flight: Flight,
a2a: AircraftEngagementZones,
own_a2a: AircraftEngagementZones,
sam: SamEngagementZones,
) -> None:
if (joined := self.check_flight_for_joined_combat(flight)) is not None:
logging.info(f"{flight} is joining existing combat {joined}")
joined.join(flight)
own_a2a.remove_flight(flight)
elif (combat := self.check_flight_for_new_combat(flight, a2a, sam)) is not None:
logging.info(f"Interrupting simulation because {combat.because()}")
combat.update_flight_states()
# Remove any preoccupied flights from the list of potential air-to-air
# threats. This prevents BARCAPs (and other air-to-air types) from getting
# involved in multiple combats simultaneously. Additional air-to-air
# aircraft may join existing combats, but they will not create new combats.
a2a.update_for_combat(combat)
own_a2a.update_for_combat(combat)
self.combats.append(combat)
def check_flight_for_joined_combat(
self, flight: Flight
) -> Optional[JoinableCombat]:
for combat in self.combats:
if isinstance(combat, JoinableCombat) and combat.joinable_by(flight):
return combat
return None
@staticmethod
def check_flight_for_new_combat(
flight: Flight, a2a: AircraftEngagementZones, sam: SamEngagementZones
) -> Optional[FrozenCombat]:
if not isinstance(flight.state, InFlight):
return None
if flight.state.is_at_ip:
return AtIp(flight)
position = flight.state.estimate_position()
if flight.state.vulnerable_to_intercept and a2a.covers(position):
flights = [flight]
flights.extend(a2a.iter_intercepting_flights(position))
return AirCombat(flights)
if flight.state.vulnerable_to_sam and sam.covers(position):
return DefendingSam(flight, list(sam.iter_threatening_sams(position)))
return None
def iter_flights(self) -> Iterator[Flight]:
packages = itertools.chain(
self.game.blue.ato.packages, self.game.red.ato.packages
)
for package in packages:
yield from package.flights

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from collections.abc import Iterator
from typing import Any, TYPE_CHECKING
from .frozencombat import FrozenCombat
if TYPE_CHECKING:
from game.ato import Flight
from game.theater import TheaterGroundObject
class DefendingSam(FrozenCombat):
def __init__(
self, flight: Flight, air_defenses: list[TheaterGroundObject[Any]]
) -> None:
super().__init__()
self.flight = flight
self.air_defenses = air_defenses
def because(self) -> str:
sams = ", ".join(str(d) for d in self.air_defenses)
return f"{self.flight} is engaged by enemy air defenses: {sams}"
def describe(self) -> str:
return f"engaged by enemy air defenses"
def iter_flights(self) -> Iterator[Flight]:
yield self.flight

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterator
from typing import TYPE_CHECKING
from game.ato.flightstate import InCombat, InFlight
if TYPE_CHECKING:
from game.ato import Flight
class FrozenCombat(ABC):
@abstractmethod
def because(self) -> str:
...
@abstractmethod
def describe(self) -> str:
...
@abstractmethod
def iter_flights(self) -> Iterator[Flight]:
...
def update_flight_states(self) -> None:
for flight in self.iter_flights():
if not isinstance(flight.state, InFlight):
raise RuntimeError(
f"Found non in-flight aircraft engaged in combat: {flight}"
)
flight.set_state(InCombat(flight.state, self))

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterator
from typing import TYPE_CHECKING
from .frozencombat import FrozenCombat
if TYPE_CHECKING:
from game.ato import Flight
class JoinableCombat(FrozenCombat, ABC):
def __init__(self, flights: list[Flight]) -> None:
self.flights = flights
@abstractmethod
def joinable_by(self, flight: Flight) -> bool:
...
def join(self, flight: Flight) -> None:
self.flights.append(flight)
def iter_flights(self) -> Iterator[Flight]:
yield from self.flights

View File

@@ -0,0 +1,51 @@
from __future__ import annotations
from collections.abc import Iterator
from typing import Any, Optional, TYPE_CHECKING
from dcs import Point
from shapely.ops import unary_union
from game.utils import dcs_to_shapely_point, meters
if TYPE_CHECKING:
from game.theater import ConflictTheater, TheaterGroundObject
from game.threatzones import ThreatPoly
class SamEngagementZones:
def __init__(
self,
threat_zones: ThreatPoly,
individual_zones: list[tuple[TheaterGroundObject[Any], ThreatPoly]],
) -> None:
self.threat_zones = threat_zones
self.individual_zones = individual_zones
def covers(self, position: Point) -> bool:
return self.threat_zones.intersects(dcs_to_shapely_point(position))
def iter_threatening_sams(
self, position: Point
) -> Iterator[TheaterGroundObject[Any]]:
for tgo, zone in self.individual_zones:
if zone.intersects(dcs_to_shapely_point(position)):
yield tgo
@classmethod
def from_theater(cls, theater: ConflictTheater, player: bool) -> SamEngagementZones:
commit_regions = []
individual_zones = []
for cp in theater.control_points_for(player):
for tgo in cp.connected_objectives:
if (region := cls.threat_region(tgo)) is not None:
commit_regions.append(region)
individual_zones.append((tgo, region))
return SamEngagementZones(unary_union(commit_regions), individual_zones)
@classmethod
def threat_region(cls, tgo: TheaterGroundObject[Any]) -> Optional[ThreatPoly]:
threat_range = tgo.max_threat_range()
if threat_range <= meters(0):
return None
return dcs_to_shapely_point(tgo.position).buffer(threat_range.meters)