Skip to content

event_bus

appimage_updater.events.event_bus

Event bus system for decoupled communication between components.

Event(source=None, **kwargs)

Base class for all events in the system.

Parameters:

Name Type Description Default
source str | None

Source component that generated the event

None
**kwargs Any

Additional event data

{}
Source code in src/appimage_updater/events/event_bus.py
@abstractmethod
def __init__(self, source: str | None = None, **kwargs: Any):
    """Initialize event.

    Args:
        source: Source component that generated the event
        **kwargs: Additional event data
    """
    self.source = source
    self.data = kwargs

data = kwargs instance-attribute

source = source instance-attribute

EventBus()

Event bus for managing event subscriptions and publishing.

Source code in src/appimage_updater/events/event_bus.py
def __init__(self) -> None:
    """Initialize event bus."""
    self._subscribers: dict[type[Event], list[Callable[[Event], None]]] = defaultdict(list)
    self._async_subscribers: dict[type[Event], list[Callable[[Event], Any]]] = defaultdict(list)

publish(event)

Publish an event to all subscribers.

Parameters:

Name Type Description Default
event Event

Event to publish

required
Source code in src/appimage_updater/events/event_bus.py
def publish(self, event: Event) -> None:
    """Publish an event to all subscribers.

    Args:
        event: Event to publish
    """
    event_type = type(event)

    # Call synchronous handlers
    for handler in self._subscribers[event_type]:
        with contextlib.suppress(Exception):
            handler(event)

    # Call async handlers
    if self._async_subscribers[event_type]:
        asyncio.create_task(self._publish_async(event))

get_event_bus()

Get the global event bus instance.

Returns:

Type Description
EventBus

Global event bus

Source code in src/appimage_updater/events/event_bus.py
def get_event_bus() -> EventBus:
    """Get the global event bus instance.

    Returns:
        Global event bus
    """
    return _global_event_bus