mirror of
https://github.com/dcs-liberation/dcs_liberation.git
synced 2025-11-10 14:22:26 +00:00
267 lines
8.8 KiB
Python
267 lines
8.8 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
from abc import abstractmethod, ABC
|
|
from collections.abc import Iterator, Callable
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from dcs.mapping import heading_between_points
|
|
from shapely.geometry import Point, MultiPolygon, Polygon
|
|
from shapely.geometry.base import BaseGeometry as Geometry, BaseGeometry
|
|
from shapely.ops import nearest_points
|
|
|
|
from game.utils import Distance, nautical_miles, Heading
|
|
|
|
|
|
def angle_between_points(a: Point, b: Point) -> float:
|
|
return heading_between_points(a.x, a.y, b.x, b.y)
|
|
|
|
|
|
def point_at_heading(p: Point, heading: Heading, distance: Distance) -> Point:
|
|
rad_heading = heading.radians
|
|
return Point(
|
|
p.x + math.cos(rad_heading) * distance.meters,
|
|
p.y + math.sin(rad_heading) * distance.meters,
|
|
)
|
|
|
|
|
|
class Prerequisite(ABC):
|
|
@abstractmethod
|
|
def is_satisfied(self) -> bool: ...
|
|
|
|
@abstractmethod
|
|
def describe_debug_info(
|
|
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
|
|
) -> dict[str, Any]: ...
|
|
|
|
|
|
class DistancePrerequisite(Prerequisite):
|
|
def __init__(self, a: Point, b: Point, min_range: Distance) -> None:
|
|
self.a = a
|
|
self.b = b
|
|
self.min_range = min_range
|
|
|
|
def is_satisfied(self) -> bool:
|
|
return self.a.distance(self.b) >= self.min_range.meters
|
|
|
|
def describe_debug_info(
|
|
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"requirement": f"at least {self.min_range} between",
|
|
"satisfied": self.is_satisfied(),
|
|
"subject": to_geojson(self.a),
|
|
"target": to_geojson(self.b),
|
|
}
|
|
|
|
|
|
class SafePrerequisite(Prerequisite):
|
|
def __init__(self, point: Point, threat_zones: MultiPolygon) -> None:
|
|
self.point = point
|
|
self.threat_zones = threat_zones
|
|
|
|
def is_satisfied(self) -> bool:
|
|
return not self.point.intersects(self.threat_zones)
|
|
|
|
def describe_debug_info(
|
|
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"requirement": "is safe",
|
|
"satisfied": self.is_satisfied(),
|
|
"subject": to_geojson(self.point),
|
|
}
|
|
|
|
|
|
class PrerequisiteBuilder:
|
|
def __init__(
|
|
self, subject: Point, threat_zones: MultiPolygon, strategy: WaypointStrategy
|
|
) -> None:
|
|
self.subject = subject
|
|
self.threat_zones = threat_zones
|
|
self.strategy = strategy
|
|
|
|
def is_safe(self) -> None:
|
|
self.strategy.add_prerequisite(
|
|
SafePrerequisite(self.subject, self.threat_zones)
|
|
)
|
|
|
|
def min_distance_from(self, target: Point, distance: Distance) -> None:
|
|
self.strategy.add_prerequisite(
|
|
DistancePrerequisite(self.subject, target, distance)
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ThreatTolerance:
|
|
target: Point
|
|
target_buffer: Distance
|
|
tolerance: Distance
|
|
|
|
|
|
class RequirementBuilder:
|
|
def __init__(self, threat_zones: MultiPolygon, strategy: WaypointStrategy) -> None:
|
|
self.threat_zones = threat_zones
|
|
self.strategy = strategy
|
|
|
|
def safe(self) -> None:
|
|
self.strategy.exclude_threat_zone()
|
|
|
|
def at_least(self, distance: Distance) -> DistanceRequirementBuilder:
|
|
return DistanceRequirementBuilder(self.strategy, min_distance=distance)
|
|
|
|
def at_most(self, distance: Distance) -> DistanceRequirementBuilder:
|
|
return DistanceRequirementBuilder(self.strategy, max_distance=distance)
|
|
|
|
def maximum_turn_to(
|
|
self, turn_point: Point, next_point: Point, turn_limit: Heading
|
|
) -> None:
|
|
large_distance = nautical_miles(400)
|
|
next_heading = Heading.from_degrees(
|
|
angle_between_points(next_point, turn_point)
|
|
)
|
|
limit_ccw = point_at_heading(
|
|
turn_point, next_heading - turn_limit, large_distance
|
|
)
|
|
limit_cw = point_at_heading(
|
|
turn_point, next_heading + turn_limit, large_distance
|
|
)
|
|
|
|
allowed_wedge = Polygon([turn_point, limit_ccw, limit_cw])
|
|
self.strategy.exclude(
|
|
f"restrict turn from {turn_point} to {next_point} to {turn_limit}",
|
|
turn_point.buffer(large_distance.meters).difference(allowed_wedge),
|
|
)
|
|
|
|
|
|
class DistanceRequirementBuilder:
|
|
def __init__(
|
|
self,
|
|
strategy: WaypointStrategy,
|
|
min_distance: Distance | None = None,
|
|
max_distance: Distance | None = None,
|
|
) -> None:
|
|
if min_distance is None and max_distance is None:
|
|
raise ValueError
|
|
self.strategy = strategy
|
|
self.min_distance = min_distance
|
|
self.max_distance = max_distance
|
|
|
|
def away_from(self, target: Point, description: str | None = None) -> None:
|
|
if description is None:
|
|
description = str(target)
|
|
|
|
if self.min_distance is not None:
|
|
self.strategy.exclude(
|
|
f"at least {self.min_distance} away from {description}",
|
|
target.buffer(self.min_distance.meters),
|
|
)
|
|
if self.max_distance is not None:
|
|
self.strategy.exclude_beyond(
|
|
f"at most {self.max_distance} away from {description}",
|
|
target.buffer(self.max_distance.meters),
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WaypointDebugInfo:
|
|
description: str
|
|
geometry: BaseGeometry
|
|
|
|
def to_geojson(
|
|
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"type": "Feature",
|
|
"properties": {
|
|
"description": self.description,
|
|
},
|
|
"geometry": to_geojson(self.geometry),
|
|
}
|
|
|
|
|
|
class WaypointStrategy:
|
|
def __init__(self, threat_zones: MultiPolygon) -> None:
|
|
self.threat_zones = threat_zones
|
|
self.prerequisites: list[Prerequisite] = []
|
|
self._max_area = Point(0, 0).buffer(2_000_000)
|
|
self.allowed_area = self._max_area.buffer(0)
|
|
self.debug_infos: list[WaypointDebugInfo] = []
|
|
self._threat_tolerance: ThreatTolerance | None = None
|
|
self.point_for_nearest_solution: Point | None = None
|
|
|
|
def add_prerequisite(self, prerequisite: Prerequisite) -> None:
|
|
self.prerequisites.append(prerequisite)
|
|
|
|
def prerequisite(self, subject: Point) -> PrerequisiteBuilder:
|
|
return PrerequisiteBuilder(subject, self.threat_zones, self)
|
|
|
|
def exclude(self, description: str, geometry: Geometry) -> None:
|
|
self.debug_infos.append(WaypointDebugInfo(description, geometry))
|
|
self.allowed_area = self.allowed_area.difference(geometry)
|
|
|
|
def exclude_beyond(self, description: str, geometry: Geometry) -> None:
|
|
self.exclude(description, self._max_area.difference(geometry))
|
|
|
|
def exclude_threat_zone(self) -> None:
|
|
if (tolerance := self._threat_tolerance) is not None:
|
|
description = (
|
|
f"safe with a {tolerance.tolerance} tolerance to a "
|
|
f"{tolerance.target_buffer} radius about {tolerance.target}"
|
|
)
|
|
else:
|
|
description = "safe"
|
|
self.exclude(description, self.threat_zones)
|
|
|
|
def prerequisites_are_satisfied(self) -> bool:
|
|
for prereq in self.prerequisites:
|
|
if not prereq.is_satisfied():
|
|
return False
|
|
return True
|
|
|
|
def require(self) -> RequirementBuilder:
|
|
return RequirementBuilder(self.threat_zones, self)
|
|
|
|
def threat_tolerance(
|
|
self, target: Point, target_size: Distance, wiggle: Distance
|
|
) -> None:
|
|
if self.threat_zones.is_empty:
|
|
return
|
|
|
|
min_distance_from_threat_to_target_buffer = target.buffer(
|
|
target_size.meters
|
|
).distance(self.threat_zones.boundary)
|
|
threat_mask = self.threat_zones.buffer(
|
|
-min_distance_from_threat_to_target_buffer - wiggle.meters
|
|
)
|
|
self._threat_tolerance = ThreatTolerance(target, target_size, wiggle)
|
|
self.threat_zones = self.threat_zones.difference(threat_mask)
|
|
|
|
def nearest(self, point: Point) -> None:
|
|
if self.point_for_nearest_solution is not None:
|
|
raise RuntimeError("WaypointStrategy.nearest() called more than once")
|
|
self.point_for_nearest_solution = point
|
|
|
|
def find(self) -> Point | None:
|
|
if self.point_for_nearest_solution is None:
|
|
raise RuntimeError(
|
|
"Must call WaypointStrategy.nearest() before WaypointStrategy.find()"
|
|
)
|
|
|
|
if not self.prerequisites_are_satisfied():
|
|
return None
|
|
|
|
try:
|
|
return nearest_points(self.allowed_area, self.point_for_nearest_solution)[0]
|
|
except ValueError:
|
|
# No solutions.
|
|
return None
|
|
|
|
def iter_debug_info(self) -> Iterator[WaypointDebugInfo]:
|
|
yield from self.debug_infos
|
|
solution = self.find()
|
|
if solution is None:
|
|
return
|
|
yield WaypointDebugInfo("solution", solution)
|