mirror of
https://github.com/dcs-liberation/dcs_liberation.git
synced 2025-11-10 14:22:26 +00:00
The UI won't stop you from aborting a flight that is already home, but that should also result in it re-completing again on the next tick. https://github.com/dcs-liberation/dcs_liberation/issues/1680
41 lines
1.1 KiB
Python
41 lines
1.1 KiB
Python
from __future__ import annotations
|
|
|
|
from asyncio import Queue
|
|
from collections.abc import Iterator
|
|
from contextlib import contextmanager
|
|
|
|
from game.sim import GameUpdateEvents
|
|
|
|
|
|
class EventStream:
|
|
_queue: Queue[GameUpdateEvents] = Queue()
|
|
|
|
@classmethod
|
|
def drain(cls) -> None:
|
|
while not cls._queue.empty():
|
|
cls._queue.get_nowait()
|
|
cls._queue.task_done()
|
|
|
|
@classmethod
|
|
async def put(cls, events: GameUpdateEvents) -> None:
|
|
await cls._queue.put(events)
|
|
|
|
@classmethod
|
|
def put_nowait(cls, events: GameUpdateEvents) -> None:
|
|
# The queue has infinite size so this should never need to block anyway. If for
|
|
# some reason the queue is full this will throw QueueFull.
|
|
cls._queue.put_nowait(events)
|
|
|
|
@classmethod
|
|
async def get(cls) -> GameUpdateEvents:
|
|
events = await cls._queue.get()
|
|
cls._queue.task_done()
|
|
return events
|
|
|
|
@staticmethod
|
|
@contextmanager
|
|
def event_context() -> Iterator[GameUpdateEvents]:
|
|
events = GameUpdateEvents()
|
|
yield events
|
|
EventStream.put_nowait(events)
|