Dan Albert 656a98675e Rework sim status update to not need a thread.
Rather than polling at 60Hz (which may be faster than the tick rate,
wasting cycles; and also makes synchronization annoying), collect events
during the tick and emit them after (rate limited, pooling events until
it is time for another event to send).

This can be improved by paying attention to the aircraft update list,
which would allow us to avoid updating aircraft that don't have a status
change. To do that we need to be able to quickly lookup a FlightJs
matching a Flight through, and Flight isn't hashable.

We should also be removing dead events and de-duplicating. Currently
each flight has an update for every tick, but only the latest one
matters. Combat update events also don't matter if the same combat is
new in the update.

https://github.com/dcs-liberation/dcs_liberation/issues/1680
2021-12-23 17:46:24 -08:00

135 lines
5.0 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from dcs import Point
from game.ato.flightstate import Completed
from game.ato.flightstate.flightstate import FlightState
from game.ato.flightwaypoint import FlightWaypoint
from game.ato.flightwaypointtype import FlightWaypointType
from game.ato.starttype import StartType
from game.utils import Distance, LBS_TO_KG, Speed, pairwise
from gen.flights.flightplan import LoiterFlightPlan
if TYPE_CHECKING:
from game.ato.flight import Flight
from game.settings import Settings
from game.sim.gameupdateevents import GameUpdateEvents
class InFlight(FlightState, ABC):
def __init__(self, flight: Flight, settings: Settings, waypoint_index: int) -> None:
super().__init__(flight, settings)
waypoints = self.flight.flight_plan.waypoints
self.waypoint_index = waypoint_index
self.current_waypoint = waypoints[self.waypoint_index]
# TODO: Error checking for flight plans without landing waypoints.
self.next_waypoint = waypoints[self.waypoint_index + 1]
self.total_time_to_next_waypoint = self.travel_time_between_waypoints()
self.elapsed_time = timedelta()
def has_passed_waypoint(self, waypoint: FlightWaypoint) -> bool:
index = self.flight.flight_plan.waypoints.index(waypoint)
return index <= self.waypoint_index
def travel_time_between_waypoints(self) -> timedelta:
travel_time = self.flight.flight_plan.travel_time_between_waypoints(
self.current_waypoint, self.next_waypoint
)
if self.current_waypoint.waypoint_type is FlightWaypointType.LOITER:
# Loiter time is already built into travel_time_between_waypoints. If we're
# at a loiter point but still a regular InFlight (Loiter overrides this
# method) that means we're traveling from the loiter point but no longer
# loitering.
assert isinstance(self.flight.flight_plan, LoiterFlightPlan)
travel_time -= self.flight.flight_plan.hold_duration
return travel_time
@abstractmethod
def estimate_position(self) -> Point:
...
@abstractmethod
def estimate_altitude(self) -> tuple[Distance, str]:
...
@abstractmethod
def estimate_speed(self) -> Speed:
...
def estimate_fuel_at_current_waypoint(self) -> float:
initial_fuel = super().estimate_fuel()
if self.flight.unit_type.fuel_consumption is None:
return initial_fuel
initial_fuel -= self.flight.unit_type.fuel_consumption.taxi * LBS_TO_KG
waypoints = self.flight.flight_plan.waypoints[: self.waypoint_index + 1]
for a, b in pairwise(waypoints[:-1]):
consumption = self.flight.flight_plan.fuel_consumption_between_points(a, b)
assert consumption is not None
initial_fuel -= consumption * LBS_TO_KG
return initial_fuel
def next_waypoint_state(self) -> FlightState:
from .loiter import Loiter
from .racetrack import RaceTrack
from .navigating import Navigating
new_index = self.waypoint_index + 1
if self.next_waypoint.waypoint_type is FlightWaypointType.LANDING_POINT:
return Completed(self.flight, self.settings)
if self.next_waypoint.waypoint_type is FlightWaypointType.PATROL_TRACK:
return RaceTrack(self.flight, self.settings, new_index)
if self.next_waypoint.waypoint_type is FlightWaypointType.LOITER:
return Loiter(self.flight, self.settings, new_index)
return Navigating(self.flight, self.settings, new_index)
def advance_to_next_waypoint(self) -> None:
self.flight.set_state(self.next_waypoint_state())
def on_game_tick(
self, events: GameUpdateEvents, time: datetime, duration: timedelta
) -> None:
self.elapsed_time += duration
if self.elapsed_time > self.total_time_to_next_waypoint:
self.advance_to_next_waypoint()
@property
def is_at_ip(self) -> bool:
contact_types = {
FlightWaypointType.INGRESS_BAI,
FlightWaypointType.INGRESS_CAS,
FlightWaypointType.INGRESS_DEAD,
FlightWaypointType.INGRESS_OCA_AIRCRAFT,
FlightWaypointType.INGRESS_OCA_RUNWAY,
FlightWaypointType.INGRESS_SEAD,
FlightWaypointType.INGRESS_STRIKE,
}
return self.current_waypoint.waypoint_type in contact_types
@property
def vulnerable_to_intercept(self) -> bool:
return True
@property
def vulnerable_to_sam(self) -> bool:
return True
@property
def will_join_air_combat(self) -> bool:
return self.flight.flight_type.is_air_to_air
@property
def is_waiting_for_start(self) -> bool:
return False
@property
def spawn_type(self) -> StartType:
return StartType.IN_FLIGHT
@property
def description(self) -> str:
return f"Flying to {self.next_waypoint.name}"