mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
505 lines
18 KiB
Python
505 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import random
|
|
from collections.abc import Iterable
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any, Union
|
|
from typing import Optional, Sequence, TYPE_CHECKING
|
|
from uuid import uuid4, UUID
|
|
|
|
from dcs.country import Country
|
|
from faker import Faker
|
|
|
|
from game.ato import Flight, FlightType, Package
|
|
from game.settings import AutoAtoBehavior, Settings
|
|
from game.theater import ParkingType
|
|
from .pilot import Pilot, PilotStatus
|
|
from ..db.database import Database
|
|
from ..radio.radios import RadioFrequency
|
|
from ..utils import meters
|
|
|
|
if TYPE_CHECKING:
|
|
from game import Game
|
|
from game.coalition import Coalition
|
|
from game.dcs.aircrafttype import AircraftType
|
|
from game.theater import ControlPoint, MissionTarget
|
|
from .operatingbases import OperatingBases
|
|
from .squadrondef import SquadronDef
|
|
|
|
|
|
@dataclass
|
|
class Squadron:
|
|
id: UUID = field(init=False, default_factory=uuid4)
|
|
|
|
name: str
|
|
nickname: Optional[str]
|
|
country: Country
|
|
role: str
|
|
aircraft: AircraftType
|
|
max_size: int
|
|
livery: Optional[str]
|
|
primary_task: FlightType
|
|
auto_assignable_mission_types: set[FlightType]
|
|
radio_presets: dict[Union[str, int], list[RadioFrequency]]
|
|
operating_bases: OperatingBases
|
|
female_pilot_percentage: int
|
|
|
|
#: The pool of pilots that have not yet been assigned to the squadron. This only
|
|
#: happens when a preset squadron defines more preset pilots than the squadron limit
|
|
#: allows. This pool will be consumed before random pilots are generated.
|
|
pilot_pool: list[Pilot]
|
|
|
|
current_roster: list[Pilot] = field(default_factory=list, init=False, hash=False)
|
|
available_pilots: list[Pilot] = field(
|
|
default_factory=list, init=False, hash=False, compare=False
|
|
)
|
|
|
|
coalition: Coalition = field(hash=False, compare=False)
|
|
flight_db: Database[Flight] = field(hash=False, compare=False)
|
|
settings: Settings = field(hash=False, compare=False)
|
|
|
|
location: ControlPoint
|
|
destination: Optional[ControlPoint] = field(
|
|
init=False, hash=False, compare=False, default=None
|
|
)
|
|
|
|
owned_aircraft: int = field(init=False, hash=False, compare=False, default=0)
|
|
untasked_aircraft: int = field(init=False, hash=False, compare=False, default=0)
|
|
pending_deliveries: int = field(init=False, hash=False, compare=False, default=0)
|
|
|
|
def __setstate__(self, state: dict[str, Any]) -> None:
|
|
if "id" not in state:
|
|
state["id"] = uuid4()
|
|
self.__dict__.update(state)
|
|
|
|
def __str__(self) -> str:
|
|
if self.nickname is None:
|
|
return self.name
|
|
return f'{self.name} "{self.nickname}"'
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(self.id)
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
if not isinstance(other, Squadron):
|
|
return False
|
|
return self.id == other.id
|
|
|
|
@property
|
|
def player(self) -> bool:
|
|
return self.coalition.player
|
|
|
|
def assign_to_base(self, base: ControlPoint) -> None:
|
|
self.location = base
|
|
logging.debug(f"Assigned {self} to {base}")
|
|
|
|
@property
|
|
def pilot_limits_enabled(self) -> bool:
|
|
return self.settings.enable_squadron_pilot_limits
|
|
|
|
def set_auto_assignable_mission_types(
|
|
self, mission_types: Iterable[FlightType]
|
|
) -> None:
|
|
self.auto_assignable_mission_types = {
|
|
t for t in mission_types if self.capable_of(t)
|
|
}
|
|
|
|
def claim_new_pilot_if_allowed(self) -> Optional[Pilot]:
|
|
if self.pilot_limits_enabled:
|
|
return None
|
|
self._recruit_pilots(1)
|
|
return self.available_pilots.pop()
|
|
|
|
def claim_available_pilot(self) -> Optional[Pilot]:
|
|
if not self.available_pilots:
|
|
return self.claim_new_pilot_if_allowed()
|
|
|
|
# For opfor, so player/AI option is irrelevant.
|
|
if not self.player:
|
|
return self.available_pilots.pop()
|
|
|
|
preference = self.settings.auto_ato_behavior
|
|
|
|
# No preference, so the first pilot is fine.
|
|
if preference is AutoAtoBehavior.Default:
|
|
return self.available_pilots.pop()
|
|
|
|
prefer_players = preference is AutoAtoBehavior.Prefer
|
|
for pilot in self.available_pilots:
|
|
if pilot.player == prefer_players:
|
|
self.available_pilots.remove(pilot)
|
|
return pilot
|
|
|
|
# No pilot was found that matched the user's preference.
|
|
#
|
|
# If they chose to *never* assign players and only players remain in the pool,
|
|
# we cannot fill the slot with the available pilots.
|
|
#
|
|
# If they only *prefer* players and we're out of players, just return an AI
|
|
# pilot.
|
|
if not prefer_players:
|
|
return self.claim_new_pilot_if_allowed()
|
|
return self.available_pilots.pop()
|
|
|
|
def claim_pilot(self, pilot: Pilot) -> None:
|
|
if pilot not in self.available_pilots:
|
|
raise ValueError(
|
|
f"Cannot assign {pilot} to {self} because they are not available"
|
|
)
|
|
self.available_pilots.remove(pilot)
|
|
|
|
def return_pilot(self, pilot: Pilot) -> None:
|
|
self.available_pilots.append(pilot)
|
|
|
|
def return_pilots(self, pilots: Sequence[Pilot]) -> None:
|
|
# Return in reverse so that returning two pilots and then getting two more
|
|
# results in the same ordering. This happens commonly when resetting rosters in
|
|
# the UI, when we clear the roster because the UI is updating, then end up
|
|
# repopulating the same size flight from the same squadron.
|
|
self.available_pilots.extend(reversed(pilots))
|
|
|
|
def _recruit_pilots(self, count: int) -> None:
|
|
new_pilots = self.pilot_pool[:count]
|
|
self.pilot_pool = self.pilot_pool[count:]
|
|
count -= len(new_pilots)
|
|
for _ in range(count):
|
|
if random.randint(1, 100) > self.female_pilot_percentage:
|
|
new_pilots.append(Pilot(self.faker.name_male()))
|
|
else:
|
|
new_pilots.append(Pilot(self.faker.name_female()))
|
|
self.current_roster.extend(new_pilots)
|
|
self.available_pilots.extend(new_pilots)
|
|
|
|
def populate_for_turn_0(self, squadrons_start_full: bool) -> None:
|
|
if any(p.status is not PilotStatus.Active for p in self.pilot_pool):
|
|
raise ValueError("Squadrons can only be created with active pilots.")
|
|
self._recruit_pilots(self.settings.squadron_pilot_limit)
|
|
if squadrons_start_full:
|
|
parking_type = ParkingType().from_squadron(self)
|
|
self.owned_aircraft = min(
|
|
self.max_size, self.location.unclaimed_parking(parking_type)
|
|
)
|
|
|
|
def end_turn(self) -> None:
|
|
if self.destination is not None:
|
|
self.relocate_to(self.destination)
|
|
self.replenish_lost_pilots()
|
|
self.deliver_orders()
|
|
|
|
def replenish_lost_pilots(self) -> None:
|
|
if self.pilot_limits_enabled and self.replenish_count > 0:
|
|
self._recruit_pilots(self.replenish_count)
|
|
|
|
def return_all_pilots_and_aircraft(self) -> None:
|
|
self.available_pilots = list(self.active_pilots)
|
|
self.untasked_aircraft = self.owned_aircraft
|
|
|
|
@staticmethod
|
|
def send_on_leave(pilot: Pilot) -> None:
|
|
pilot.send_on_leave()
|
|
|
|
def return_from_leave(self, pilot: Pilot) -> None:
|
|
if not self.has_unfilled_pilot_slots:
|
|
raise RuntimeError(
|
|
f"Cannot return {pilot} from leave because {self} is full"
|
|
)
|
|
pilot.return_from_leave()
|
|
|
|
@property
|
|
def faker(self) -> Faker:
|
|
return self.coalition.faker
|
|
|
|
def _pilots_with_status(self, status: PilotStatus) -> list[Pilot]:
|
|
return [p for p in self.current_roster if p.status == status]
|
|
|
|
def _pilots_without_status(self, status: PilotStatus) -> list[Pilot]:
|
|
return [p for p in self.current_roster if p.status != status]
|
|
|
|
@property
|
|
def pilot_limit(self) -> int:
|
|
return self.settings.squadron_pilot_limit
|
|
|
|
@property
|
|
def expected_pilots_next_turn(self) -> int:
|
|
return len(self.active_pilots) + self.replenish_count
|
|
|
|
@property
|
|
def replenish_count(self) -> int:
|
|
return min(
|
|
self.settings.squadron_replenishment_rate,
|
|
self._number_of_unfilled_pilot_slots,
|
|
)
|
|
|
|
@property
|
|
def active_pilots(self) -> list[Pilot]:
|
|
return self._pilots_with_status(PilotStatus.Active)
|
|
|
|
@property
|
|
def pilots_on_leave(self) -> list[Pilot]:
|
|
return self._pilots_with_status(PilotStatus.OnLeave)
|
|
|
|
@property
|
|
def number_of_pilots_including_inactive(self) -> int:
|
|
return len(self.current_roster)
|
|
|
|
@property
|
|
def _number_of_unfilled_pilot_slots(self) -> int:
|
|
return self.pilot_limit - len(self.active_pilots)
|
|
|
|
@property
|
|
def number_of_available_pilots(self) -> int:
|
|
return len(self.available_pilots)
|
|
|
|
def can_provide_pilots(self, count: int) -> bool:
|
|
return not self.pilot_limits_enabled or self.number_of_available_pilots >= count
|
|
|
|
@property
|
|
def has_available_pilots(self) -> bool:
|
|
return not self.pilot_limits_enabled or bool(self.available_pilots)
|
|
|
|
@property
|
|
def has_unfilled_pilot_slots(self) -> bool:
|
|
return not self.pilot_limits_enabled or self._number_of_unfilled_pilot_slots > 0
|
|
|
|
def capable_of(self, task: FlightType) -> bool:
|
|
"""Returns True if the squadron is capable of performing the given task.
|
|
|
|
A squadron may be capable of performing a task even if it will not be
|
|
automatically assigned to it.
|
|
"""
|
|
return self.aircraft.capable_of(task)
|
|
|
|
def can_auto_assign(self, task: FlightType) -> bool:
|
|
return task in self.auto_assignable_mission_types
|
|
|
|
def can_auto_assign_mission(
|
|
self,
|
|
location: MissionTarget,
|
|
task: FlightType,
|
|
size: int,
|
|
heli: bool,
|
|
this_turn: bool,
|
|
ignore_range: bool = False,
|
|
) -> bool:
|
|
if (
|
|
self.location.cptype.name in ["FOB", "FARP"]
|
|
and not self.aircraft.helicopter
|
|
):
|
|
# AI harriers can't handle FOBs/FARPs
|
|
# AI has a hard time taking off and will not land back at FOB/FARP
|
|
# thus, disable auto-planning
|
|
return False
|
|
if not self.can_auto_assign(task):
|
|
return False
|
|
if this_turn and not self.can_fulfill_flight(size):
|
|
return False
|
|
|
|
if task in [FlightType.ESCORT, FlightType.SEAD_ESCORT]:
|
|
if heli and not self.aircraft.helicopter and not self.aircraft.lha_capable:
|
|
return False
|
|
if not heli and self.aircraft.helicopter:
|
|
return False
|
|
|
|
if heli and task == FlightType.REFUELING:
|
|
return False
|
|
|
|
if ignore_range:
|
|
return True
|
|
|
|
distance_to_target = meters(location.distance_to(self.location))
|
|
return distance_to_target <= self.aircraft.max_mission_range
|
|
|
|
def operates_from(self, control_point: ControlPoint) -> bool:
|
|
if not control_point.can_operate(self.aircraft):
|
|
return False
|
|
if control_point.is_carrier:
|
|
return self.operating_bases.carrier
|
|
elif control_point.is_lha:
|
|
return self.operating_bases.lha
|
|
else:
|
|
return self.operating_bases.shore
|
|
|
|
def pilot_at_index(self, index: int) -> Pilot:
|
|
return self.current_roster[index]
|
|
|
|
def claim_inventory(self, count: int) -> None:
|
|
if self.untasked_aircraft < count:
|
|
raise ValueError(
|
|
f"Cannot remove {count} from {self.name}. Only have "
|
|
f"{self.untasked_aircraft}."
|
|
)
|
|
self.untasked_aircraft -= count
|
|
|
|
def can_fulfill_flight(self, count: int) -> bool:
|
|
return self.can_provide_pilots(count) and self.untasked_aircraft >= count
|
|
|
|
def refund_orders(self, count: Optional[int] = None) -> None:
|
|
if count is None:
|
|
count = self.pending_deliveries
|
|
self.coalition.adjust_budget(self.aircraft.price * count)
|
|
self.pending_deliveries -= count
|
|
|
|
def deliver_orders(self) -> None:
|
|
self.cancel_overflow_orders()
|
|
self.owned_aircraft += self.pending_deliveries
|
|
self.pending_deliveries = 0
|
|
|
|
def relocate_to(self, destination: ControlPoint) -> None:
|
|
self.location = destination
|
|
if self.location == self.destination:
|
|
self.destination = None
|
|
|
|
def cancel_overflow_orders(self) -> None:
|
|
from game.theater import ParkingType
|
|
|
|
if self.pending_deliveries <= 0:
|
|
return
|
|
parking_type = ParkingType().from_aircraft(
|
|
self.aircraft, self.coalition.game.settings.ground_start_ai_planes
|
|
)
|
|
overflow = -self.location.unclaimed_parking(parking_type)
|
|
if overflow > 0:
|
|
sell_count = min(overflow, self.pending_deliveries)
|
|
logging.debug(
|
|
f"{self.location} is overfull by {overflow} aircraft. Cancelling "
|
|
f"orders for {sell_count} aircraft to make room."
|
|
)
|
|
self.refund_orders(sell_count)
|
|
|
|
@property
|
|
def max_fulfillable_aircraft(self) -> int:
|
|
return max(self.number_of_available_pilots, self.untasked_aircraft)
|
|
|
|
@property
|
|
def expected_size_next_turn(self) -> int:
|
|
return self.owned_aircraft + self.pending_deliveries
|
|
|
|
def has_aircraft_capacity_for(self, n: int) -> bool:
|
|
if not self.settings.enable_squadron_aircraft_limits:
|
|
return True
|
|
remaining = self.max_size - self.owned_aircraft - self.pending_deliveries
|
|
return remaining >= n
|
|
|
|
@property
|
|
def arrival(self) -> ControlPoint:
|
|
return self.location if self.destination is None else self.destination
|
|
|
|
def plan_relocation(self, destination: ControlPoint, now: datetime) -> None:
|
|
from game.theater import ParkingType
|
|
|
|
if destination == self.location:
|
|
logging.warning(
|
|
f"Attempted to plan relocation of {self} to current location "
|
|
f"{destination}. Ignoring."
|
|
)
|
|
return
|
|
if destination == self.destination:
|
|
logging.warning(
|
|
f"Attempted to plan relocation of {self} to current destination "
|
|
f"{destination}. Ignoring."
|
|
)
|
|
return
|
|
|
|
parking_type = ParkingType().from_squadron(self)
|
|
if self.expected_size_next_turn > destination.unclaimed_parking(parking_type):
|
|
raise RuntimeError(f"Not enough parking for {self} at {destination}.")
|
|
if not destination.can_operate(self.aircraft):
|
|
raise RuntimeError(f"{self} cannot operate at {destination}.")
|
|
self.destination = destination
|
|
self.replan_ferry_flights(now)
|
|
|
|
def cancel_relocation(self) -> None:
|
|
from game.theater import ParkingType
|
|
|
|
if self.destination is None:
|
|
logging.warning(
|
|
f"Attempted to cancel relocation of squadron with no transfer order. "
|
|
"Ignoring."
|
|
)
|
|
return
|
|
|
|
parking_type = ParkingType().from_squadron(self)
|
|
if self.expected_size_next_turn > self.location.unclaimed_parking(parking_type):
|
|
raise RuntimeError(f"Not enough parking for {self} at {self.location}.")
|
|
self.destination = None
|
|
self.cancel_ferry_flights()
|
|
|
|
def replan_ferry_flights(self, now: datetime) -> None:
|
|
self.cancel_ferry_flights()
|
|
self.plan_ferry_flights(now)
|
|
|
|
def cancel_ferry_flights(self) -> None:
|
|
for package in self.coalition.ato.packages:
|
|
# Copy the list so our iterator remains consistent throughout the removal.
|
|
for flight in list(package.flights):
|
|
if flight.squadron == self and flight.flight_type is FlightType.FERRY:
|
|
package.remove_flight(flight)
|
|
if not package.flights:
|
|
self.coalition.ato.remove_package(package)
|
|
|
|
def plan_ferry_flights(self, now: datetime) -> None:
|
|
if self.destination is None:
|
|
raise RuntimeError(
|
|
f"Cannot plan ferry flights for {self} because there is no destination."
|
|
)
|
|
remaining = self.untasked_aircraft
|
|
if not remaining:
|
|
return
|
|
|
|
package = Package(self.destination, self.flight_db)
|
|
while remaining:
|
|
size = min(remaining, self.aircraft.max_group_size)
|
|
self.plan_ferry_flight(package, size)
|
|
remaining -= size
|
|
package.set_tot_asap(now)
|
|
self.coalition.ato.add_package(package)
|
|
|
|
def plan_ferry_flight(self, package: Package, size: int) -> None:
|
|
start_type = self.location.required_aircraft_start_type
|
|
if start_type is None:
|
|
start_type = self.settings.default_start_type
|
|
|
|
flight = Flight(
|
|
package,
|
|
self,
|
|
size,
|
|
FlightType.FERRY,
|
|
start_type,
|
|
divert=None,
|
|
)
|
|
package.add_flight(flight)
|
|
flight.recreate_flight_plan()
|
|
|
|
@classmethod
|
|
def create_from(
|
|
cls,
|
|
squadron_def: SquadronDef,
|
|
primary_task: FlightType,
|
|
max_size: int,
|
|
base: ControlPoint,
|
|
coalition: Coalition,
|
|
game: Game,
|
|
) -> Squadron:
|
|
squadron_def.claimed = True
|
|
return Squadron(
|
|
squadron_def.name,
|
|
squadron_def.nickname,
|
|
squadron_def.country,
|
|
squadron_def.role,
|
|
squadron_def.aircraft,
|
|
max_size,
|
|
squadron_def.livery,
|
|
primary_task,
|
|
squadron_def.auto_assignable_mission_types,
|
|
squadron_def.radio_presets,
|
|
squadron_def.operating_bases,
|
|
squadron_def.female_pilot_percentage,
|
|
squadron_def.pilot_pool,
|
|
coalition,
|
|
game.db.flights,
|
|
game.settings,
|
|
base,
|
|
)
|