Build common interface for waypoint geometry constraints.

This is a replacement for the existing "zone geometry" classes that are
currently used for choosing locations for IP, hold, and join points.
The older approach required the author to define the methods for
choosing locations at a rather low level using shapely APIs to merge or
mask geometries. Debug UIs had to be defined manually which was a great
deal of work. Worse, those debug UIs were only useable for *successful*
waypoint placement. If there was a bug in the solver (which was pretty
much unavoidable during development or tuning), it wasn't possible to
use the debug UI.

This new system adds a (very simple) geometric constraint solver to
allow the author to describe the requirements for a waypoint at a high
level. Each waypoint type will define a waypoint solver that defines one
or more waypoint strategies which will be tried in order. For example,
the IP solver might have the following strategies:

1. Safe IP
2. Threat tolerant IP
3. Unsafe IP
4. Safe backtracking IP
5. Unsafe backtracking IP

We prefer those in the order defined, but the preferred strategies won't
always have a valid solution. When that happens, the next one is tried.

The strategies define the constraints for the waypoint location. For
example, the safe IP strategy could be defined as (in pseudo code):

* At least 5 NM away from the departure airfield
* Not farther from the departure airfield than the target is
* Within 10 NM and 45 NM of the target (doctrine dependent)
* Safe
* Within the permissible region, select the point nearest the departure
  airfield

When a solver fails to find a solution using any strategy, debug
information is automatically written in a GeoJSON format which can be
viewed on geojson.io.

Fixes https://github.com/dcs-liberation/dcs_liberation/issues/3085.
This commit is contained in:
Dan Albert
2023-07-17 23:35:21 -07:00
committed by Raffson
parent bf879a6141
commit 643dafd2c8
4 changed files with 830 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
from __future__ import annotations
import json
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any
from dcs import Point
from dcs.mapping import Point as DcsPoint
from dcs.terrain import Terrain
from numpy import float64, array
from numpy._typing import NDArray
from shapely import transform, to_geojson
from shapely.geometry.base import BaseGeometry
if TYPE_CHECKING:
from .waypointstrategy import WaypointStrategy
class NoSolutionsError(RuntimeError):
pass
class WaypointSolver:
def __init__(self) -> None:
self.strategies: list[WaypointStrategy] = []
self.debug_output_directory: Path | None = None
self._terrain: Terrain | None = None
def add_strategy(self, strategy: WaypointStrategy) -> None:
self.strategies.append(strategy)
def set_debug_properties(self, path: Path, terrain: Terrain) -> None:
self.debug_output_directory = path
self._terrain = terrain
def to_geojson(self, geometry: BaseGeometry) -> dict[str, Any]:
if geometry.is_empty:
return json.loads(to_geojson(geometry))
assert self._terrain is not None
origin = DcsPoint(0, 0, self._terrain)
def xy_to_ll(points: NDArray[float64]) -> NDArray[float64]:
ll_points = []
for point in points:
p = origin.new_in_same_map(point[0], point[1])
latlng = p.latlng()
# Longitude is unintuitively first because it's the "X" coordinate:
# https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.1
ll_points.append([latlng.lng, latlng.lat])
return array(ll_points)
transformed = transform(geometry, xy_to_ll)
return json.loads(to_geojson(transformed))
def describe_metadata(self) -> dict[str, Any]:
return {}
def describe_inputs(self) -> Iterator[tuple[str, BaseGeometry]]:
yield from []
def describe_debug(self) -> dict[str, Any]:
assert self._terrain is not None
metadata = {"name": self.__class__.__name__, "terrain": self._terrain.name}
metadata.update(self.describe_metadata())
return {
"type": "FeatureCollection",
# The GeoJSON spec forbids us from adding a "properties" field to a feature
# collection, but it doesn't restrict us from adding our own custom fields.
# https://gis.stackexchange.com/a/209263
#
# It's possible that some consumers won't work with this, but we don't read
# collections directly with shapely and geojson.io is happy with it, so it
# works where we need it to.
"metadata": metadata,
"features": list(self.describe_features()),
}
def describe_features(self) -> Iterator[dict[str, Any]]:
for description, geometry in self.describe_inputs():
yield {
"type": "Feature",
"properties": {
"description": description,
},
"geometry": self.to_geojson(geometry),
}
def dump_debug_info(self) -> None:
path = self.debug_output_directory
if path is None:
return
path.mkdir(exist_ok=True, parents=True)
inputs_path = path / "solver.json"
with inputs_path.open("w", encoding="utf-8") as inputs_file:
json.dump(self.describe_debug(), inputs_file)
features = list(self.describe_features())
for idx, strategy in enumerate(self.strategies):
strategy_path = path / f"{idx}.json"
with strategy_path.open("w", encoding="utf-8") as strategy_debug_file:
json.dump(
{
"type": "FeatureCollection",
"metadata": {
"name": strategy.__class__.__name__,
"prerequisites": [
p.describe_debug_info(self.to_geojson)
for p in strategy.prerequisites
],
},
# Include the solver's features in the strategy feature
# collection for easy copy/paste into geojson.io.
"features": features
+ [
d.to_geojson(self.to_geojson)
for d in strategy.iter_debug_info()
],
},
strategy_debug_file,
)
def solve(self) -> Point:
if not self.strategies:
raise ValueError(
"WaypointSolver.solve() called before any strategies were added"
)
for strategy in self.strategies:
if (point := strategy.find()) is not None:
return point
self.dump_debug_info()
debug_details = "No debug output directory set"
if (debug_path := self.debug_output_directory) is not None:
debug_details = f"Debug details written to {debug_path}"
raise NoSolutionsError(f"No solutions found for waypoint. {debug_details}")

View File

@@ -0,0 +1,269 @@
from __future__ import annotations
import math
from abc import abstractmethod, ABC
from collections.abc import Iterator, Callable
from dataclasses import dataclass
from typing import Any
from dcs.mapping import heading_between_points
from shapely.geometry import Point, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry as Geometry, BaseGeometry
from shapely.ops import nearest_points
from game.utils import Distance, nautical_miles, Heading
def angle_between_points(a: Point, b: Point) -> float:
return heading_between_points(a.x, a.y, b.x, b.y)
def point_at_heading(p: Point, heading: Heading, distance: Distance) -> Point:
rad_heading = heading.radians
return Point(
p.x + math.cos(rad_heading) * distance.meters,
p.y + math.sin(rad_heading) * distance.meters,
)
class Prerequisite(ABC):
@abstractmethod
def is_satisfied(self) -> bool:
...
@abstractmethod
def describe_debug_info(
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
) -> dict[str, Any]:
...
class DistancePrerequisite(Prerequisite):
def __init__(self, a: Point, b: Point, min_range: Distance) -> None:
self.a = a
self.b = b
self.min_range = min_range
def is_satisfied(self) -> bool:
return self.a.distance(self.b) >= self.min_range.meters
def describe_debug_info(
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
) -> dict[str, Any]:
return {
"requirement": f"at least {self.min_range} between",
"satisfied": self.is_satisfied(),
"subject": to_geojson(self.a),
"target": to_geojson(self.b),
}
class SafePrerequisite(Prerequisite):
def __init__(self, point: Point, threat_zones: MultiPolygon) -> None:
self.point = point
self.threat_zones = threat_zones
def is_satisfied(self) -> bool:
return not self.point.intersects(self.threat_zones)
def describe_debug_info(
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
) -> dict[str, Any]:
return {
"requirement": "is safe",
"satisfied": self.is_satisfied(),
"subject": to_geojson(self.point),
}
class PrerequisiteBuilder:
def __init__(
self, subject: Point, threat_zones: MultiPolygon, strategy: WaypointStrategy
) -> None:
self.subject = subject
self.threat_zones = threat_zones
self.strategy = strategy
def is_safe(self) -> None:
self.strategy.add_prerequisite(
SafePrerequisite(self.subject, self.threat_zones)
)
def min_distance_from(self, target: Point, distance: Distance) -> None:
self.strategy.add_prerequisite(
DistancePrerequisite(self.subject, target, distance)
)
@dataclass(frozen=True)
class ThreatTolerance:
target: Point
target_buffer: Distance
tolerance: Distance
class RequirementBuilder:
def __init__(self, threat_zones: MultiPolygon, strategy: WaypointStrategy) -> None:
self.threat_zones = threat_zones
self.strategy = strategy
def safe(self) -> None:
self.strategy.exclude_threat_zone()
def at_least(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, min_distance=distance)
def at_most(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, max_distance=distance)
def maximum_turn_to(
self, turn_point: Point, next_point: Point, turn_limit: Heading
) -> None:
large_distance = nautical_miles(400)
next_heading = Heading.from_degrees(
angle_between_points(next_point, turn_point)
)
limit_ccw = point_at_heading(
turn_point, next_heading - turn_limit, large_distance
)
limit_cw = point_at_heading(
turn_point, next_heading + turn_limit, large_distance
)
allowed_wedge = Polygon([turn_point, limit_ccw, limit_cw])
self.strategy.exclude(
f"restrict turn from {turn_point} to {next_point} to {turn_limit}",
turn_point.buffer(large_distance.meters).difference(allowed_wedge),
)
class DistanceRequirementBuilder:
def __init__(
self,
strategy: WaypointStrategy,
min_distance: Distance | None = None,
max_distance: Distance | None = None,
) -> None:
if min_distance is None and max_distance is None:
raise ValueError
self.strategy = strategy
self.min_distance = min_distance
self.max_distance = max_distance
def away_from(self, target: Point, description: str | None = None) -> None:
if description is None:
description = str(target)
if self.min_distance is not None:
self.strategy.exclude(
f"at least {self.min_distance} away from {description}",
target.buffer(self.min_distance.meters),
)
if self.max_distance is not None:
self.strategy.exclude_beyond(
f"at most {self.max_distance} away from {description}",
target.buffer(self.max_distance.meters),
)
@dataclass(frozen=True)
class WaypointDebugInfo:
description: str
geometry: BaseGeometry
def to_geojson(
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
) -> dict[str, Any]:
return {
"type": "Feature",
"properties": {
"description": self.description,
},
"geometry": to_geojson(self.geometry),
}
class WaypointStrategy:
def __init__(self, threat_zones: MultiPolygon) -> None:
self.threat_zones = threat_zones
self.prerequisites: list[Prerequisite] = []
self._max_area = Point(0, 0).buffer(1_000_000)
self.allowed_area = self._max_area.buffer(0)
self.debug_infos: list[WaypointDebugInfo] = []
self._threat_tolerance: ThreatTolerance | None = None
self.point_for_nearest_solution: Point | None = None
def add_prerequisite(self, prerequisite: Prerequisite) -> None:
self.prerequisites.append(prerequisite)
def prerequisite(self, subject: Point) -> PrerequisiteBuilder:
return PrerequisiteBuilder(subject, self.threat_zones, self)
def exclude(self, description: str, geometry: Geometry) -> None:
self.debug_infos.append(WaypointDebugInfo(description, geometry))
self.allowed_area = self.allowed_area.difference(geometry)
def exclude_beyond(self, description: str, geometry: Geometry) -> None:
self.exclude(description, self._max_area.difference(geometry))
def exclude_threat_zone(self) -> None:
if (tolerance := self._threat_tolerance) is not None:
description = (
f"safe with a {tolerance.tolerance} tolerance to a "
f"{tolerance.target_buffer} radius about {tolerance.target}"
)
else:
description = "safe"
self.exclude(description, self.threat_zones)
def prerequisites_are_satisfied(self) -> bool:
for prereq in self.prerequisites:
if not prereq.is_satisfied():
return False
return True
def require(self) -> RequirementBuilder:
return RequirementBuilder(self.threat_zones, self)
def threat_tolerance(
self, target: Point, target_size: Distance, wiggle: Distance
) -> None:
if self.threat_zones.is_empty:
return
min_distance_from_threat_to_target_buffer = target.buffer(
target_size.meters
).distance(self.threat_zones.boundary)
threat_mask = self.threat_zones.buffer(
-min_distance_from_threat_to_target_buffer - wiggle.meters
)
self._threat_tolerance = ThreatTolerance(target, target_size, wiggle)
self.threat_zones = self.threat_zones.difference(threat_mask)
def nearest(self, point: Point) -> None:
if self.point_for_nearest_solution is not None:
raise RuntimeError("WaypointStrategy.nearest() called more than once")
self.point_for_nearest_solution = point
def find(self) -> Point | None:
if self.point_for_nearest_solution is None:
raise RuntimeError(
"Must call WaypointStrategy.nearest() before WaypointStrategy.find()"
)
if not self.prerequisites_are_satisfied():
return None
try:
return nearest_points(self.allowed_area, self.point_for_nearest_solution)[0]
except ValueError:
# No solutions.
return None
def iter_debug_info(self) -> Iterator[WaypointDebugInfo]:
yield from self.debug_infos
solution = self.find()
if solution is None:
return
yield WaypointDebugInfo("solution", solution)