Control radio/TACAN allocation, set flight radios.

Add central registries for allocating TACAN/radio channels to the
Operation. These ensure that each channel is allocated uniquely, and
removes the caller's need to think about which frequency to use.

The registry allocates frequencies based on the radio it is given,
which ensures that the allocated frequency will be compatible with the
radio that needs it. A mapping from aircraft to the radio used by that
aircraft for intra-flight comms (i.e. the F-16 uses the AN/ARC-222)
exists for creating infra-flight channels appropriate for the
aircraft. Inter-flight channels are allocated by a generic UHF radio.

I've moved the inter-flight radio channels from the VHF to UHF range,
since that's the most easily allocated band, and inter-flight will be
in the highest demand.

Intra-flight radios are now generally not shared. For aircraft where
the radio type is not known we will still fall back to the shared
channel, but that will stop being the case as we gain more data.

Tankers have been moved to the Y TACAN band. Not completely needed,
but seems typical for most missions and deconflicts the tankers from
any unknown airfields (which always use the X band in DCS).
This commit is contained in:
Dan Albert 2020-08-30 17:46:02 -07:00
parent b4e3067718
commit af596c58c3
8 changed files with 960 additions and 535 deletions

View File

@ -4,8 +4,9 @@ from dcs.terrain import Terrain
from gen import *
from userdata.debriefing import *
TANKER_CALLSIGNS = ["Texaco", "Arco", "Shell"]
from gen.airfields import AIRFIELD_DATA
from gen.radios import RadioRegistry
from gen.tacan import TacanRegistry
class Operation:
@ -25,6 +26,8 @@ class Operation:
groundobjectgen = None # type: GroundObjectsGenerator
briefinggen = None # type: BriefingGenerator
forcedoptionsgen = None # type: ForcedOptionsGenerator
radio_registry: Optional[RadioRegistry] = None
tacan_registry: Optional[TacanRegistry] = None
environment_settings = None
trigger_radius = TRIGGER_RADIUS_MEDIUM
@ -63,8 +66,14 @@ class Operation:
def initialize(self, mission: Mission, conflict: Conflict):
self.current_mission = mission
self.conflict = conflict
self.airgen = AircraftConflictGenerator(mission, conflict, self.game.settings, self.game)
self.airsupportgen = AirSupportConflictGenerator(mission, conflict, self.game)
self.radio_registry = RadioRegistry()
self.tacan_registry = TacanRegistry()
self.airgen = AircraftConflictGenerator(
mission, conflict, self.game.settings, self.game,
self.radio_registry)
self.airsupportgen = AirSupportConflictGenerator(
mission, conflict, self.game, self.radio_registry,
self.tacan_registry)
self.triggersgen = TriggersGenerator(mission, conflict, self.game)
self.visualgen = VisualGenerator(mission, conflict, self.game)
self.envgen = EnviromentGenerator(mission, conflict, self.game)
@ -120,6 +129,17 @@ class Operation:
# Generate ground object first
self.groundobjectgen.generate()
for airfield, data in AIRFIELD_DATA.items():
if data.theater == self.game.theater.terrain.name:
self.radio_registry.reserve(data.atc.hf)
self.radio_registry.reserve(data.atc.vhf_fm)
self.radio_registry.reserve(data.atc.vhf_am)
self.radio_registry.reserve(data.atc.uhf)
for ils in data.ils.values():
self.radio_registry.reserve(ils)
if data.tacan is not None:
self.tacan_registry.reserve(data.tacan)
# Generate destroyed units
for d in self.game.get_destroyed_units():
try:
@ -224,21 +244,16 @@ class Operation:
kneeboard_generator = KneeboardGenerator(self.current_mission, self.game)
# Briefing Generation
for i, tanker_type in enumerate(self.airsupportgen.generated_tankers):
callsign = TANKER_CALLSIGNS[i]
tacan = f"{60 + i}X"
freq = f"{130 + i} MHz AM"
self.briefinggen.append_frequency(f"Tanker {callsign} ({tanker_type})", f"{tacan}/{freq}")
kneeboard_generator.add_tanker(callsign, tanker_type, freq, tacan)
for tanker in self.airsupportgen.air_support.tankers:
self.briefinggen.append_frequency(
f"Tanker {tanker.callsign} ({tanker.variant})",
f"{tanker.tacan}/{tanker.freq}")
kneeboard_generator.add_tanker(tanker)
if self.is_awacs_enabled:
callsign = "AWACS"
freq = "233 MHz AM"
self.briefinggen.append_frequency(callsign, freq)
kneeboard_generator.add_awacs(callsign, freq)
self.briefinggen.append_frequency("Flight", "251 MHz AM")
kneeboard_generator.add_comm("Flight", "251 MHz AM")
for awacs in self.airsupportgen.air_support.awacs:
self.briefinggen.append_frequency(awacs.callsign, awacs.freq)
kneeboard_generator.add_awacs(awacs)
# Generate the briefing
self.briefinggen.generate()

View File

@ -1,14 +1,32 @@
from dcs.action import ActivateGroup, AITaskPush, MessageToCoalition, MessageToAll
from dcs.condition import TimeAfter, CoalitionHasAirdrome, PartOfCoalitionInZone
from dcs.helicopters import UH_1H
from dcs.terrain.terrain import NoParkingSlotError
from dcs.triggers import TriggerOnce, Event
from dataclasses import dataclass
from typing import Dict
from game.data.cap_capabilities_db import GUNFIGHTERS
from game.settings import Settings
from game.utils import nm_to_meter
from gen.flights.ai_flight_planner import FlightPlanner
from gen.flights.flight import Flight, FlightType, FlightWaypointType
from gen.radios import get_radio, MHz, Radio, RadioFrequency, RadioRegistry
from pydcs.dcs import helicopters
from pydcs.dcs.action import ActivateGroup, AITaskPush, MessageToAll
from pydcs.dcs.condition import TimeAfter, CoalitionHasAirdrome, PartOfCoalitionInZone
from pydcs.dcs.helicopters import helicopter_map, UH_1H
from pydcs.dcs.mission import Mission, StartType
from pydcs.dcs.planes import (
Bf_109K_4,
FW_190A8,
FW_190D9,
I_16,
Ju_88A4,
P_47D_30,
P_51D,
P_51D_30_NA,
SpitfireLFMkIX,
SpitfireLFMkIXCW,
)
from pydcs.dcs.terrain.terrain import NoParkingSlotError
from pydcs.dcs.triggers import TriggerOnce, Event
from pydcs.dcs.unittype import UnitType
from .conflictgen import *
from .naming import *
@ -23,17 +41,83 @@ RTB_ALTITUDE = 800
RTB_DISTANCE = 5000
HELI_ALT = 500
# Note that fallback radio channels will *not* be reserved. It's possible that
# flights using these will overlap with other channels. This is because we would
# need to make sure we fell back to a frequency that is not used by any beacon
# or ATC, which we don't have the information to predict. Deal with the minor
# annoyance for now since we'll be fleshing out radio info soon enough.
ALLIES_WW2_CHANNEL = MHz(124)
GERMAN_WW2_CHANNEL = MHz(40)
HELICOPTER_CHANNEL = MHz(127)
UHF_FALLBACK_CHANNEL = MHz(251)
@dataclass(frozen=True)
class AircraftData:
"""Additional aircraft data not exposed by pydcs."""
#: The type of radio used for intra-flight communications.
intra_flight_radio: Radio
# Indexed by the id field of the pydcs PlaneType.
AIRCRAFT_DATA: Dict[str, AircraftData] = {
"A-10C": AircraftData(get_radio("AN/ARC-186(V) AM")),
"F-16C_50": AircraftData(get_radio("AN/ARC-222")),
"F/A-18C": AircraftData(get_radio("AN/ARC-210")),
}
# TODO: Get radio information for all the special cases.
def get_fallback_channel(unit_type: UnitType) -> RadioFrequency:
if unit_type in helicopter_map.values() and unit_type != UH_1H:
return HELICOPTER_CHANNEL
german_ww2_aircraft = [
Bf_109K_4,
FW_190A8,
FW_190D9,
Ju_88A4,
]
if unit_type in german_ww2_aircraft:
return GERMAN_WW2_CHANNEL
allied_ww2_aircraft = [
I_16,
P_47D_30,
P_51D,
P_51D_30_NA,
SpitfireLFMkIX,
SpitfireLFMkIXCW,
]
if unit_type in allied_ww2_aircraft:
return ALLIES_WW2_CHANNEL
return UHF_FALLBACK_CHANNEL
class AircraftConflictGenerator:
escort_targets = [] # type: typing.List[typing.Tuple[FlyingGroup, int]]
def __init__(self, mission: Mission, conflict: Conflict, settings: Settings, game):
def __init__(self, mission: Mission, conflict: Conflict, settings: Settings,
game, radio_registry: RadioRegistry):
self.m = mission
self.game = game
self.settings = settings
self.conflict = conflict
self.radio_registry = radio_registry
self.escort_targets = []
def get_intra_flight_channel(self, airframe: UnitType) -> RadioFrequency:
try:
aircraft_data = AIRCRAFT_DATA[airframe.id]
return self.radio_registry.alloc_for_radio(
aircraft_data.intra_flight_radio)
except KeyError:
return get_fallback_channel(airframe)
def _start_type(self) -> StartType:
return self.settings.cold_start and StartType.Cold or StartType.Warm
@ -90,19 +174,9 @@ class AircraftConflictGenerator:
group.points[0].tasks.append(OptReactOnThreat(OptReactOnThreat.Values.EvadeFire))
# TODO : refactor this following bad specific special case code :(
if unit_type in helicopters.helicopter_map.values() and unit_type not in [UH_1H]:
group.set_frequency(127.5)
else:
if unit_type not in [P_51D_30_NA, P_51D, SpitfireLFMkIX, SpitfireLFMkIXCW, P_47D_30, I_16, FW_190A8, FW_190D9, Bf_109K_4]:
group.set_frequency(251.0)
else:
# WW2
if unit_type in [FW_190A8, FW_190D9, Bf_109K_4, Ju_88A4]:
group.set_frequency(40)
else:
group.set_frequency(124.0)
channel = self.get_intra_flight_channel(unit_type)
group.set_frequency(channel.mhz)
flight.intra_flight_channel = channel
# Special case so Su 33 carrier take off
if unit_type is Su_33:

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,15 @@
from game import db
from typing import List
from dataclasses import dataclass, field
from .conflictgen import *
from .naming import *
from .radios import RadioFrequency, RadioRegistry
from .tacan import TacanBand, TacanChannel, TacanRegistry
from dcs.mission import *
from dcs.unitgroup import *
from dcs.unittype import *
from dcs.task import *
from dcs.terrain.terrain import NoParkingSlotError
TANKER_DISTANCE = 15000
TANKER_ALT = 4572
@ -15,15 +18,61 @@ TANKER_HEADING_OFFSET = 45
AWACS_DISTANCE = 150000
AWACS_ALT = 13000
AWACS_CALLSIGNS = [
"Overlord",
"Magic",
"Wizard",
"Focus",
"Darkstar",
]
@dataclass
class TankerCallsign:
full: str
short: str
TANKER_CALLSIGNS = [
TankerCallsign("Texaco", "TEX"),
TankerCallsign("Arco", "ARC"),
TankerCallsign("Shell", "SHL"),
]
@dataclass
class AwacsInfo:
"""AWACS information for the kneeboard."""
callsign: str
freq: RadioFrequency
@dataclass
class TankerInfo:
"""Tanker information for the kneeboard."""
callsign: str
variant: str
freq: RadioFrequency
tacan: TacanChannel
@dataclass
class AirSupport:
awacs: List[AwacsInfo] = field(default_factory=list)
tankers: List[TankerInfo] = field(default_factory=list)
class AirSupportConflictGenerator:
generated_tankers = None # type: typing.List[str]
def __init__(self, mission: Mission, conflict: Conflict, game):
def __init__(self, mission: Mission, conflict: Conflict, game,
radio_registry: RadioRegistry,
tacan_registry: TacanRegistry) -> None:
self.mission = mission
self.conflict = conflict
self.game = game
self.generated_tankers = []
self.air_support = AirSupport()
self.radio_registry = radio_registry
self.tacan_registry = tacan_registry
@classmethod
def support_tasks(cls) -> typing.Collection[typing.Type[MainTask]]:
@ -32,9 +81,11 @@ class AirSupportConflictGenerator:
def generate(self, is_awacs_enabled):
player_cp = self.conflict.from_cp if self.conflict.from_cp.captured else self.conflict.to_cp
CALLSIGNS = ["TKR", "TEX", "FUL", "FUE", ""]
for i, tanker_unit_type in enumerate(db.find_unittype(Refueling, self.conflict.attackers_side)):
self.generated_tankers.append(db.unit_type_name(tanker_unit_type))
callsign = TANKER_CALLSIGNS[i]
variant = db.unit_type_name(tanker_unit_type)
freq = self.radio_registry.alloc_uhf()
tacan = self.tacan_registry.alloc_for_band(TacanBand.Y)
tanker_heading = self.conflict.to_cp.position.heading_between_point(self.conflict.from_cp.position) + TANKER_HEADING_OFFSET * i
tanker_position = player_cp.position.point_from_heading(tanker_heading, TANKER_DISTANCE)
tanker_group = self.mission.refuel_flight(
@ -45,21 +96,26 @@ class AirSupportConflictGenerator:
position=tanker_position,
altitude=TANKER_ALT,
race_distance=58000,
frequency=130 + i,
frequency=freq.mhz,
start_type=StartType.Warm,
speed=574,
tacanchannel="{}X".format(60 + i),
tacanchannel=str(tacan),
)
if tanker_unit_type != IL_78M:
tanker_group.points[0].tasks.pop() # Override PyDCS tacan channel
tanker_group.points[0].tasks.append(ActivateBeaconCommand(60 + i, "X", CALLSIGNS[i], True, tanker_group.units[0].id, True))
tanker_group.points[0].tasks.append(ActivateBeaconCommand(
tacan.number, tacan.band.value, callsign.short, True, tanker_group.units[0].id, True))
tanker_group.points[0].tasks.append(SetInvisibleCommand(True))
tanker_group.points[0].tasks.append(SetImmortalCommand(True))
self.air_support.tankers.append(TankerInfo(callsign.full, variant, freq, tacan))
if is_awacs_enabled:
try:
callsign = AWACS_CALLSIGNS[0]
freq = self.radio_registry.alloc_uhf()
awacs_unit = db.find_unittype(AWACS, self.conflict.attackers_side)[0]
awacs_flight = self.mission.awacs_flight(
country=self.mission.country(self.game.player_country),
@ -68,11 +124,12 @@ class AirSupportConflictGenerator:
altitude=AWACS_ALT,
airport=None,
position=self.conflict.position.random_point_within(AWACS_DISTANCE, AWACS_DISTANCE),
frequency=233,
frequency=freq.mhz,
start_type=StartType.Warm,
)
awacs_flight.points[0].tasks.append(SetInvisibleCommand(True))
awacs_flight.points[0].tasks.append(SetImmortalCommand(True))
self.air_support.awacs.append(AwacsInfo(callsign, freq))
except:
print("No AWACS for faction")

View File

@ -1,10 +1,10 @@
from enum import Enum
from typing import List
from typing import List, Optional
from dcs.mission import StartType
from dcs.unittype import UnitType
from pydcs.dcs.unittype import UnitType
from game import db
from gen.radios import RadioFrequency
class FlightType(Enum):
@ -96,6 +96,13 @@ class Flight:
# How long before this flight should take off
scheduled_in = 0
# Populated during mission generation time by AircraftConflictGenerator.
# TODO: Decouple radio planning from the Flight.
# Make AircraftConflictGenerator generate a FlightData object that is
# returned to the Operation rather than relying on the Flight object, which
# represents a game UI flight rather than a fully planned flight.
intra_flight_channel: Optional[RadioFrequency]
def __init__(self, unit_type: UnitType, count: int, from_cp, flight_type: FlightType):
self.unit_type = unit_type
self.count = count
@ -105,6 +112,7 @@ class Flight:
self.targets = []
self.loadout = {}
self.start_type = "Runway"
self.intra_flight_channel = None
def __repr__(self):
return self.flight_type.name + " | " + str(self.count) + "x" + db.unit_type_name(self.unit_type) \
@ -113,10 +121,10 @@ class Flight:
# Test
if __name__ == '__main__':
from dcs.planes import A_10C
from pydcs.dcs.planes import A_10C
from theater import ControlPoint, Point, List
from_cp = ControlPoint(0, "AA", Point(0, 0), None, [], 0, 0)
f = Flight(A_10C, 4, from_cp, FlightType.CAS)
f = Flight(A_10C(), 4, from_cp, FlightType.CAS)
f.scheduled_in = 50
print(f)

View File

@ -33,9 +33,11 @@ from tabulate import tabulate
from pydcs.dcs.mission import Mission
from pydcs.dcs.terrain.terrain import Airport
from pydcs.dcs.unittype import FlyingType
from .airfields import AIRFIELD_DATA
from .flights.flight import Flight
from . import units
from .airfields import AIRFIELD_DATA
from .airsupportgen import AwacsInfo, TankerInfo
from .flights.flight import Flight
from .radios import RadioFrequency
class KneeboardPageWriter:
@ -116,23 +118,7 @@ class AirfieldInfo:
class CommInfo:
"""Communications information for the kneeboard."""
name: str
freq: str
@dataclass
class AwacsInfo:
"""AWACS information for the kneeboard."""
callsign: str
freq: str
@dataclass
class TankerInfo:
"""Tanker information for the kneeboard."""
callsign: str
variant: str
freq: str
tacan: str
freq: RadioFrequency
@dataclass
@ -153,6 +139,10 @@ class BriefingPage(KneeboardPage):
self.awacs = awacs
self.tankers = tankers
self.jtacs = jtacs
if self.flight.intra_flight_channel is not None:
self.comms.append(
CommInfo("Flight", self.flight.intra_flight_channel)
)
self.departure = flight.from_cp.airport
self.arrival = flight.from_cp.airport
self.divert: Optional[Airport] = None
@ -245,7 +235,7 @@ class KneeboardGenerator:
self.tankers: List[TankerInfo] = []
self.jtacs: List[JtacInfo] = []
def add_comm(self, name: str, freq: str) -> None:
def add_comm(self, name: str, freq: RadioFrequency) -> None:
"""Adds communications info to the kneeboard.
Args:
@ -254,26 +244,21 @@ class KneeboardGenerator:
"""
self.comms.append(CommInfo(name, freq))
def add_awacs(self, callsign: str, freq: str) -> None:
def add_awacs(self, awacs: AwacsInfo) -> None:
"""Adds an AWACS/GCI to the kneeboard.
Args:
callsign: Callsign of the AWACS/GCI.
freq: Radio frequency used by the AWACS/GCI.
awacs: AWACS information.
"""
self.awacs.append(AwacsInfo(callsign, freq))
self.awacs.append(awacs)
def add_tanker(self, callsign: str, variant: str, freq: str,
tacan: str) -> None:
def add_tanker(self, tanker: TankerInfo) -> None:
"""Adds a tanker to the kneeboard.
Args:
callsign: Callsign of the tanker.
variant: Aircraft type.
freq: Radio frequency used by the tanker.
tacan: TACAN channel of the tanker.
tanker: Tanker information.
"""
self.tankers.append(TankerInfo(callsign, variant, freq, tacan))
self.tankers.append(tanker)
def add_jtac(self, callsign: str, region: str, code: str) -> None:
"""Adds a JTAC to the kneeboard.

200
gen/radios.py Normal file
View File

@ -0,0 +1,200 @@
"""Radio frequency types and allocators."""
import itertools
from dataclasses import dataclass
from typing import Dict, Iterator, List, Set
@dataclass(frozen=True)
class RadioFrequency:
"""A radio frequency.
Not currently concerned with tracking modulation, just the frequency.
"""
#: The frequency in kilohertz.
hertz: int
def __str__(self):
if self.hertz >= 1000000:
return self.format("MHz", 1000000)
return self.format("kHz", 1000)
def format(self, units: str, divisor: int) -> str:
converted = self.hertz / divisor
if converted.is_integer():
return f"{int(converted)} {units}"
return f"{converted:0.3f} {units}"
@property
def mhz(self) -> float:
"""Returns the frequency in megahertz.
Returns:
The frequency in megahertz.
"""
return self.hertz / 1000000
def MHz(num: int, khz: int = 0) -> RadioFrequency:
return RadioFrequency(num * 1000000 + khz * 1000)
def kHz(num: int) -> RadioFrequency:
return RadioFrequency(num * 1000)
@dataclass(frozen=True)
class Radio:
"""A radio.
Defines the minimum (inclusive) and maximum (exclusive) range of the radio.
"""
#: The name of the radio.
name: str
#: The minimum (inclusive) frequency tunable by this radio.
minimum: RadioFrequency
#: The maximum (exclusive) frequency tunable by this radio.
maximum: RadioFrequency
#: The spacing between adjacent frequencies.
step: RadioFrequency
def __str__(self) -> str:
return self.name
def range(self) -> Iterator[RadioFrequency]:
"""Returns an iterator over the usable frequencies of this radio."""
return (RadioFrequency(x) for x in range(
self.minimum.hertz, self.maximum.hertz, self.step.hertz
))
class OutOfChannelsError(RuntimeError):
"""Raised when all channels usable by this radio have been allocated."""
def __init__(self, radio: Radio) -> None:
super().__init__(f"No available channels for {radio}")
class ChannelInUseError(RuntimeError):
"""Raised when attempting to reserve an in-use frequency."""
def __init__(self, frequency: RadioFrequency) -> None:
super().__init__(f"{frequency} is already in use")
# TODO: Figure out appropriate steps for each radio. These are just guesses.
#: List of all known radios used by aircraft in the game.
RADIOS: List[Radio] = [
Radio("AN/ARC-164", MHz(225), MHz(400), step=MHz(1)),
Radio("AN/ARC-186(V) AM", MHz(116), MHz(152), step=MHz(1)),
Radio("AN/ARC-186(V) FM", MHz(30), MHz(76), step=MHz(1)),
# The AN/ARC-210 can also use [30, 88) and [108, 118), but the current
# implementation can't implement the gap and the radio can't transmit on the
# latter. There's still plenty of channels between 118 MHz and 400 MHz, so
# not worth worrying about.
Radio("AN/ARC-210", MHz(118), MHz(400), step=MHz(1)),
Radio("AN/ARC-222", MHz(116), MHz(174), step=MHz(1)),
Radio("SCR-522", MHz(100), MHz(156), step=MHz(1)),
Radio("A.R.I. 1063", MHz(100), MHz(156), step=MHz(1)),
Radio("BC-1206", kHz(200), kHz(400), step=kHz(10)),
]
def get_radio(name: str) -> Radio:
"""Returns the radio with the given name.
Args:
name: Name of the radio to return.
Returns:
The radio matching name.
Raises:
KeyError: No matching radio was found.
"""
for radio in RADIOS:
if radio.name == name:
return radio
raise KeyError
class RadioRegistry:
"""Manages allocation of radio channels.
There's some room for improvement here. We could prefer to allocate
frequencies that are available to the fewest number of radios first, so
radios with wide bands like the AN/ARC-210 don't exhaust all the channels
available to narrower radios like the AN/ARC-186(V). In practice there are
probably plenty of channels, so we can deal with that later if we need to.
We could also allocate using a larger increment, returning to smaller
increments each time the range is exhausted. This would help with the
previous problem, as the AN/ARC-186(V) would still have plenty of 25 kHz
increment channels left after the AN/ARC-210 moved on to the higher
frequencies. This would also look a little nicer than having every flight
allocated in the 30 MHz range.
"""
# Not a real radio, but useful for allocating a channel usable for
# inter-flight communications.
BLUFOR_UHF = Radio("BLUFOR UHF", MHz(225), MHz(400), step=MHz(1))
def __init__(self) -> None:
self.allocated_channels: Set[RadioFrequency] = set()
self.radio_allocators: Dict[Radio, Iterator[RadioFrequency]] = {}
radios = itertools.chain(RADIOS, [self.BLUFOR_UHF])
for radio in radios:
self.radio_allocators[radio] = radio.range()
def alloc_for_radio(self, radio: Radio) -> RadioFrequency:
"""Allocates a radio channel tunable by the given radio.
Args:
radio: The radio to allocate a channel for.
Returns:
A radio channel compatible with the given radio.
Raises:
OutOfChannelsError: All channels compatible with the given radio are
already allocated.
"""
allocator = self.radio_allocators[radio]
try:
while (channel := next(allocator)) in self.allocated_channels:
pass
return channel
except StopIteration:
raise OutOfChannelsError(radio)
def alloc_uhf(self) -> RadioFrequency:
"""Allocates a UHF radio channel suitable for inter-flight comms.
Returns:
A UHF radio channel suitable for inter-flight comms.
Raises:
OutOfChannelsError: All channels compatible with the given radio are
already allocated.
"""
return self.alloc_for_radio(self.BLUFOR_UHF)
def reserve(self, frequency: RadioFrequency) -> None:
"""Reserves the given channel.
Reserving a channel ensures that it will not be allocated in the future.
Args:
frequency: The channel to reserve.
Raises:
ChannelInUseError: The given frequency is already in use.
"""
if frequency in self.allocated_channels:
raise ChannelInUseError(frequency)
self.allocated_channels.add(frequency)

83
gen/tacan.py Normal file
View File

@ -0,0 +1,83 @@
"""TACAN channel handling."""
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Iterator, Set
class TacanBand(Enum):
X = "X"
Y = "Y"
def range(self) -> Iterator["TacanChannel"]:
"""Returns an iterator over the channels in this band."""
return (TacanChannel(x, self) for x in range(1, 100))
@dataclass(frozen=True)
class TacanChannel:
number: int
band: TacanBand
def __str__(self) -> str:
return f"{self.number}{self.band.value}"
class OutOfTacanChannelsError(RuntimeError):
"""Raised when all channels in this band have been allocated."""
def __init__(self, band: TacanBand) -> None:
super().__init__(f"No available channels in TACAN {band.value} band")
class TacanChannelInUseError(RuntimeError):
"""Raised when attempting to reserve an in-use channel."""
def __init__(self, channel: TacanChannel) -> None:
super().__init__(f"{channel} is already in use")
class TacanRegistry:
"""Manages allocation of TACAN channels."""
def __init__(self) -> None:
self.allocated_channels: Set[TacanChannel] = set()
self.band_allocators: Dict[TacanBand, Iterator[TacanChannel]] = {}
for band in TacanBand:
self.band_allocators[band] = band.range()
def alloc_for_band(self, band: TacanBand) -> TacanChannel:
"""Allocates a TACAN channel in the given band.
Args:
band: The TACAN band to allocate a channel for.
Returns:
A TACAN channel in the given band.
Raises:
OutOfChannelsError: All channels compatible with the given radio are
already allocated.
"""
allocator = self.band_allocators[band]
try:
while (channel := next(allocator)) in self.allocated_channels:
pass
return channel
except StopIteration:
raise OutOfTacanChannelsError(band)
def reserve(self, channel: TacanChannel) -> None:
"""Reserves the given channel.
Reserving a channel ensures that it will not be allocated in the future.
Args:
channel: The channel to reserve.
Raises:
ChannelInUseError: The given frequency is already in use.
"""
if channel in self.allocated_channels:
raise TacanChannelInUseError(channel)
self.allocated_channels.add(channel)