dcs-retribution/qt_ui/simcontroller.py
Dan Albert 515efd0598 Draw frozen combat on the map.
Very basic display. Draws the engagement footprint for air-to-air
combat, a line from the flight to the target for IP, and lines from SAMs
to their target for air defense.

https://github.com/dcs-liberation/dcs_liberation/issues/1680
2021-12-21 14:56:25 -08:00

118 lines
4.0 KiB
Python

from __future__ import annotations
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Callable, Optional, TYPE_CHECKING
from PySide2.QtCore import QObject, Signal
from game.polldebriefingfilethread import PollDebriefingFileThread
from game.sim.combat import FrozenCombat
from game.sim.gameloop import GameLoop
from game.sim.gameupdatecallbacks import GameUpdateCallbacks
from game.sim.simspeedsetting import SimSpeedSetting
from qt_ui.simupdatethread import SimUpdateThread
if TYPE_CHECKING:
from game import Game
from game.debriefing import Debriefing
class SimController(QObject):
sim_update = Signal()
sim_speed_reset = Signal(SimSpeedSetting)
simulation_complete = Signal()
on_add_combat = Signal(FrozenCombat)
on_combat_changed = Signal(FrozenCombat)
def __init__(self, game: Optional[Game]) -> None:
super().__init__()
self.game_loop: Optional[GameLoop] = None
self.recreate_game_loop(game)
self.started = False
self._sim_update_thread = SimUpdateThread(self.sim_update.emit)
self._sim_update_thread.start()
@property
def completed(self) -> bool:
return self.game_loop.completed
@property
def current_time_in_sim(self) -> Optional[datetime]:
if self.game_loop is None:
return None
return self.game_loop.current_time_in_sim
@property
def elapsed_time(self) -> timedelta:
if self.game_loop is None:
return timedelta()
return self.game_loop.elapsed_time
def set_game(self, game: Optional[Game]) -> None:
self.recreate_game_loop(game)
self.sim_speed_reset.emit(SimSpeedSetting.PAUSED)
def shut_down(self) -> None:
self._sim_update_thread.stop()
def recreate_game_loop(self, game: Optional[Game]) -> None:
if self.game_loop is not None:
self._sim_update_thread.on_sim_pause()
self.game_loop.pause()
self.game_loop = None
if game is not None:
self.game_loop = GameLoop(
game,
GameUpdateCallbacks(
self.on_simulation_complete,
self.on_add_combat.emit,
self.on_combat_changed.emit,
),
)
self.started = False
def set_simulation_speed(self, simulation_speed: SimSpeedSetting) -> None:
if self.game_loop.completed and simulation_speed is not SimSpeedSetting.PAUSED:
logging.debug("Cannot unpause sim: already complete")
return
if not self.started and simulation_speed is not SimSpeedSetting.PAUSED:
self.game_loop.start()
self.started = True
self.game_loop.set_simulation_speed(simulation_speed)
if simulation_speed is SimSpeedSetting.PAUSED:
self._sim_update_thread.on_sim_pause()
else:
self._sim_update_thread.on_sim_unpause()
def run_to_first_contact(self) -> None:
self.game_loop.run_to_first_contact()
self.sim_update.emit()
def generate_miz(self, output: Path) -> None:
self._sim_update_thread.on_sim_pause()
self.game_loop.pause_and_generate_miz(output)
def wait_for_debriefing(
self, callback: Callable[[Debriefing], None]
) -> PollDebriefingFileThread:
thread = PollDebriefingFileThread(callback, self.game_loop.sim)
thread.start()
return thread
def debrief_current_state(
self, state_path: Path, force_end: bool = False
) -> Debriefing:
self._sim_update_thread.on_sim_pause()
return self.game_loop.pause_and_debrief(state_path, force_end)
def process_results(self, debriefing: Debriefing) -> None:
self._sim_update_thread.on_sim_pause()
return self.game_loop.complete_with_results(debriefing)
def on_simulation_complete(self) -> None:
logging.debug("Simulation complete")
self._sim_update_thread.on_sim_pause()
self.simulation_complete.emit()