mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
Add (very!) rough simulation of frozen combat.
There are some TODOs here but th behavior is flagged off by default. The biggest TODO here is that the time spent frozen is not simulated, so flights that are engaged by SAMs will unfreeze, move slightly, then re- freeze. https://github.com/dcs-liberation/dcs_liberation/issues/1680
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from typing_extensions import TYPE_CHECKING
|
||||
|
||||
from game.ato import Flight
|
||||
from game.ato.flightstate import (
|
||||
Navigating,
|
||||
StartUp,
|
||||
@@ -18,9 +18,11 @@ from game.ato.flightstate import (
|
||||
from game.ato.starttype import StartType
|
||||
from game.ato.traveltime import TotEstimator
|
||||
from .combat import CombatInitiator, FrozenCombat
|
||||
from .simulationresults import SimulationResults
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game import Game
|
||||
from game.ato import Flight
|
||||
from .gameupdateevents import GameUpdateEvents
|
||||
|
||||
|
||||
@@ -28,6 +30,7 @@ class AircraftSimulation:
|
||||
def __init__(self, game: Game) -> None:
|
||||
self.game = game
|
||||
self.combats: list[FrozenCombat] = []
|
||||
self.results = SimulationResults()
|
||||
|
||||
def begin_simulation(self) -> None:
|
||||
self.reset()
|
||||
@@ -36,6 +39,22 @@ class AircraftSimulation:
|
||||
def on_game_tick(
|
||||
self, events: GameUpdateEvents, time: datetime, duration: timedelta
|
||||
) -> None:
|
||||
if not self.game.settings.auto_resolve_combat and self.combats:
|
||||
logging.error(
|
||||
"Cannot resume simulation because aircraft are in combat and "
|
||||
"auto-resolve is disabled"
|
||||
)
|
||||
events.complete_simulation()
|
||||
return
|
||||
|
||||
still_active = []
|
||||
for combat in self.combats:
|
||||
if combat.on_game_tick(duration, self.results):
|
||||
events.end_combat(combat)
|
||||
else:
|
||||
still_active.append(combat)
|
||||
self.combats = still_active
|
||||
|
||||
for flight in self.iter_flights():
|
||||
flight.on_game_tick(events, time, duration)
|
||||
|
||||
@@ -49,6 +68,9 @@ class AircraftSimulation:
|
||||
events.complete_simulation()
|
||||
return
|
||||
|
||||
if not self.game.settings.auto_resolve_combat and self.combats:
|
||||
events.complete_simulation()
|
||||
|
||||
def set_initial_flight_states(self) -> None:
|
||||
now = self.game.conditions.start_time
|
||||
for flight in self.iter_flights():
|
||||
|
||||
@@ -1,20 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import random
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from shapely.ops import unary_union
|
||||
|
||||
from game.ato.flightstate import InFlight
|
||||
from game.ato.flightstate import InCombat, InFlight
|
||||
from game.utils import dcs_to_shapely_point
|
||||
from .joinablecombat import JoinableCombat
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.ato import Flight
|
||||
from ..simulationresults import SimulationResults
|
||||
|
||||
|
||||
class AirCombat(JoinableCombat):
|
||||
def __init__(self, flights: list[Flight]) -> None:
|
||||
super().__init__(flights)
|
||||
def __init__(self, freeze_duration: timedelta, flights: list[Flight]) -> None:
|
||||
super().__init__(freeze_duration, flights)
|
||||
footprints = []
|
||||
for flight in self.flights:
|
||||
if (region := flight.state.a2a_commit_region()) is not None:
|
||||
@@ -37,7 +41,7 @@ class AirCombat(JoinableCombat):
|
||||
return True
|
||||
return False
|
||||
|
||||
def because(self) -> str:
|
||||
def __str__(self) -> str:
|
||||
blue_flights = []
|
||||
red_flights = []
|
||||
for flight in self.flights:
|
||||
@@ -48,7 +52,46 @@ class AirCombat(JoinableCombat):
|
||||
|
||||
blue = ", ".join(blue_flights)
|
||||
red = ", ".join(red_flights)
|
||||
return f"of air combat {blue} vs {red}"
|
||||
return f"air combat {blue} vs {red}"
|
||||
|
||||
def because(self) -> str:
|
||||
return f"of {self}"
|
||||
|
||||
def describe(self) -> str:
|
||||
return f"in air-to-air combat"
|
||||
|
||||
def resolve(self, results: SimulationResults) -> None:
|
||||
blue = []
|
||||
red = []
|
||||
for flight in self.flights:
|
||||
if flight.squadron.player:
|
||||
blue.append(flight)
|
||||
else:
|
||||
red.append(flight)
|
||||
if len(blue) > len(red):
|
||||
winner = blue
|
||||
loser = red
|
||||
elif len(blue) < len(red):
|
||||
winner = red
|
||||
loser = blue
|
||||
elif random.random() >= 0.5:
|
||||
winner = blue
|
||||
loser = red
|
||||
else:
|
||||
winner = red
|
||||
loser = blue
|
||||
|
||||
if winner == blue:
|
||||
logging.debug(f"{self} auto-resolved as blue victory")
|
||||
else:
|
||||
logging.debug(f"{self} auto-resolved as red victory")
|
||||
|
||||
for flight in loser:
|
||||
flight.kill(results)
|
||||
|
||||
for flight in winner:
|
||||
assert isinstance(flight.state, InCombat)
|
||||
if random.random() / flight.count >= 0.5:
|
||||
flight.kill(results)
|
||||
else:
|
||||
flight.state.exit_combat()
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .frozencombat import FrozenCombat
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.ato import Flight
|
||||
from ..simulationresults import SimulationResults
|
||||
|
||||
|
||||
class AtIp(FrozenCombat):
|
||||
def __init__(self, flight: Flight) -> None:
|
||||
super().__init__()
|
||||
def __init__(self, freeze_duration: timedelta, flight: Flight) -> None:
|
||||
super().__init__(freeze_duration)
|
||||
self.flight = flight
|
||||
|
||||
def because(self) -> str:
|
||||
@@ -22,3 +25,9 @@ class AtIp(FrozenCombat):
|
||||
|
||||
def iter_flights(self) -> Iterator[Flight]:
|
||||
yield self.flight
|
||||
|
||||
def resolve(self, results: SimulationResults) -> None:
|
||||
logging.debug(
|
||||
f"{self.flight} attack on {self.flight.package.target} auto-resolved with "
|
||||
"mission failure but no losses"
|
||||
)
|
||||
|
||||
@@ -3,9 +3,9 @@ from __future__ import annotations
|
||||
import itertools
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from datetime import timedelta
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
from game.ato.flightstate import InFlight
|
||||
from .aircombat import AirCombat
|
||||
from .aircraftengagementzones import AircraftEngagementZones
|
||||
from .atip import AtIp
|
||||
@@ -43,6 +43,9 @@ class CombatInitiator:
|
||||
# aircraft has entered combat it will not be rechecked later in the loop or on
|
||||
# another tick.
|
||||
for flight in self.iter_flights():
|
||||
if flight.state.in_combat:
|
||||
return
|
||||
|
||||
if flight.squadron.player:
|
||||
a2a = red_a2a
|
||||
own_a2a = blue_a2a
|
||||
@@ -66,7 +69,7 @@ class CombatInitiator:
|
||||
own_a2a.remove_flight(flight)
|
||||
self.events.update_combat(joined)
|
||||
elif (combat := self.check_flight_for_new_combat(flight, a2a, sam)) is not None:
|
||||
logging.info(f"Interrupting simulation because {combat.because()}")
|
||||
logging.info(f"Creating new combat because {combat.because()}")
|
||||
combat.update_flight_states()
|
||||
# Remove any preoccupied flights from the list of potential air-to-air
|
||||
# threats. This prevents BARCAPs (and other air-to-air types) from getting
|
||||
@@ -89,21 +92,23 @@ class CombatInitiator:
|
||||
def check_flight_for_new_combat(
|
||||
flight: Flight, a2a: AircraftEngagementZones, sam: SamEngagementZones
|
||||
) -> Optional[FrozenCombat]:
|
||||
if not isinstance(flight.state, InFlight):
|
||||
if not flight.state.in_flight:
|
||||
return None
|
||||
|
||||
if flight.state.is_at_ip:
|
||||
return AtIp(flight)
|
||||
return AtIp(timedelta(minutes=1), flight)
|
||||
|
||||
position = flight.state.estimate_position()
|
||||
|
||||
if flight.state.vulnerable_to_intercept and a2a.covers(position):
|
||||
flights = [flight]
|
||||
flights.extend(a2a.iter_intercepting_flights(position))
|
||||
return AirCombat(flights)
|
||||
return AirCombat(timedelta(minutes=1), flights)
|
||||
|
||||
if flight.state.vulnerable_to_sam and sam.covers(position):
|
||||
return DefendingSam(flight, list(sam.iter_threatening_sams(position)))
|
||||
return DefendingSam(
|
||||
timedelta(minutes=1), flight, list(sam.iter_threatening_sams(position))
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import random
|
||||
from collections.abc import Iterator
|
||||
from typing import Any, TYPE_CHECKING
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from game.ato.flightstate import InCombat
|
||||
from .frozencombat import FrozenCombat
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.ato import Flight
|
||||
from game.theater import TheaterGroundObject
|
||||
from ..simulationresults import SimulationResults
|
||||
|
||||
|
||||
class DefendingSam(FrozenCombat):
|
||||
def __init__(self, flight: Flight, air_defenses: list[TheaterGroundObject]) -> None:
|
||||
super().__init__()
|
||||
def __init__(
|
||||
self,
|
||||
freeze_duration: timedelta,
|
||||
flight: Flight,
|
||||
air_defenses: list[TheaterGroundObject],
|
||||
) -> None:
|
||||
super().__init__(freeze_duration)
|
||||
self.flight = flight
|
||||
self.air_defenses = air_defenses
|
||||
|
||||
@@ -25,3 +35,14 @@ class DefendingSam(FrozenCombat):
|
||||
|
||||
def iter_flights(self) -> Iterator[Flight]:
|
||||
yield self.flight
|
||||
|
||||
def resolve(self, results: SimulationResults) -> None:
|
||||
assert isinstance(self.flight.state, InCombat)
|
||||
if random.random() / self.flight.count >= 0.5:
|
||||
logging.debug(f"Air defense combat auto-resolved with {self.flight} lost")
|
||||
self.flight.kill(results)
|
||||
else:
|
||||
logging.debug(
|
||||
f"Air defense combat auto-resolved with {self.flight} surviving"
|
||||
)
|
||||
self.flight.state.exit_combat()
|
||||
|
||||
@@ -3,17 +3,32 @@ from __future__ import annotations
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterator
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from game.ato.flightstate import InCombat, InFlight
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.ato import Flight
|
||||
from ..simulationresults import SimulationResults
|
||||
|
||||
|
||||
class FrozenCombat(ABC):
|
||||
def __init__(self) -> None:
|
||||
def __init__(self, freeze_duration: timedelta) -> None:
|
||||
self.id = uuid.uuid4()
|
||||
self.freeze_duration = freeze_duration
|
||||
self.elapsed_time = timedelta()
|
||||
|
||||
def on_game_tick(self, duration: timedelta, results: SimulationResults) -> bool:
|
||||
self.elapsed_time += duration
|
||||
if self.elapsed_time >= self.freeze_duration:
|
||||
self.resolve(results)
|
||||
return True
|
||||
return False
|
||||
|
||||
@abstractmethod
|
||||
def resolve(self, results: SimulationResults) -> None:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def because(self) -> str:
|
||||
|
||||
@@ -2,8 +2,10 @@ from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterator
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from game.ato.flightstate import InCombat, InFlight
|
||||
from .frozencombat import FrozenCombat
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -11,8 +13,8 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
class JoinableCombat(FrozenCombat, ABC):
|
||||
def __init__(self, flights: list[Flight]) -> None:
|
||||
super().__init__()
|
||||
def __init__(self, freeze_duration: timedelta, flights: list[Flight]) -> None:
|
||||
super().__init__(freeze_duration)
|
||||
self.flights = flights
|
||||
|
||||
@abstractmethod
|
||||
@@ -20,7 +22,10 @@ class JoinableCombat(FrozenCombat, ABC):
|
||||
...
|
||||
|
||||
def join(self, flight: Flight) -> None:
|
||||
assert isinstance(flight.state, InFlight)
|
||||
assert not isinstance(flight.state, InCombat)
|
||||
self.flights.append(flight)
|
||||
flight.set_state(InCombat(flight.state, self))
|
||||
|
||||
def iter_flights(self) -> Iterator[Flight]:
|
||||
yield from self.flights
|
||||
|
||||
@@ -16,6 +16,7 @@ class GameUpdateEvents:
|
||||
simulation_complete = False
|
||||
new_combats: list[FrozenCombat] = field(default_factory=list)
|
||||
updated_combats: list[FrozenCombat] = field(default_factory=list)
|
||||
ended_combats: list[FrozenCombat] = field(default_factory=list)
|
||||
updated_flight_positions: list[tuple[Flight, Point]] = field(default_factory=list)
|
||||
navmesh_updates: set[bool] = field(default_factory=set)
|
||||
unculled_zones_updated: bool = False
|
||||
@@ -43,6 +44,9 @@ class GameUpdateEvents:
|
||||
self.updated_combats.append(combat)
|
||||
return self
|
||||
|
||||
def end_combat(self, combat: FrozenCombat) -> None:
|
||||
self.ended_combats.append(combat)
|
||||
|
||||
def update_flight_position(
|
||||
self, flight: Flight, new_position: Point
|
||||
) -> GameUpdateEvents:
|
||||
|
||||
@@ -62,7 +62,9 @@ class MissionSimulation:
|
||||
data = json.load(state_file)
|
||||
if force_end:
|
||||
data["mission_ended"] = True
|
||||
return Debriefing(data, self.game, self.unit_map)
|
||||
debriefing = Debriefing(data, self.game, self.unit_map)
|
||||
debriefing.merge_simulation_results(self.aircraft_simulation.results)
|
||||
return debriefing
|
||||
|
||||
def process_results(self, debriefing: Debriefing) -> None:
|
||||
if self.unit_map is None:
|
||||
|
||||
23
game/sim/simulationresults.py
Normal file
23
game/sim/simulationresults.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from game.unitmap import FlyingUnit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.ato import Flight
|
||||
from game.squadrons import Pilot
|
||||
|
||||
|
||||
# TODO: Serialize for bug reproducibility.
|
||||
# Any bugs filed that can only be reproduced with auto-resolved combat results will not
|
||||
# be reproducible since we cannot replay the auto-resolution that the player saw. We
|
||||
# need to be able to serialize this data so bug repro can include the auto-resolved
|
||||
# results.
|
||||
@dataclass
|
||||
class SimulationResults:
|
||||
air_losses: list[FlyingUnit] = field(default_factory=list)
|
||||
|
||||
def kill_pilot(self, flight: Flight, pilot: Pilot) -> None:
|
||||
self.air_losses.append(FlyingUnit(flight, pilot))
|
||||
Reference in New Issue
Block a user