mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
Very basic display. Draws the engagement footprint for air-to-air combat, a line from the flight to the target for IP, and lines from SAMs to their target for air defense. https://github.com/dcs-liberation/dcs_liberation/issues/1680
82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
from .gamelooptimer import GameLoopTimer
|
|
from .gameupdatecallbacks import GameUpdateCallbacks
|
|
from .missionsimulation import MissionSimulation, SimulationAlreadyCompletedError
|
|
from .simspeedsetting import SimSpeedSetting
|
|
|
|
if TYPE_CHECKING:
|
|
from game import Game
|
|
from game.debriefing import Debriefing
|
|
|
|
|
|
class GameLoop:
|
|
def __init__(self, game: Game, callbacks: GameUpdateCallbacks) -> None:
|
|
self.game = game
|
|
self.callbacks = callbacks
|
|
self.timer = GameLoopTimer(self.tick)
|
|
self.sim = MissionSimulation(self.game, self.callbacks)
|
|
self.started = False
|
|
self.completed = False
|
|
|
|
@property
|
|
def current_time_in_sim(self) -> datetime:
|
|
return self.sim.time
|
|
|
|
@property
|
|
def elapsed_time(self) -> timedelta:
|
|
return self.sim.time - self.game.conditions.start_time
|
|
|
|
def start(self) -> None:
|
|
if self.started:
|
|
raise RuntimeError("Cannot start game loop because it has already started")
|
|
self.started = True
|
|
self.sim.begin_simulation()
|
|
|
|
def pause(self) -> None:
|
|
self.set_simulation_speed(SimSpeedSetting.PAUSED)
|
|
|
|
def set_simulation_speed(self, simulation_speed: SimSpeedSetting) -> None:
|
|
self.timer.stop()
|
|
if simulation_speed != self.timer.simulation_speed:
|
|
logging.info(f"Speed changed to {simulation_speed}")
|
|
if not self.started:
|
|
self.start()
|
|
self.timer.set_speed(simulation_speed)
|
|
|
|
def run_to_first_contact(self) -> None:
|
|
self.pause()
|
|
logging.info("Running sim to first contact")
|
|
while not self.completed:
|
|
self.tick()
|
|
|
|
def pause_and_generate_miz(self, output: Path) -> None:
|
|
self.pause()
|
|
self.sim.generate_miz(output)
|
|
|
|
def pause_and_debrief(self, state_path: Path, force_end: bool) -> Debriefing:
|
|
self.pause()
|
|
return self.sim.debrief_current_state(state_path, force_end)
|
|
|
|
def complete_with_results(self, debriefing: Debriefing) -> None:
|
|
self.pause()
|
|
self.sim.process_results(debriefing)
|
|
self.completed = True
|
|
|
|
def tick(self) -> None:
|
|
if not self.started:
|
|
raise RuntimeError("Attempted to tick game loop before initialization")
|
|
try:
|
|
self.completed = self.sim.tick()
|
|
if self.completed:
|
|
self.pause()
|
|
logging.info(f"Simulation completed at {self.sim.time}")
|
|
self.callbacks.on_simulation_complete()
|
|
except SimulationAlreadyCompletedError:
|
|
logging.exception("Attempted to tick already completed sim")
|