Dan Albert 783ac18222 Replace existing campaign planner with an HTN.
An HTN (https://en.wikipedia.org/wiki/Hierarchical_task_network) is
similar to a decision tree, but it is able to reset to an earlier stage
if a subtask fails and tasks are able to account for the changes in
world state caused by earlier tasks.

Currently this just uses exactly the same strategy as before so we can
prove the system, but it should make it simpler to improve on task
planning.
2021-07-12 13:02:23 -07:00

126 lines
4.7 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from collections import Iterator, deque
from dataclasses import dataclass
from typing import Any, Generic, Optional, TypeVar
WorldStateT = TypeVar("WorldStateT", bound="WorldState[Any]")
class WorldState(ABC, Generic[WorldStateT]):
@abstractmethod
def clone(self) -> WorldStateT:
...
class Task(Generic[WorldStateT]):
pass
Method = list[Task[WorldStateT]]
class PrimitiveTask(Task[WorldStateT], Generic[WorldStateT], ABC):
@abstractmethod
def preconditions_met(self, state: WorldStateT) -> bool:
...
@abstractmethod
def apply_effects(self, state: WorldStateT) -> None:
...
class CompoundTask(Task[WorldStateT], Generic[WorldStateT], ABC):
@abstractmethod
def each_valid_method(self, state: WorldStateT) -> Iterator[Method[WorldStateT]]:
...
PrimitiveTaskT = TypeVar("PrimitiveTaskT", bound=PrimitiveTask[Any])
@dataclass
class PlanningState(Generic[WorldStateT, PrimitiveTaskT]):
state: WorldStateT
tasks_to_process: deque[Task[WorldStateT]]
plan: list[PrimitiveTaskT]
methods: Optional[Iterator[Method[WorldStateT]]]
@dataclass(frozen=True)
class PlanningResult(Generic[WorldStateT, PrimitiveTaskT]):
tasks: list[PrimitiveTaskT]
end_state: WorldStateT
class PlanningHistory(Generic[WorldStateT, PrimitiveTaskT]):
def __init__(self) -> None:
self.states: list[PlanningState[WorldStateT, PrimitiveTaskT]] = []
def push(self, planning_state: PlanningState[WorldStateT, PrimitiveTaskT]) -> None:
self.states.append(planning_state)
def pop(self) -> PlanningState[WorldStateT, PrimitiveTaskT]:
return self.states.pop()
class Planner(Generic[WorldStateT, PrimitiveTaskT]):
def __init__(self, main_task: Task[WorldStateT]) -> None:
self.main_task = main_task
def plan(
self, initial_state: WorldStateT
) -> Optional[PlanningResult[WorldStateT, PrimitiveTaskT]]:
planning_state: PlanningState[WorldStateT, PrimitiveTaskT] = PlanningState(
initial_state, deque([self.main_task]), [], None
)
history: PlanningHistory[WorldStateT, PrimitiveTaskT] = PlanningHistory()
while planning_state.tasks_to_process:
task = planning_state.tasks_to_process.popleft()
if isinstance(task, PrimitiveTask):
if task.preconditions_met(planning_state.state):
task.apply_effects(planning_state.state)
# Ignore type erasure. We've already verified that this is a Planner
# with a WorldStateT and a PrimitiveTaskT, so we know that the task
# list is a list of CompoundTask[WorldStateT] and PrimitiveTaskT. We
# could scatter more unions throughout to be more explicit but
# there's no way around the type erasure that mypy uses for
# isinstance.
planning_state.plan.append(task) # type: ignore
else:
planning_state = history.pop()
else:
assert isinstance(task, CompoundTask)
# If the methods field of our current state is not None that means we're
# resuming a prior attempt to execute this task after a subtask of the
# previously selected method failed.
#
# Otherwise this is the first exectution of this task so we need to
# create the generator.
if planning_state.methods is None:
methods = task.each_valid_method(planning_state.state)
else:
methods = planning_state.methods
try:
method = next(methods)
history.push(
PlanningState(
planning_state.state.clone(),
# Push the current node back onto the stack so that we
# resume handling this task when we pop back to this state.
planning_state.tasks_to_process + deque([task]),
planning_state.plan,
methods,
)
)
planning_state.methods = None
planning_state.tasks_to_process.extend(method)
except StopIteration:
try:
planning_state = history.pop()
except IndexError:
# No valid plan was found.
return None
return PlanningResult(planning_state.plan, planning_state.state)