mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
Copied three files as templates/inheritance for generating Pretense campaigns from Retribution campaigns:
- missiongenerator.py - aircraftgenerator.py - triggergenerator.py
This commit is contained in:
parent
65b2db0ece
commit
975471e942
296
game/pretense/pretenseaircraftgenerator.py
Normal file
296
game/pretense/pretenseaircraftgenerator.py
Normal file
@ -0,0 +1,296 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from functools import cached_property
|
||||
from typing import Any, Dict, List, TYPE_CHECKING, Tuple
|
||||
|
||||
from dcs import Point
|
||||
from dcs.action import AITaskPush
|
||||
from dcs.condition import FlagIsTrue, GroupDead, Or, FlagIsFalse
|
||||
from dcs.country import Country
|
||||
from dcs.mission import Mission
|
||||
from dcs.terrain.terrain import NoParkingSlotError
|
||||
from dcs.triggers import TriggerOnce, Event
|
||||
from dcs.unit import Skill
|
||||
from dcs.unitgroup import FlyingGroup, StaticGroup
|
||||
|
||||
from game.ato.airtaaskingorder import AirTaskingOrder
|
||||
from game.ato.flight import Flight
|
||||
from game.ato.flightstate import Completed, WaitingForStart
|
||||
from game.ato.flighttype import FlightType
|
||||
from game.ato.package import Package
|
||||
from game.ato.starttype import StartType
|
||||
from game.missiongenerator.lasercoderegistry import LaserCodeRegistry
|
||||
from game.missiongenerator.missiondata import MissionData
|
||||
from game.radio.radios import RadioRegistry
|
||||
from game.radio.tacan import TacanRegistry
|
||||
from game.runways import RunwayData
|
||||
from game.settings import Settings
|
||||
from game.theater.controlpoint import (
|
||||
Airfield,
|
||||
ControlPoint,
|
||||
Fob,
|
||||
)
|
||||
from game.unitmap import UnitMap
|
||||
from .aircraftpainter import AircraftPainter
|
||||
from .flightdata import FlightData
|
||||
from .flightgroupconfigurator import FlightGroupConfigurator
|
||||
from .flightgroupspawner import FlightGroupSpawner
|
||||
from ...data.weapons import WeaponType
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game import Game
|
||||
from game.squadrons import Squadron
|
||||
|
||||
|
||||
class AircraftGenerator:
|
||||
def __init__(
|
||||
self,
|
||||
mission: Mission,
|
||||
settings: Settings,
|
||||
game: Game,
|
||||
time: datetime,
|
||||
radio_registry: RadioRegistry,
|
||||
tacan_registry: TacanRegistry,
|
||||
laser_code_registry: LaserCodeRegistry,
|
||||
unit_map: UnitMap,
|
||||
mission_data: MissionData,
|
||||
helipads: dict[ControlPoint, list[StaticGroup]],
|
||||
ground_spawns_roadbase: dict[ControlPoint, list[Tuple[StaticGroup, Point]]],
|
||||
ground_spawns: dict[ControlPoint, list[Tuple[StaticGroup, Point]]],
|
||||
) -> None:
|
||||
self.mission = mission
|
||||
self.settings = settings
|
||||
self.game = game
|
||||
self.time = time
|
||||
self.radio_registry = radio_registry
|
||||
self.tacan_registy = tacan_registry
|
||||
self.laser_code_registry = laser_code_registry
|
||||
self.unit_map = unit_map
|
||||
self.flights: List[FlightData] = []
|
||||
self.mission_data = mission_data
|
||||
self.helipads = helipads
|
||||
self.ground_spawns_roadbase = ground_spawns_roadbase
|
||||
self.ground_spawns = ground_spawns
|
||||
|
||||
self.ewrj_package_dict: Dict[int, List[FlyingGroup[Any]]] = {}
|
||||
self.ewrj = settings.plugins.get("ewrj")
|
||||
self.need_ecm = settings.plugin_option("ewrj.ecm_required")
|
||||
|
||||
@cached_property
|
||||
def use_client(self) -> bool:
|
||||
"""True if Client should be used instead of Player."""
|
||||
blue_clients = self.client_slots_in_ato(self.game.blue.ato)
|
||||
red_clients = self.client_slots_in_ato(self.game.red.ato)
|
||||
return blue_clients + red_clients > 1
|
||||
|
||||
@staticmethod
|
||||
def client_slots_in_ato(ato: AirTaskingOrder) -> int:
|
||||
total = 0
|
||||
for package in ato.packages:
|
||||
for flight in package.flights:
|
||||
total += flight.client_count
|
||||
return total
|
||||
|
||||
def clear_parking_slots(self) -> None:
|
||||
for cp in self.game.theater.controlpoints:
|
||||
for parking_slot in cp.parking_slots:
|
||||
parking_slot.unit_id = None
|
||||
|
||||
def generate_flights(
|
||||
self,
|
||||
country: Country,
|
||||
ato: AirTaskingOrder,
|
||||
dynamic_runways: Dict[str, RunwayData],
|
||||
) -> None:
|
||||
"""Adds aircraft to the mission for every flight in the ATO.
|
||||
|
||||
Aircraft generation is done by walking the ATO and spawning each flight in turn.
|
||||
After the flight is generated the group is added to the UnitMap so aircraft
|
||||
deaths can be tracked.
|
||||
|
||||
Args:
|
||||
country: The country from the mission to use for this ATO.
|
||||
ato: The ATO to spawn aircraft for.
|
||||
dynamic_runways: Runway data for carriers and FARPs.
|
||||
"""
|
||||
self._reserve_frequencies_and_tacan(ato)
|
||||
|
||||
for package in reversed(sorted(ato.packages, key=lambda x: x.time_over_target)):
|
||||
logging.info(f"Generating package for target: {package.target.name}")
|
||||
if not package.flights:
|
||||
continue
|
||||
for flight in package.flights:
|
||||
if flight.alive:
|
||||
if not flight.squadron.location.runway_is_operational():
|
||||
logging.warning(
|
||||
f"Runway not operational, skipping flight: {flight.flight_type}"
|
||||
)
|
||||
flight.return_pilots_and_aircraft()
|
||||
continue
|
||||
logging.info(f"Generating flight: {flight.unit_type}")
|
||||
group = self.create_and_configure_flight(
|
||||
flight, country, dynamic_runways
|
||||
)
|
||||
self.unit_map.add_aircraft(group, flight)
|
||||
if (
|
||||
package.primary_flight is not None
|
||||
and package.primary_flight.flight_plan.is_formation(
|
||||
package.primary_flight.flight_plan
|
||||
)
|
||||
):
|
||||
splittrigger = TriggerOnce(Event.NoEvent, f"Split-{id(package)}")
|
||||
splittrigger.add_condition(FlagIsTrue(flag=f"split-{id(package)}"))
|
||||
splittrigger.add_condition(Or())
|
||||
splittrigger.add_condition(FlagIsFalse(flag=f"split-{id(package)}"))
|
||||
splittrigger.add_condition(GroupDead(package.primary_flight.group_id))
|
||||
for flight in package.flights:
|
||||
if flight.flight_type in [
|
||||
FlightType.ESCORT,
|
||||
FlightType.SEAD_ESCORT,
|
||||
]:
|
||||
splittrigger.add_action(AITaskPush(flight.group_id, 1))
|
||||
if len(splittrigger.actions) > 0:
|
||||
self.mission.triggerrules.triggers.append(splittrigger)
|
||||
|
||||
def spawn_unused_aircraft(
|
||||
self, player_country: Country, enemy_country: Country
|
||||
) -> None:
|
||||
for control_point in self.game.theater.controlpoints:
|
||||
if not (
|
||||
isinstance(control_point, Airfield) or isinstance(control_point, Fob)
|
||||
):
|
||||
continue
|
||||
|
||||
if control_point.captured:
|
||||
country = player_country
|
||||
else:
|
||||
country = enemy_country
|
||||
|
||||
for squadron in control_point.squadrons:
|
||||
try:
|
||||
self._spawn_unused_for(squadron, country)
|
||||
except NoParkingSlotError:
|
||||
# If we run out of parking, stop spawning aircraft at this base.
|
||||
break
|
||||
|
||||
def _spawn_unused_for(self, squadron: Squadron, country: Country) -> None:
|
||||
assert isinstance(squadron.location, Airfield) or isinstance(
|
||||
squadron.location, Fob
|
||||
)
|
||||
if (
|
||||
squadron.coalition.player
|
||||
and self.game.settings.perf_disable_untasked_blufor_aircraft
|
||||
):
|
||||
return
|
||||
elif (
|
||||
not squadron.coalition.player
|
||||
and self.game.settings.perf_disable_untasked_opfor_aircraft
|
||||
):
|
||||
return
|
||||
|
||||
for _ in range(squadron.untasked_aircraft):
|
||||
# Creating a flight even those this isn't a fragged mission lets us
|
||||
# reuse the existing debriefing code.
|
||||
# TODO: Special flight type?
|
||||
flight = Flight(
|
||||
Package(squadron.location, self.game.db.flights),
|
||||
squadron,
|
||||
1,
|
||||
FlightType.BARCAP,
|
||||
StartType.COLD,
|
||||
divert=None,
|
||||
claim_inv=False,
|
||||
)
|
||||
flight.state = Completed(flight, self.game.settings)
|
||||
|
||||
group = FlightGroupSpawner(
|
||||
flight,
|
||||
country,
|
||||
self.mission,
|
||||
self.helipads,
|
||||
self.ground_spawns_roadbase,
|
||||
self.ground_spawns,
|
||||
self.mission_data,
|
||||
).create_idle_aircraft()
|
||||
if group:
|
||||
if (
|
||||
not squadron.coalition.player
|
||||
and squadron.aircraft.flyable
|
||||
and (
|
||||
self.game.settings.enable_squadron_pilot_limits
|
||||
or squadron.number_of_available_pilots > 0
|
||||
)
|
||||
and self.game.settings.untasked_opfor_client_slots
|
||||
):
|
||||
flight.state = WaitingForStart(
|
||||
flight, self.game.settings, self.game.conditions.start_time
|
||||
)
|
||||
group.uncontrolled = False
|
||||
group.units[0].skill = Skill.Client
|
||||
AircraftPainter(flight, group).apply_livery()
|
||||
self.unit_map.add_aircraft(group, flight)
|
||||
|
||||
def create_and_configure_flight(
|
||||
self, flight: Flight, country: Country, dynamic_runways: Dict[str, RunwayData]
|
||||
) -> FlyingGroup[Any]:
|
||||
"""Creates and configures the flight group in the mission."""
|
||||
group = FlightGroupSpawner(
|
||||
flight,
|
||||
country,
|
||||
self.mission,
|
||||
self.helipads,
|
||||
self.ground_spawns_roadbase,
|
||||
self.ground_spawns,
|
||||
self.mission_data,
|
||||
).create_flight_group()
|
||||
self.flights.append(
|
||||
FlightGroupConfigurator(
|
||||
flight,
|
||||
group,
|
||||
self.game,
|
||||
self.mission,
|
||||
self.time,
|
||||
self.radio_registry,
|
||||
self.tacan_registy,
|
||||
self.laser_code_registry,
|
||||
self.mission_data,
|
||||
dynamic_runways,
|
||||
self.use_client,
|
||||
).configure()
|
||||
)
|
||||
|
||||
if self.ewrj:
|
||||
self._track_ewrj_flight(flight, group)
|
||||
|
||||
return group
|
||||
|
||||
def _track_ewrj_flight(self, flight: Flight, group: FlyingGroup[Any]) -> None:
|
||||
if not self.ewrj_package_dict.get(id(flight.package)):
|
||||
self.ewrj_package_dict[id(flight.package)] = []
|
||||
if (
|
||||
flight.package.primary_flight
|
||||
and flight is flight.package.primary_flight
|
||||
or flight.client_count
|
||||
and (
|
||||
not self.need_ecm
|
||||
or flight.loadout.has_weapon_of_type(WeaponType.JAMMER)
|
||||
)
|
||||
):
|
||||
self.ewrj_package_dict[id(flight.package)].append(group)
|
||||
|
||||
def _reserve_frequencies_and_tacan(self, ato: AirTaskingOrder) -> None:
|
||||
for package in ato.packages:
|
||||
if package.frequency is None:
|
||||
continue
|
||||
if package.frequency not in self.radio_registry.allocated_channels:
|
||||
self.radio_registry.reserve(package.frequency)
|
||||
for f in package.flights:
|
||||
if (
|
||||
f.frequency
|
||||
and f.frequency not in self.radio_registry.allocated_channels
|
||||
):
|
||||
self.radio_registry.reserve(f.frequency)
|
||||
if f.tacan and f.tacan not in self.tacan_registy.allocated_channels:
|
||||
self.tacan_registy.mark_unavailable(f.tacan)
|
||||
343
game/pretense/pretensemissiongenerator.py
Normal file
343
game/pretense/pretensemissiongenerator.py
Normal file
@ -0,0 +1,343 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import dcs.lua
|
||||
from dcs import Mission, Point
|
||||
from dcs.coalition import Coalition
|
||||
from dcs.countries import country_dict
|
||||
from dcs.task import OptReactOnThreat
|
||||
|
||||
from game.atcdata import AtcData
|
||||
from game.dcs.beacons import Beacons
|
||||
from game.dcs.helpers import unit_type_from_name
|
||||
from game.missiongenerator.aircraft.aircraftgenerator import (
|
||||
AircraftGenerator,
|
||||
)
|
||||
from game.naming import namegen
|
||||
from game.radio.radios import RadioFrequency, RadioRegistry, MHz
|
||||
from game.radio.tacan import TacanRegistry
|
||||
from game.theater import Airfield
|
||||
from game.theater.bullseye import Bullseye
|
||||
from game.unitmap import UnitMap
|
||||
from .briefinggenerator import BriefingGenerator, MissionInfoGenerator
|
||||
from .cargoshipgenerator import CargoShipGenerator
|
||||
from .convoygenerator import ConvoyGenerator
|
||||
from .drawingsgenerator import DrawingsGenerator
|
||||
from .environmentgenerator import EnvironmentGenerator
|
||||
from .flotgenerator import FlotGenerator
|
||||
from .forcedoptionsgenerator import ForcedOptionsGenerator
|
||||
from .frontlineconflictdescription import FrontLineConflictDescription
|
||||
from .kneeboard import KneeboardGenerator
|
||||
from .lasercoderegistry import LaserCodeRegistry
|
||||
from .luagenerator import LuaGenerator
|
||||
from .missiondata import MissionData
|
||||
from .tgogenerator import TgoGenerator
|
||||
from .triggergenerator import TriggerGenerator
|
||||
from .visualsgenerator import VisualsGenerator
|
||||
from ..radio.TacanContainer import TacanContainer
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game import Game
|
||||
|
||||
|
||||
class MissionGenerator:
|
||||
def __init__(self, game: Game, time: datetime) -> None:
|
||||
self.game = game
|
||||
self.time = time
|
||||
self.mission = Mission(game.theater.terrain)
|
||||
self.unit_map = UnitMap()
|
||||
|
||||
self.mission_data = MissionData()
|
||||
|
||||
self.laser_code_registry = LaserCodeRegistry()
|
||||
self.radio_registry = RadioRegistry()
|
||||
self.tacan_registry = TacanRegistry()
|
||||
|
||||
self.generation_started = False
|
||||
|
||||
self.p_country = country_dict[self.game.blue.faction.country.id]()
|
||||
self.e_country = country_dict[self.game.red.faction.country.id]()
|
||||
|
||||
with open("resources/default_options.lua", "r", encoding="utf-8") as f:
|
||||
options = dcs.lua.loads(f.read())["options"]
|
||||
ext_view = game.settings.external_views_allowed
|
||||
options["miscellaneous"]["f11_free_camera"] = ext_view
|
||||
options["difficulty"]["spectatorExternalViews"] = ext_view
|
||||
self.mission.options.load_from_dict(options)
|
||||
|
||||
def generate_miz(self, output: Path) -> UnitMap:
|
||||
if self.generation_started:
|
||||
raise RuntimeError(
|
||||
"Mission has already begun generating. To reset, create a new "
|
||||
"MissionSimulation."
|
||||
)
|
||||
self.generation_started = True
|
||||
|
||||
self.setup_mission_coalitions()
|
||||
self.add_airfields_to_unit_map()
|
||||
self.initialize_registries()
|
||||
|
||||
EnvironmentGenerator(self.mission, self.game.conditions, self.time).generate()
|
||||
|
||||
tgo_generator = TgoGenerator(
|
||||
self.mission,
|
||||
self.game,
|
||||
self.radio_registry,
|
||||
self.tacan_registry,
|
||||
self.unit_map,
|
||||
self.mission_data,
|
||||
)
|
||||
tgo_generator.generate()
|
||||
|
||||
ConvoyGenerator(self.mission, self.game, self.unit_map).generate()
|
||||
CargoShipGenerator(self.mission, self.game, self.unit_map).generate()
|
||||
|
||||
self.generate_destroyed_units()
|
||||
|
||||
# Generate ground conflicts first so the JTACs get the first laser code (1688)
|
||||
# rather than the first player flight with a TGP.
|
||||
self.generate_ground_conflicts()
|
||||
self.generate_air_units(tgo_generator)
|
||||
|
||||
TriggerGenerator(self.mission, self.game).generate()
|
||||
ForcedOptionsGenerator(self.mission, self.game).generate()
|
||||
VisualsGenerator(self.mission, self.game).generate()
|
||||
LuaGenerator(self.game, self.mission, self.mission_data).generate()
|
||||
DrawingsGenerator(self.mission, self.game).generate()
|
||||
|
||||
self.setup_combined_arms()
|
||||
|
||||
self.notify_info_generators()
|
||||
|
||||
# TODO: Shouldn't this be first?
|
||||
namegen.reset_numbers()
|
||||
self.mission.save(output)
|
||||
|
||||
return self.unit_map
|
||||
|
||||
@staticmethod
|
||||
def _configure_ewrj(gen: AircraftGenerator) -> None:
|
||||
for groups in gen.ewrj_package_dict.values():
|
||||
optrot = groups[0].points[0].tasks[0]
|
||||
assert isinstance(optrot, OptReactOnThreat)
|
||||
if (
|
||||
len(groups) == 1
|
||||
and optrot.value != OptReactOnThreat.Values.PassiveDefense
|
||||
):
|
||||
# primary flight with no EWR-Jamming capability
|
||||
continue
|
||||
for group in groups:
|
||||
group.points[0].tasks[0] = OptReactOnThreat(
|
||||
OptReactOnThreat.Values.PassiveDefense
|
||||
)
|
||||
|
||||
def setup_mission_coalitions(self) -> None:
|
||||
self.mission.coalition["blue"] = Coalition(
|
||||
"blue", bullseye=self.game.blue.bullseye.to_pydcs()
|
||||
)
|
||||
self.mission.coalition["red"] = Coalition(
|
||||
"red", bullseye=self.game.red.bullseye.to_pydcs()
|
||||
)
|
||||
self.mission.coalition["neutrals"] = Coalition(
|
||||
"neutrals", bullseye=Bullseye(Point(0, 0, self.mission.terrain)).to_pydcs()
|
||||
)
|
||||
|
||||
self.mission.coalition["blue"].add_country(self.p_country)
|
||||
self.mission.coalition["red"].add_country(self.e_country)
|
||||
|
||||
belligerents = {self.p_country.id, self.e_country.id}
|
||||
for country_id in country_dict.keys():
|
||||
if country_id not in belligerents:
|
||||
c = country_dict[country_id]()
|
||||
self.mission.coalition["neutrals"].add_country(c)
|
||||
|
||||
def add_airfields_to_unit_map(self) -> None:
|
||||
for control_point in self.game.theater.controlpoints:
|
||||
if isinstance(control_point, Airfield):
|
||||
self.unit_map.add_airfield(control_point)
|
||||
|
||||
def initialize_registries(self) -> None:
|
||||
unique_map_frequencies: set[RadioFrequency] = set()
|
||||
self.initialize_tacan_registry(unique_map_frequencies)
|
||||
self.initialize_radio_registry(unique_map_frequencies)
|
||||
# Allocate UHF/VHF Guard Freq first!
|
||||
unique_map_frequencies.add(MHz(243))
|
||||
unique_map_frequencies.add(MHz(121, 500))
|
||||
for frequency in unique_map_frequencies:
|
||||
self.radio_registry.reserve(frequency)
|
||||
|
||||
def initialize_tacan_registry(
|
||||
self, unique_map_frequencies: set[RadioFrequency]
|
||||
) -> None:
|
||||
"""
|
||||
Dedup beacon/radio frequencies, since some maps have some frequencies
|
||||
used multiple times.
|
||||
"""
|
||||
for beacon in Beacons.iter_theater(self.game.theater):
|
||||
unique_map_frequencies.add(beacon.frequency)
|
||||
if beacon.is_tacan:
|
||||
if beacon.channel is None:
|
||||
logging.warning(f"TACAN beacon has no channel: {beacon.callsign}")
|
||||
else:
|
||||
self.tacan_registry.mark_unavailable(beacon.tacan_channel)
|
||||
for cp in self.game.theater.controlpoints:
|
||||
if isinstance(cp, TacanContainer) and cp.tacan is not None:
|
||||
self.tacan_registry.mark_unavailable(cp.tacan)
|
||||
|
||||
def initialize_radio_registry(
|
||||
self, unique_map_frequencies: set[RadioFrequency]
|
||||
) -> None:
|
||||
for airport in self.game.theater.terrain.airport_list():
|
||||
if (atc := AtcData.from_pydcs(airport)) is not None:
|
||||
unique_map_frequencies.add(atc.hf)
|
||||
unique_map_frequencies.add(atc.vhf_fm)
|
||||
unique_map_frequencies.add(atc.vhf_am)
|
||||
unique_map_frequencies.add(atc.uhf)
|
||||
# No need to reserve ILS or TACAN because those are in the
|
||||
# beacon list.
|
||||
|
||||
def generate_ground_conflicts(self) -> None:
|
||||
"""Generate FLOTs and JTACs for each active front line."""
|
||||
for front_line in self.game.theater.conflicts():
|
||||
player_cp = front_line.blue_cp
|
||||
enemy_cp = front_line.red_cp
|
||||
conflict = FrontLineConflictDescription.frontline_cas_conflict(
|
||||
front_line, self.game.theater, self.game.settings
|
||||
)
|
||||
# Generate frontline ops
|
||||
player_gp = self.game.ground_planners[player_cp.id].units_per_cp[
|
||||
enemy_cp.id
|
||||
]
|
||||
enemy_gp = self.game.ground_planners[enemy_cp.id].units_per_cp[player_cp.id]
|
||||
ground_conflict_gen = FlotGenerator(
|
||||
self.mission,
|
||||
conflict,
|
||||
self.game,
|
||||
player_gp,
|
||||
enemy_gp,
|
||||
player_cp.stances[enemy_cp.id],
|
||||
enemy_cp.stances[player_cp.id],
|
||||
self.unit_map,
|
||||
self.radio_registry,
|
||||
self.mission_data,
|
||||
self.laser_code_registry,
|
||||
)
|
||||
ground_conflict_gen.generate()
|
||||
|
||||
def generate_air_units(self, tgo_generator: TgoGenerator) -> None:
|
||||
"""Generate the air units for the Operation"""
|
||||
|
||||
# Generate Aircraft Activity on the map
|
||||
aircraft_generator = AircraftGenerator(
|
||||
self.mission,
|
||||
self.game.settings,
|
||||
self.game,
|
||||
self.time,
|
||||
self.radio_registry,
|
||||
self.tacan_registry,
|
||||
self.laser_code_registry,
|
||||
self.unit_map,
|
||||
mission_data=self.mission_data,
|
||||
helipads=tgo_generator.helipads,
|
||||
ground_spawns_roadbase=tgo_generator.ground_spawns_roadbase,
|
||||
ground_spawns=tgo_generator.ground_spawns,
|
||||
)
|
||||
|
||||
aircraft_generator.clear_parking_slots()
|
||||
|
||||
aircraft_generator.generate_flights(
|
||||
self.p_country,
|
||||
self.game.blue.ato,
|
||||
tgo_generator.runways,
|
||||
)
|
||||
aircraft_generator.generate_flights(
|
||||
self.e_country,
|
||||
self.game.red.ato,
|
||||
tgo_generator.runways,
|
||||
)
|
||||
aircraft_generator.spawn_unused_aircraft(
|
||||
self.p_country,
|
||||
self.e_country,
|
||||
)
|
||||
|
||||
self.mission_data.flights = aircraft_generator.flights
|
||||
|
||||
for flight in aircraft_generator.flights:
|
||||
if not flight.client_units:
|
||||
continue
|
||||
flight.aircraft_type.assign_channels_for_flight(flight, self.mission_data)
|
||||
|
||||
if self.game.settings.plugins.get("ewrj"):
|
||||
self._configure_ewrj(aircraft_generator)
|
||||
|
||||
def generate_destroyed_units(self) -> None:
|
||||
"""Add destroyed units to the Mission"""
|
||||
if not self.game.settings.perf_destroyed_units:
|
||||
return
|
||||
|
||||
for d in self.game.get_destroyed_units():
|
||||
try:
|
||||
type_name = d["type"]
|
||||
if not isinstance(type_name, str):
|
||||
raise TypeError(
|
||||
"Expected the type of the destroyed static to be a string"
|
||||
)
|
||||
utype = unit_type_from_name(type_name)
|
||||
except KeyError:
|
||||
logging.warning(f"Destroyed unit has no type: {d}")
|
||||
continue
|
||||
|
||||
pos = Point(cast(float, d["x"]), cast(float, d["z"]), self.mission.terrain)
|
||||
if utype is not None and not self.game.position_culled(pos):
|
||||
self.mission.static_group(
|
||||
country=self.p_country,
|
||||
name="",
|
||||
_type=utype,
|
||||
hidden=True,
|
||||
position=pos,
|
||||
heading=d["orientation"],
|
||||
dead=True,
|
||||
)
|
||||
|
||||
def notify_info_generators(
|
||||
self,
|
||||
) -> None:
|
||||
"""Generates subscribed MissionInfoGenerator objects."""
|
||||
mission_data = self.mission_data
|
||||
gens: list[MissionInfoGenerator] = [
|
||||
KneeboardGenerator(self.mission, self.game),
|
||||
BriefingGenerator(self.mission, self.game),
|
||||
]
|
||||
for gen in gens:
|
||||
for dynamic_runway in mission_data.runways:
|
||||
gen.add_dynamic_runway(dynamic_runway)
|
||||
|
||||
for tanker in mission_data.tankers:
|
||||
if tanker.blue:
|
||||
gen.add_tanker(tanker)
|
||||
|
||||
for aewc in mission_data.awacs:
|
||||
if aewc.blue:
|
||||
gen.add_awacs(aewc)
|
||||
|
||||
for jtac in mission_data.jtacs:
|
||||
if jtac.blue:
|
||||
gen.add_jtac(jtac)
|
||||
|
||||
for flight in mission_data.flights:
|
||||
gen.add_flight(flight)
|
||||
gen.generate()
|
||||
|
||||
def setup_combined_arms(self) -> None:
|
||||
settings = self.game.settings
|
||||
commanders = settings.tactical_commander_count
|
||||
self.mission.groundControl.pilot_can_control_vehicles = commanders > 0
|
||||
|
||||
self.mission.groundControl.blue_game_masters = settings.game_masters_count
|
||||
self.mission.groundControl.blue_tactical_commander = commanders
|
||||
self.mission.groundControl.blue_jtac = settings.jtac_count
|
||||
self.mission.groundControl.blue_observer = settings.observer_count
|
||||
264
game/pretense/pretensetriggergenerator.py
Normal file
264
game/pretense/pretensetriggergenerator.py
Normal file
@ -0,0 +1,264 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from dcs import Point
|
||||
from dcs.action import (
|
||||
ClearFlag,
|
||||
DoScript,
|
||||
MarkToAll,
|
||||
SetFlag,
|
||||
RemoveSceneObjects,
|
||||
RemoveSceneObjectsMask,
|
||||
SceneryDestructionZone,
|
||||
Smoke,
|
||||
)
|
||||
from dcs.condition import (
|
||||
AllOfCoalitionOutsideZone,
|
||||
FlagIsFalse,
|
||||
FlagIsTrue,
|
||||
PartOfCoalitionInZone,
|
||||
TimeAfter,
|
||||
TimeSinceFlag,
|
||||
)
|
||||
from dcs.mission import Mission
|
||||
from dcs.task import Option
|
||||
from dcs.translation import String
|
||||
from dcs.triggers import Event, TriggerCondition, TriggerOnce
|
||||
from dcs.unit import Skill
|
||||
|
||||
from game.theater import Airfield
|
||||
from game.theater.controlpoint import Fob, TRIGGER_RADIUS_CAPTURE
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from game.game import Game
|
||||
|
||||
PUSH_TRIGGER_SIZE = 3000
|
||||
PUSH_TRIGGER_ACTIVATION_AGL = 25
|
||||
|
||||
REGROUP_ZONE_DISTANCE = 12000
|
||||
REGROUP_ALT = 5000
|
||||
|
||||
TRIGGER_WAYPOINT_OFFSET = 2
|
||||
TRIGGER_MIN_DISTANCE_FROM_START = 10000
|
||||
# modified since we now have advanced SAM units
|
||||
TRIGGER_RADIUS_MINIMUM = 3000000
|
||||
|
||||
TRIGGER_RADIUS_SMALL = 50000
|
||||
TRIGGER_RADIUS_MEDIUM = 100000
|
||||
TRIGGER_RADIUS_LARGE = 150000
|
||||
TRIGGER_RADIUS_ALL_MAP = 3000000
|
||||
TRIGGER_RADIUS_CLEAR_SCENERY = 1000
|
||||
|
||||
|
||||
class Silence(Option):
|
||||
Key = 7
|
||||
|
||||
|
||||
class TriggerGenerator:
|
||||
capture_zone_types = (Fob, Airfield)
|
||||
capture_zone_flag = 600
|
||||
|
||||
def __init__(self, mission: Mission, game: Game) -> None:
|
||||
self.mission = mission
|
||||
self.game = game
|
||||
|
||||
def _set_allegiances(self, player_coalition: str, enemy_coalition: str) -> None:
|
||||
"""
|
||||
Set airbase initial coalition
|
||||
"""
|
||||
|
||||
# Empty neutrals airports
|
||||
airfields = [
|
||||
cp for cp in self.game.theater.controlpoints if isinstance(cp, Airfield)
|
||||
]
|
||||
airport_ids = {cp.airport.id for cp in airfields}
|
||||
for airport in self.mission.terrain.airport_list():
|
||||
if airport.id not in airport_ids:
|
||||
airport.unlimited_fuel = False
|
||||
airport.unlimited_munitions = False
|
||||
airport.unlimited_aircrafts = False
|
||||
airport.gasoline_init = 0
|
||||
airport.methanol_mixture_init = 0
|
||||
airport.diesel_init = 0
|
||||
airport.jet_init = 0
|
||||
airport.operating_level_air = 0
|
||||
airport.operating_level_equipment = 0
|
||||
airport.operating_level_fuel = 0
|
||||
|
||||
for airport in self.mission.terrain.airport_list():
|
||||
if airport.id not in airport_ids:
|
||||
airport.unlimited_fuel = True
|
||||
airport.unlimited_munitions = True
|
||||
airport.unlimited_aircrafts = True
|
||||
|
||||
for airfield in airfields:
|
||||
cp_airport = self.mission.terrain.airport_by_id(airfield.airport.id)
|
||||
if cp_airport is None:
|
||||
raise RuntimeError(
|
||||
f"Could not find {airfield.airport.name} in the mission"
|
||||
)
|
||||
cp_airport.set_coalition(
|
||||
airfield.captured and player_coalition or enemy_coalition
|
||||
)
|
||||
|
||||
def _set_skill(self, player_coalition: str, enemy_coalition: str) -> None:
|
||||
"""
|
||||
Set skill level for all aircraft in the mission
|
||||
"""
|
||||
for coalition_name, coalition in self.mission.coalition.items():
|
||||
if coalition_name == player_coalition:
|
||||
skill_level = Skill(self.game.settings.player_skill)
|
||||
elif coalition_name == enemy_coalition:
|
||||
skill_level = Skill(self.game.settings.enemy_vehicle_skill)
|
||||
else:
|
||||
continue
|
||||
|
||||
for country in coalition.countries.values():
|
||||
for vehicle_group in country.vehicle_group:
|
||||
vehicle_group.set_skill(skill_level)
|
||||
|
||||
def _gen_markers(self) -> None:
|
||||
"""
|
||||
Generate markers on F10 map for each existing objective
|
||||
"""
|
||||
if self.game.settings.generate_marks:
|
||||
mark_trigger = TriggerOnce(Event.NoEvent, "Marks generator")
|
||||
mark_trigger.add_condition(TimeAfter(1))
|
||||
v = 10
|
||||
for cp in self.game.theater.controlpoints:
|
||||
seen = set()
|
||||
for ground_object in cp.ground_objects:
|
||||
if ground_object.obj_name in seen:
|
||||
continue
|
||||
|
||||
seen.add(ground_object.obj_name)
|
||||
for location in ground_object.mark_locations:
|
||||
zone = self.mission.triggers.add_triggerzone(
|
||||
location, radius=10, hidden=True, name="MARK"
|
||||
)
|
||||
if cp.captured:
|
||||
name = ground_object.obj_name + " [ALLY]"
|
||||
else:
|
||||
name = ground_object.obj_name + " [ENEMY]"
|
||||
mark_trigger.add_action(MarkToAll(v, zone.id, String(name)))
|
||||
v += 1
|
||||
self.mission.triggerrules.triggers.append(mark_trigger)
|
||||
|
||||
def _generate_clear_statics_trigger(self, scenery_clear_zones: List[Point]) -> None:
|
||||
for zone_center in scenery_clear_zones:
|
||||
trigger_zone = self.mission.triggers.add_triggerzone(
|
||||
zone_center,
|
||||
radius=TRIGGER_RADIUS_CLEAR_SCENERY,
|
||||
hidden=False,
|
||||
name="CLEAR",
|
||||
)
|
||||
clear_trigger = TriggerCondition(Event.NoEvent, "Clear Trigger")
|
||||
clear_flag = self.get_capture_zone_flag()
|
||||
clear_trigger.add_condition(TimeSinceFlag(clear_flag, 30))
|
||||
clear_trigger.add_action(ClearFlag(clear_flag))
|
||||
clear_trigger.add_action(SetFlag(clear_flag))
|
||||
clear_trigger.add_action(
|
||||
RemoveSceneObjects(
|
||||
objects_mask=RemoveSceneObjectsMask.OBJECTS_ONLY,
|
||||
zone=trigger_zone.id,
|
||||
)
|
||||
)
|
||||
clear_trigger.add_action(
|
||||
SceneryDestructionZone(destruction_level=100, zone=trigger_zone.id)
|
||||
)
|
||||
self.mission.triggerrules.triggers.append(clear_trigger)
|
||||
|
||||
enable_clear_trigger = TriggerOnce(Event.NoEvent, "Enable Clear Trigger")
|
||||
enable_clear_trigger.add_condition(TimeAfter(30))
|
||||
enable_clear_trigger.add_action(ClearFlag(clear_flag))
|
||||
enable_clear_trigger.add_action(SetFlag(clear_flag))
|
||||
# clear_trigger.add_action(MessageToAll(text=String("Enable clear trigger"),))
|
||||
self.mission.triggerrules.triggers.append(enable_clear_trigger)
|
||||
|
||||
def _generate_capture_triggers(
|
||||
self, player_coalition: str, enemy_coalition: str
|
||||
) -> None:
|
||||
"""Creates a pair of triggers for each control point of `cls.capture_zone_types`.
|
||||
One for the initial capture of a control point, and one if it is recaptured.
|
||||
Directly appends to the global `base_capture_events` var declared by `dcs_libaration.lua`
|
||||
"""
|
||||
for cp in self.game.theater.controlpoints:
|
||||
if isinstance(cp, self.capture_zone_types) and not cp.is_carrier:
|
||||
if cp.captured:
|
||||
attacking_coalition = enemy_coalition
|
||||
attack_coalition_int = 1 # 1 is the Event int for Red
|
||||
defending_coalition = player_coalition
|
||||
defend_coalition_int = 2 # 2 is the Event int for Blue
|
||||
else:
|
||||
attacking_coalition = player_coalition
|
||||
attack_coalition_int = 2
|
||||
defending_coalition = enemy_coalition
|
||||
defend_coalition_int = 1
|
||||
|
||||
trigger_zone = self.mission.triggers.add_triggerzone(
|
||||
cp.position,
|
||||
radius=TRIGGER_RADIUS_CAPTURE,
|
||||
hidden=False,
|
||||
name="CAPTURE",
|
||||
)
|
||||
flag = self.get_capture_zone_flag()
|
||||
capture_trigger = TriggerCondition(Event.NoEvent, "Capture Trigger")
|
||||
capture_trigger.add_condition(
|
||||
AllOfCoalitionOutsideZone(
|
||||
defending_coalition, trigger_zone.id, unit_type="GROUND"
|
||||
)
|
||||
)
|
||||
capture_trigger.add_condition(
|
||||
PartOfCoalitionInZone(
|
||||
attacking_coalition, trigger_zone.id, unit_type="GROUND"
|
||||
)
|
||||
)
|
||||
capture_trigger.add_condition(FlagIsFalse(flag=flag))
|
||||
script_string = String(
|
||||
f'base_capture_events[#base_capture_events + 1] = "{cp.id}||{attack_coalition_int}||{cp.full_name}"'
|
||||
)
|
||||
capture_trigger.add_action(DoScript(script_string))
|
||||
capture_trigger.add_action(SetFlag(flag=flag))
|
||||
self.mission.triggerrules.triggers.append(capture_trigger)
|
||||
|
||||
recapture_trigger = TriggerCondition(Event.NoEvent, "Capture Trigger")
|
||||
recapture_trigger.add_condition(
|
||||
AllOfCoalitionOutsideZone(
|
||||
attacking_coalition, trigger_zone.id, unit_type="GROUND"
|
||||
)
|
||||
)
|
||||
recapture_trigger.add_condition(
|
||||
PartOfCoalitionInZone(
|
||||
defending_coalition, trigger_zone.id, unit_type="GROUND"
|
||||
)
|
||||
)
|
||||
recapture_trigger.add_condition(FlagIsTrue(flag=flag))
|
||||
script_string = String(
|
||||
f'base_capture_events[#base_capture_events + 1] = "{cp.id}||{defend_coalition_int}||{cp.full_name}"'
|
||||
)
|
||||
recapture_trigger.add_action(DoScript(script_string))
|
||||
recapture_trigger.add_action(ClearFlag(flag=flag))
|
||||
self.mission.triggerrules.triggers.append(recapture_trigger)
|
||||
|
||||
def generate(self) -> None:
|
||||
player_coalition = "blue"
|
||||
enemy_coalition = "red"
|
||||
|
||||
self._set_skill(player_coalition, enemy_coalition)
|
||||
self._set_allegiances(player_coalition, enemy_coalition)
|
||||
self._gen_markers()
|
||||
self._generate_capture_triggers(player_coalition, enemy_coalition)
|
||||
if self.game.settings.ground_start_scenery_remove_triggers:
|
||||
try:
|
||||
self._generate_clear_statics_trigger(self.game.scenery_clear_zones)
|
||||
self.game.scenery_clear_zones.clear()
|
||||
except AttributeError:
|
||||
logging.info(f"Unable to create Clear Statics triggers")
|
||||
|
||||
@classmethod
|
||||
def get_capture_zone_flag(cls) -> int:
|
||||
flag = cls.capture_zone_flag
|
||||
cls.capture_zone_flag += 1
|
||||
return flag
|
||||
Loading…
x
Reference in New Issue
Block a user