Add fuzz testing for waypoint solvers.

This fuzz test generates random inputs for waypoint solvers to check if
they can find a solution. If they can't, the debug info for the solver
is dumped to the testcases directory. Another test loads those test
cases, creates a solver from them, and checks that a solution is found.
Obviously it won't be immediately, but it's a starting point for fixing
the bug and serves as a regression test afterward.
This commit is contained in:
Dan Albert 2023-08-03 00:47:19 -07:00
parent 6b6c4f4112
commit 1708baf772
7 changed files with 428 additions and 0 deletions

View File

@ -0,0 +1,79 @@
import json
from functools import cached_property
from pathlib import Path
from typing import Any
from dcs.mapping import Point as DcsPoint, LatLng
from dcs.terrain import Terrain
from numpy import float64, array
from numpy._typing import NDArray
from shapely import transform
from shapely.geometry import shape
from shapely.geometry.base import BaseGeometry
from game.data.doctrine import Doctrine, ALL_DOCTRINES
from .ipsolver import IpSolver
from .waypointsolver import WaypointSolver
from ..theater.theaterloader import TERRAINS_BY_NAME
def doctrine_from_name(name: str) -> Doctrine:
for doctrine in ALL_DOCTRINES:
if doctrine.name == name:
return doctrine
raise KeyError
def geometry_ll_to_xy(geometry: BaseGeometry, terrain: Terrain) -> BaseGeometry:
if geometry.is_empty:
return geometry
def ll_to_xy(points: NDArray[float64]) -> NDArray[float64]:
ll_points = []
for point in points:
# Longitude is unintuitively first because it's the "X" coordinate:
# https://datatracker.ietf.org/doc/html/rfc7946#section-3.1.1
p = DcsPoint.from_latlng(LatLng(point[1], point[0]), terrain)
ll_points.append([p.x, p.y])
return array(ll_points)
return transform(geometry, ll_to_xy)
class WaypointSolverLoader:
def __init__(self, debug_info_path: Path) -> None:
self.debug_info_path = debug_info_path
def load_data(self) -> dict[str, Any]:
with self.debug_info_path.open(encoding="utf-8") as debug_info_file:
return json.load(debug_info_file)
@staticmethod
def load_geometries(
feature_collection: dict[str, Any], terrain: Terrain
) -> dict[str, BaseGeometry]:
geometries = {}
for feature in feature_collection["features"]:
description = feature["properties"]["description"]
geometry = shape(feature["geometry"])
geometries[description] = geometry_ll_to_xy(geometry, terrain)
return geometries
@cached_property
def terrain(self) -> Terrain:
return TERRAINS_BY_NAME[self.load_data()["metadata"]["terrain"]]
def load(self) -> WaypointSolver:
data = self.load_data()
metadata = data["metadata"]
name = metadata.pop("name")
terrain_name = metadata.pop("terrain")
terrain = TERRAINS_BY_NAME[terrain_name]
if "doctrine" in metadata:
metadata["doctrine"] = doctrine_from_name(metadata["doctrine"])
geometries = self.load_geometries(data, terrain)
builder: type[WaypointSolver] = {
"IpSolver": IpSolver,
}[name]
metadata.update(geometries)
return builder(**metadata)

9
pytest.ini Normal file
View File

@ -0,0 +1,9 @@
[pytest]
markers =
fuzztest: marks tests as fuzz tests
# Disable fuzz tests by default. They're randomized so flaky by nature. They
# are typically run manually after making changes to fuzzed code to generate
# new regression tests.
addopts =
-m "not fuzztest"

View File

View File

@ -0,0 +1,101 @@
import random
import uuid
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
import pytest
from dcs.terrain import Caucasus
from shapely import Point, MultiPolygon, Polygon, unary_union
from game.data.doctrine import ALL_DOCTRINES
from game.flightplan.ipsolver import IpSolver
from game.flightplan.waypointsolver import NoSolutionsError
from game.flightplan.waypointstrategy import point_at_heading
from game.utils import Heading, nautical_miles, feet, Distance, meters
from tests.flightplan.waypointsolvertestcasereducer import WaypointSolverTestCaseReducer
# The Falklands is nearly 1000 nmi diagonal. We'll use that as a radius for a bit of
# overkill.
MAP_RADIUS = nautical_miles(1000)
# Aircraft like the B-1 have a combat range closer to 3000 nmi, but we don't have
# maps big enough for that to matter. 600 nmi is still a *very* distant target for
# our campaigns.
MAX_TARGET_DISTANCE = nautical_miles(500)
# 200 nmi is roughly the max range of the SA-5, which has the greatest range of
# anything in DCS.
MAX_THREAT_RANGE = nautical_miles(200)
MAX_THREAT_DISTANCE = MAX_TARGET_DISTANCE + MAX_THREAT_RANGE
THIS_DIR = Path(__file__).parent
TEST_CASE_DIRECTORY = THIS_DIR / "testcases"
def fuzz_threat() -> Polygon:
threat_range_m = random.triangular(
feet(500).meters, MAX_THREAT_RANGE.meters, nautical_miles(40).meters
)
threat_distance = meters(
random.triangular(0, MAX_THREAT_DISTANCE.meters, nautical_miles(100).meters)
)
threat_position = point_at_heading(Point(0, 0), Heading.random(), threat_distance)
return threat_position.buffer(threat_range_m)
@pytest.fixture(name="fuzzed_target_distance")
def fuzzed_target_distance_fixture() -> Distance:
return meters(
random.triangular(0, MAX_TARGET_DISTANCE.meters, nautical_miles(100).meters)
)
@pytest.fixture(name="fuzzed_threat_poly")
def fuzzed_threat_poly_fixture() -> MultiPolygon:
number_of_threats = random.randint(0, 100)
polys = unary_union([fuzz_threat() for _ in range(number_of_threats)])
if isinstance(polys, MultiPolygon):
return polys
return MultiPolygon([polys])
@pytest.fixture(name="fuzzed_solver")
def fuzzed_solver_fixture(
fuzzed_target_distance: Distance, fuzzed_threat_poly: MultiPolygon, tmp_path: Path
) -> IpSolver:
target_heading = Heading.from_degrees(random.uniform(0, 360))
departure = Point(0, 0)
target = point_at_heading(departure, target_heading, fuzzed_target_distance)
solver = IpSolver(
departure, target, random.choice(ALL_DOCTRINES), fuzzed_threat_poly
)
solver.set_debug_properties(tmp_path, Caucasus())
return solver
@contextmanager
def capture_fuzz_failures(solver: IpSolver) -> Iterator[None]:
try:
yield
except NoSolutionsError as ex:
test_case_directory = TEST_CASE_DIRECTORY / str(uuid.uuid4())
assert solver.debug_output_directory
WaypointSolverTestCaseReducer(
solver.debug_output_directory, test_case_directory
).reduce()
ex.add_note(f"Reduced test case was written to {test_case_directory}")
raise
@pytest.mark.fuzztest
@pytest.mark.parametrize("run_number", range(500))
def test_fuzz_ipsolver(fuzzed_solver: IpSolver, run_number: int) -> None:
with capture_fuzz_failures(fuzzed_solver):
fuzzed_solver.solve()
def test_can_construct_solver_with_empty_threat() -> None:
IpSolver(Point(0, 0), Point(0, 0), ALL_DOCTRINES[0], MultiPolygon([]))

View File

@ -0,0 +1,23 @@
from pathlib import Path
import pytest
from game.flightplan.waypointsolverloader import WaypointSolverLoader
THIS_DIR = Path(__file__).parent
TEST_CASES_DIR = THIS_DIR / "testcases"
# Set to True to regenerate the debug files for each test case. After doing this, format
# the test cases with `npx prettier -w tests/flightplan/testcases` for readability.
UPDATE_TEST_CASES = False
@pytest.mark.parametrize("test_case", TEST_CASES_DIR.glob("**/solver.json"))
def test_waypoint_solver_regression_tests(test_case: Path) -> None:
loader = WaypointSolverLoader(test_case)
solver = loader.load()
if UPDATE_TEST_CASES:
solver.set_debug_properties(test_case.parent, loader.terrain)
solver.solve()
if UPDATE_TEST_CASES:
solver.dump_debug_info()

View File

@ -0,0 +1,54 @@
import json
from pathlib import Path
import pytest
from game.flightplan.ipsolver import IpSolver
from game.flightplan.waypointsolverloader import WaypointSolverLoader
def test_waypointsolverloader(tmp_path: Path) -> None:
debug_info_path = tmp_path / "solver.json"
debug_info_path.write_text(
json.dumps(
{
"type": "FeatureCollection",
"metadata": {
"name": "IpSolver",
"terrain": "Falklands",
"doctrine": "coldwar",
},
"features": [
{
"type": "Feature",
"properties": {"description": "departure"},
"geometry": {
"type": "Point",
"coordinates": [-59.17351849883801, -52.46892777233296],
},
},
{
"type": "Feature",
"properties": {"description": "target"},
"geometry": {
"type": "Point",
"coordinates": [-59.12970828579045, -52.51860490233211],
},
},
{
"type": "Feature",
"properties": {"description": "threat_zones"},
"geometry": {"type": "MultiPolygon", "coordinates": []},
},
],
}
)
)
solver = WaypointSolverLoader(debug_info_path).load()
assert isinstance(solver, IpSolver)
assert solver.doctrine.name == "coldwar"
assert solver.threat_zones.is_empty
assert solver.departure.x == pytest.approx(0, abs=1e-8)
assert solver.departure.y == pytest.approx(0, abs=1e-8)
assert solver.target.x == pytest.approx(-5436.058, abs=0.001)
assert solver.target.y == pytest.approx(3138.51, abs=0.001)

View File

@ -0,0 +1,162 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterator
from pathlib import Path
from typing import Any, TypeVar, Generic
from shapely import Point, MultiPolygon
from shapely.geometry import shape
from game.data.doctrine import Doctrine, ALL_DOCTRINES
from game.flightplan.ipsolver import IpSolver
from game.flightplan.waypointsolver import WaypointSolver, NoSolutionsError
from game.flightplan.waypointsolverloader import WaypointSolverLoader
from game.theater.theaterloader import TERRAINS_BY_NAME
ReducerT = TypeVar("ReducerT")
def doctrine_from_name(name: str) -> Doctrine:
for doctrine in ALL_DOCTRINES:
if doctrine.name == name:
return doctrine
raise KeyError
class Reducer(Generic[ReducerT], Iterator[ReducerT], ABC):
@abstractmethod
def accept(self) -> None:
...
@abstractmethod
def reject(self) -> None:
...
class MultiPolyReducer(Reducer[MultiPolygon]):
def __init__(self, multipoly: MultiPolygon) -> None:
self._multipoly: MultiPolygon | None = multipoly
self._previous_poly: MultiPolygon | None = None
self._remove_index = 0
def __next__(self) -> MultiPolygon:
if self._multipoly is None:
raise StopIteration
return self._multipoly
def _reduce_poly(self) -> None:
assert self._multipoly is not None
polys = list(self._multipoly.geoms)
if not polys or self._remove_index >= len(polys):
self._multipoly = None
return
del polys[self._remove_index]
self._previous_poly = self._multipoly
self._multipoly = MultiPolygon(polys)
def accept(self) -> None:
self._reduce_poly()
def reject(self) -> None:
self._multipoly = self._previous_poly
self._previous_poly = None
self._remove_index += 1
self._reduce_poly()
class IpSolverReducer(Reducer[IpSolver]):
def __init__(
self,
departure: Point,
target: Point,
doctrine: Doctrine,
threat_zones: MultiPolygon,
) -> None:
self.departure = departure
self.target = target
self.doctrine = doctrine
self.threat_zones_reducer = MultiPolyReducer(threat_zones)
@staticmethod
def from_metadata_and_feature_collection(
metadata: dict[str, Any], feature_collection: dict[str, Any]
) -> IpSolverReducer:
departure: Point | None = None
target: Point | None = None
threat_zones: MultiPolygon | None = None
for feature in feature_collection["features"]:
description = feature["properties"]["description"]
geometry = feature["geometry"]
match description:
case "departure":
departure = shape(geometry)
case "target":
target = shape(geometry)
case "threat_zones":
threat_zones = shape(geometry)
if departure is None:
raise KeyError("feature collection has no departure point")
if target is None:
raise KeyError("feature collection has no target point")
if threat_zones is None:
raise KeyError("feature collection has no threat zones")
return IpSolverReducer(
departure,
target,
doctrine_from_name(metadata["doctrine"]),
threat_zones,
)
def __next__(self) -> IpSolver:
return IpSolver(
self.departure, self.target, self.doctrine, next(self.threat_zones_reducer)
)
def accept(self) -> None:
self.threat_zones_reducer.accept()
def reject(self) -> None:
self.threat_zones_reducer.reject()
class WaypointSolverTestCaseReducer:
def __init__(self, debug_directory: Path, out_dir: Path) -> None:
self.debug_directory = debug_directory
self.out_dir = out_dir
if self.out_dir.exists():
raise ValueError(f"out_dir {out_dir} already exists")
@staticmethod
def _reducer_from_solver(solver: WaypointSolver) -> Reducer[Any]:
if isinstance(solver, IpSolver):
return IpSolverReducer(
solver.departure, solver.target, solver.doctrine, solver.threat_zones
)
else:
raise KeyError(f"Unhandled waypoint solver {solver.__class__.__name__}")
def reduce(self) -> None:
loader = WaypointSolverLoader(self.debug_directory / "solver.json")
solver = loader.load()
last_broken: WaypointSolver | None = None
reducer = self._reducer_from_solver(solver)
for solver in reducer:
try:
solver.solve()
reducer.reject()
except NoSolutionsError:
last_broken = solver
reducer.accept()
if last_broken is None:
raise RuntimeError("all cases succeeded, nothing to reduce")
self.out_dir.mkdir(parents=True)
last_broken.set_debug_properties(
self.out_dir, TERRAINS_BY_NAME[loader.load_data()["metadata"]["terrain"]]
)
last_broken.dump_debug_info()