Add play/pause features to the sim.

There's no UI feedback for this yet other than the log messages.

https://github.com/dcs-liberation/dcs_liberation/issues/1704
This commit is contained in:
Dan Albert
2021-10-31 18:07:42 -07:00
parent 748d80ff3b
commit 87bf3110c8
13 changed files with 369 additions and 112 deletions

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import itertools
import logging
from collections import Iterator
from datetime import datetime, timedelta
@@ -24,30 +23,17 @@ if TYPE_CHECKING:
from game import Game
TICK = timedelta(seconds=1)
class AircraftSimulation:
def __init__(self, game: Game) -> None:
self.game = game
self.time = self.game.conditions.start_time
def run(self) -> None:
def begin_simulation(self) -> None:
self.reset()
self.set_initial_flight_states()
if self.game.settings.fast_forward_to_first_contact:
self.simulate_until_first_contact()
logging.info(f"Mission simulation completed at {self.time}")
def simulate_until_first_contact(self) -> None:
while True:
self.time += TICK
if self.tick():
return
def tick(self) -> bool:
def on_game_tick(self, time: datetime, duration: timedelta) -> bool:
for flight in self.iter_flights():
flight.on_game_tick(self.time, TICK)
flight.on_game_tick(time, duration)
# Finish updating all flights before computing engagement zones so that the new
# positions are used.
@@ -83,7 +69,6 @@ class AircraftSimulation:
raise ValueError(f"Unknown start type {flight.start_type} for {flight}")
def reset(self) -> None:
self.time = self.game.conditions.start_time
for flight in self.iter_flights():
flight.set_state(Uninitialized(flight, self.game.settings))

73
game/sim/gameloop.py Normal file
View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import logging
from pathlib import Path
from typing import Callable, TYPE_CHECKING
from .gamelooptimer import GameLoopTimer
from .missionsimulation import MissionSimulation, SimulationAlreadyCompletedError
from .simspeedsetting import SimSpeedSetting
if TYPE_CHECKING:
from game import Game
from game.debriefing import Debriefing
class GameLoop:
def __init__(self, game: Game, on_complete: Callable[[], None]) -> None:
self.game = game
self.on_complete = on_complete
self.timer = GameLoopTimer(self.tick)
self.sim = MissionSimulation(self.game)
self.started = False
self.completed = False
def start(self) -> None:
if self.started:
raise RuntimeError("Cannot start game loop because it has already started")
self.started = True
self.sim.begin_simulation()
def pause(self) -> None:
self.set_simulation_speed(SimSpeedSetting.PAUSED)
def set_simulation_speed(self, simulation_speed: SimSpeedSetting) -> None:
self.timer.stop()
if simulation_speed != self.timer.simulation_speed:
logging.info(f"Speed changed to {simulation_speed}")
if not self.started:
self.start()
self.timer.set_speed(simulation_speed)
def run_to_first_contact(self) -> None:
self.pause()
logging.info("Running sim to first contact")
while not self.completed:
self.tick()
def pause_and_generate_miz(self, output: Path) -> None:
self.pause()
self.sim.generate_miz(output)
def pause_and_debrief(self, state_path: Path, force_end: bool) -> Debriefing:
self.pause()
return self.sim.debrief_current_state(state_path, force_end)
def complete_with_results(self, debriefing: Debriefing) -> None:
self.pause()
self.sim.process_results(debriefing)
self.completed = True
def tick(self) -> None:
if not self.started:
raise RuntimeError("Attempted to tick game loop before initialization")
try:
self.completed = self.sim.tick()
if self.completed:
self.pause()
logging.info(f"Simulation completed at {self.sim.time}")
self.on_complete()
else:
logging.info(f"Simulation continued at {self.sim.time}")
except SimulationAlreadyCompletedError:
logging.exception("Attempted to tick already completed sim")

40
game/sim/gamelooptimer.py Normal file
View File

@@ -0,0 +1,40 @@
from threading import Lock, Timer
from typing import Callable, Optional
from .simspeedsetting import SimSpeedSetting
class GameLoopTimer:
def __init__(self, callback: Callable[[], None]) -> None:
self.callback = callback
self.simulation_speed = SimSpeedSetting.PAUSED
self._timer: Optional[Timer] = None
self._timer_lock = Lock()
def set_speed(self, simulation_speed: SimSpeedSetting) -> None:
with self._timer_lock:
self._stop()
self.simulation_speed = simulation_speed
self._recreate_timer()
def stop(self) -> None:
with self._timer_lock:
self._stop()
def _stop(self) -> None:
if self._timer is not None:
self._timer.cancel()
def _recreate_timer(self) -> None:
self._stop()
factor = self.simulation_speed.speed_factor
if not factor:
self._timer = None
return None
self._timer = Timer(1 / factor, self._tick)
self._timer.start()
def _tick(self) -> None:
self.callback()
with self._timer_lock:
self._recreate_timer()

View File

@@ -1,29 +1,45 @@
from __future__ import annotations
import json
from datetime import timedelta
from pathlib import Path
from typing import Optional, TYPE_CHECKING
from game.debriefing import Debriefing
from game.missiongenerator import MissionGenerator
from game.sim.aircraftsimulation import AircraftSimulation
from game.sim.missionresultsprocessor import MissionResultsProcessor
from game.unitmap import UnitMap
from .aircraftsimulation import AircraftSimulation
from .missionresultsprocessor import MissionResultsProcessor
if TYPE_CHECKING:
from game import Game
TICK = timedelta(seconds=1)
class SimulationAlreadyCompletedError(RuntimeError):
def __init__(self) -> None:
super().__init__("Simulation already completed")
class MissionSimulation:
def __init__(self, game: Game) -> None:
self.game = game
self.unit_map: Optional[UnitMap] = None
self.time = game.conditions.start_time
self.aircraft_simulation = AircraftSimulation(self.game)
self.completed = False
self.time = self.game.conditions.start_time
def run(self) -> None:
sim = AircraftSimulation(self.game)
sim.run()
self.time = sim.time
def begin_simulation(self) -> None:
self.aircraft_simulation.begin_simulation()
def tick(self) -> bool:
self.time += TICK
if self.completed:
raise RuntimeError("Simulation already completed")
self.completed = self.aircraft_simulation.on_game_tick(self.time, TICK)
return self.completed
def generate_miz(self, output: Path) -> None:
self.unit_map = MissionGenerator(self.game, self.time).generate_miz(output)

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from enum import Enum, unique
@unique
class SimSpeedSetting(Enum):
PAUSED = (0, "Paused")
X1 = (1, "1x")
X2 = (2, "2x")
X5 = (5, "5x")
X10 = (10, "10x")
X100 = (100, "100x")
X1000 = (1000, "1000x")
def __init__(self, speed_factor: int, text: str) -> None:
self.speed_factor = speed_factor
self.text = text
def __str__(self) -> str:
return self.text
@classmethod
def from_factor(cls, speed_factor: int) -> SimSpeedSetting:
for setting in SimSpeedSetting:
if setting.speed_factor == speed_factor:
return setting
raise ValueError