mirror of
https://github.com/dcs-retribution/dcs-retribution.git
synced 2025-11-10 15:41:24 +00:00
Add fuzz testing for waypoint solvers.
This fuzz test generates random inputs for waypoint solvers to check if they can find a solution. If they can't, the debug info for the solver is dumped to the testcases directory. Another test loads those test cases, creates a solver from them, and checks that a solution is found. Obviously it won't be immediately, but it's a starting point for fixing the bug and serves as a regression test afterward.
This commit is contained in:
parent
8b04dd878d
commit
98f92f9ab2
79
game/flightplan/waypointsolverloader.py
Normal file
79
game/flightplan/waypointsolverloader.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
import json
|
||||||
|
from functools import cached_property
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from dcs.mapping import Point as DcsPoint, LatLng
|
||||||
|
from dcs.terrain import Terrain
|
||||||
|
from numpy import float64, array
|
||||||
|
from numpy._typing import NDArray
|
||||||
|
from shapely import transform
|
||||||
|
from shapely.geometry import shape
|
||||||
|
from shapely.geometry.base import BaseGeometry
|
||||||
|
|
||||||
|
from game.data.doctrine import Doctrine, ALL_DOCTRINES
|
||||||
|
from .ipsolver import IpSolver
|
||||||
|
from .waypointsolver import WaypointSolver
|
||||||
|
from ..theater.theaterloader import TERRAINS_BY_NAME
|
||||||
|
|
||||||
|
|
||||||
|
def doctrine_from_name(name: str) -> Doctrine:
|
||||||
|
for doctrine in ALL_DOCTRINES:
|
||||||
|
if doctrine.name == name:
|
||||||
|
return doctrine
|
||||||
|
raise KeyError
|
||||||
|
|
||||||
|
|
||||||
|
def geometry_ll_to_xy(geometry: BaseGeometry, terrain: Terrain) -> BaseGeometry:
|
||||||
|
if geometry.is_empty:
|
||||||
|
return geometry
|
||||||
|
|
||||||
|
def ll_to_xy(points: NDArray[float64]) -> NDArray[float64]:
|
||||||
|
ll_points = []
|
||||||
|
for point in points:
|
||||||
|
# Longitude is unintuitively first because it's the "X" coordinate:
|
||||||
|
# https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.1
|
||||||
|
p = DcsPoint.from_latlng(LatLng(point[1], point[0]), terrain)
|
||||||
|
ll_points.append([p.x, p.y])
|
||||||
|
return array(ll_points)
|
||||||
|
|
||||||
|
return transform(geometry, ll_to_xy)
|
||||||
|
|
||||||
|
|
||||||
|
class WaypointSolverLoader:
|
||||||
|
def __init__(self, debug_info_path: Path) -> None:
|
||||||
|
self.debug_info_path = debug_info_path
|
||||||
|
|
||||||
|
def load_data(self) -> dict[str, Any]:
|
||||||
|
with self.debug_info_path.open(encoding="utf-8") as debug_info_file:
|
||||||
|
return json.load(debug_info_file)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def load_geometries(
|
||||||
|
feature_collection: dict[str, Any], terrain: Terrain
|
||||||
|
) -> dict[str, BaseGeometry]:
|
||||||
|
geometries = {}
|
||||||
|
for feature in feature_collection["features"]:
|
||||||
|
description = feature["properties"]["description"]
|
||||||
|
geometry = shape(feature["geometry"])
|
||||||
|
geometries[description] = geometry_ll_to_xy(geometry, terrain)
|
||||||
|
return geometries
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def terrain(self) -> Terrain:
|
||||||
|
return TERRAINS_BY_NAME[self.load_data()["metadata"]["terrain"]]
|
||||||
|
|
||||||
|
def load(self) -> WaypointSolver:
|
||||||
|
data = self.load_data()
|
||||||
|
metadata = data["metadata"]
|
||||||
|
name = metadata.pop("name")
|
||||||
|
terrain_name = metadata.pop("terrain")
|
||||||
|
terrain = TERRAINS_BY_NAME[terrain_name]
|
||||||
|
if "doctrine" in metadata:
|
||||||
|
metadata["doctrine"] = doctrine_from_name(metadata["doctrine"])
|
||||||
|
geometries = self.load_geometries(data, terrain)
|
||||||
|
builder: type[WaypointSolver] = {
|
||||||
|
"IpSolver": IpSolver,
|
||||||
|
}[name]
|
||||||
|
metadata.update(geometries)
|
||||||
|
return builder(**metadata)
|
||||||
9
pytest.ini
Normal file
9
pytest.ini
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
[pytest]
|
||||||
|
markers =
|
||||||
|
fuzztest: marks tests as fuzz tests
|
||||||
|
|
||||||
|
# Disable fuzz tests by default. They're randomized so flaky by nature. They
|
||||||
|
# are typically run manually after making changes to fuzzed code to generate
|
||||||
|
# new regression tests.
|
||||||
|
addopts =
|
||||||
|
-m "not fuzztest"
|
||||||
0
tests/flightplan/__init__.py
Normal file
0
tests/flightplan/__init__.py
Normal file
101
tests/flightplan/test_ipsolver.py
Normal file
101
tests/flightplan/test_ipsolver.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
import random
|
||||||
|
import uuid
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from dcs.terrain import Caucasus
|
||||||
|
from shapely import Point, MultiPolygon, Polygon, unary_union
|
||||||
|
|
||||||
|
from game.data.doctrine import ALL_DOCTRINES
|
||||||
|
from game.flightplan.ipsolver import IpSolver
|
||||||
|
from game.flightplan.waypointsolver import NoSolutionsError
|
||||||
|
from game.flightplan.waypointstrategy import point_at_heading
|
||||||
|
from game.utils import Heading, nautical_miles, feet, Distance, meters
|
||||||
|
from tests.flightplan.waypointsolvertestcasereducer import WaypointSolverTestCaseReducer
|
||||||
|
|
||||||
|
# The Falklands is nearly 1000 nmi diagonal. We'll use that as a radius for a bit of
|
||||||
|
# overkill.
|
||||||
|
MAP_RADIUS = nautical_miles(1000)
|
||||||
|
|
||||||
|
# Aircraft like the B-1 have a combat range closer to 3000 nmi, but we don't have
|
||||||
|
# maps big enough for that to matter. 600 nmi is still a *very* distant target for
|
||||||
|
# our campaigns.
|
||||||
|
MAX_TARGET_DISTANCE = nautical_miles(500)
|
||||||
|
|
||||||
|
# 200 nmi is roughly the max range of the SA-5, which has the greatest range of
|
||||||
|
# anything in DCS.
|
||||||
|
MAX_THREAT_RANGE = nautical_miles(200)
|
||||||
|
|
||||||
|
MAX_THREAT_DISTANCE = MAX_TARGET_DISTANCE + MAX_THREAT_RANGE
|
||||||
|
|
||||||
|
|
||||||
|
THIS_DIR = Path(__file__).parent
|
||||||
|
TEST_CASE_DIRECTORY = THIS_DIR / "testcases"
|
||||||
|
|
||||||
|
|
||||||
|
def fuzz_threat() -> Polygon:
|
||||||
|
threat_range_m = random.triangular(
|
||||||
|
feet(500).meters, MAX_THREAT_RANGE.meters, nautical_miles(40).meters
|
||||||
|
)
|
||||||
|
threat_distance = meters(
|
||||||
|
random.triangular(0, MAX_THREAT_DISTANCE.meters, nautical_miles(100).meters)
|
||||||
|
)
|
||||||
|
threat_position = point_at_heading(Point(0, 0), Heading.random(), threat_distance)
|
||||||
|
return threat_position.buffer(threat_range_m)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="fuzzed_target_distance")
|
||||||
|
def fuzzed_target_distance_fixture() -> Distance:
|
||||||
|
return meters(
|
||||||
|
random.triangular(0, MAX_TARGET_DISTANCE.meters, nautical_miles(100).meters)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="fuzzed_threat_poly")
|
||||||
|
def fuzzed_threat_poly_fixture() -> MultiPolygon:
|
||||||
|
number_of_threats = random.randint(0, 100)
|
||||||
|
polys = unary_union([fuzz_threat() for _ in range(number_of_threats)])
|
||||||
|
if isinstance(polys, MultiPolygon):
|
||||||
|
return polys
|
||||||
|
return MultiPolygon([polys])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(name="fuzzed_solver")
|
||||||
|
def fuzzed_solver_fixture(
|
||||||
|
fuzzed_target_distance: Distance, fuzzed_threat_poly: MultiPolygon, tmp_path: Path
|
||||||
|
) -> IpSolver:
|
||||||
|
target_heading = Heading.from_degrees(random.uniform(0, 360))
|
||||||
|
departure = Point(0, 0)
|
||||||
|
target = point_at_heading(departure, target_heading, fuzzed_target_distance)
|
||||||
|
solver = IpSolver(
|
||||||
|
departure, target, random.choice(ALL_DOCTRINES), fuzzed_threat_poly
|
||||||
|
)
|
||||||
|
solver.set_debug_properties(tmp_path, Caucasus())
|
||||||
|
return solver
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def capture_fuzz_failures(solver: IpSolver) -> Iterator[None]:
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except NoSolutionsError as ex:
|
||||||
|
test_case_directory = TEST_CASE_DIRECTORY / str(uuid.uuid4())
|
||||||
|
assert solver.debug_output_directory
|
||||||
|
WaypointSolverTestCaseReducer(
|
||||||
|
solver.debug_output_directory, test_case_directory
|
||||||
|
).reduce()
|
||||||
|
ex.add_note(f"Reduced test case was written to {test_case_directory}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.fuzztest
|
||||||
|
@pytest.mark.parametrize("run_number", range(500))
|
||||||
|
def test_fuzz_ipsolver(fuzzed_solver: IpSolver, run_number: int) -> None:
|
||||||
|
with capture_fuzz_failures(fuzzed_solver):
|
||||||
|
fuzzed_solver.solve()
|
||||||
|
|
||||||
|
|
||||||
|
def test_can_construct_solver_with_empty_threat() -> None:
|
||||||
|
IpSolver(Point(0, 0), Point(0, 0), ALL_DOCTRINES[0], MultiPolygon([]))
|
||||||
23
tests/flightplan/test_waypointsolver_regression_tests.py
Normal file
23
tests/flightplan/test_waypointsolver_regression_tests.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from game.flightplan.waypointsolverloader import WaypointSolverLoader
|
||||||
|
|
||||||
|
THIS_DIR = Path(__file__).parent
|
||||||
|
TEST_CASES_DIR = THIS_DIR / "testcases"
|
||||||
|
|
||||||
|
# Set to True to regenerate the debug files for each test case. After doing this, format
|
||||||
|
# the test cases with `npx prettier -w tests/flightplan/testcases` for readability.
|
||||||
|
UPDATE_TEST_CASES = False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("test_case", TEST_CASES_DIR.glob("**/solver.json"))
|
||||||
|
def test_waypoint_solver_regression_tests(test_case: Path) -> None:
|
||||||
|
loader = WaypointSolverLoader(test_case)
|
||||||
|
solver = loader.load()
|
||||||
|
if UPDATE_TEST_CASES:
|
||||||
|
solver.set_debug_properties(test_case.parent, loader.terrain)
|
||||||
|
solver.solve()
|
||||||
|
if UPDATE_TEST_CASES:
|
||||||
|
solver.dump_debug_info()
|
||||||
54
tests/flightplan/test_waypointsolverloader.py
Normal file
54
tests/flightplan/test_waypointsolverloader.py
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from game.flightplan.ipsolver import IpSolver
|
||||||
|
from game.flightplan.waypointsolverloader import WaypointSolverLoader
|
||||||
|
|
||||||
|
|
||||||
|
def test_waypointsolverloader(tmp_path: Path) -> None:
|
||||||
|
debug_info_path = tmp_path / "solver.json"
|
||||||
|
debug_info_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"type": "FeatureCollection",
|
||||||
|
"metadata": {
|
||||||
|
"name": "IpSolver",
|
||||||
|
"terrain": "Falklands",
|
||||||
|
"doctrine": "coldwar",
|
||||||
|
},
|
||||||
|
"features": [
|
||||||
|
{
|
||||||
|
"type": "Feature",
|
||||||
|
"properties": {"description": "departure"},
|
||||||
|
"geometry": {
|
||||||
|
"type": "Point",
|
||||||
|
"coordinates": [-59.17351849883801, -52.46892777233296],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Feature",
|
||||||
|
"properties": {"description": "target"},
|
||||||
|
"geometry": {
|
||||||
|
"type": "Point",
|
||||||
|
"coordinates": [-59.12970828579045, -52.51860490233211],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Feature",
|
||||||
|
"properties": {"description": "threat_zones"},
|
||||||
|
"geometry": {"type": "MultiPolygon", "coordinates": []},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
solver = WaypointSolverLoader(debug_info_path).load()
|
||||||
|
assert isinstance(solver, IpSolver)
|
||||||
|
assert solver.doctrine.name == "coldwar"
|
||||||
|
assert solver.threat_zones.is_empty
|
||||||
|
assert solver.departure.x == pytest.approx(0, abs=1e-8)
|
||||||
|
assert solver.departure.y == pytest.approx(0, abs=1e-8)
|
||||||
|
assert solver.target.x == pytest.approx(-5436.058, abs=0.001)
|
||||||
|
assert solver.target.y == pytest.approx(3138.51, abs=0.001)
|
||||||
162
tests/flightplan/waypointsolvertestcasereducer.py
Normal file
162
tests/flightplan/waypointsolvertestcasereducer.py
Normal file
@ -0,0 +1,162 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, TypeVar, Generic
|
||||||
|
|
||||||
|
from shapely import Point, MultiPolygon
|
||||||
|
from shapely.geometry import shape
|
||||||
|
|
||||||
|
from game.data.doctrine import Doctrine, ALL_DOCTRINES
|
||||||
|
from game.flightplan.ipsolver import IpSolver
|
||||||
|
from game.flightplan.waypointsolver import WaypointSolver, NoSolutionsError
|
||||||
|
from game.flightplan.waypointsolverloader import WaypointSolverLoader
|
||||||
|
from game.theater.theaterloader import TERRAINS_BY_NAME
|
||||||
|
|
||||||
|
ReducerT = TypeVar("ReducerT")
|
||||||
|
|
||||||
|
|
||||||
|
def doctrine_from_name(name: str) -> Doctrine:
|
||||||
|
for doctrine in ALL_DOCTRINES:
|
||||||
|
if doctrine.name == name:
|
||||||
|
return doctrine
|
||||||
|
raise KeyError
|
||||||
|
|
||||||
|
|
||||||
|
class Reducer(Generic[ReducerT], Iterator[ReducerT], ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def accept(self) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def reject(self) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class MultiPolyReducer(Reducer[MultiPolygon]):
|
||||||
|
def __init__(self, multipoly: MultiPolygon) -> None:
|
||||||
|
self._multipoly: MultiPolygon | None = multipoly
|
||||||
|
self._previous_poly: MultiPolygon | None = None
|
||||||
|
self._remove_index = 0
|
||||||
|
|
||||||
|
def __next__(self) -> MultiPolygon:
|
||||||
|
if self._multipoly is None:
|
||||||
|
raise StopIteration
|
||||||
|
return self._multipoly
|
||||||
|
|
||||||
|
def _reduce_poly(self) -> None:
|
||||||
|
assert self._multipoly is not None
|
||||||
|
polys = list(self._multipoly.geoms)
|
||||||
|
if not polys or self._remove_index >= len(polys):
|
||||||
|
self._multipoly = None
|
||||||
|
return
|
||||||
|
|
||||||
|
del polys[self._remove_index]
|
||||||
|
self._previous_poly = self._multipoly
|
||||||
|
self._multipoly = MultiPolygon(polys)
|
||||||
|
|
||||||
|
def accept(self) -> None:
|
||||||
|
self._reduce_poly()
|
||||||
|
|
||||||
|
def reject(self) -> None:
|
||||||
|
self._multipoly = self._previous_poly
|
||||||
|
self._previous_poly = None
|
||||||
|
self._remove_index += 1
|
||||||
|
self._reduce_poly()
|
||||||
|
|
||||||
|
|
||||||
|
class IpSolverReducer(Reducer[IpSolver]):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
departure: Point,
|
||||||
|
target: Point,
|
||||||
|
doctrine: Doctrine,
|
||||||
|
threat_zones: MultiPolygon,
|
||||||
|
) -> None:
|
||||||
|
self.departure = departure
|
||||||
|
self.target = target
|
||||||
|
self.doctrine = doctrine
|
||||||
|
self.threat_zones_reducer = MultiPolyReducer(threat_zones)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_metadata_and_feature_collection(
|
||||||
|
metadata: dict[str, Any], feature_collection: dict[str, Any]
|
||||||
|
) -> IpSolverReducer:
|
||||||
|
departure: Point | None = None
|
||||||
|
target: Point | None = None
|
||||||
|
threat_zones: MultiPolygon | None = None
|
||||||
|
for feature in feature_collection["features"]:
|
||||||
|
description = feature["properties"]["description"]
|
||||||
|
geometry = feature["geometry"]
|
||||||
|
match description:
|
||||||
|
case "departure":
|
||||||
|
departure = shape(geometry)
|
||||||
|
case "target":
|
||||||
|
target = shape(geometry)
|
||||||
|
case "threat_zones":
|
||||||
|
threat_zones = shape(geometry)
|
||||||
|
|
||||||
|
if departure is None:
|
||||||
|
raise KeyError("feature collection has no departure point")
|
||||||
|
if target is None:
|
||||||
|
raise KeyError("feature collection has no target point")
|
||||||
|
if threat_zones is None:
|
||||||
|
raise KeyError("feature collection has no threat zones")
|
||||||
|
|
||||||
|
return IpSolverReducer(
|
||||||
|
departure,
|
||||||
|
target,
|
||||||
|
doctrine_from_name(metadata["doctrine"]),
|
||||||
|
threat_zones,
|
||||||
|
)
|
||||||
|
|
||||||
|
def __next__(self) -> IpSolver:
|
||||||
|
return IpSolver(
|
||||||
|
self.departure, self.target, self.doctrine, next(self.threat_zones_reducer)
|
||||||
|
)
|
||||||
|
|
||||||
|
def accept(self) -> None:
|
||||||
|
self.threat_zones_reducer.accept()
|
||||||
|
|
||||||
|
def reject(self) -> None:
|
||||||
|
self.threat_zones_reducer.reject()
|
||||||
|
|
||||||
|
|
||||||
|
class WaypointSolverTestCaseReducer:
|
||||||
|
def __init__(self, debug_directory: Path, out_dir: Path) -> None:
|
||||||
|
self.debug_directory = debug_directory
|
||||||
|
self.out_dir = out_dir
|
||||||
|
if self.out_dir.exists():
|
||||||
|
raise ValueError(f"out_dir {out_dir} already exists")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _reducer_from_solver(solver: WaypointSolver) -> Reducer[Any]:
|
||||||
|
if isinstance(solver, IpSolver):
|
||||||
|
return IpSolverReducer(
|
||||||
|
solver.departure, solver.target, solver.doctrine, solver.threat_zones
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise KeyError(f"Unhandled waypoint solver {solver.__class__.__name__}")
|
||||||
|
|
||||||
|
def reduce(self) -> None:
|
||||||
|
loader = WaypointSolverLoader(self.debug_directory / "solver.json")
|
||||||
|
solver = loader.load()
|
||||||
|
last_broken: WaypointSolver | None = None
|
||||||
|
reducer = self._reducer_from_solver(solver)
|
||||||
|
for solver in reducer:
|
||||||
|
try:
|
||||||
|
solver.solve()
|
||||||
|
reducer.reject()
|
||||||
|
except NoSolutionsError:
|
||||||
|
last_broken = solver
|
||||||
|
reducer.accept()
|
||||||
|
|
||||||
|
if last_broken is None:
|
||||||
|
raise RuntimeError("all cases succeeded, nothing to reduce")
|
||||||
|
|
||||||
|
self.out_dir.mkdir(parents=True)
|
||||||
|
last_broken.set_debug_properties(
|
||||||
|
self.out_dir, TERRAINS_BY_NAME[loader.load_data()["metadata"]["terrain"]]
|
||||||
|
)
|
||||||
|
last_broken.dump_debug_info()
|
||||||
Loading…
x
Reference in New Issue
Block a user