diff --git a/game/pretense/pretenseaircraftgenerator.py b/game/pretense/pretenseaircraftgenerator.py new file mode 100644 index 00000000..7caa91c3 --- /dev/null +++ b/game/pretense/pretenseaircraftgenerator.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import logging +from datetime import datetime +from functools import cached_property +from typing import Any, Dict, List, TYPE_CHECKING, Tuple + +from dcs import Point +from dcs.action import AITaskPush +from dcs.condition import FlagIsTrue, GroupDead, Or, FlagIsFalse +from dcs.country import Country +from dcs.mission import Mission +from dcs.terrain.terrain import NoParkingSlotError +from dcs.triggers import TriggerOnce, Event +from dcs.unit import Skill +from dcs.unitgroup import FlyingGroup, StaticGroup + +from game.ato.airtaaskingorder import AirTaskingOrder +from game.ato.flight import Flight +from game.ato.flightstate import Completed, WaitingForStart +from game.ato.flighttype import FlightType +from game.ato.package import Package +from game.ato.starttype import StartType +from game.missiongenerator.lasercoderegistry import LaserCodeRegistry +from game.missiongenerator.missiondata import MissionData +from game.radio.radios import RadioRegistry +from game.radio.tacan import TacanRegistry +from game.runways import RunwayData +from game.settings import Settings +from game.theater.controlpoint import ( + Airfield, + ControlPoint, + Fob, +) +from game.unitmap import UnitMap +from .aircraftpainter import AircraftPainter +from .flightdata import FlightData +from .flightgroupconfigurator import FlightGroupConfigurator +from .flightgroupspawner import FlightGroupSpawner +from ...data.weapons import WeaponType + +if TYPE_CHECKING: + from game import Game + from game.squadrons import Squadron + + +class AircraftGenerator: + def __init__( + self, + mission: Mission, + settings: Settings, + game: Game, + time: datetime, + radio_registry: RadioRegistry, + tacan_registry: TacanRegistry, + laser_code_registry: LaserCodeRegistry, + unit_map: UnitMap, + mission_data: MissionData, + helipads: dict[ControlPoint, list[StaticGroup]], + ground_spawns_roadbase: dict[ControlPoint, list[Tuple[StaticGroup, Point]]], + ground_spawns: dict[ControlPoint, list[Tuple[StaticGroup, Point]]], + ) -> None: + self.mission = mission + self.settings = settings + self.game = game + self.time = time + self.radio_registry = radio_registry + self.tacan_registy = tacan_registry + self.laser_code_registry = laser_code_registry + self.unit_map = unit_map + self.flights: List[FlightData] = [] + self.mission_data = mission_data + self.helipads = helipads + self.ground_spawns_roadbase = ground_spawns_roadbase + self.ground_spawns = ground_spawns + + self.ewrj_package_dict: Dict[int, List[FlyingGroup[Any]]] = {} + self.ewrj = settings.plugins.get("ewrj") + self.need_ecm = settings.plugin_option("ewrj.ecm_required") + + @cached_property + def use_client(self) -> bool: + """True if Client should be used instead of Player.""" + blue_clients = self.client_slots_in_ato(self.game.blue.ato) + red_clients = self.client_slots_in_ato(self.game.red.ato) + return blue_clients + red_clients > 1 + + @staticmethod + def client_slots_in_ato(ato: AirTaskingOrder) -> int: + total = 0 + for package in ato.packages: + for flight in package.flights: + total += flight.client_count + return total + + def clear_parking_slots(self) -> None: + for cp in self.game.theater.controlpoints: + for parking_slot in cp.parking_slots: + parking_slot.unit_id = None + + def generate_flights( + self, + country: Country, + ato: AirTaskingOrder, + dynamic_runways: Dict[str, RunwayData], + ) -> None: + """Adds aircraft to the mission for every flight in the ATO. + + Aircraft generation is done by walking the ATO and spawning each flight in turn. + After the flight is generated the group is added to the UnitMap so aircraft + deaths can be tracked. + + Args: + country: The country from the mission to use for this ATO. + ato: The ATO to spawn aircraft for. + dynamic_runways: Runway data for carriers and FARPs. + """ + self._reserve_frequencies_and_tacan(ato) + + for package in reversed(sorted(ato.packages, key=lambda x: x.time_over_target)): + logging.info(f"Generating package for target: {package.target.name}") + if not package.flights: + continue + for flight in package.flights: + if flight.alive: + if not flight.squadron.location.runway_is_operational(): + logging.warning( + f"Runway not operational, skipping flight: {flight.flight_type}" + ) + flight.return_pilots_and_aircraft() + continue + logging.info(f"Generating flight: {flight.unit_type}") + group = self.create_and_configure_flight( + flight, country, dynamic_runways + ) + self.unit_map.add_aircraft(group, flight) + if ( + package.primary_flight is not None + and package.primary_flight.flight_plan.is_formation( + package.primary_flight.flight_plan + ) + ): + splittrigger = TriggerOnce(Event.NoEvent, f"Split-{id(package)}") + splittrigger.add_condition(FlagIsTrue(flag=f"split-{id(package)}")) + splittrigger.add_condition(Or()) + splittrigger.add_condition(FlagIsFalse(flag=f"split-{id(package)}")) + splittrigger.add_condition(GroupDead(package.primary_flight.group_id)) + for flight in package.flights: + if flight.flight_type in [ + FlightType.ESCORT, + FlightType.SEAD_ESCORT, + ]: + splittrigger.add_action(AITaskPush(flight.group_id, 1)) + if len(splittrigger.actions) > 0: + self.mission.triggerrules.triggers.append(splittrigger) + + def spawn_unused_aircraft( + self, player_country: Country, enemy_country: Country + ) -> None: + for control_point in self.game.theater.controlpoints: + if not ( + isinstance(control_point, Airfield) or isinstance(control_point, Fob) + ): + continue + + if control_point.captured: + country = player_country + else: + country = enemy_country + + for squadron in control_point.squadrons: + try: + self._spawn_unused_for(squadron, country) + except NoParkingSlotError: + # If we run out of parking, stop spawning aircraft at this base. + break + + def _spawn_unused_for(self, squadron: Squadron, country: Country) -> None: + assert isinstance(squadron.location, Airfield) or isinstance( + squadron.location, Fob + ) + if ( + squadron.coalition.player + and self.game.settings.perf_disable_untasked_blufor_aircraft + ): + return + elif ( + not squadron.coalition.player + and self.game.settings.perf_disable_untasked_opfor_aircraft + ): + return + + for _ in range(squadron.untasked_aircraft): + # Creating a flight even those this isn't a fragged mission lets us + # reuse the existing debriefing code. + # TODO: Special flight type? + flight = Flight( + Package(squadron.location, self.game.db.flights), + squadron, + 1, + FlightType.BARCAP, + StartType.COLD, + divert=None, + claim_inv=False, + ) + flight.state = Completed(flight, self.game.settings) + + group = FlightGroupSpawner( + flight, + country, + self.mission, + self.helipads, + self.ground_spawns_roadbase, + self.ground_spawns, + self.mission_data, + ).create_idle_aircraft() + if group: + if ( + not squadron.coalition.player + and squadron.aircraft.flyable + and ( + self.game.settings.enable_squadron_pilot_limits + or squadron.number_of_available_pilots > 0 + ) + and self.game.settings.untasked_opfor_client_slots + ): + flight.state = WaitingForStart( + flight, self.game.settings, self.game.conditions.start_time + ) + group.uncontrolled = False + group.units[0].skill = Skill.Client + AircraftPainter(flight, group).apply_livery() + self.unit_map.add_aircraft(group, flight) + + def create_and_configure_flight( + self, flight: Flight, country: Country, dynamic_runways: Dict[str, RunwayData] + ) -> FlyingGroup[Any]: + """Creates and configures the flight group in the mission.""" + group = FlightGroupSpawner( + flight, + country, + self.mission, + self.helipads, + self.ground_spawns_roadbase, + self.ground_spawns, + self.mission_data, + ).create_flight_group() + self.flights.append( + FlightGroupConfigurator( + flight, + group, + self.game, + self.mission, + self.time, + self.radio_registry, + self.tacan_registy, + self.laser_code_registry, + self.mission_data, + dynamic_runways, + self.use_client, + ).configure() + ) + + if self.ewrj: + self._track_ewrj_flight(flight, group) + + return group + + def _track_ewrj_flight(self, flight: Flight, group: FlyingGroup[Any]) -> None: + if not self.ewrj_package_dict.get(id(flight.package)): + self.ewrj_package_dict[id(flight.package)] = [] + if ( + flight.package.primary_flight + and flight is flight.package.primary_flight + or flight.client_count + and ( + not self.need_ecm + or flight.loadout.has_weapon_of_type(WeaponType.JAMMER) + ) + ): + self.ewrj_package_dict[id(flight.package)].append(group) + + def _reserve_frequencies_and_tacan(self, ato: AirTaskingOrder) -> None: + for package in ato.packages: + if package.frequency is None: + continue + if package.frequency not in self.radio_registry.allocated_channels: + self.radio_registry.reserve(package.frequency) + for f in package.flights: + if ( + f.frequency + and f.frequency not in self.radio_registry.allocated_channels + ): + self.radio_registry.reserve(f.frequency) + if f.tacan and f.tacan not in self.tacan_registy.allocated_channels: + self.tacan_registy.mark_unavailable(f.tacan) diff --git a/game/pretense/pretensemissiongenerator.py b/game/pretense/pretensemissiongenerator.py new file mode 100644 index 00000000..85c5e5c9 --- /dev/null +++ b/game/pretense/pretensemissiongenerator.py @@ -0,0 +1,343 @@ +from __future__ import annotations + +import logging +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, cast + +import dcs.lua +from dcs import Mission, Point +from dcs.coalition import Coalition +from dcs.countries import country_dict +from dcs.task import OptReactOnThreat + +from game.atcdata import AtcData +from game.dcs.beacons import Beacons +from game.dcs.helpers import unit_type_from_name +from game.missiongenerator.aircraft.aircraftgenerator import ( + AircraftGenerator, +) +from game.naming import namegen +from game.radio.radios import RadioFrequency, RadioRegistry, MHz +from game.radio.tacan import TacanRegistry +from game.theater import Airfield +from game.theater.bullseye import Bullseye +from game.unitmap import UnitMap +from .briefinggenerator import BriefingGenerator, MissionInfoGenerator +from .cargoshipgenerator import CargoShipGenerator +from .convoygenerator import ConvoyGenerator +from .drawingsgenerator import DrawingsGenerator +from .environmentgenerator import EnvironmentGenerator +from .flotgenerator import FlotGenerator +from .forcedoptionsgenerator import ForcedOptionsGenerator +from .frontlineconflictdescription import FrontLineConflictDescription +from .kneeboard import KneeboardGenerator +from .lasercoderegistry import LaserCodeRegistry +from .luagenerator import LuaGenerator +from .missiondata import MissionData +from .tgogenerator import TgoGenerator +from .triggergenerator import TriggerGenerator +from .visualsgenerator import VisualsGenerator +from ..radio.TacanContainer import TacanContainer + +if TYPE_CHECKING: + from game import Game + + +class MissionGenerator: + def __init__(self, game: Game, time: datetime) -> None: + self.game = game + self.time = time + self.mission = Mission(game.theater.terrain) + self.unit_map = UnitMap() + + self.mission_data = MissionData() + + self.laser_code_registry = LaserCodeRegistry() + self.radio_registry = RadioRegistry() + self.tacan_registry = TacanRegistry() + + self.generation_started = False + + self.p_country = country_dict[self.game.blue.faction.country.id]() + self.e_country = country_dict[self.game.red.faction.country.id]() + + with open("resources/default_options.lua", "r", encoding="utf-8") as f: + options = dcs.lua.loads(f.read())["options"] + ext_view = game.settings.external_views_allowed + options["miscellaneous"]["f11_free_camera"] = ext_view + options["difficulty"]["spectatorExternalViews"] = ext_view + self.mission.options.load_from_dict(options) + + def generate_miz(self, output: Path) -> UnitMap: + if self.generation_started: + raise RuntimeError( + "Mission has already begun generating. To reset, create a new " + "MissionSimulation." + ) + self.generation_started = True + + self.setup_mission_coalitions() + self.add_airfields_to_unit_map() + self.initialize_registries() + + EnvironmentGenerator(self.mission, self.game.conditions, self.time).generate() + + tgo_generator = TgoGenerator( + self.mission, + self.game, + self.radio_registry, + self.tacan_registry, + self.unit_map, + self.mission_data, + ) + tgo_generator.generate() + + ConvoyGenerator(self.mission, self.game, self.unit_map).generate() + CargoShipGenerator(self.mission, self.game, self.unit_map).generate() + + self.generate_destroyed_units() + + # Generate ground conflicts first so the JTACs get the first laser code (1688) + # rather than the first player flight with a TGP. + self.generate_ground_conflicts() + self.generate_air_units(tgo_generator) + + TriggerGenerator(self.mission, self.game).generate() + ForcedOptionsGenerator(self.mission, self.game).generate() + VisualsGenerator(self.mission, self.game).generate() + LuaGenerator(self.game, self.mission, self.mission_data).generate() + DrawingsGenerator(self.mission, self.game).generate() + + self.setup_combined_arms() + + self.notify_info_generators() + + # TODO: Shouldn't this be first? + namegen.reset_numbers() + self.mission.save(output) + + return self.unit_map + + @staticmethod + def _configure_ewrj(gen: AircraftGenerator) -> None: + for groups in gen.ewrj_package_dict.values(): + optrot = groups[0].points[0].tasks[0] + assert isinstance(optrot, OptReactOnThreat) + if ( + len(groups) == 1 + and optrot.value != OptReactOnThreat.Values.PassiveDefense + ): + # primary flight with no EWR-Jamming capability + continue + for group in groups: + group.points[0].tasks[0] = OptReactOnThreat( + OptReactOnThreat.Values.PassiveDefense + ) + + def setup_mission_coalitions(self) -> None: + self.mission.coalition["blue"] = Coalition( + "blue", bullseye=self.game.blue.bullseye.to_pydcs() + ) + self.mission.coalition["red"] = Coalition( + "red", bullseye=self.game.red.bullseye.to_pydcs() + ) + self.mission.coalition["neutrals"] = Coalition( + "neutrals", bullseye=Bullseye(Point(0, 0, self.mission.terrain)).to_pydcs() + ) + + self.mission.coalition["blue"].add_country(self.p_country) + self.mission.coalition["red"].add_country(self.e_country) + + belligerents = {self.p_country.id, self.e_country.id} + for country_id in country_dict.keys(): + if country_id not in belligerents: + c = country_dict[country_id]() + self.mission.coalition["neutrals"].add_country(c) + + def add_airfields_to_unit_map(self) -> None: + for control_point in self.game.theater.controlpoints: + if isinstance(control_point, Airfield): + self.unit_map.add_airfield(control_point) + + def initialize_registries(self) -> None: + unique_map_frequencies: set[RadioFrequency] = set() + self.initialize_tacan_registry(unique_map_frequencies) + self.initialize_radio_registry(unique_map_frequencies) + # Allocate UHF/VHF Guard Freq first! + unique_map_frequencies.add(MHz(243)) + unique_map_frequencies.add(MHz(121, 500)) + for frequency in unique_map_frequencies: + self.radio_registry.reserve(frequency) + + def initialize_tacan_registry( + self, unique_map_frequencies: set[RadioFrequency] + ) -> None: + """ + Dedup beacon/radio frequencies, since some maps have some frequencies + used multiple times. + """ + for beacon in Beacons.iter_theater(self.game.theater): + unique_map_frequencies.add(beacon.frequency) + if beacon.is_tacan: + if beacon.channel is None: + logging.warning(f"TACAN beacon has no channel: {beacon.callsign}") + else: + self.tacan_registry.mark_unavailable(beacon.tacan_channel) + for cp in self.game.theater.controlpoints: + if isinstance(cp, TacanContainer) and cp.tacan is not None: + self.tacan_registry.mark_unavailable(cp.tacan) + + def initialize_radio_registry( + self, unique_map_frequencies: set[RadioFrequency] + ) -> None: + for airport in self.game.theater.terrain.airport_list(): + if (atc := AtcData.from_pydcs(airport)) is not None: + unique_map_frequencies.add(atc.hf) + unique_map_frequencies.add(atc.vhf_fm) + unique_map_frequencies.add(atc.vhf_am) + unique_map_frequencies.add(atc.uhf) + # No need to reserve ILS or TACAN because those are in the + # beacon list. + + def generate_ground_conflicts(self) -> None: + """Generate FLOTs and JTACs for each active front line.""" + for front_line in self.game.theater.conflicts(): + player_cp = front_line.blue_cp + enemy_cp = front_line.red_cp + conflict = FrontLineConflictDescription.frontline_cas_conflict( + front_line, self.game.theater, self.game.settings + ) + # Generate frontline ops + player_gp = self.game.ground_planners[player_cp.id].units_per_cp[ + enemy_cp.id + ] + enemy_gp = self.game.ground_planners[enemy_cp.id].units_per_cp[player_cp.id] + ground_conflict_gen = FlotGenerator( + self.mission, + conflict, + self.game, + player_gp, + enemy_gp, + player_cp.stances[enemy_cp.id], + enemy_cp.stances[player_cp.id], + self.unit_map, + self.radio_registry, + self.mission_data, + self.laser_code_registry, + ) + ground_conflict_gen.generate() + + def generate_air_units(self, tgo_generator: TgoGenerator) -> None: + """Generate the air units for the Operation""" + + # Generate Aircraft Activity on the map + aircraft_generator = AircraftGenerator( + self.mission, + self.game.settings, + self.game, + self.time, + self.radio_registry, + self.tacan_registry, + self.laser_code_registry, + self.unit_map, + mission_data=self.mission_data, + helipads=tgo_generator.helipads, + ground_spawns_roadbase=tgo_generator.ground_spawns_roadbase, + ground_spawns=tgo_generator.ground_spawns, + ) + + aircraft_generator.clear_parking_slots() + + aircraft_generator.generate_flights( + self.p_country, + self.game.blue.ato, + tgo_generator.runways, + ) + aircraft_generator.generate_flights( + self.e_country, + self.game.red.ato, + tgo_generator.runways, + ) + aircraft_generator.spawn_unused_aircraft( + self.p_country, + self.e_country, + ) + + self.mission_data.flights = aircraft_generator.flights + + for flight in aircraft_generator.flights: + if not flight.client_units: + continue + flight.aircraft_type.assign_channels_for_flight(flight, self.mission_data) + + if self.game.settings.plugins.get("ewrj"): + self._configure_ewrj(aircraft_generator) + + def generate_destroyed_units(self) -> None: + """Add destroyed units to the Mission""" + if not self.game.settings.perf_destroyed_units: + return + + for d in self.game.get_destroyed_units(): + try: + type_name = d["type"] + if not isinstance(type_name, str): + raise TypeError( + "Expected the type of the destroyed static to be a string" + ) + utype = unit_type_from_name(type_name) + except KeyError: + logging.warning(f"Destroyed unit has no type: {d}") + continue + + pos = Point(cast(float, d["x"]), cast(float, d["z"]), self.mission.terrain) + if utype is not None and not self.game.position_culled(pos): + self.mission.static_group( + country=self.p_country, + name="", + _type=utype, + hidden=True, + position=pos, + heading=d["orientation"], + dead=True, + ) + + def notify_info_generators( + self, + ) -> None: + """Generates subscribed MissionInfoGenerator objects.""" + mission_data = self.mission_data + gens: list[MissionInfoGenerator] = [ + KneeboardGenerator(self.mission, self.game), + BriefingGenerator(self.mission, self.game), + ] + for gen in gens: + for dynamic_runway in mission_data.runways: + gen.add_dynamic_runway(dynamic_runway) + + for tanker in mission_data.tankers: + if tanker.blue: + gen.add_tanker(tanker) + + for aewc in mission_data.awacs: + if aewc.blue: + gen.add_awacs(aewc) + + for jtac in mission_data.jtacs: + if jtac.blue: + gen.add_jtac(jtac) + + for flight in mission_data.flights: + gen.add_flight(flight) + gen.generate() + + def setup_combined_arms(self) -> None: + settings = self.game.settings + commanders = settings.tactical_commander_count + self.mission.groundControl.pilot_can_control_vehicles = commanders > 0 + + self.mission.groundControl.blue_game_masters = settings.game_masters_count + self.mission.groundControl.blue_tactical_commander = commanders + self.mission.groundControl.blue_jtac = settings.jtac_count + self.mission.groundControl.blue_observer = settings.observer_count diff --git a/game/pretense/pretensetriggergenerator.py b/game/pretense/pretensetriggergenerator.py new file mode 100644 index 00000000..cc31b4e5 --- /dev/null +++ b/game/pretense/pretensetriggergenerator.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, List + +from dcs import Point +from dcs.action import ( + ClearFlag, + DoScript, + MarkToAll, + SetFlag, + RemoveSceneObjects, + RemoveSceneObjectsMask, + SceneryDestructionZone, + Smoke, +) +from dcs.condition import ( + AllOfCoalitionOutsideZone, + FlagIsFalse, + FlagIsTrue, + PartOfCoalitionInZone, + TimeAfter, + TimeSinceFlag, +) +from dcs.mission import Mission +from dcs.task import Option +from dcs.translation import String +from dcs.triggers import Event, TriggerCondition, TriggerOnce +from dcs.unit import Skill + +from game.theater import Airfield +from game.theater.controlpoint import Fob, TRIGGER_RADIUS_CAPTURE + +if TYPE_CHECKING: + from game.game import Game + +PUSH_TRIGGER_SIZE = 3000 +PUSH_TRIGGER_ACTIVATION_AGL = 25 + +REGROUP_ZONE_DISTANCE = 12000 +REGROUP_ALT = 5000 + +TRIGGER_WAYPOINT_OFFSET = 2 +TRIGGER_MIN_DISTANCE_FROM_START = 10000 +# modified since we now have advanced SAM units +TRIGGER_RADIUS_MINIMUM = 3000000 + +TRIGGER_RADIUS_SMALL = 50000 +TRIGGER_RADIUS_MEDIUM = 100000 +TRIGGER_RADIUS_LARGE = 150000 +TRIGGER_RADIUS_ALL_MAP = 3000000 +TRIGGER_RADIUS_CLEAR_SCENERY = 1000 + + +class Silence(Option): + Key = 7 + + +class TriggerGenerator: + capture_zone_types = (Fob, Airfield) + capture_zone_flag = 600 + + def __init__(self, mission: Mission, game: Game) -> None: + self.mission = mission + self.game = game + + def _set_allegiances(self, player_coalition: str, enemy_coalition: str) -> None: + """ + Set airbase initial coalition + """ + + # Empty neutrals airports + airfields = [ + cp for cp in self.game.theater.controlpoints if isinstance(cp, Airfield) + ] + airport_ids = {cp.airport.id for cp in airfields} + for airport in self.mission.terrain.airport_list(): + if airport.id not in airport_ids: + airport.unlimited_fuel = False + airport.unlimited_munitions = False + airport.unlimited_aircrafts = False + airport.gasoline_init = 0 + airport.methanol_mixture_init = 0 + airport.diesel_init = 0 + airport.jet_init = 0 + airport.operating_level_air = 0 + airport.operating_level_equipment = 0 + airport.operating_level_fuel = 0 + + for airport in self.mission.terrain.airport_list(): + if airport.id not in airport_ids: + airport.unlimited_fuel = True + airport.unlimited_munitions = True + airport.unlimited_aircrafts = True + + for airfield in airfields: + cp_airport = self.mission.terrain.airport_by_id(airfield.airport.id) + if cp_airport is None: + raise RuntimeError( + f"Could not find {airfield.airport.name} in the mission" + ) + cp_airport.set_coalition( + airfield.captured and player_coalition or enemy_coalition + ) + + def _set_skill(self, player_coalition: str, enemy_coalition: str) -> None: + """ + Set skill level for all aircraft in the mission + """ + for coalition_name, coalition in self.mission.coalition.items(): + if coalition_name == player_coalition: + skill_level = Skill(self.game.settings.player_skill) + elif coalition_name == enemy_coalition: + skill_level = Skill(self.game.settings.enemy_vehicle_skill) + else: + continue + + for country in coalition.countries.values(): + for vehicle_group in country.vehicle_group: + vehicle_group.set_skill(skill_level) + + def _gen_markers(self) -> None: + """ + Generate markers on F10 map for each existing objective + """ + if self.game.settings.generate_marks: + mark_trigger = TriggerOnce(Event.NoEvent, "Marks generator") + mark_trigger.add_condition(TimeAfter(1)) + v = 10 + for cp in self.game.theater.controlpoints: + seen = set() + for ground_object in cp.ground_objects: + if ground_object.obj_name in seen: + continue + + seen.add(ground_object.obj_name) + for location in ground_object.mark_locations: + zone = self.mission.triggers.add_triggerzone( + location, radius=10, hidden=True, name="MARK" + ) + if cp.captured: + name = ground_object.obj_name + " [ALLY]" + else: + name = ground_object.obj_name + " [ENEMY]" + mark_trigger.add_action(MarkToAll(v, zone.id, String(name))) + v += 1 + self.mission.triggerrules.triggers.append(mark_trigger) + + def _generate_clear_statics_trigger(self, scenery_clear_zones: List[Point]) -> None: + for zone_center in scenery_clear_zones: + trigger_zone = self.mission.triggers.add_triggerzone( + zone_center, + radius=TRIGGER_RADIUS_CLEAR_SCENERY, + hidden=False, + name="CLEAR", + ) + clear_trigger = TriggerCondition(Event.NoEvent, "Clear Trigger") + clear_flag = self.get_capture_zone_flag() + clear_trigger.add_condition(TimeSinceFlag(clear_flag, 30)) + clear_trigger.add_action(ClearFlag(clear_flag)) + clear_trigger.add_action(SetFlag(clear_flag)) + clear_trigger.add_action( + RemoveSceneObjects( + objects_mask=RemoveSceneObjectsMask.OBJECTS_ONLY, + zone=trigger_zone.id, + ) + ) + clear_trigger.add_action( + SceneryDestructionZone(destruction_level=100, zone=trigger_zone.id) + ) + self.mission.triggerrules.triggers.append(clear_trigger) + + enable_clear_trigger = TriggerOnce(Event.NoEvent, "Enable Clear Trigger") + enable_clear_trigger.add_condition(TimeAfter(30)) + enable_clear_trigger.add_action(ClearFlag(clear_flag)) + enable_clear_trigger.add_action(SetFlag(clear_flag)) + # clear_trigger.add_action(MessageToAll(text=String("Enable clear trigger"),)) + self.mission.triggerrules.triggers.append(enable_clear_trigger) + + def _generate_capture_triggers( + self, player_coalition: str, enemy_coalition: str + ) -> None: + """Creates a pair of triggers for each control point of `cls.capture_zone_types`. + One for the initial capture of a control point, and one if it is recaptured. + Directly appends to the global `base_capture_events` var declared by `dcs_libaration.lua` + """ + for cp in self.game.theater.controlpoints: + if isinstance(cp, self.capture_zone_types) and not cp.is_carrier: + if cp.captured: + attacking_coalition = enemy_coalition + attack_coalition_int = 1 # 1 is the Event int for Red + defending_coalition = player_coalition + defend_coalition_int = 2 # 2 is the Event int for Blue + else: + attacking_coalition = player_coalition + attack_coalition_int = 2 + defending_coalition = enemy_coalition + defend_coalition_int = 1 + + trigger_zone = self.mission.triggers.add_triggerzone( + cp.position, + radius=TRIGGER_RADIUS_CAPTURE, + hidden=False, + name="CAPTURE", + ) + flag = self.get_capture_zone_flag() + capture_trigger = TriggerCondition(Event.NoEvent, "Capture Trigger") + capture_trigger.add_condition( + AllOfCoalitionOutsideZone( + defending_coalition, trigger_zone.id, unit_type="GROUND" + ) + ) + capture_trigger.add_condition( + PartOfCoalitionInZone( + attacking_coalition, trigger_zone.id, unit_type="GROUND" + ) + ) + capture_trigger.add_condition(FlagIsFalse(flag=flag)) + script_string = String( + f'base_capture_events[#base_capture_events + 1] = "{cp.id}||{attack_coalition_int}||{cp.full_name}"' + ) + capture_trigger.add_action(DoScript(script_string)) + capture_trigger.add_action(SetFlag(flag=flag)) + self.mission.triggerrules.triggers.append(capture_trigger) + + recapture_trigger = TriggerCondition(Event.NoEvent, "Capture Trigger") + recapture_trigger.add_condition( + AllOfCoalitionOutsideZone( + attacking_coalition, trigger_zone.id, unit_type="GROUND" + ) + ) + recapture_trigger.add_condition( + PartOfCoalitionInZone( + defending_coalition, trigger_zone.id, unit_type="GROUND" + ) + ) + recapture_trigger.add_condition(FlagIsTrue(flag=flag)) + script_string = String( + f'base_capture_events[#base_capture_events + 1] = "{cp.id}||{defend_coalition_int}||{cp.full_name}"' + ) + recapture_trigger.add_action(DoScript(script_string)) + recapture_trigger.add_action(ClearFlag(flag=flag)) + self.mission.triggerrules.triggers.append(recapture_trigger) + + def generate(self) -> None: + player_coalition = "blue" + enemy_coalition = "red" + + self._set_skill(player_coalition, enemy_coalition) + self._set_allegiances(player_coalition, enemy_coalition) + self._gen_markers() + self._generate_capture_triggers(player_coalition, enemy_coalition) + if self.game.settings.ground_start_scenery_remove_triggers: + try: + self._generate_clear_statics_trigger(self.game.scenery_clear_zones) + self.game.scenery_clear_zones.clear() + except AttributeError: + logging.info(f"Unable to create Clear Statics triggers") + + @classmethod + def get_capture_zone_flag(cls) -> int: + flag = cls.capture_zone_flag + cls.capture_zone_flag += 1 + return flag