mirror of
https://github.com/RafaelSolVargas/Vulkan.git
synced 2025-10-29 16:57:23 +00:00
51 lines
1.7 KiB
Python
51 lines
1.7 KiB
Python
import asyncio
|
|
from asyncio import AbstractEventLoop
|
|
from threading import Thread
|
|
from typing import Any, Coroutine, List
|
|
|
|
|
|
class LoopRunner(Thread):
|
|
"""
|
|
Class to help deal with asyncio coroutines and loops
|
|
Copyright: https://agariinc.medium.com/advanced-strategies-for-testing-async-code-in-python-6196a032d8d7
|
|
"""
|
|
|
|
def __init__(self, loop: AbstractEventLoop) -> None:
|
|
# We ensure to always use the same loop
|
|
self.loop = loop
|
|
Thread.__init__(self, name='runner')
|
|
|
|
def run(self) -> None:
|
|
asyncio.set_event_loop(self.loop)
|
|
try:
|
|
self.loop.run_forever()
|
|
finally:
|
|
if self.loop.is_running():
|
|
self.loop.close()
|
|
|
|
def run_coroutine(self, coroutine: Coroutine) -> Any:
|
|
"""Run a coroutine inside the loop and return the result, doesn't allow concurrency"""
|
|
result = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
|
|
return result.result()
|
|
|
|
def _stop(self):
|
|
self.loop.stop()
|
|
|
|
def run_in_thread(self, callback, *args):
|
|
return self.loop.call_soon_threadsafe(callback, *args)
|
|
|
|
def stop(self):
|
|
return self.run_in_thread(self._stop)
|
|
|
|
def run_coroutines_list(self, coroutineList: List[Coroutine]) -> None:
|
|
"""Create multiple tasks in the loop and wait for them, use concurrency"""
|
|
tasks = []
|
|
for coroutine in coroutineList:
|
|
tasks.append(self.loop.create_task(coroutine))
|
|
|
|
self.run_coroutine(self.__waitForMultipleTasks(tasks))
|
|
|
|
async def __waitForMultipleTasks(self, coroutines: List[Coroutine]) -> None:
|
|
"""Function to trigger the await for asyncio.wait coroutines"""
|
|
await asyncio.wait(coroutines)
|