dcs_liberation/game/sim/missionsimulation.py
Dan Albert 656a98675e Rework sim status update to not need a thread.
Rather than polling at 60Hz (which may be faster than the tick rate,
wasting cycles; and also makes synchronization annoying), collect events
during the tick and emit them after (rate limited, pooling events until
it is time for another event to send).

This can be improved by paying attention to the aircraft update list,
which would allow us to avoid updating aircraft that don't have a status
change. To do that we need to be able to quickly lookup a FlightJs
matching a Flight through, and Flight isn't hashable.

We should also be removing dead events and de-duplicating. Currently
each flight has an update for every tick, but only the latest one
matters. Combat update events also don't matter if the same combat is
new in the update.

https://github.com/dcs-liberation/dcs_liberation/issues/1680
2021-12-23 17:46:24 -08:00

76 lines
2.4 KiB
Python

from __future__ import annotations
import json
from datetime import timedelta
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from game.debriefing import Debriefing
from game.missiongenerator import MissionGenerator
from game.unitmap import UnitMap
from .aircraftsimulation import AircraftSimulation
from .missionresultsprocessor import MissionResultsProcessor
if TYPE_CHECKING:
from game import Game
from .gameupdateevents import GameUpdateEvents
TICK = timedelta(seconds=1)
class SimulationAlreadyCompletedError(RuntimeError):
def __init__(self) -> None:
super().__init__("Simulation already completed")
class MissionSimulation:
def __init__(self, game: Game) -> None:
self.game = game
self.unit_map: Optional[UnitMap] = None
self.aircraft_simulation = AircraftSimulation(self.game)
self.completed = False
self.time = self.game.conditions.start_time
def begin_simulation(self) -> None:
self.time = self.game.conditions.start_time
self.aircraft_simulation.begin_simulation()
def tick(self, events: GameUpdateEvents) -> GameUpdateEvents:
self.time += TICK
if self.completed:
raise RuntimeError("Simulation already completed")
self.aircraft_simulation.on_game_tick(events, self.time, TICK)
self.completed = events.simulation_complete
return events
def generate_miz(self, output: Path) -> None:
self.unit_map = MissionGenerator(self.game, self.time).generate_miz(output)
def debrief_current_state(
self, state_path: Path, force_end: bool = False
) -> Debriefing:
if self.unit_map is None:
raise RuntimeError(
"Simulation has no unit map. Results processing began before a mission "
"was generated."
)
with state_path.open("r", encoding="utf-8") as state_file:
data = json.load(state_file)
if force_end:
data["mission_ended"] = True
return Debriefing(data, self.game, self.unit_map)
def process_results(self, debriefing: Debriefing) -> None:
if self.unit_map is None:
raise RuntimeError(
"Simulation has no unit map. Results processing began before a mission "
"was generated."
)
MissionResultsProcessor(self.game).commit(debriefing)
def finish(self) -> None:
self.unit_map = None