"""
Event system for structured output in the thesis framework.
Provides an event bus that decouples event emission from rendering.
All executor/tool output is normalized into structured events, which
are then filtered by verbosity and rendered by the output layer.
Example:
>>> from thesis.core.output.events import get_event_bus, EventLevel
>>> bus = get_event_bus()
>>> bus.emit("Workflow started", level=EventLevel.IMPORTANT)
"""
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Callable, Deque, Dict, List, Optional
__all__ = [
"EventLevel",
"Event",
"EventBus",
"get_event_bus",
"reset_event_bus",
]
# Upper bound on the number of events retained in memory. The bus keeps only
# the most recent events; older ones are dropped once the cap is reached. This
# prevents unbounded growth during long-running batch jobs that emit a large
# number of events.
_MAX_BUFFERED_EVENTS = 10000
[docs]
class EventLevel(IntEnum):
"""Event importance levels, ordered from most to least critical.
- ERROR: Failures that stop processing.
- WARNING: Issues that may affect outcome but do not stop processing.
- IMPORTANT: Key milestones -- tool starts/completions, file changes,
build/test results, skipped actions.
- INFO: Detailed informational messages (full tool logs, timing, etc.).
- DEBUG: Internal implementation details.
Default output mode shows ERROR, WARNING, IMPORTANT.
Verbose mode shows everything.
Quiet mode shows ERROR only plus the final result.
"""
ERROR = 50
WARNING = 40
IMPORTANT = 30
INFO = 20
DEBUG = 10
[docs]
@dataclass(frozen=True)
class Event:
"""A structured output event emitted during pipeline execution.
Attributes:
message: Human-readable event description.
level: Importance level controlling visibility.
timestamp: Unix timestamp when the event was created.
category: Optional grouping tag (e.g. ``"workflow"``, ``"tool"``,
``"file"``, ``"test"``).
patient_id: Patient this event relates to, if any.
metadata: Arbitrary key-value data attached to the event.
"""
message: str
level: EventLevel = EventLevel.INFO
timestamp: float = field(default_factory=time.time)
category: str = ""
patient_id: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
# Type alias for event listener callbacks.
EventListener = Callable[[Event], None]
[docs]
class EventBus:
"""Central event pipeline that decouples emission from rendering.
Listeners subscribe to receive events and can filter by minimum level.
The bus is thread-safe: events can be emitted from worker threads
while the main thread renders output.
Example:
>>> bus = EventBus()
>>> bus.subscribe(lambda e: print(e.message))
>>> bus.emit("hello")
"""
[docs]
def __init__(self, max_events: int = _MAX_BUFFERED_EVENTS) -> None:
self._listeners: List[EventListener] = []
self._lock = threading.Lock()
# Bounded ring buffer: once full, appending drops the oldest event so
# memory stays capped regardless of how many events are emitted.
self._events: Deque[Event] = deque(maxlen=max_events)
# ------------------------------------------------------------------
# Subscription
# ------------------------------------------------------------------
[docs]
def subscribe(self, listener: EventListener) -> None:
"""Register a listener that will receive all emitted events.
Args:
listener: Callable that accepts an :class:`Event`.
"""
with self._lock:
self._listeners.append(listener)
[docs]
def unsubscribe(self, listener: EventListener) -> None:
"""Remove a previously registered listener.
Args:
listener: The listener to remove. No-op if not found.
"""
with self._lock:
try:
self._listeners.remove(listener)
except ValueError:
pass
# ------------------------------------------------------------------
# Emission
# ------------------------------------------------------------------
[docs]
def emit(
self,
message: str,
*,
level: EventLevel = EventLevel.INFO,
category: str = "",
patient_id: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> Event:
"""Create and dispatch an event to all listeners.
Args:
message: Human-readable description of the event.
level: Importance level.
category: Optional grouping tag.
patient_id: Patient identifier, if applicable.
metadata: Extra structured data.
Returns:
The created :class:`Event` instance.
"""
event = Event(
message=message,
level=level,
category=category,
patient_id=patient_id,
metadata=metadata or {},
)
with self._lock:
self._events.append(event)
listeners = list(self._listeners)
for listener in listeners:
try:
listener(event)
except Exception:
# Listeners must not break the pipeline.
pass
return event
[docs]
def emit_event(self, event: Event) -> None:
"""Dispatch a pre-built event to all listeners.
Args:
event: Event instance to dispatch.
"""
with self._lock:
self._events.append(event)
listeners = list(self._listeners)
for listener in listeners:
try:
listener(event)
except Exception:
pass
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
[docs]
def get_events(
self,
min_level: EventLevel = EventLevel.DEBUG,
category: Optional[str] = None,
patient_id: Optional[str] = None,
) -> List[Event]:
"""Return stored events matching the given filters.
Args:
min_level: Minimum importance level to include.
category: If set, only return events with this category.
patient_id: If set, only return events for this patient.
Returns:
List of matching events in chronological order.
"""
with self._lock:
events = list(self._events)
result: List[Event] = []
for e in events:
if e.level < min_level:
continue
if category is not None and e.category != category:
continue
if patient_id is not None and e.patient_id != patient_id:
continue
result.append(e)
return result
[docs]
def clear(self) -> None:
"""Remove all stored events."""
with self._lock:
self._events.clear()
@property
def event_count(self) -> int:
"""Number of stored events."""
with self._lock:
return len(self._events)
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_global_bus: Optional[EventBus] = None
_bus_lock = threading.Lock()
[docs]
def get_event_bus() -> EventBus:
"""Return the process-wide :class:`EventBus` singleton.
The bus is created on first call and reused thereafter.
Returns:
The global EventBus instance.
"""
global _global_bus
if _global_bus is None:
with _bus_lock:
if _global_bus is None:
_global_bus = EventBus()
return _global_bus
[docs]
def reset_event_bus() -> None:
"""Reset the global event bus (primarily for testing)."""
global _global_bus
with _bus_lock:
if _global_bus is not None:
_global_bus.clear()
_global_bus = None