Connect networks to enable multi-mode transfers.

Removing the per-transit type supply routes allows us to find the best
route from A to B even if the unit needs to switch transit modes along
the way.

The "best" route is the one that will generate better gameplay. That is,
convoys are preferred to ships (use cases for GMT are rare in DCS), and
ships are preferred to airlift (reasons to attack cargo ships are also
rare). Avoiding airlift is also a good strategic choice generally since
it consumes aircraft that could be performing other missions.

The extreme weight against airlift in the pathfinding algorithm could
probably be scaled way down so that airlift would be given preference
over a very long trip, possibly only for urgent transfers.

Later when we add rail that will probably be given the most preference,
but possibly between road and shipping.

https://github.com/Khopa/dcs_liberation/issues/823
This commit is contained in:
Dan Albert 2021-04-25 17:02:18 -07:00
parent a48ef69e41
commit 0779679b99
8 changed files with 248 additions and 221 deletions

View File

@ -33,6 +33,7 @@ from .navmesh import NavMesh
from .procurement import AircraftProcurementRequest, ProcurementAi
from .settings import Settings
from .theater import ConflictTheater
from .theater.transitnetwork import TransitNetwork, TransitNetworkBuilder
from .threatzones import ThreatZones
from .transfers import PendingTransfers
from .unitmap import UnitMap
@ -117,6 +118,9 @@ class Game:
self.conditions = self.generate_conditions()
self.blue_transit_network = self.compute_transit_network_for(player=True)
self.red_transit_network = self.compute_transit_network_for(player=False)
self.blue_procurement_requests: List[AircraftProcurementRequest] = []
self.red_procurement_requests: List[AircraftProcurementRequest] = []
@ -171,6 +175,11 @@ class Game:
return self.blue_procurement_requests
return self.red_procurement_requests
def transit_network_for(self, player: bool) -> TransitNetwork:
if player:
return self.blue_transit_network
return self.red_transit_network
def generate_conditions(self) -> Conditions:
return Conditions.generate(
self.theater, self.current_day, self.current_turn_time_of_day, self.settings
@ -346,6 +355,7 @@ class Game:
# Plan flights & combat for next turn
self.compute_conflicts_position()
self.compute_threat_zones()
self.compute_transit_networks()
self.ground_planners = {}
self.transfers.order_airlift_assets()
@ -417,6 +427,13 @@ class Game:
self.current_group_id += 1
return self.current_group_id
def compute_transit_networks(self) -> None:
self.blue_transit_network = self.compute_transit_network_for(player=True)
self.red_transit_network = self.compute_transit_network_for(player=False)
def compute_transit_network_for(self, player: bool) -> TransitNetwork:
return TransitNetworkBuilder(self.theater, player).build()
def compute_threat_zones(self) -> None:
self.blue_threat_zone = ThreatZones.for_faction(self, player=True)
self.red_threat_zone = ThreatZones.for_faction(self, player=False)

View File

@ -2,5 +2,4 @@ from .base import *
from .conflicttheater import *
from .controlpoint import *
from .missiontarget import MissionTarget
from .supplyroutes import SupplyRoute
from .theatergroundobject import SamGroundObject

View File

@ -466,6 +466,9 @@ class ControlPoint(MissionTarget, ABC):
def is_friendly(self, to_player: bool) -> bool:
return self.captured == to_player
def is_friendly_to(self, control_point: ControlPoint) -> bool:
return control_point.is_friendly(self.captured)
# TODO: Should be Airbase specific.
def clear_base_defenses(self) -> None:
for base_defense in self.base_defenses:

View File

@ -1,140 +0,0 @@
from __future__ import annotations
import heapq
import math
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Iterable, Iterator, List, Optional
from game.theater.controlpoint import ControlPoint
@dataclass(frozen=True, order=True)
class FrontierNode:
cost: float
point: ControlPoint = field(compare=False)
class Frontier:
def __init__(self) -> None:
self.nodes: List[FrontierNode] = []
def push(self, poly: ControlPoint, cost: float) -> None:
heapq.heappush(self.nodes, FrontierNode(cost, poly))
def pop(self) -> Optional[FrontierNode]:
try:
return heapq.heappop(self.nodes)
except IndexError:
return None
def __bool__(self) -> bool:
return bool(self.nodes)
# TODO: Build a single SupplyRoute for each coalition at the start of the turn.
# Supply routes need to cover the whole network to support multi-mode links.
#
# Traverse each friendly control point and build out a network from each. Nodes create
# connections to:
#
# 1. Bases connected by road
# 2. Bases connected by rail
# 3. Bases connected by shipping lane
# 4. Airports large enough to operate cargo planes connect to each other
# 5. Airports capable of operating helicopters connect to other airports within cargo
# helicopter range, and FOBs within half of the range (since they can't be refueled
# at the drop off).
#
# The costs of each link would be set such that the above order roughly corresponds to
# the prevalence of each type of transport. Most units should move by road, rail should
# be used a little less often than road, ships a bit less often than that, cargo planes
# infrequently, and helicopters rarely. Convoys, trains, and ships make the most
# interesting targets for players (and the easiest to generate AI flight plans for).
class SupplyRoute:
def __init__(self, control_points: List[ControlPoint]) -> None:
self.control_points = control_points
def __contains__(self, item: ControlPoint) -> bool:
return item in self.control_points
def __iter__(self) -> Iterator[ControlPoint]:
yield from self.control_points
def __len__(self) -> int:
return len(self.control_points)
def connections_from(self, control_point: ControlPoint) -> Iterable:
raise NotImplementedError
def shortest_path_between(
self, origin: ControlPoint, destination: ControlPoint
) -> List[ControlPoint]:
if origin not in self:
raise ValueError(f"{origin} is not in supply route to {destination}")
if destination not in self:
raise ValueError(f"{destination} is not in supply route from {origin}")
frontier = Frontier()
frontier.push(origin, 0)
came_from: Dict[ControlPoint, Optional[ControlPoint]] = {origin: None}
best_known: Dict[ControlPoint, float] = defaultdict(lambda: math.inf)
best_known[origin] = 0.0
while (node := frontier.pop()) is not None:
cost = node.cost
current = node.point
if cost > best_known[current]:
continue
for neighbor in self.connections_from(current):
if current.captured != neighbor.captured:
continue
new_cost = cost + 1
if new_cost < best_known[neighbor]:
best_known[neighbor] = new_cost
frontier.push(neighbor, new_cost)
came_from[neighbor] = current
# Reconstruct and reverse the path.
current = destination
path: List[ControlPoint] = []
while current != origin:
path.append(current)
previous = came_from[current]
if previous is None:
raise RuntimeError(
f"Could not reconstruct path to {destination} from {origin}"
)
current = previous
path.reverse()
return path
class RoadNetwork(SupplyRoute):
@classmethod
def for_control_point(cls, control_point: ControlPoint) -> RoadNetwork:
connected_friendly_points = control_point.transitive_connected_friendly_points()
if not connected_friendly_points:
return RoadNetwork([control_point])
return RoadNetwork([control_point] + connected_friendly_points)
def connections_from(self, control_point: ControlPoint) -> Iterable:
yield from control_point.connected_points
class ShippingNetwork(SupplyRoute):
@classmethod
def for_control_point(cls, control_point: ControlPoint) -> ShippingNetwork:
connected_friendly_points = (
control_point.transitive_friendly_shipping_destinations()
)
if not connected_friendly_points:
return ShippingNetwork([control_point])
return ShippingNetwork([control_point] + connected_friendly_points)
def connections_from(self, control_point: ControlPoint) -> Iterable:
yield from control_point.shipping_lanes

View File

@ -0,0 +1,178 @@
from __future__ import annotations
import heapq
import math
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Dict, Iterator, List, Optional, Set, Tuple
from game.theater import ConflictTheater
from game.theater.controlpoint import ControlPoint
class NoPathError(RuntimeError):
def __init__(self, origin: ControlPoint, destination: ControlPoint) -> None:
super().__init__(f"Could not reconstruct path to {destination} from {origin}")
@dataclass(frozen=True, order=True)
class FrontierNode:
cost: float
point: ControlPoint = field(compare=False)
class Frontier:
def __init__(self) -> None:
self.nodes: List[FrontierNode] = []
def push(self, poly: ControlPoint, cost: float) -> None:
heapq.heappush(self.nodes, FrontierNode(cost, poly))
def pop(self) -> Optional[FrontierNode]:
try:
return heapq.heappop(self.nodes)
except IndexError:
return None
def __bool__(self) -> bool:
return bool(self.nodes)
class TransitConnection(Enum):
Road = auto()
Shipping = auto()
Airlift = auto()
class TransitNetwork:
def __init__(self) -> None:
self.nodes: Dict[
ControlPoint, Dict[ControlPoint, TransitConnection]
] = defaultdict(dict)
def has_destinations(self, control_point: ControlPoint) -> bool:
return bool(self.nodes[control_point])
def has_link(self, a: ControlPoint, b: ControlPoint) -> bool:
return b in self.nodes[a]
def link_type(self, a: ControlPoint, b: ControlPoint) -> TransitConnection:
return self.nodes[a][b]
def link_with(
self, a: ControlPoint, b: ControlPoint, link_type: TransitConnection
) -> None:
self.nodes[a][b] = link_type
self.nodes[b][a] = link_type
def link_road(self, a: ControlPoint, b: ControlPoint) -> None:
self.link_with(a, b, TransitConnection.Road)
def link_shipping(self, a: ControlPoint, b: ControlPoint) -> None:
self.link_with(a, b, TransitConnection.Shipping)
def link_airport(self, a: ControlPoint, b: ControlPoint) -> None:
self.link_with(a, b, TransitConnection.Airlift)
def connections_from(self, control_point: ControlPoint) -> Iterator[ControlPoint]:
yield from self.nodes[control_point]
def cost(self, a: ControlPoint, b: ControlPoint) -> float:
return {
TransitConnection.Road: 1,
TransitConnection.Shipping: 3,
# Set arbitrarily high so that other methods are preferred, but still scaled
# by distance so that when we do need it we still pick the closest airfield.
# The units of distance are meters so there's no risk of these
TransitConnection.Airlift: a.position.distance_to_point(b.position),
}[self.link_type(a, b)]
def shortest_path_between(
self, origin: ControlPoint, destination: ControlPoint
) -> List[ControlPoint]:
return self.shortest_path_with_cost(origin, destination)[0]
def shortest_path_with_cost(
self, origin: ControlPoint, destination: ControlPoint
) -> Tuple[List[ControlPoint], float]:
if origin not in self.nodes:
raise ValueError(f"{origin} is not in the transit network.")
if destination not in self.nodes:
raise ValueError(f"{destination} is not in the transit network.")
frontier = Frontier()
frontier.push(origin, 0)
came_from: Dict[ControlPoint, Optional[ControlPoint]] = {origin: None}
best_known: Dict[ControlPoint, float] = defaultdict(lambda: math.inf)
best_known[origin] = 0.0
while (node := frontier.pop()) is not None:
cost = node.cost
current = node.point
if cost > best_known[current]:
continue
for neighbor in self.connections_from(current):
new_cost = cost + self.cost(node.point, neighbor)
if new_cost < best_known[neighbor]:
best_known[neighbor] = new_cost
frontier.push(neighbor, new_cost)
came_from[neighbor] = current
# Reconstruct and reverse the path.
current = destination
path: List[ControlPoint] = []
while current != origin:
path.append(current)
previous = came_from[current]
if previous is None:
raise NoPathError(origin, destination)
current = previous
path.reverse()
return path, best_known[destination]
class TransitNetworkBuilder:
def __init__(self, theater: ConflictTheater, for_player: bool) -> None:
self.control_points = list(theater.control_points_for(for_player))
self.network = TransitNetwork()
self.airports: Set[ControlPoint] = {
cp
for cp in self.control_points
if cp.is_friendly(for_player) and cp.runway_is_operational()
}
def build(self) -> TransitNetwork:
seen = set()
for control_point in self.control_points:
if control_point not in seen:
seen.add(control_point)
self.add_transit_links(control_point)
return self.network
def add_transit_links(self, control_point: ControlPoint) -> None:
# Prefer road connections.
for road_connection in control_point.connected_points:
if road_connection.is_friendly_to(control_point):
self.network.link_road(control_point, road_connection)
# Use sea connections if there's no road or rail connection.
for sea_connection in control_point.shipping_lanes:
if self.network.has_link(control_point, sea_connection):
continue
if sea_connection.is_friendly_to(control_point):
self.network.link_shipping(control_point, sea_connection)
# And use airports as a last resort.
if control_point in self.airports:
for airport in self.airports:
if control_point == airport:
continue
if self.network.has_link(control_point, airport):
continue
if not airport.is_friendly_to(control_point):
continue
self.network.link_airport(control_point, airport)

View File

@ -10,15 +10,18 @@ from dcs.mapping import Point
from dcs.unittype import FlyingType, VehicleType
from game.procurement import AircraftProcurementRequest
from game.theater import ControlPoint, MissionTarget
from game.theater.transitnetwork import (
TransitConnection,
TransitNetwork,
)
from game.utils import meters, nautical_miles
from gen.ato import Package
from gen.flights.ai_flight_planner_db import TRANSPORT_CAPABLE
from gen.flights.closestairfields import ObjectiveDistanceCache
from gen.flights.flightplan import FlightPlanBuilder
from game.theater import ControlPoint, MissionTarget
from game.theater.supplyroutes import RoadNetwork, ShippingNetwork, SupplyRoute
from gen.naming import namegen
from gen.flights.flight import Flight, FlightType
from gen.flights.flightplan import FlightPlanBuilder
from gen.naming import namegen
if TYPE_CHECKING:
from game import Game
@ -151,11 +154,14 @@ class AirliftPlanner:
#: maximum range.
HELO_MAX_RANGE = nautical_miles(100)
def __init__(self, game: Game, transfer: TransferOrder) -> None:
def __init__(
self, game: Game, transfer: TransferOrder, next_stop: ControlPoint
) -> None:
self.game = game
self.transfer = transfer
self.next_stop = next_stop
self.for_player = transfer.destination.captured
self.package = Package(target=transfer.destination, auto_asap=True)
self.package = Package(target=next_stop, auto_asap=True)
def compatible_with_mission(
self, unit_type: Type[FlyingType], airfield: ControlPoint
@ -164,7 +170,7 @@ class AirliftPlanner:
return False
if not self.transfer.origin.can_operate(unit_type):
return False
if not self.transfer.destination.can_operate(unit_type):
if not self.next_stop.can_operate(unit_type):
return False
# Cargo planes have no maximum range.
@ -395,20 +401,7 @@ class TransportMap(Generic[TransportType]):
transport.disband()
del self.transports[transport.origin][transport.destination]
def network_for(self, control_point: ControlPoint) -> SupplyRoute:
raise NotImplementedError
def path_for(self, transfer: TransferOrder) -> List[ControlPoint]:
supply_route = self.network_for(transfer.position)
return supply_route.shortest_path_between(
transfer.position, transfer.destination
)
def next_stop_for(self, transfer: TransferOrder) -> ControlPoint:
return self.path_for(transfer)[0]
def add(self, transfer: TransferOrder) -> None:
next_stop = self.next_stop_for(transfer)
def add(self, transfer: TransferOrder, next_stop: ControlPoint) -> None:
self.find_or_create_transport(transfer.position, next_stop).add_units(transfer)
def remove(self, transport: TransportType, transfer: TransferOrder) -> None:
@ -431,9 +424,6 @@ class ConvoyMap(TransportMap):
) -> Convoy:
return Convoy(origin, destination)
def network_for(self, control_point: ControlPoint) -> RoadNetwork:
return RoadNetwork.for_control_point(control_point)
class CargoShipMap(TransportMap):
def create_transport(
@ -441,9 +431,6 @@ class CargoShipMap(TransportMap):
) -> CargoShip:
return CargoShip(origin, destination)
def network_for(self, control_point: ControlPoint) -> ShippingNetwork:
return ShippingNetwork.for_control_point(control_point)
class PendingTransfers:
def __init__(self, game: Game) -> None:
@ -465,15 +452,22 @@ class PendingTransfers:
def index_of_transfer(self, transfer: TransferOrder) -> int:
return self.pending_transfers.index(transfer)
def network_for(self, control_point: ControlPoint) -> TransitNetwork:
return self.game.transit_network_for(control_point.captured)
def arrange_transport(self, transfer: TransferOrder) -> None:
if transfer.destination in RoadNetwork.for_control_point(transfer.position):
self.convoys.add(transfer)
elif transfer.destination in ShippingNetwork.for_control_point(
transfer.position
network = self.network_for(transfer.position)
path = network.shortest_path_between(transfer.position, transfer.destination)
next_stop = path[0]
if network.link_type(transfer.position, next_stop) == TransitConnection.Road:
self.convoys.add(transfer, next_stop)
elif (
network.link_type(transfer.position, next_stop)
== TransitConnection.Shipping
):
self.cargo_ships.add(transfer)
self.cargo_ships.add(transfer, next_stop)
else:
AirliftPlanner(self.game, transfer).create_package_for_airlift()
AirliftPlanner(self.game, transfer, next_stop).create_package_for_airlift()
def new_transfer(self, transfer: TransferOrder) -> None:
transfer.origin.base.commit_losses(transfer.units)

View File

@ -7,10 +7,12 @@ from typing import Dict, Optional, TYPE_CHECKING, Type
from dcs.unittype import UnitType, VehicleType
from game.theater import ControlPoint, SupplyRoute
from gen.flights.closestairfields import ObjectiveDistanceCache
from game.theater import ControlPoint
from .db import PRICES
from .theater.supplyroutes import RoadNetwork, ShippingNetwork
from .theater.transitnetwork import (
NoPathError,
TransitNetwork,
)
from .transfers import TransferOrder
if TYPE_CHECKING:
@ -125,28 +127,18 @@ class PendingUnitDeliveries:
if self.destination.can_recruit_ground_units(game):
return self.destination
by_road = self.find_ground_unit_source_in_supply_route(
RoadNetwork.for_control_point(self.destination), game
)
if by_road is not None:
return by_road
try:
return self.find_ground_unit_source_in_network(
game.transit_network_for(self.destination.captured), game
)
except NoPathError:
return None
by_ship = self.find_ground_unit_source_in_supply_route(
ShippingNetwork.for_control_point(self.destination), game
)
if by_ship is not None:
return by_ship
by_air = self.find_ground_unit_source_by_air(game)
if by_air is not None:
return by_air
return None
def find_ground_unit_source_in_supply_route(
self, supply_route: SupplyRoute, game: Game
def find_ground_unit_source_in_network(
self, network: TransitNetwork, game: Game
) -> Optional[ControlPoint]:
sources = []
for control_point in supply_route:
for control_point in game.theater.control_points_for(self.destination.captured):
if control_point.can_recruit_ground_units(game):
sources.append(control_point)
@ -158,23 +150,10 @@ class PendingUnitDeliveries:
return sources[0]
closest = sources[0]
distance = len(supply_route.shortest_path_between(self.destination, closest))
_, cost = network.shortest_path_with_cost(self.destination, closest)
for source in sources:
new_distance = len(
supply_route.shortest_path_between(self.destination, source)
)
if new_distance < distance:
_, new_cost = network.shortest_path_with_cost(self.destination, source)
if new_cost < cost:
closest = source
distance = new_distance
cost = new_cost
return closest
def find_ground_unit_source_by_air(self, game: Game) -> Optional[ControlPoint]:
closest_airfields = ObjectiveDistanceCache.get_closest_airfields(
self.destination
)
for airfield in closest_airfields.operational_airfields:
if airfield.is_friendly(
self.destination.captured
) and airfield.can_recruit_ground_units(game):
return airfield
return None

View File

@ -11,16 +11,15 @@ from PySide2.QtWidgets import (
)
from game import Game, db
from game.theater import ControlPoint, ControlPointType, SupplyRoute
from game.theater.supplyroutes import RoadNetwork, ShippingNetwork
from game.theater import ControlPoint, ControlPointType
from gen.flights.flight import FlightType
from qt_ui.dialogs import Dialog
from qt_ui.models import GameModel
from qt_ui.uiconstants import EVENT_ICONS
from qt_ui.windows.GameUpdateSignal import GameUpdateSignal
from qt_ui.windows.basemenu.NewUnitTransferDialog import NewUnitTransferDialog
from qt_ui.windows.basemenu.QBaseMenuTabs import QBaseMenuTabs
from qt_ui.windows.basemenu.QRecruitBehaviour import QRecruitBehaviour
from qt_ui.windows.basemenu.NewUnitTransferDialog import NewUnitTransferDialog
class QBaseMenu2(QDialog):
@ -106,11 +105,9 @@ class QBaseMenu2(QDialog):
@property
def has_transfer_destinations(self) -> bool:
return (
self.cp.runway_is_operational()
or len(RoadNetwork.for_control_point(self.cp)) > 1
or len(ShippingNetwork.for_control_point(self.cp)) > 1
)
return self.game_model.game.transit_network_for(
self.cp.captured
).has_destinations(self.cp)
@property
def can_repair_runway(self) -> bool: