mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
The purchase system will seek out a source for its units when the purchase is completed. If no source is available the order will be refunded. Orders with no source available are prevented, so this only happens when the source was cut off from the destination during the turn. There's still some funkiness going on with the first turn (but possibly only when the first turn includes a cheat to capture) where the AI buys a ton of units somewhere other than the front line. First turn behavior should probably be different anyway, with the first turn allowing purchases anywhere to avoid empty front lines while troops reinforce if the front line isn't a factory. https://github.com/Khopa/dcs_liberation/issues/986
573 lines
22 KiB
Python
573 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections import defaultdict
|
|
from typing import Dict, List, Optional, TYPE_CHECKING, Type
|
|
|
|
from dcs.mapping import Point
|
|
from dcs.task import Task
|
|
from dcs.unittype import UnitType, VehicleType
|
|
|
|
from game import persistency
|
|
from game.debriefing import AirLosses, Debriefing
|
|
from game.infos.information import Information
|
|
from game.operation.operation import Operation
|
|
from game.theater import ControlPoint, SupplyRoute
|
|
from gen import AirTaskingOrder
|
|
from gen.ground_forces.combat_stance import CombatStance
|
|
from ..db import PRICES
|
|
from ..transfers import RoadTransferOrder
|
|
from ..unitmap import UnitMap
|
|
|
|
if TYPE_CHECKING:
|
|
from ..game import Game
|
|
|
|
|
|
MINOR_DEFEAT_INFLUENCE = 0.1
|
|
DEFEAT_INFLUENCE = 0.3
|
|
STRONG_DEFEAT_INFLUENCE = 0.5
|
|
|
|
|
|
class Event:
|
|
silent = False
|
|
informational = False
|
|
|
|
game = None # type: Game
|
|
location = None # type: Point
|
|
from_cp = None # type: ControlPoint
|
|
to_cp = None # type: ControlPoint
|
|
difficulty = 1 # type: int
|
|
|
|
def __init__(
|
|
self,
|
|
game,
|
|
from_cp: ControlPoint,
|
|
target_cp: ControlPoint,
|
|
location: Point,
|
|
attacker_name: str,
|
|
defender_name: str,
|
|
):
|
|
self.game = game
|
|
self.from_cp = from_cp
|
|
self.to_cp = target_cp
|
|
self.location = location
|
|
self.attacker_name = attacker_name
|
|
self.defender_name = defender_name
|
|
|
|
@property
|
|
def is_player_attacking(self) -> bool:
|
|
return self.attacker_name == self.game.player_name
|
|
|
|
@property
|
|
def tasks(self) -> List[Type[Task]]:
|
|
return []
|
|
|
|
def generate(self) -> UnitMap:
|
|
Operation.prepare(self.game)
|
|
unit_map = Operation.generate()
|
|
Operation.current_mission.save(
|
|
persistency.mission_path_for("liberation_nextturn.miz")
|
|
)
|
|
return unit_map
|
|
|
|
@staticmethod
|
|
def _transfer_aircraft(
|
|
ato: AirTaskingOrder, losses: AirLosses, for_player: bool
|
|
) -> None:
|
|
for package in ato.packages:
|
|
for flight in package.flights:
|
|
# No need to transfer to the same location.
|
|
if flight.departure == flight.arrival:
|
|
continue
|
|
|
|
# Don't transfer to bases that were captured. Note that if the
|
|
# airfield was back-filling transfers it may overflow. We could
|
|
# attempt to be smarter in the future by performing transfers in
|
|
# order up a graph to prevent transfers to full airports and
|
|
# send overflow off-map, but overflow is fine for now.
|
|
if flight.arrival.captured != for_player:
|
|
logging.info(
|
|
f"Not transferring {flight} because {flight.arrival} "
|
|
"was captured"
|
|
)
|
|
continue
|
|
|
|
transfer_count = losses.surviving_flight_members(flight)
|
|
if transfer_count < 0:
|
|
logging.error(
|
|
f"{flight} had {flight.count} aircraft but "
|
|
f"{transfer_count} losses were recorded."
|
|
)
|
|
continue
|
|
|
|
aircraft = flight.unit_type
|
|
available = flight.departure.base.total_units_of_type(aircraft)
|
|
if available < transfer_count:
|
|
logging.error(
|
|
f"Found killed {aircraft} from {flight.departure} but "
|
|
f"that airbase has only {available} available."
|
|
)
|
|
continue
|
|
|
|
flight.departure.base.aircraft[aircraft] -= transfer_count
|
|
if aircraft not in flight.arrival.base.aircraft:
|
|
# TODO: Should use defaultdict.
|
|
flight.arrival.base.aircraft[aircraft] = 0
|
|
flight.arrival.base.aircraft[aircraft] += transfer_count
|
|
|
|
def complete_aircraft_transfers(self, debriefing: Debriefing) -> None:
|
|
self._transfer_aircraft(
|
|
self.game.blue_ato, debriefing.air_losses, for_player=True
|
|
)
|
|
self._transfer_aircraft(
|
|
self.game.red_ato, debriefing.air_losses, for_player=False
|
|
)
|
|
|
|
@staticmethod
|
|
def commit_air_losses(debriefing: Debriefing) -> None:
|
|
for loss in debriefing.air_losses.losses:
|
|
aircraft = loss.unit_type
|
|
cp = loss.departure
|
|
available = cp.base.total_units_of_type(aircraft)
|
|
if available <= 0:
|
|
logging.error(
|
|
f"Found killed {aircraft} from {cp} but that airbase has "
|
|
"none available."
|
|
)
|
|
continue
|
|
|
|
logging.info(f"{aircraft} destroyed from {cp}")
|
|
cp.base.aircraft[aircraft] -= 1
|
|
|
|
@staticmethod
|
|
def commit_front_line_losses(debriefing: Debriefing) -> None:
|
|
for loss in debriefing.front_line_losses:
|
|
unit_type = loss.unit_type
|
|
control_point = loss.origin
|
|
available = control_point.base.total_units_of_type(unit_type)
|
|
if available <= 0:
|
|
logging.error(
|
|
f"Found killed {unit_type} from {control_point} but that "
|
|
"airbase has none available."
|
|
)
|
|
continue
|
|
|
|
logging.info(f"{unit_type} destroyed from {control_point}")
|
|
control_point.base.armor[unit_type] -= 1
|
|
|
|
@staticmethod
|
|
def commit_convoy_losses(debriefing: Debriefing) -> None:
|
|
for loss in debriefing.convoy_losses:
|
|
unit_type = loss.unit_type
|
|
transfer = loss.transfer
|
|
available = loss.transfer.units.get(unit_type, 0)
|
|
convoy_name = f"convoy from {transfer.position} to {transfer.destination}"
|
|
if available <= 0:
|
|
logging.error(
|
|
f"Found killed {unit_type} in {convoy_name} but that convoy has "
|
|
"none available."
|
|
)
|
|
continue
|
|
|
|
logging.info(f"{unit_type} destroyed in {convoy_name}")
|
|
transfer.units[unit_type] -= 1
|
|
|
|
@staticmethod
|
|
def commit_ground_object_losses(debriefing: Debriefing) -> None:
|
|
for loss in debriefing.ground_object_losses:
|
|
# TODO: This should be stored in the TGO, not in the pydcs Group.
|
|
if not hasattr(loss.group, "units_losts"):
|
|
loss.group.units_losts = []
|
|
|
|
loss.group.units.remove(loss.unit)
|
|
loss.group.units_losts.append(loss.unit)
|
|
|
|
def commit_building_losses(self, debriefing: Debriefing) -> None:
|
|
for loss in debriefing.building_losses:
|
|
loss.ground_object.kill()
|
|
self.game.informations.append(
|
|
Information(
|
|
"Building destroyed",
|
|
f"{loss.ground_object.dcs_identifier} has been destroyed at "
|
|
f"location {loss.ground_object.obj_name}",
|
|
self.game.turn,
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def commit_damaged_runways(debriefing: Debriefing) -> None:
|
|
for damaged_runway in debriefing.damaged_runways:
|
|
damaged_runway.damage_runway()
|
|
|
|
def commit(self, debriefing: Debriefing):
|
|
logging.info("Committing mission results")
|
|
|
|
self.commit_air_losses(debriefing)
|
|
self.commit_front_line_losses(debriefing)
|
|
self.commit_convoy_losses(debriefing)
|
|
self.commit_ground_object_losses(debriefing)
|
|
self.commit_building_losses(debriefing)
|
|
self.commit_damaged_runways(debriefing)
|
|
|
|
# ------------------------------
|
|
# Captured bases
|
|
# if self.game.player_country in db.BLUEFOR_FACTIONS:
|
|
coalition = 2 # Value in DCS mission event for BLUE
|
|
# else:
|
|
# coalition = 1 # Value in DCS mission event for RED
|
|
|
|
for captured in debriefing.base_capture_events:
|
|
try:
|
|
id = int(captured.split("||")[0])
|
|
new_owner_coalition = int(captured.split("||")[1])
|
|
|
|
captured_cps = []
|
|
for cp in self.game.theater.controlpoints:
|
|
if cp.id == id:
|
|
|
|
if cp.captured and new_owner_coalition != coalition:
|
|
for_player = False
|
|
info = Information(
|
|
cp.name + " lost !",
|
|
"The ennemy took control of "
|
|
+ cp.name
|
|
+ "\nShame on us !",
|
|
self.game.turn,
|
|
)
|
|
self.game.informations.append(info)
|
|
captured_cps.append(cp)
|
|
elif not (cp.captured) and new_owner_coalition == coalition:
|
|
for_player = True
|
|
info = Information(
|
|
cp.name + " captured !",
|
|
"We took control of " + cp.name + "! Great job !",
|
|
self.game.turn,
|
|
)
|
|
self.game.informations.append(info)
|
|
captured_cps.append(cp)
|
|
else:
|
|
continue
|
|
|
|
cp.capture(self.game, for_player)
|
|
|
|
for cp in captured_cps:
|
|
logging.info("Will run redeploy for " + cp.name)
|
|
self.redeploy_units(cp)
|
|
except Exception:
|
|
logging.exception(f"Could not process base capture {captured}")
|
|
|
|
self.complete_aircraft_transfers(debriefing)
|
|
|
|
# Destroyed units carcass
|
|
# -------------------------
|
|
for destroyed_unit in debriefing.state_data.destroyed_statics:
|
|
self.game.add_destroyed_units(destroyed_unit)
|
|
|
|
# -----------------------------------
|
|
# Compute damage to bases
|
|
for cp in self.game.theater.player_points():
|
|
enemy_cps = [e for e in cp.connected_points if not e.captured]
|
|
for enemy_cp in enemy_cps:
|
|
print(
|
|
"Compute frontline progression for : "
|
|
+ cp.name
|
|
+ " to "
|
|
+ enemy_cp.name
|
|
)
|
|
|
|
delta = 0.0
|
|
player_won = True
|
|
ally_casualties = debriefing.casualty_count(cp)
|
|
enemy_casualties = debriefing.casualty_count(enemy_cp)
|
|
ally_units_alive = cp.base.total_armor
|
|
enemy_units_alive = enemy_cp.base.total_armor
|
|
|
|
print(ally_units_alive)
|
|
print(enemy_units_alive)
|
|
print(ally_casualties)
|
|
print(enemy_casualties)
|
|
|
|
ratio = (1.0 + enemy_casualties) / (1.0 + ally_casualties)
|
|
|
|
player_aggresive = cp.stances[enemy_cp.id] in [
|
|
CombatStance.AGGRESSIVE,
|
|
CombatStance.ELIMINATION,
|
|
CombatStance.BREAKTHROUGH,
|
|
]
|
|
|
|
if ally_units_alive == 0:
|
|
player_won = False
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
elif enemy_units_alive == 0:
|
|
player_won = True
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
elif cp.stances[enemy_cp.id] == CombatStance.RETREAT:
|
|
player_won = False
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
else:
|
|
if enemy_casualties > ally_casualties:
|
|
player_won = True
|
|
if cp.stances[enemy_cp.id] == CombatStance.BREAKTHROUGH:
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
else:
|
|
if ratio > 3:
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
elif ratio < 1.5:
|
|
delta = MINOR_DEFEAT_INFLUENCE
|
|
else:
|
|
delta = DEFEAT_INFLUENCE
|
|
elif ally_casualties > enemy_casualties:
|
|
|
|
if (
|
|
ally_units_alive > 2 * enemy_units_alive
|
|
and player_aggresive
|
|
):
|
|
# Even with casualties if the enemy is overwhelmed, they are going to lose ground
|
|
player_won = True
|
|
delta = MINOR_DEFEAT_INFLUENCE
|
|
elif (
|
|
ally_units_alive > 3 * enemy_units_alive
|
|
and player_aggresive
|
|
):
|
|
player_won = True
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
else:
|
|
# But is the enemy is not outnumbered, we lose
|
|
player_won = False
|
|
if cp.stances[enemy_cp.id] == CombatStance.BREAKTHROUGH:
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
else:
|
|
delta = STRONG_DEFEAT_INFLUENCE
|
|
|
|
# No progress with defensive strategies
|
|
if player_won and cp.stances[enemy_cp.id] in [
|
|
CombatStance.DEFENSIVE,
|
|
CombatStance.AMBUSH,
|
|
]:
|
|
print("Defensive stance, progress is limited")
|
|
delta = MINOR_DEFEAT_INFLUENCE
|
|
|
|
if player_won:
|
|
print(cp.name + " won ! factor > " + str(delta))
|
|
cp.base.affect_strength(delta)
|
|
enemy_cp.base.affect_strength(-delta)
|
|
info = Information(
|
|
"Frontline Report",
|
|
"Our ground forces from "
|
|
+ cp.name
|
|
+ " are making progress toward "
|
|
+ enemy_cp.name,
|
|
self.game.turn,
|
|
)
|
|
self.game.informations.append(info)
|
|
else:
|
|
print(cp.name + " lost ! factor > " + str(delta))
|
|
enemy_cp.base.affect_strength(delta)
|
|
cp.base.affect_strength(-delta)
|
|
info = Information(
|
|
"Frontline Report",
|
|
"Our ground forces from "
|
|
+ cp.name
|
|
+ " are losing ground against the enemy forces from "
|
|
+ enemy_cp.name,
|
|
self.game.turn,
|
|
)
|
|
self.game.informations.append(info)
|
|
|
|
def redeploy_units(self, cp: ControlPoint) -> None:
|
|
""" "
|
|
Auto redeploy units to newly captured base
|
|
"""
|
|
|
|
ally_connected_cps = [
|
|
ocp for ocp in cp.connected_points if cp.captured == ocp.captured
|
|
]
|
|
enemy_connected_cps = [
|
|
ocp for ocp in cp.connected_points if cp.captured != ocp.captured
|
|
]
|
|
|
|
# If the newly captured cp does not have enemy connected cp,
|
|
# then it is not necessary to redeploy frontline units there.
|
|
if len(enemy_connected_cps) == 0:
|
|
return
|
|
|
|
# From each ally cp, send reinforcements
|
|
for ally_cp in ally_connected_cps:
|
|
self.redeploy_between(cp, ally_cp)
|
|
|
|
def redeploy_between(self, destination: ControlPoint, source: ControlPoint) -> None:
|
|
total_units_redeployed = 0
|
|
moved_units = {}
|
|
|
|
if source.has_active_frontline or not destination.captured:
|
|
# If there are still active front lines to defend at the
|
|
# transferring CP we should not transfer all units.
|
|
#
|
|
# Opfor also does not transfer all of their units.
|
|
# TODO: Balance the CPs rather than moving half from everywhere.
|
|
move_factor = 0.5
|
|
else:
|
|
# Otherwise we can move everything.
|
|
move_factor = 1
|
|
|
|
for frontline_unit, count in source.base.armor.items():
|
|
moved_units[frontline_unit] = int(count * move_factor)
|
|
total_units_redeployed = total_units_redeployed + int(count * move_factor)
|
|
|
|
destination.base.commision_units(moved_units)
|
|
source.base.commit_losses(moved_units)
|
|
|
|
# Also transfer pending deliveries.
|
|
for unit_type, count in source.pending_unit_deliveries.units.items():
|
|
if not issubclass(unit_type, VehicleType):
|
|
continue
|
|
if count <= 0:
|
|
# Don't transfer *sales*...
|
|
continue
|
|
move_count = int(count * move_factor)
|
|
source.pending_unit_deliveries.sell({unit_type: move_count})
|
|
destination.pending_unit_deliveries.order({unit_type: move_count})
|
|
total_units_redeployed += move_count
|
|
|
|
if total_units_redeployed > 0:
|
|
text = (
|
|
f"{total_units_redeployed} units have been redeployed from "
|
|
f"{source.name} to {destination.name}"
|
|
)
|
|
info = Information("Units redeployed", text, self.game.turn)
|
|
self.game.informations.append(info)
|
|
logging.info(text)
|
|
|
|
|
|
class UnitsDeliveryEvent:
|
|
def __init__(self, destination: ControlPoint) -> None:
|
|
self.destination = destination
|
|
|
|
# Maps unit type to order quantity.
|
|
self.units: Dict[Type[UnitType], int] = defaultdict(int)
|
|
|
|
def __str__(self) -> str:
|
|
return f"Pending delivery to {self.destination}"
|
|
|
|
def order(self, units: Dict[Type[UnitType], int]) -> None:
|
|
for k, v in units.items():
|
|
self.units[k] += v
|
|
|
|
def sell(self, units: Dict[Type[UnitType], int]) -> None:
|
|
for k, v in units.items():
|
|
self.units[k] -= v
|
|
|
|
def refund_all(self, game: Game) -> None:
|
|
self.refund(game, self.units)
|
|
self.units = defaultdict(int)
|
|
|
|
def refund(self, game: Game, units: Dict[Type[UnitType], int]) -> None:
|
|
for unit_type, count in units.items():
|
|
try:
|
|
price = PRICES[unit_type]
|
|
except KeyError:
|
|
logging.error(f"Could not refund {unit_type.id}, price unknown")
|
|
continue
|
|
|
|
logging.info(f"Refunding {count} {unit_type.id} at {self.destination.name}")
|
|
game.adjust_budget(price * count, player=self.destination.captured)
|
|
|
|
def pending_orders(self, unit_type: Type[UnitType]) -> int:
|
|
pending_units = self.units.get(unit_type)
|
|
if pending_units is None:
|
|
pending_units = 0
|
|
return pending_units
|
|
|
|
def available_next_turn(self, unit_type: Type[UnitType]) -> int:
|
|
current_units = self.destination.base.total_units_of_type(unit_type)
|
|
return self.pending_orders(unit_type) + current_units
|
|
|
|
def process(self, game: Game) -> None:
|
|
ground_unit_source = self.find_ground_unit_source(game)
|
|
bought_units: Dict[Type[UnitType], int] = {}
|
|
units_needing_transfer: Dict[Type[VehicleType], int] = {}
|
|
sold_units: Dict[Type[UnitType], int] = {}
|
|
for unit_type, count in self.units.items():
|
|
coalition = "Ally" if self.destination.captured else "Enemy"
|
|
name = unit_type.id
|
|
|
|
if (
|
|
issubclass(unit_type, VehicleType)
|
|
and self.destination != ground_unit_source
|
|
):
|
|
source = ground_unit_source
|
|
d = units_needing_transfer
|
|
ground = True
|
|
else:
|
|
source = self.destination
|
|
d = bought_units
|
|
ground = False
|
|
|
|
if count >= 0:
|
|
# The destination dict will be set appropriately even if we have no
|
|
# source, and we'll refund later, buto nly emit the message when we're
|
|
# actually completing the purchase.
|
|
d[unit_type] = count
|
|
if ground or ground_unit_source is not None:
|
|
game.message(
|
|
f"{coalition} reinforcements: {name} x {count} at {source}"
|
|
)
|
|
else:
|
|
sold_units[unit_type] = -count
|
|
game.message(f"{coalition} sold: {name} x {-count} at {source}")
|
|
|
|
self.units = defaultdict(int)
|
|
self.destination.base.commision_units(bought_units)
|
|
self.destination.base.commit_losses(sold_units)
|
|
|
|
if ground_unit_source is None:
|
|
game.message(
|
|
f"{self.destination.name} lost its source for ground unit "
|
|
"reinforcements. Refunding purchase price."
|
|
)
|
|
self.refund(game, units_needing_transfer)
|
|
return
|
|
|
|
if units_needing_transfer:
|
|
ground_unit_source.base.commision_units(units_needing_transfer)
|
|
game.transfers.new_transfer(
|
|
RoadTransferOrder(
|
|
ground_unit_source,
|
|
self.destination,
|
|
self.destination.captured,
|
|
units_needing_transfer,
|
|
)
|
|
)
|
|
|
|
def find_ground_unit_source(self, game: Game) -> Optional[ControlPoint]:
|
|
# Fast path if the destination is a valid source.
|
|
if self.destination.can_recruit_ground_units(game):
|
|
return self.destination
|
|
|
|
supply_route = SupplyRoute.for_control_point(self.destination)
|
|
if supply_route is None:
|
|
return None
|
|
|
|
sources = []
|
|
for control_point in supply_route:
|
|
if control_point.can_recruit_ground_units(game):
|
|
sources.append(control_point)
|
|
|
|
if not sources:
|
|
return None
|
|
|
|
# Fast path to skip the distance calculation if we have only one option.
|
|
if len(sources) == 1:
|
|
return sources[0]
|
|
|
|
closest = sources[0]
|
|
distance = len(supply_route.shortest_path_between(self.destination, closest))
|
|
for source in sources:
|
|
new_distance = len(
|
|
supply_route.shortest_path_between(self.destination, source)
|
|
)
|
|
if new_distance < distance:
|
|
closest = source
|
|
distance = new_distance
|
|
return closest
|