Vulkan/Tests/LoopRunner.py

51 lines
1.7 KiB
Python

import asyncio
from asyncio import AbstractEventLoop
from threading import Thread
from typing import Any, Coroutine, List
class LoopRunner(Thread):
"""
Class to help deal with asyncio coroutines and loops
Copyright: https://agariinc.medium.com/advanced-strategies-for-testing-async-code-in-python-6196a032d8d7
"""
def __init__(self, loop: AbstractEventLoop) -> None:
# We ensure to always use the same loop
self.loop = loop
Thread.__init__(self, name='runner')
def run(self) -> None:
asyncio.set_event_loop(self.loop)
try:
self.loop.run_forever()
finally:
if self.loop.is_running():
self.loop.close()
def run_coroutine(self, coroutine: Coroutine) -> Any:
"""Run a coroutine inside the loop and return the result, doesn't allow concurrency"""
result = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
return result.result()
def _stop(self):
self.loop.stop()
def run_in_thread(self, callback, *args):
return self.loop.call_soon_threadsafe(callback, *args)
def stop(self):
return self.run_in_thread(self._stop)
def run_coroutines_list(self, coroutineList: List[Coroutine]) -> None:
"""Create multiple tasks in the loop and wait for them, use concurrency"""
tasks = []
for coroutine in coroutineList:
tasks.append(self.loop.create_task(coroutine))
self.run_coroutine(self.__waitForMultipleTasks(tasks))
async def __waitForMultipleTasks(self, coroutines: List[Coroutine]) -> None:
"""Function to trigger the await for asyncio.wait coroutines"""
await asyncio.wait(coroutines)