Halt sim on A2A contact.

https://github.com/dcs-liberation/dcs_liberation/issues/1681
This commit is contained in:
Dan Albert 2021-10-27 00:45:39 -07:00
parent 9839787b6d
commit b2cbf4b6f4
13 changed files with 202 additions and 64 deletions

View File

@ -10,6 +10,7 @@ from .flightstate import Uninitialized, FlightState
if TYPE_CHECKING: if TYPE_CHECKING:
from game.dcs.aircrafttype import AircraftType from game.dcs.aircrafttype import AircraftType
from game.sim.aircraftengagementzones import AircraftEngagementZones
from game.squadrons import Squadron, Pilot from game.squadrons import Squadron, Pilot
from game.theater import ControlPoint, MissionTarget from game.theater import ControlPoint, MissionTarget
from game.transfers import TransferOrder from game.transfers import TransferOrder
@ -137,5 +138,5 @@ class Flight:
def on_game_tick(self, time: datetime, duration: timedelta) -> None: def on_game_tick(self, time: datetime, duration: timedelta) -> None:
self.state.on_game_tick(time, duration) self.state.on_game_tick(time, duration)
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
return self.state.should_halt_sim() return self.state.should_halt_sim(enemy_aircraft_coverage)

View File

@ -2,13 +2,15 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import TYPE_CHECKING from typing import Optional, TYPE_CHECKING
from game.ato.starttype import StartType from game.ato.starttype import StartType
if TYPE_CHECKING: if TYPE_CHECKING:
from game.ato.flight import Flight from game.ato.flight import Flight
from game.settings import Settings from game.settings import Settings
from game.sim.aircraftengagementzones import AircraftEngagementZones
from game.threatzones import ThreatPoly
class FlightState(ABC): class FlightState(ABC):
@ -20,7 +22,7 @@ class FlightState(ABC):
def on_game_tick(self, time: datetime, duration: timedelta) -> None: def on_game_tick(self, time: datetime, duration: timedelta) -> None:
... ...
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
return False return False
@property @property
@ -32,3 +34,6 @@ class FlightState(ABC):
@abstractmethod @abstractmethod
def spawn_type(self) -> StartType: def spawn_type(self) -> StartType:
... ...
def a2a_commit_region(self) -> Optional[ThreatPoly]:
return None

View File

@ -8,14 +8,16 @@ from dcs import Point
from game.ato.flightstate import Completed from game.ato.flightstate import Completed
from game.ato.flightstate.flightstate import FlightState from game.ato.flightstate.flightstate import FlightState
from game.ato.flightwaypoint import FlightWaypoint
from game.ato.flightwaypointtype import FlightWaypointType from game.ato.flightwaypointtype import FlightWaypointType
from game.ato.starttype import StartType from game.ato.starttype import StartType
from game.utils import Distance, Speed, meters from game.utils import Distance, Speed, meters
from gen.flights.flightplan import LoiterFlightPlan, PatrollingFlightPlan from gen.flights.flightplan import LoiterFlightPlan
if TYPE_CHECKING: if TYPE_CHECKING:
from game.ato.flight import Flight from game.ato.flight import Flight
from game.settings import Settings from game.settings import Settings
from game.sim.aircraftengagementzones import AircraftEngagementZones
def lerp(v0: float, v1: float, t: float) -> float: def lerp(v0: float, v1: float, t: float) -> float:
@ -23,27 +25,32 @@ def lerp(v0: float, v1: float, t: float) -> float:
class InFlight(FlightState): class InFlight(FlightState):
def __init__(self, flight: Flight, settings: Settings) -> None: def __init__(self, flight: Flight, settings: Settings, waypoint_index: int) -> None:
super().__init__(flight, settings) super().__init__(flight, settings)
self.waypoints = flight.flight_plan.iter_waypoints() waypoints = self.flight.flight_plan.waypoints
# TODO: Error checking for stupid flight plans with fewer than two waypoints. self.waypoint_index = waypoint_index
self.current_waypoint = next(self.waypoints) self.current_waypoint = waypoints[self.waypoint_index]
self.next_waypoint = next(self.waypoints) # TODO: Error checking for flight plans without landing waypoints.
self.passed_waypoints = {self.current_waypoint} self.next_waypoint = waypoints[self.waypoint_index + 1]
self.total_time_to_next_waypoint = self._total_time() self.total_time_to_next_waypoint = self.travel_time_between_waypoints()
self.elapsed_time = timedelta() self.elapsed_time = timedelta()
def _total_time(self) -> timedelta: def has_passed_waypoint(self, waypoint: FlightWaypoint) -> bool:
if isinstance(self.flight.flight_plan, PatrollingFlightPlan): index = self.flight.flight_plan.waypoints.index(waypoint)
# Racetracks should remain at the first waypoint until the patrol ends so return index <= self.waypoint_index
# that the waypoint generation doesn't need to reverse the orbit direction.
if self.current_waypoint == self.flight.flight_plan.patrol_start:
return self.flight.flight_plan.patrol_duration
# Loiter time is already built into travel_time_between_waypoints. def travel_time_between_waypoints(self) -> timedelta:
return self.flight.flight_plan.travel_time_between_waypoints( travel_time = self.flight.flight_plan.travel_time_between_waypoints(
self.current_waypoint, self.next_waypoint self.current_waypoint, self.next_waypoint
) )
if self.current_waypoint.waypoint_type is FlightWaypointType.LOITER:
# Loiter time is already built into travel_time_between_waypoints. If we're
# at a loiter point but still a regular InFlight (Loiter overrides this
# method) that means we're traveling from the loiter point but no longer
# loitering.
assert isinstance(self.flight.flight_plan, LoiterFlightPlan)
travel_time -= self.flight.flight_plan.hold_duration
return travel_time
def progress(self) -> float: def progress(self) -> float:
return ( return (
@ -52,19 +59,6 @@ class InFlight(FlightState):
) )
def estimate_position(self) -> Point: def estimate_position(self) -> Point:
# TODO: Make Loiter and RaceTrack distinct FlightStates.
if isinstance(self.flight.flight_plan, PatrollingFlightPlan):
# Prevent spawning racetracks in the middle of a leg. For simplicity we
# always start the aircraft at the beginning of the racetrack.
if self.current_waypoint == self.flight.flight_plan.patrol_start:
return self.current_waypoint.position
elif isinstance(self.flight.flight_plan, LoiterFlightPlan):
if (
self.current_waypoint == self.flight.flight_plan.hold
and self.elapsed_time < self.flight.flight_plan.hold_duration
):
return self.current_waypoint.position
x0 = self.current_waypoint.position.x x0 = self.current_waypoint.position.x
y0 = self.current_waypoint.position.y y0 = self.current_waypoint.position.y
x1 = self.next_waypoint.position.x x1 = self.next_waypoint.position.x
@ -73,7 +67,6 @@ class InFlight(FlightState):
return Point(lerp(x0, x1, progress), lerp(y0, y1, progress)) return Point(lerp(x0, x1, progress), lerp(y0, y1, progress))
def estimate_altitude(self) -> tuple[Distance, str]: def estimate_altitude(self) -> tuple[Distance, str]:
# TODO: Should count progress as 0 until departing a hold.
return ( return (
meters( meters(
lerp( lerp(
@ -86,27 +79,32 @@ class InFlight(FlightState):
) )
def estimate_speed(self) -> Speed: def estimate_speed(self) -> Speed:
# TODO: Patrol/loiter speeds may be different.
return self.flight.flight_plan.speed_between_waypoints( return self.flight.flight_plan.speed_between_waypoints(
self.current_waypoint, self.next_waypoint self.current_waypoint, self.next_waypoint
) )
def update_waypoints(self) -> None: def next_waypoint_state(self) -> FlightState:
self.current_waypoint = self.next_waypoint from game.ato.flightstate.loiter import Loiter
self.passed_waypoints.add(self.current_waypoint) from game.ato.flightstate.racetrack import RaceTrack
try:
self.next_waypoint = next(self.waypoints) new_index = self.waypoint_index + 1
except StopIteration: if self.next_waypoint.waypoint_type is FlightWaypointType.LANDING_POINT:
self.flight.set_state(Completed(self.flight, self.settings)) return Completed(self.flight, self.settings)
self.total_time_to_next_waypoint = self._total_time() if self.next_waypoint.waypoint_type is FlightWaypointType.PATROL_TRACK:
self.elapsed_time = timedelta() return RaceTrack(self.flight, self.settings, new_index)
if self.next_waypoint.waypoint_type is FlightWaypointType.LOITER:
return Loiter(self.flight, self.settings, new_index)
return InFlight(self.flight, self.settings, new_index)
def advance_to_next_waypoint(self) -> None:
self.flight.set_state(self.next_waypoint_state())
def on_game_tick(self, time: datetime, duration: timedelta) -> None: def on_game_tick(self, time: datetime, duration: timedelta) -> None:
self.elapsed_time += duration self.elapsed_time += duration
if self.elapsed_time > self.total_time_to_next_waypoint: if self.elapsed_time > self.total_time_to_next_waypoint:
self.update_waypoints() self.advance_to_next_waypoint()
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
contact_types = { contact_types = {
FlightWaypointType.INGRESS_BAI, FlightWaypointType.INGRESS_BAI,
FlightWaypointType.INGRESS_CAS, FlightWaypointType.INGRESS_CAS,
@ -116,7 +114,7 @@ class InFlight(FlightState):
FlightWaypointType.INGRESS_SEAD, FlightWaypointType.INGRESS_SEAD,
FlightWaypointType.INGRESS_STRIKE, FlightWaypointType.INGRESS_STRIKE,
} }
# TODO: Check against enemy threats.
if self.current_waypoint.waypoint_type in contact_types: if self.current_waypoint.waypoint_type in contact_types:
logging.info( logging.info(
f"Interrupting simulation because {self.flight} has reached its " f"Interrupting simulation because {self.flight} has reached its "
@ -132,6 +130,13 @@ class InFlight(FlightState):
) )
return True return True
if enemy_aircraft_coverage.covers(self.estimate_position()):
logging.info(
f"Interrupting simulation because {self.flight} has encountered enemy "
"air-to-air patrol"
)
return True
return False return False
@property @property

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
from dcs import Point
from game.ato.flightstate import FlightState, InFlight
from game.utils import Distance, Speed
from gen.flights.flightplan import LoiterFlightPlan
if TYPE_CHECKING:
from game.ato.flight import Flight
from game.settings import Settings
class Loiter(InFlight):
def __init__(self, flight: Flight, settings: Settings, waypoint_index: int) -> None:
assert isinstance(flight.flight_plan, LoiterFlightPlan)
self.hold_duration = flight.flight_plan.hold_duration
super().__init__(flight, settings, waypoint_index)
def estimate_position(self) -> Point:
return self.current_waypoint.position
def estimate_altitude(self) -> tuple[Distance, str]:
return self.current_waypoint.alt, self.current_waypoint.alt_type
def estimate_speed(self) -> Speed:
return self.flight.unit_type.preferred_patrol_speed(self.estimate_altitude()[0])
def next_waypoint_state(self) -> FlightState:
# Do not automatically advance to the next waypoint. Just proceed from the
# current one with the normal flying state.
return InFlight(self.flight, self.settings, self.waypoint_index)
def travel_time_between_waypoints(self) -> timedelta:
return self.hold_duration

View File

@ -0,0 +1,49 @@
from __future__ import annotations
from datetime import timedelta
from typing import Optional, TYPE_CHECKING
from dcs import Point
from shapely.geometry import LineString, Point as ShapelyPoint
from game.ato import FlightType
from game.ato.flightstate import InFlight
from game.threatzones import ThreatPoly
from game.utils import Distance, Speed
from gen.flights.flightplan import PatrollingFlightPlan
if TYPE_CHECKING:
from game.ato.flight import Flight
from game.settings import Settings
class RaceTrack(InFlight):
def __init__(self, flight: Flight, settings: Settings, waypoint_index: int) -> None:
assert isinstance(flight.flight_plan, PatrollingFlightPlan)
self.patrol_duration = flight.flight_plan.patrol_duration
super().__init__(flight, settings, waypoint_index)
self.commit_region = LineString(
[
ShapelyPoint(self.current_waypoint.x, self.current_waypoint.y),
ShapelyPoint(self.next_waypoint.x, self.next_waypoint.y),
]
).buffer(flight.flight_plan.engagement_distance.meters)
def estimate_position(self) -> Point:
# Prevent spawning racetracks in the middle of a leg. For simplicity we
# always start the aircraft at the beginning of the racetrack.
return self.current_waypoint.position
def estimate_altitude(self) -> tuple[Distance, str]:
return self.current_waypoint.alt, self.current_waypoint.alt_type
def estimate_speed(self) -> Speed:
return self.flight.unit_type.preferred_patrol_speed(self.estimate_altitude()[0])
def travel_time_between_waypoints(self) -> timedelta:
return self.patrol_duration
def a2a_commit_region(self) -> Optional[ThreatPoly]:
if self.flight.flight_type in {FlightType.BARCAP, FlightType.TARCAP}:
return self.commit_region
return None

View File

@ -11,6 +11,7 @@ from ..starttype import StartType
if TYPE_CHECKING: if TYPE_CHECKING:
from game.ato.flight import Flight from game.ato.flight import Flight
from game.settings import Settings from game.settings import Settings
from game.sim.aircraftengagementzones import AircraftEngagementZones
class StartUp(FlightState): class StartUp(FlightState):
@ -31,7 +32,7 @@ class StartUp(FlightState):
def spawn_type(self) -> StartType: def spawn_type(self) -> StartType:
return StartType.COLD return StartType.COLD
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
if ( if (
self.flight.client_count > 0 self.flight.client_count > 0
and self.settings.player_mission_interrupts_sim_at is StartType.COLD and self.settings.player_mission_interrupts_sim_at is StartType.COLD

View File

@ -11,6 +11,7 @@ from ..starttype import StartType
if TYPE_CHECKING: if TYPE_CHECKING:
from game.ato.flight import Flight from game.ato.flight import Flight
from game.settings import Settings from game.settings import Settings
from game.sim.aircraftengagementzones import AircraftEngagementZones
class Takeoff(FlightState): class Takeoff(FlightState):
@ -22,7 +23,7 @@ class Takeoff(FlightState):
def on_game_tick(self, time: datetime, duration: timedelta) -> None: def on_game_tick(self, time: datetime, duration: timedelta) -> None:
if time < self.completion_time: if time < self.completion_time:
return return
self.flight.set_state(InFlight(self.flight, self.settings)) self.flight.set_state(InFlight(self.flight, self.settings, waypoint_index=0))
@property @property
def is_waiting_for_start(self) -> bool: def is_waiting_for_start(self) -> bool:
@ -32,7 +33,7 @@ class Takeoff(FlightState):
def spawn_type(self) -> StartType: def spawn_type(self) -> StartType:
return StartType.RUNWAY return StartType.RUNWAY
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
if ( if (
self.flight.client_count > 0 self.flight.client_count > 0
and self.settings.player_mission_interrupts_sim_at is StartType.RUNWAY and self.settings.player_mission_interrupts_sim_at is StartType.RUNWAY

View File

@ -11,6 +11,7 @@ from ..starttype import StartType
if TYPE_CHECKING: if TYPE_CHECKING:
from game.ato.flight import Flight from game.ato.flight import Flight
from game.settings import Settings from game.settings import Settings
from game.sim.aircraftengagementzones import AircraftEngagementZones
class Taxi(FlightState): class Taxi(FlightState):
@ -31,7 +32,7 @@ class Taxi(FlightState):
def spawn_type(self) -> StartType: def spawn_type(self) -> StartType:
return StartType.WARM return StartType.WARM
def should_halt_sim(self) -> bool: def should_halt_sim(self, enemy_aircraft_coverage: AircraftEngagementZones) -> bool:
if ( if (
self.flight.client_count > 0 self.flight.client_count > 0
and self.settings.player_mission_interrupts_sim_at is StartType.WARM and self.settings.player_mission_interrupts_sim_at is StartType.WARM

View File

@ -41,7 +41,7 @@ class WaitingForStart(FlightState):
elif self.start_type is StartType.RUNWAY: elif self.start_type is StartType.RUNWAY:
new_state = Takeoff(self.flight, self.settings, time) new_state = Takeoff(self.flight, self.settings, time)
else: else:
new_state = InFlight(self.flight, self.settings) new_state = InFlight(self.flight, self.settings, waypoint_index=0)
self.flight.set_state(new_state) self.flight.set_state(new_state)
@property @property

View File

@ -74,7 +74,7 @@ class WaypointGenerator:
# mission aircraft starting at a waypoint with tasks behave # mission aircraft starting at a waypoint with tasks behave
# correctly. # correctly.
self.builder_for_waypoint(point).add_tasks(self.group.points[0]) self.builder_for_waypoint(point).add_tasks(self.group.points[0])
if point not in self.flight.state.passed_waypoints: if not self.flight.state.has_passed_waypoint(point):
filtered_points.append(point) filtered_points.append(point)
else: else:
filtered_points.append(point) filtered_points.append(point)

View File

@ -290,7 +290,7 @@ class Settings:
"Fast forward mission to first contact (WIP)", "Fast forward mission to first contact (WIP)",
page=MISSION_GENERATOR_PAGE, page=MISSION_GENERATOR_PAGE,
section=GAMEPLAY_SECTION, section=GAMEPLAY_SECTION,
default=False, default=True,
detail=( detail=(
"If enabled, the mission will be generated at the point of first contact." "If enabled, the mission will be generated at the point of first contact."
), ),

View File

@ -0,0 +1,37 @@
from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from dcs import Point
from shapely.geometry import Point as ShapelyPoint
from shapely.ops import unary_union
from game.ato.flightstate import InFlight
if TYPE_CHECKING:
from game.ato import Flight
from game.ato.airtaaskingorder import AirTaskingOrder
from game.threatzones import ThreatPoly
class AircraftEngagementZones:
def __init__(self, threat_zones: ThreatPoly) -> None:
self.threat_zones = threat_zones
def covers(self, position: Point) -> bool:
return self.threat_zones.intersects(ShapelyPoint(position.x, position.y))
@classmethod
def from_ato(cls, ato: AirTaskingOrder) -> AircraftEngagementZones:
commit_regions = []
for package in ato.packages:
for flight in package.flights:
if (region := cls.commit_region(flight)) is not None:
commit_regions.append(region)
return AircraftEngagementZones(unary_union(commit_regions))
@classmethod
def commit_region(cls, flight: Flight) -> Optional[ThreatPoly]:
if isinstance(flight.state, InFlight):
return flight.state.a2a_commit_region()
return None

View File

@ -17,6 +17,7 @@ from game.ato.flightstate import (
WaitingForStart, WaitingForStart,
) )
from game.ato.starttype import StartType from game.ato.starttype import StartType
from game.sim.aircraftengagementzones import AircraftEngagementZones
from gen.flights.traveltime import TotEstimator from gen.flights.traveltime import TotEstimator
if TYPE_CHECKING: if TYPE_CHECKING:
@ -45,18 +46,17 @@ class AircraftSimulation:
return return
def tick(self) -> bool: def tick(self) -> bool:
interrupt_sim = False
for flight in self.iter_flights(): for flight in self.iter_flights():
flight.on_game_tick(self.time, TICK) flight.on_game_tick(self.time, TICK)
if flight.should_halt_sim():
interrupt_sim = True
# TODO: Check for SAM or A2A contact. # Finish updating all flights before computing engagement zones so that the new
# Generate an engagement poly for all active air-to-air aircraft per-coalition # positions are used.
# and compare those against aircraft positions. If any aircraft intersects an blue_a2a = AircraftEngagementZones.from_ato(self.game.blue.ato)
# enemy air-threat region, generate the mission. Also check against enemy SAM red_a2a = AircraftEngagementZones.from_ato(self.game.red.ato)
# zones. for flight in self.iter_flights():
return interrupt_sim if flight.should_halt_sim(red_a2a if flight.squadron.player else blue_a2a):
return True
return False
def set_initial_flight_states(self) -> None: def set_initial_flight_states(self) -> None:
now = self.game.conditions.start_time now = self.game.conditions.start_time
@ -78,7 +78,7 @@ class AircraftSimulation:
elif flight.start_type is StartType.RUNWAY: elif flight.start_type is StartType.RUNWAY:
flight.set_state(Takeoff(flight, self.game.settings, now)) flight.set_state(Takeoff(flight, self.game.settings, now))
elif flight.start_type is StartType.IN_FLIGHT: elif flight.start_type is StartType.IN_FLIGHT:
flight.set_state(InFlight(flight, self.game.settings)) flight.set_state(InFlight(flight, self.game.settings, waypoint_index=0))
else: else:
raise ValueError(f"Unknown start type {flight.start_type} for {flight}") raise ValueError(f"Unknown start type {flight.start_type} for {flight}")