mirror of
https://github.com/dcs-liberation/dcs_liberation.git
synced 2025-11-10 14:22:26 +00:00
113 lines
3.4 KiB
Python
113 lines
3.4 KiB
Python
"""TACAN channel handling."""
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Dict, Iterator, Set
|
|
|
|
|
|
class TacanUsage(Enum):
|
|
TransmitReceive = "transmit receive"
|
|
AirToAir = "air to air"
|
|
|
|
|
|
class TacanBand(Enum):
|
|
X = "X"
|
|
Y = "Y"
|
|
|
|
def range(self) -> Iterator["TacanChannel"]:
|
|
"""Returns an iterator over the channels in this band."""
|
|
return (TacanChannel(x, self) for x in range(1, 126 + 1))
|
|
|
|
def valid_channels(self, usage: TacanUsage) -> Iterator["TacanChannel"]:
|
|
for x in self.range():
|
|
if x.number not in UNAVAILABLE[usage][self]:
|
|
yield x
|
|
|
|
|
|
# Avoid certain TACAN channels for various reasons
|
|
# https://forums.eagle.ru/topic/276390-datalink-issue/
|
|
UNAVAILABLE = {
|
|
TacanUsage.TransmitReceive: {
|
|
TacanBand.X: set(range(2, 30 + 1)) | set(range(47, 63 + 1)),
|
|
TacanBand.Y: set(range(2, 30 + 1)) | set(range(64, 92 + 1)),
|
|
},
|
|
TacanUsage.AirToAir: {
|
|
TacanBand.X: set(range(1, 36 + 1)) | set(range(64, 99 + 1)),
|
|
TacanBand.Y: set(range(1, 36 + 1)) | set(range(64, 99 + 1)),
|
|
},
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TacanChannel:
|
|
number: int
|
|
band: TacanBand
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.number}{self.band.value}"
|
|
|
|
|
|
class OutOfTacanChannelsError(RuntimeError):
|
|
"""Raised when all channels in this band have been allocated."""
|
|
|
|
def __init__(self, band: TacanBand) -> None:
|
|
super().__init__(f"No available channels in TACAN {band.value} band")
|
|
|
|
|
|
class TacanChannelInUseError(RuntimeError):
|
|
"""Raised when attempting to reserve an in-use channel."""
|
|
|
|
def __init__(self, channel: TacanChannel) -> None:
|
|
super().__init__(f"{channel} is already in use")
|
|
|
|
|
|
class TacanRegistry:
|
|
"""Manages allocation of TACAN channels."""
|
|
|
|
def __init__(self) -> None:
|
|
self.allocated_channels: Set[TacanChannel] = set()
|
|
self.allocators: Dict[TacanBand, Dict[TacanUsage, Iterator[TacanChannel]]] = {}
|
|
|
|
for band in TacanBand:
|
|
self.allocators[band] = {}
|
|
for usage in TacanUsage:
|
|
self.allocators[band][usage] = band.valid_channels(usage)
|
|
|
|
def alloc_for_band(
|
|
self, band: TacanBand, intended_usage: TacanUsage
|
|
) -> TacanChannel:
|
|
"""Allocates a TACAN channel in the given band.
|
|
|
|
Args:
|
|
band: The TACAN band to allocate a channel for.
|
|
intended_usage: What the caller intends to use the tacan channel for.
|
|
|
|
Returns:
|
|
A TACAN channel in the given band.
|
|
|
|
Raises:
|
|
OutOfTacanChannelsError: All channels compatible with the given radio are
|
|
already allocated.
|
|
"""
|
|
allocator = self.allocators[band][intended_usage]
|
|
try:
|
|
while (channel := next(allocator)) in self.allocated_channels:
|
|
pass
|
|
return channel
|
|
except StopIteration:
|
|
raise OutOfTacanChannelsError(band)
|
|
|
|
def mark_unavailable(self, channel: TacanChannel) -> None:
|
|
"""Reserves the given channel.
|
|
|
|
Reserving a channel ensures that it will not be allocated in the future.
|
|
|
|
Args:
|
|
channel: The channel to reserve.
|
|
|
|
Raises:
|
|
TacanChannelInUseError: The given channel is already in use.
|
|
"""
|
|
if channel in self.allocated_channels:
|
|
raise TacanChannelInUseError(channel)
|
|
self.allocated_channels.add(channel)
|