Source code for thesis.core.output.events

"""
Event system for structured output in the thesis framework.

Provides an event bus that decouples event emission from rendering.
All executor/tool output is normalized into structured events, which
are then filtered by verbosity and rendered by the output layer.

Example:
    >>> from thesis.core.output.events import get_event_bus, EventLevel
    >>> bus = get_event_bus()
    >>> bus.emit("Workflow started", level=EventLevel.IMPORTANT)
"""

import threading
import time
from collections import deque
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Callable, Deque, Dict, List, Optional

__all__ = [
    "EventLevel",
    "Event",
    "EventBus",
    "get_event_bus",
    "reset_event_bus",
]

# Upper bound on the number of events retained in memory. The bus keeps only
# the most recent events; older ones are dropped once the cap is reached. This
# prevents unbounded growth during long-running batch jobs that emit a large
# number of events.
_MAX_BUFFERED_EVENTS = 10000


[docs] class EventLevel(IntEnum): """Event importance levels, ordered from most to least critical. - ERROR: Failures that stop processing. - WARNING: Issues that may affect outcome but do not stop processing. - IMPORTANT: Key milestones -- tool starts/completions, file changes, build/test results, skipped actions. - INFO: Detailed informational messages (full tool logs, timing, etc.). - DEBUG: Internal implementation details. Default output mode shows ERROR, WARNING, IMPORTANT. Verbose mode shows everything. Quiet mode shows ERROR only plus the final result. """ ERROR = 50 WARNING = 40 IMPORTANT = 30 INFO = 20 DEBUG = 10
[docs] @dataclass(frozen=True) class Event: """A structured output event emitted during pipeline execution. Attributes: message: Human-readable event description. level: Importance level controlling visibility. timestamp: Unix timestamp when the event was created. category: Optional grouping tag (e.g. ``"workflow"``, ``"tool"``, ``"file"``, ``"test"``). patient_id: Patient this event relates to, if any. metadata: Arbitrary key-value data attached to the event. """ message: str level: EventLevel = EventLevel.INFO timestamp: float = field(default_factory=time.time) category: str = "" patient_id: str = "" metadata: Dict[str, Any] = field(default_factory=dict)
# Type alias for event listener callbacks. EventListener = Callable[[Event], None]
[docs] class EventBus: """Central event pipeline that decouples emission from rendering. Listeners subscribe to receive events and can filter by minimum level. The bus is thread-safe: events can be emitted from worker threads while the main thread renders output. Example: >>> bus = EventBus() >>> bus.subscribe(lambda e: print(e.message)) >>> bus.emit("hello") """
[docs] def __init__(self, max_events: int = _MAX_BUFFERED_EVENTS) -> None: self._listeners: List[EventListener] = [] self._lock = threading.Lock() # Bounded ring buffer: once full, appending drops the oldest event so # memory stays capped regardless of how many events are emitted. self._events: Deque[Event] = deque(maxlen=max_events)
# ------------------------------------------------------------------ # Subscription # ------------------------------------------------------------------
[docs] def subscribe(self, listener: EventListener) -> None: """Register a listener that will receive all emitted events. Args: listener: Callable that accepts an :class:`Event`. """ with self._lock: self._listeners.append(listener)
[docs] def unsubscribe(self, listener: EventListener) -> None: """Remove a previously registered listener. Args: listener: The listener to remove. No-op if not found. """ with self._lock: try: self._listeners.remove(listener) except ValueError: pass
# ------------------------------------------------------------------ # Emission # ------------------------------------------------------------------
[docs] def emit( self, message: str, *, level: EventLevel = EventLevel.INFO, category: str = "", patient_id: str = "", metadata: Optional[Dict[str, Any]] = None, ) -> Event: """Create and dispatch an event to all listeners. Args: message: Human-readable description of the event. level: Importance level. category: Optional grouping tag. patient_id: Patient identifier, if applicable. metadata: Extra structured data. Returns: The created :class:`Event` instance. """ event = Event( message=message, level=level, category=category, patient_id=patient_id, metadata=metadata or {}, ) with self._lock: self._events.append(event) listeners = list(self._listeners) for listener in listeners: try: listener(event) except Exception: # Listeners must not break the pipeline. pass return event
[docs] def emit_event(self, event: Event) -> None: """Dispatch a pre-built event to all listeners. Args: event: Event instance to dispatch. """ with self._lock: self._events.append(event) listeners = list(self._listeners) for listener in listeners: try: listener(event) except Exception: pass
# ------------------------------------------------------------------ # Query # ------------------------------------------------------------------
[docs] def get_events( self, min_level: EventLevel = EventLevel.DEBUG, category: Optional[str] = None, patient_id: Optional[str] = None, ) -> List[Event]: """Return stored events matching the given filters. Args: min_level: Minimum importance level to include. category: If set, only return events with this category. patient_id: If set, only return events for this patient. Returns: List of matching events in chronological order. """ with self._lock: events = list(self._events) result: List[Event] = [] for e in events: if e.level < min_level: continue if category is not None and e.category != category: continue if patient_id is not None and e.patient_id != patient_id: continue result.append(e) return result
[docs] def clear(self) -> None: """Remove all stored events.""" with self._lock: self._events.clear()
@property def event_count(self) -> int: """Number of stored events.""" with self._lock: return len(self._events)
# --------------------------------------------------------------------------- # Module-level singleton # --------------------------------------------------------------------------- _global_bus: Optional[EventBus] = None _bus_lock = threading.Lock()
[docs] def get_event_bus() -> EventBus: """Return the process-wide :class:`EventBus` singleton. The bus is created on first call and reused thereafter. Returns: The global EventBus instance. """ global _global_bus if _global_bus is None: with _bus_lock: if _global_bus is None: _global_bus = EventBus() return _global_bus
[docs] def reset_event_bus() -> None: """Reset the global event bus (primarily for testing).""" global _global_bus with _bus_lock: if _global_bus is not None: _global_bus.clear() _global_bus = None