From 1708baf7722339a3edff2c855d0c8fa085bf0cf0 Mon Sep 17 00:00:00 2001 From: Dan Albert Date: Thu, 3 Aug 2023 00:47:19 -0700 Subject: [PATCH] Add fuzz testing for waypoint solvers. This fuzz test generates random inputs for waypoint solvers to check if they can find a solution. If they can't, the debug info for the solver is dumped to the testcases directory. Another test loads those test cases, creates a solver from them, and checks that a solution is found. Obviously it won't be immediately, but it's a starting point for fixing the bug and serves as a regression test afterward. --- game/flightplan/waypointsolverloader.py | 79 +++++++++ pytest.ini | 9 + tests/flightplan/__init__.py | 0 tests/flightplan/test_ipsolver.py | 101 +++++++++++ .../test_waypointsolver_regression_tests.py | 23 +++ tests/flightplan/test_waypointsolverloader.py | 54 ++++++ .../waypointsolvertestcasereducer.py | 162 ++++++++++++++++++ 7 files changed, 428 insertions(+) create mode 100644 game/flightplan/waypointsolverloader.py create mode 100644 pytest.ini create mode 100644 tests/flightplan/__init__.py create mode 100644 tests/flightplan/test_ipsolver.py create mode 100644 tests/flightplan/test_waypointsolver_regression_tests.py create mode 100644 tests/flightplan/test_waypointsolverloader.py create mode 100644 tests/flightplan/waypointsolvertestcasereducer.py diff --git a/game/flightplan/waypointsolverloader.py b/game/flightplan/waypointsolverloader.py new file mode 100644 index 00000000..83852b65 --- /dev/null +++ b/game/flightplan/waypointsolverloader.py @@ -0,0 +1,79 @@ +import json +from functools import cached_property +from pathlib import Path +from typing import Any + +from dcs.mapping import Point as DcsPoint, LatLng +from dcs.terrain import Terrain +from numpy import float64, array +from numpy._typing import NDArray +from shapely import transform +from shapely.geometry import shape +from shapely.geometry.base import BaseGeometry + +from game.data.doctrine import Doctrine, ALL_DOCTRINES +from .ipsolver import IpSolver +from .waypointsolver import WaypointSolver +from ..theater.theaterloader import TERRAINS_BY_NAME + + +def doctrine_from_name(name: str) -> Doctrine: + for doctrine in ALL_DOCTRINES: + if doctrine.name == name: + return doctrine + raise KeyError + + +def geometry_ll_to_xy(geometry: BaseGeometry, terrain: Terrain) -> BaseGeometry: + if geometry.is_empty: + return geometry + + def ll_to_xy(points: NDArray[float64]) -> NDArray[float64]: + ll_points = [] + for point in points: + # Longitude is unintuitively first because it's the "X" coordinate: + # https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.1 + p = DcsPoint.from_latlng(LatLng(point[1], point[0]), terrain) + ll_points.append([p.x, p.y]) + return array(ll_points) + + return transform(geometry, ll_to_xy) + + +class WaypointSolverLoader: + def __init__(self, debug_info_path: Path) -> None: + self.debug_info_path = debug_info_path + + def load_data(self) -> dict[str, Any]: + with self.debug_info_path.open(encoding="utf-8") as debug_info_file: + return json.load(debug_info_file) + + @staticmethod + def load_geometries( + feature_collection: dict[str, Any], terrain: Terrain + ) -> dict[str, BaseGeometry]: + geometries = {} + for feature in feature_collection["features"]: + description = feature["properties"]["description"] + geometry = shape(feature["geometry"]) + geometries[description] = geometry_ll_to_xy(geometry, terrain) + return geometries + + @cached_property + def terrain(self) -> Terrain: + return TERRAINS_BY_NAME[self.load_data()["metadata"]["terrain"]] + + def load(self) -> WaypointSolver: + data = self.load_data() + metadata = data["metadata"] + name = metadata.pop("name") + terrain_name = metadata.pop("terrain") + terrain = TERRAINS_BY_NAME[terrain_name] + if "doctrine" in metadata: + metadata["doctrine"] = doctrine_from_name(metadata["doctrine"]) + geometries = self.load_geometries(data, terrain) + builder: type[WaypointSolver] = { + "IpSolver": IpSolver, + }[name] + metadata.update(geometries) + return builder(**metadata) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..ea7db674 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,9 @@ +[pytest] +markers = + fuzztest: marks tests as fuzz tests + +# Disable fuzz tests by default. They're randomized so flaky by nature. They +# are typically run manually after making changes to fuzzed code to generate +# new regression tests. +addopts = + -m "not fuzztest" diff --git a/tests/flightplan/__init__.py b/tests/flightplan/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/flightplan/test_ipsolver.py b/tests/flightplan/test_ipsolver.py new file mode 100644 index 00000000..ec03c9a4 --- /dev/null +++ b/tests/flightplan/test_ipsolver.py @@ -0,0 +1,101 @@ +import random +import uuid +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path + +import pytest +from dcs.terrain import Caucasus +from shapely import Point, MultiPolygon, Polygon, unary_union + +from game.data.doctrine import ALL_DOCTRINES +from game.flightplan.ipsolver import IpSolver +from game.flightplan.waypointsolver import NoSolutionsError +from game.flightplan.waypointstrategy import point_at_heading +from game.utils import Heading, nautical_miles, feet, Distance, meters +from tests.flightplan.waypointsolvertestcasereducer import WaypointSolverTestCaseReducer + +# The Falklands is nearly 1000 nmi diagonal. We'll use that as a radius for a bit of +# overkill. +MAP_RADIUS = nautical_miles(1000) + +# Aircraft like the B-1 have a combat range closer to 3000 nmi, but we don't have +# maps big enough for that to matter. 600 nmi is still a *very* distant target for +# our campaigns. +MAX_TARGET_DISTANCE = nautical_miles(500) + +# 200 nmi is roughly the max range of the SA-5, which has the greatest range of +# anything in DCS. +MAX_THREAT_RANGE = nautical_miles(200) + +MAX_THREAT_DISTANCE = MAX_TARGET_DISTANCE + MAX_THREAT_RANGE + + +THIS_DIR = Path(__file__).parent +TEST_CASE_DIRECTORY = THIS_DIR / "testcases" + + +def fuzz_threat() -> Polygon: + threat_range_m = random.triangular( + feet(500).meters, MAX_THREAT_RANGE.meters, nautical_miles(40).meters + ) + threat_distance = meters( + random.triangular(0, MAX_THREAT_DISTANCE.meters, nautical_miles(100).meters) + ) + threat_position = point_at_heading(Point(0, 0), Heading.random(), threat_distance) + return threat_position.buffer(threat_range_m) + + +@pytest.fixture(name="fuzzed_target_distance") +def fuzzed_target_distance_fixture() -> Distance: + return meters( + random.triangular(0, MAX_TARGET_DISTANCE.meters, nautical_miles(100).meters) + ) + + +@pytest.fixture(name="fuzzed_threat_poly") +def fuzzed_threat_poly_fixture() -> MultiPolygon: + number_of_threats = random.randint(0, 100) + polys = unary_union([fuzz_threat() for _ in range(number_of_threats)]) + if isinstance(polys, MultiPolygon): + return polys + return MultiPolygon([polys]) + + +@pytest.fixture(name="fuzzed_solver") +def fuzzed_solver_fixture( + fuzzed_target_distance: Distance, fuzzed_threat_poly: MultiPolygon, tmp_path: Path +) -> IpSolver: + target_heading = Heading.from_degrees(random.uniform(0, 360)) + departure = Point(0, 0) + target = point_at_heading(departure, target_heading, fuzzed_target_distance) + solver = IpSolver( + departure, target, random.choice(ALL_DOCTRINES), fuzzed_threat_poly + ) + solver.set_debug_properties(tmp_path, Caucasus()) + return solver + + +@contextmanager +def capture_fuzz_failures(solver: IpSolver) -> Iterator[None]: + try: + yield + except NoSolutionsError as ex: + test_case_directory = TEST_CASE_DIRECTORY / str(uuid.uuid4()) + assert solver.debug_output_directory + WaypointSolverTestCaseReducer( + solver.debug_output_directory, test_case_directory + ).reduce() + ex.add_note(f"Reduced test case was written to {test_case_directory}") + raise + + +@pytest.mark.fuzztest +@pytest.mark.parametrize("run_number", range(500)) +def test_fuzz_ipsolver(fuzzed_solver: IpSolver, run_number: int) -> None: + with capture_fuzz_failures(fuzzed_solver): + fuzzed_solver.solve() + + +def test_can_construct_solver_with_empty_threat() -> None: + IpSolver(Point(0, 0), Point(0, 0), ALL_DOCTRINES[0], MultiPolygon([])) diff --git a/tests/flightplan/test_waypointsolver_regression_tests.py b/tests/flightplan/test_waypointsolver_regression_tests.py new file mode 100644 index 00000000..40de20b2 --- /dev/null +++ b/tests/flightplan/test_waypointsolver_regression_tests.py @@ -0,0 +1,23 @@ +from pathlib import Path + +import pytest + +from game.flightplan.waypointsolverloader import WaypointSolverLoader + +THIS_DIR = Path(__file__).parent +TEST_CASES_DIR = THIS_DIR / "testcases" + +# Set to True to regenerate the debug files for each test case. After doing this, format +# the test cases with `npx prettier -w tests/flightplan/testcases` for readability. +UPDATE_TEST_CASES = False + + +@pytest.mark.parametrize("test_case", TEST_CASES_DIR.glob("**/solver.json")) +def test_waypoint_solver_regression_tests(test_case: Path) -> None: + loader = WaypointSolverLoader(test_case) + solver = loader.load() + if UPDATE_TEST_CASES: + solver.set_debug_properties(test_case.parent, loader.terrain) + solver.solve() + if UPDATE_TEST_CASES: + solver.dump_debug_info() diff --git a/tests/flightplan/test_waypointsolverloader.py b/tests/flightplan/test_waypointsolverloader.py new file mode 100644 index 00000000..c8fe8386 --- /dev/null +++ b/tests/flightplan/test_waypointsolverloader.py @@ -0,0 +1,54 @@ +import json +from pathlib import Path + +import pytest + +from game.flightplan.ipsolver import IpSolver +from game.flightplan.waypointsolverloader import WaypointSolverLoader + + +def test_waypointsolverloader(tmp_path: Path) -> None: + debug_info_path = tmp_path / "solver.json" + debug_info_path.write_text( + json.dumps( + { + "type": "FeatureCollection", + "metadata": { + "name": "IpSolver", + "terrain": "Falklands", + "doctrine": "coldwar", + }, + "features": [ + { + "type": "Feature", + "properties": {"description": "departure"}, + "geometry": { + "type": "Point", + "coordinates": [-59.17351849883801, -52.46892777233296], + }, + }, + { + "type": "Feature", + "properties": {"description": "target"}, + "geometry": { + "type": "Point", + "coordinates": [-59.12970828579045, -52.51860490233211], + }, + }, + { + "type": "Feature", + "properties": {"description": "threat_zones"}, + "geometry": {"type": "MultiPolygon", "coordinates": []}, + }, + ], + } + ) + ) + solver = WaypointSolverLoader(debug_info_path).load() + assert isinstance(solver, IpSolver) + assert solver.doctrine.name == "coldwar" + assert solver.threat_zones.is_empty + assert solver.departure.x == pytest.approx(0, abs=1e-8) + assert solver.departure.y == pytest.approx(0, abs=1e-8) + assert solver.target.x == pytest.approx(-5436.058, abs=0.001) + assert solver.target.y == pytest.approx(3138.51, abs=0.001) diff --git a/tests/flightplan/waypointsolvertestcasereducer.py b/tests/flightplan/waypointsolvertestcasereducer.py new file mode 100644 index 00000000..366ee86c --- /dev/null +++ b/tests/flightplan/waypointsolvertestcasereducer.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Iterator +from pathlib import Path +from typing import Any, TypeVar, Generic + +from shapely import Point, MultiPolygon +from shapely.geometry import shape + +from game.data.doctrine import Doctrine, ALL_DOCTRINES +from game.flightplan.ipsolver import IpSolver +from game.flightplan.waypointsolver import WaypointSolver, NoSolutionsError +from game.flightplan.waypointsolverloader import WaypointSolverLoader +from game.theater.theaterloader import TERRAINS_BY_NAME + +ReducerT = TypeVar("ReducerT") + + +def doctrine_from_name(name: str) -> Doctrine: + for doctrine in ALL_DOCTRINES: + if doctrine.name == name: + return doctrine + raise KeyError + + +class Reducer(Generic[ReducerT], Iterator[ReducerT], ABC): + @abstractmethod + def accept(self) -> None: + ... + + @abstractmethod + def reject(self) -> None: + ... + + +class MultiPolyReducer(Reducer[MultiPolygon]): + def __init__(self, multipoly: MultiPolygon) -> None: + self._multipoly: MultiPolygon | None = multipoly + self._previous_poly: MultiPolygon | None = None + self._remove_index = 0 + + def __next__(self) -> MultiPolygon: + if self._multipoly is None: + raise StopIteration + return self._multipoly + + def _reduce_poly(self) -> None: + assert self._multipoly is not None + polys = list(self._multipoly.geoms) + if not polys or self._remove_index >= len(polys): + self._multipoly = None + return + + del polys[self._remove_index] + self._previous_poly = self._multipoly + self._multipoly = MultiPolygon(polys) + + def accept(self) -> None: + self._reduce_poly() + + def reject(self) -> None: + self._multipoly = self._previous_poly + self._previous_poly = None + self._remove_index += 1 + self._reduce_poly() + + +class IpSolverReducer(Reducer[IpSolver]): + def __init__( + self, + departure: Point, + target: Point, + doctrine: Doctrine, + threat_zones: MultiPolygon, + ) -> None: + self.departure = departure + self.target = target + self.doctrine = doctrine + self.threat_zones_reducer = MultiPolyReducer(threat_zones) + + @staticmethod + def from_metadata_and_feature_collection( + metadata: dict[str, Any], feature_collection: dict[str, Any] + ) -> IpSolverReducer: + departure: Point | None = None + target: Point | None = None + threat_zones: MultiPolygon | None = None + for feature in feature_collection["features"]: + description = feature["properties"]["description"] + geometry = feature["geometry"] + match description: + case "departure": + departure = shape(geometry) + case "target": + target = shape(geometry) + case "threat_zones": + threat_zones = shape(geometry) + + if departure is None: + raise KeyError("feature collection has no departure point") + if target is None: + raise KeyError("feature collection has no target point") + if threat_zones is None: + raise KeyError("feature collection has no threat zones") + + return IpSolverReducer( + departure, + target, + doctrine_from_name(metadata["doctrine"]), + threat_zones, + ) + + def __next__(self) -> IpSolver: + return IpSolver( + self.departure, self.target, self.doctrine, next(self.threat_zones_reducer) + ) + + def accept(self) -> None: + self.threat_zones_reducer.accept() + + def reject(self) -> None: + self.threat_zones_reducer.reject() + + +class WaypointSolverTestCaseReducer: + def __init__(self, debug_directory: Path, out_dir: Path) -> None: + self.debug_directory = debug_directory + self.out_dir = out_dir + if self.out_dir.exists(): + raise ValueError(f"out_dir {out_dir} already exists") + + @staticmethod + def _reducer_from_solver(solver: WaypointSolver) -> Reducer[Any]: + if isinstance(solver, IpSolver): + return IpSolverReducer( + solver.departure, solver.target, solver.doctrine, solver.threat_zones + ) + else: + raise KeyError(f"Unhandled waypoint solver {solver.__class__.__name__}") + + def reduce(self) -> None: + loader = WaypointSolverLoader(self.debug_directory / "solver.json") + solver = loader.load() + last_broken: WaypointSolver | None = None + reducer = self._reducer_from_solver(solver) + for solver in reducer: + try: + solver.solve() + reducer.reject() + except NoSolutionsError: + last_broken = solver + reducer.accept() + + if last_broken is None: + raise RuntimeError("all cases succeeded, nothing to reduce") + + self.out_dir.mkdir(parents=True) + last_broken.set_debug_properties( + self.out_dir, TERRAINS_BY_NAME[loader.load_data()["metadata"]["terrain"]] + ) + last_broken.dump_debug_info()