Dan Albert 87441b8939 Formalize waypoint actions.
Create a WaypointAction class that defines the actions taken at a
waypoint. These will often map one-to-one with DCS waypoint actions but
can also be higher level and generate multiple actions. Once everything
has migrated all waypoint-type-specific behaviors of
PydcsWaypointBuilder will be gone, and it'll be easier to keep the sim
behaviors in sync with the mission generator behaviors.

For now only hold has been migrated. This is actually probably the most
complicated action we have (starting with this may have been a mistake,
but it did find all the rough edges quickly) since it affects waypoint
timings and flight position during simulation. That part isn't handled
as neatly as I'd like because the FlightState still has to special case
LOITER points to avoid simulating the wrong waypoint position. At some
point we should probably start tracking real positions in FlightState,
and when we do that will be solved.
2023-08-13 12:43:59 -07:00

170 lines
6.1 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from dcs import Point
from dcs.task import Task
from game.ato.flightstate import Completed
from game.ato.flightstate.actionstate import ActionState
from game.ato.flightstate.flightstate import FlightState
from game.ato.flightwaypoint import FlightWaypoint
from game.ato.flightwaypointtype import FlightWaypointType
from game.ato.starttype import StartType
from game.utils import Distance, LBS_TO_KG, Speed, pairwise
if TYPE_CHECKING:
from game.ato.flight import Flight
from game.settings import Settings
from game.sim.gameupdateevents import GameUpdateEvents
class InFlight(FlightState, ABC):
def __init__(
self,
flight: Flight,
settings: Settings,
waypoint_index: int,
has_aborted: bool = False,
) -> None:
super().__init__(flight, settings)
waypoints = self.flight.flight_plan.waypoints
self.waypoint_index = waypoint_index
self.has_aborted = has_aborted
self.current_waypoint = waypoints[self.waypoint_index]
# TODO: Error checking for flight plans without landing waypoints.
self.next_waypoint = waypoints[self.waypoint_index + 1]
self.total_time_to_next_waypoint = self.travel_time_between_waypoints()
self.elapsed_time = timedelta()
self.current_waypoint_elapsed = False
self.pending_actions: deque[ActionState] = deque(
ActionState(a) for a in self.current_waypoint.actions
)
@property
def current_action(self) -> ActionState | None:
if self.pending_actions:
return self.pending_actions[0]
return None
@property
def cancelable(self) -> bool:
return False
@property
def in_flight(self) -> bool:
return True
def has_passed_waypoint(self, waypoint: FlightWaypoint) -> bool:
index = self.flight.flight_plan.waypoints.index(waypoint)
return index <= self.waypoint_index
def travel_time_between_waypoints(self) -> timedelta:
return self.flight.flight_plan.travel_time_between_waypoints(
self.current_waypoint, self.next_waypoint
)
@abstractmethod
def estimate_position(self) -> Point:
...
@abstractmethod
def estimate_altitude(self) -> tuple[Distance, str]:
...
@abstractmethod
def estimate_speed(self) -> Speed:
...
def estimate_fuel_at_current_waypoint(self) -> float:
initial_fuel = super().estimate_fuel()
if self.flight.unit_type.fuel_consumption is None:
return initial_fuel
initial_fuel -= self.flight.unit_type.fuel_consumption.taxi * LBS_TO_KG
waypoints = self.flight.flight_plan.waypoints[: self.waypoint_index + 1]
for a, b in pairwise(waypoints[:-1]):
consumption = self.flight.flight_plan.fuel_consumption_between_points(a, b)
assert consumption is not None
initial_fuel -= consumption * LBS_TO_KG
return initial_fuel
def next_waypoint_state(self) -> FlightState:
from .racetrack import RaceTrack
from .navigating import Navigating
new_index = self.waypoint_index + 1
if self.next_waypoint.waypoint_type is FlightWaypointType.LANDING_POINT:
return Completed(self.flight, self.settings)
if self.next_waypoint.waypoint_type is FlightWaypointType.PATROL_TRACK:
return RaceTrack(self.flight, self.settings, new_index)
return Navigating(self.flight, self.settings, new_index)
def advance_to_next_waypoint(self) -> FlightState:
new_state = self.next_waypoint_state()
self.flight.set_state(new_state)
self.current_waypoint_elapsed = True
return new_state
def on_game_tick(
self, events: GameUpdateEvents, time: datetime, duration: timedelta
) -> None:
if (action := self.current_action) is not None:
action.on_game_tick(time, duration)
if action.is_finished():
self.pending_actions.popleft()
return
self.elapsed_time += duration
if self.elapsed_time > self.total_time_to_next_waypoint:
new_state = self.advance_to_next_waypoint()
# Roll over any extra time to the next state. We don't need to loop here
# even if we've passed more than one waypoint because the new state will do
# the same. There is a small gap here where we only do that for other *in
# flight* states. We don't need to tick combat states (combat is ticked
# separately) or completed states at all, so the only states that might be
# under-ticked are the pre-takeoff states, where it's not really that
# critical if we under-simulate them by the tick period or less. The tick
# period at time of writing is one second. Not enough to throw off ground
# ops, but at 600 knots we'd be getting the position wrong by up to 1000
# feet.
rollover = self.elapsed_time - self.total_time_to_next_waypoint
new_state.on_game_tick(events, time, rollover)
@property
def is_at_ip(self) -> bool:
contact_types = {
FlightWaypointType.INGRESS_BAI,
FlightWaypointType.INGRESS_CAS,
FlightWaypointType.INGRESS_DEAD,
FlightWaypointType.INGRESS_OCA_AIRCRAFT,
FlightWaypointType.INGRESS_OCA_RUNWAY,
FlightWaypointType.INGRESS_SEAD,
FlightWaypointType.INGRESS_STRIKE,
FlightWaypointType.INGRESS_AIR_ASSAULT,
}
return self.current_waypoint.waypoint_type in contact_types
@property
def vulnerable_to_intercept(self) -> bool:
return True
@property
def vulnerable_to_sam(self) -> bool:
return True
@property
def will_join_air_combat(self) -> bool:
return self.flight.flight_type.is_air_to_air
@property
def is_waiting_for_start(self) -> bool:
return False
@property
def spawn_type(self) -> StartType:
return StartType.IN_FLIGHT