Add additional information to multi-event tracing.

Show the number of times the event was logged as well as an average run-
time.
This commit is contained in:
Dan Albert 2022-09-10 15:58:27 -07:00 committed by Raffson
parent e503a8034c
commit daf565f11c
No known key found for this signature in database
GPG Key ID: B0402B2C9B764D99

View File

@ -4,6 +4,7 @@ import logging
import timeit import timeit
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass
from datetime import timedelta from datetime import timedelta
from types import TracebackType from types import TracebackType
from typing import Iterator, Optional, Type from typing import Iterator, Optional, Type
@ -11,15 +12,29 @@ from typing import Iterator, Optional, Type
@contextmanager @contextmanager
def logged_duration(event: str) -> Iterator[None]: def logged_duration(event: str) -> Iterator[None]:
start = timeit.default_timer() timer = Timer()
yield with timer:
end = timeit.default_timer() yield
logging.debug("%s took %s", event, timedelta(seconds=end - start)) logging.debug("%s took %s", event, timer.duration)
@dataclass
class CountedEvent:
count: int = 0
duration: timedelta = timedelta()
def increment(self, duration: timedelta) -> None:
self.count += 1
self.duration += duration
@property
def average(self) -> timedelta:
return self.duration / self.count
class MultiEventTracer: class MultiEventTracer:
def __init__(self) -> None: def __init__(self) -> None:
self.events: dict[str, timedelta] = defaultdict(timedelta) self.events: dict[str, CountedEvent] = defaultdict(CountedEvent)
def __enter__(self) -> MultiEventTracer: def __enter__(self) -> MultiEventTracer:
return self return self
@ -30,12 +45,46 @@ class MultiEventTracer:
exc_val: Optional[BaseException], exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType], exc_tb: Optional[TracebackType],
) -> None: ) -> None:
for event, duration in self.events.items(): for event, counter in self.events.items():
logging.debug("%s took %s", event, duration) logging.debug(
"%s %d times took %s (%s average)",
event,
counter.count,
counter.duration,
counter.average,
)
@contextmanager @contextmanager
def trace(self, event: str) -> Iterator[None]: def trace(self, event: str) -> Iterator[None]:
start = timeit.default_timer() timer = Timer()
yield with timer:
end = timeit.default_timer() yield
self.events[event] += timedelta(seconds=end - start) self.events[event].increment(timer.duration)
class Timer:
def __init__(self) -> None:
self._start_time: float | None = None
self._end_time: float | None = None
self._duration: timedelta | None = None
@property
def duration(self) -> timedelta:
if self._duration is None:
raise RuntimeError(
"Cannot query the duration of the timer before it has elapsed"
)
return self._duration
def __enter__(self) -> None:
self._start_time = timeit.default_timer()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
assert self._start_time is not None
self._end_time = timeit.default_timer()
self._duration = timedelta(seconds=self._end_time - self._start_time)