Event Bus¶
The event bus provides structured, append-only event logging for all significant actions in the buyer agent. Every quote request, negotiation round, deal booking, and budget allocation is recorded as a typed event --- giving operators full observability into what the buyer did, when, and why.
Why an Event Bus¶
Standard Python logging captures text messages. The event bus captures structured records that can be filtered, queried, and subscribed to programmatically:
- Auditability --- Regulators, advertisers, and agency partners can trace every action the buyer agent took on their behalf.
- Debugging --- Reconstruct the exact sequence of events that led to a deal outcome by querying
flow_idordeal_id. - Analytics --- Query historical events for reporting (e.g., average negotiation rounds, win rates by seller, time-to-book).
- Integration --- External systems can subscribe to events for real-time dashboards, alerting, or downstream processing.
Fail-open by design
Event emission never blocks or breaks the calling flow. If the bus is unavailable, the helpers log a warning and return None. Deal execution always takes priority over observability.
Event Model¶
Every event is a Pydantic Event instance with the following fields:
| Field | Type | Default | Description |
|---|---|---|---|
event_id |
str |
Auto-generated UUID | Unique identifier for this event |
event_type |
EventType |
(required) | Enum value identifying what happened |
timestamp |
datetime |
datetime.utcnow() |
When the event was created |
flow_id |
str |
"" |
ID of the flow instance that produced this event |
flow_type |
str |
"" |
Type of flow (e.g., "dsp_deal", "deal_booking") |
deal_id |
str |
"" |
Associated deal ID, if applicable |
session_id |
str |
"" |
Associated session ID, if applicable |
payload |
dict[str, Any] |
{} |
Event-specific data (see per-type examples below) |
metadata |
dict[str, Any] |
{} |
Additional context passed via **kwargs to helpers |
Correlation¶
Events can be correlated across a workflow using three keys:
flow_id--- Groups all events from a single flow execution.deal_id--- Groups all events related to a specific deal across multiple flows.session_id--- Groups all events within a user session.
Event Types¶
The EventType enum defines 13 event types organized by domain.
Quote Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
QUOTE_REQUESTED |
quote.requested |
Quote request sent to seller | {"request": "...", "deal_type": "PD"} |
QUOTE_RECEIVED |
quote.received |
Pricing response received | {"product_id": "prod-ctv-001"} |
Deal Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
DEAL_BOOKED |
deal.booked |
Deal booking confirmed | {"product_id": "prod-001", "deal_id": "..."} |
DEAL_CANCELLED |
deal.cancelled |
Deal cancelled | {"deal_id": "...", "reason": "..."} |
Campaign Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
CAMPAIGN_CREATED |
campaign.created |
Campaign brief parsed | {"name": "Q3 CTV", "budget": 50000} |
Budget Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
BUDGET_ALLOCATED |
budget.allocated |
Budget distributed across channels | {"channels": ["ctv", "mobile"], "total_budget": 50000} |
Booking Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
BOOKING_SUBMITTED |
booking.submitted |
Booking request sent to seller | {"order_id": "...", "line_id": "..."} |
Inventory Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
INVENTORY_DISCOVERED |
inventory.discovered |
Seller inventory query returned results | {"query": "..."} |
Negotiation Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
NEGOTIATION_STARTED |
negotiation.started |
Negotiation session opened | {"proposal_id": "...", "strategy": "anchor_high"} |
NEGOTIATION_ROUND |
negotiation.round |
A negotiation round completed | {"round": 2, "buyer_price": 12.0, "seller_price": 15.0} |
NEGOTIATION_CONCLUDED |
negotiation.concluded |
Negotiation session ended | {"outcome": "accepted", "final_price": 14.0} |
Session Lifecycle¶
| Event Type | Value | Emitted When | Example Payload |
|---|---|---|---|
SESSION_CREATED |
session.created |
New buyer session started | {"session_id": "..."} |
SESSION_CLOSED |
session.closed |
Buyer session ended | {"session_id": "...", "deals_count": 3} |
Architecture¶
graph TB
subgraph Flows
DSP["DSPDealFlow"]
DBF["DealBookingFlow"]
end
subgraph "Event Helpers"
ES["emit_event_sync()"]
EA["emit_event()"]
end
subgraph "Event Bus"
BUS["InMemoryEventBus"]
SUBS["Subscribers"]
end
subgraph "Persistence"
STORE["DealStore.save_event()"]
DB[("SQLite events table")]
end
subgraph "API Layer"
LIST["/events"]
GET["/events/{event_id}"]
end
DSP -->|sync calls| ES
DBF -->|sync calls| ES
EA --> BUS
ES -->|async bridge| BUS
BUS -->|notify| SUBS
BUS --> LIST
BUS --> GET
STORE --> DB
InMemoryEventBus¶
The InMemoryEventBus is the default (and currently only) backend. It stores events in a Python list and dispatches to subscribers synchronously during publish().
Methods¶
| Method | Signature | Description |
|---|---|---|
publish |
async (event: Event) -> None |
Append event to store, notify subscribers |
subscribe |
async (event_type: str, callback: Subscriber) -> None |
Register a callback for a specific event type or "*" for all |
get_event |
async (event_id: str) -> Event \| None |
Look up a single event by ID |
list_events |
async (flow_id?, event_type?, session_id?, limit=50) -> list[Event] |
Query events with optional filters |
Subscriber Dispatch¶
When publish() is called:
- The event is appended to the internal list.
- All callbacks registered for the event's type are invoked.
- All callbacks registered for
"*"(wildcard) are invoked. - If a subscriber raises an exception, it is logged and the next subscriber is called. One failing subscriber never blocks others.
from ad_buyer.events import get_event_bus, EventType, Event
bus = await get_event_bus()
# Subscribe to all deal.booked events
async def on_deal_booked(event: Event):
print(f"Deal booked: {event.deal_id}")
await bus.subscribe("deal.booked", on_deal_booked)
# Subscribe to ALL events
async def audit_logger(event: Event):
print(f"[AUDIT] {event.event_type}: {event.payload}")
await bus.subscribe("*", audit_logger)
Singleton Access¶
The module provides a singleton factory:
from ad_buyer.events.bus import get_event_bus, close_event_bus
bus = await get_event_bus() # Creates InMemoryEventBus on first call
bus2 = await get_event_bus() # Returns same instance
assert bus is bus2
await close_event_bus() # Resets the singleton (for testing)
No persistence across restarts
InMemoryEventBus stores events in a Python list. Events are lost when the process exits. For durable storage, events are separately persisted to the SQLite events table via DealStore.save_event().
Emitting Events¶
Two helper functions make it easy to emit events from any code path. Both follow the fail-open pattern: they catch all exceptions, log a warning, and return None.
emit_event() --- Async¶
Use in async code (FastAPI endpoints, async handlers):
from ad_buyer.events.helpers import emit_event
from ad_buyer.events.models import EventType
event = await emit_event(
EventType.DEAL_BOOKED,
flow_id="flow-abc-123",
flow_type="dsp_deal",
deal_id="deal-456",
payload={
"product_id": "prod-ctv-sports-001",
"final_cpm": 14.50,
},
)
# event is an Event instance, or None if the bus was unavailable
emit_event_sync() --- Synchronous¶
Use in synchronous code, particularly CrewAI flow methods that run in worker threads without an asyncio event loop:
from ad_buyer.events.helpers import emit_event_sync
from ad_buyer.events.models import EventType
event = emit_event_sync(
EventType.CAMPAIGN_CREATED,
flow_type="deal_booking",
payload={"name": "Q3 CTV Campaign", "budget": 50000},
)
When to use which
Use emit_event_sync() in CrewAI flow methods (@listen, @start, @router decorated methods). Use emit_event() in FastAPI endpoints and other async contexts. When in doubt, emit_event_sync() works everywhere.
How emit_event_sync() Works¶
CrewAI runs flow steps in worker threads that may not have an asyncio event loop. The sync helper handles three scenarios:
- Running event loop detected --- Schedules the publish as a future on the existing loop.
- Event loop exists but is not running --- Calls
loop.run_until_complete(). - No event loop at all --- Creates a new loop via
asyncio.run().
If the singleton bus does not exist yet, emit_event_sync() creates an InMemoryEventBus directly (bypassing the async get_event_bus() factory).
Helper Parameters¶
Both emit_event() and emit_event_sync() accept the same parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
event_type |
EventType |
(required) | The type of event to emit |
flow_id |
str |
"" |
Flow instance identifier |
flow_type |
str |
"" |
Flow type name |
deal_id |
str |
"" |
Associated deal ID |
session_id |
str |
"" |
Associated session ID |
payload |
dict |
None |
Event-specific data |
**kwargs |
Any |
--- | Extra key-value pairs stored in metadata |
SQLite Persistence¶
Events are persisted to a SQLite events table managed by the DealStore. This provides durability across process restarts, independent of the in-memory bus.
Events Table Schema¶
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
flow_id TEXT NOT NULL DEFAULT '',
flow_type TEXT NOT NULL DEFAULT '',
deal_id TEXT NOT NULL DEFAULT '',
session_id TEXT NOT NULL DEFAULT '',
payload TEXT DEFAULT '{}',
metadata TEXT DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
Indexes¶
| Index | Column(s) | Purpose |
|---|---|---|
idx_events_type |
event_type |
Filter by event type |
idx_events_flow_id |
flow_id |
Correlate events within a flow |
idx_events_deal_id |
deal_id |
Find all events for a deal |
idx_events_session_id |
session_id |
Find all events in a session |
idx_events_created_at |
created_at |
Time-range queries |
DealStore Event Methods¶
| Method | Signature | Description |
|---|---|---|
save_event |
(**kwargs) -> str |
Insert an event row. Returns the event ID (auto-generated if not provided). |
get_event |
(event_id: str) -> dict \| None |
Retrieve a single event by ID. |
list_events |
(event_type?, flow_id?, session_id?, limit=50) -> list[dict] |
Query events with optional filters, ordered by created_at descending. |
from ad_buyer.storage.deal_store import DealStore
import json
store = DealStore("sqlite:///./ad_buyer.db")
store.connect()
# Persist an event
event_id = store.save_event(
event_type="deal.booked",
flow_id="flow-abc",
deal_id="deal-456",
payload=json.dumps({"product_id": "prod-001", "cpm": 14.50}),
)
# Query events for a deal
events = store.list_events(flow_id="flow-abc")
API Endpoints¶
Two REST endpoints expose events from the in-memory bus.
GET /events¶
List events with optional filters.
Query Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
event_type |
str |
None |
Filter by event type value (e.g., "deal.booked") |
flow_id |
str |
None |
Filter by flow ID |
session_id |
str |
None |
Filter by session ID |
limit |
int |
50 |
Maximum number of events to return |
Response:
{
"events": [
{
"event_id": "a1b2c3d4-...",
"event_type": "deal.booked",
"timestamp": "2026-03-11T14:30:00Z",
"flow_id": "flow-abc-123",
"flow_type": "dsp_deal",
"deal_id": "deal-456",
"session_id": "",
"payload": {"product_id": "prod-001", "final_cpm": 14.50},
"metadata": {}
}
],
"total": 1
}
GET /events/{event_id}¶
Retrieve a single event by ID.
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
event_id |
str |
The event's UUID |
Response: The event object (same shape as array elements above).
Errors: Returns 404 if the event ID is not found.
In-memory only
These endpoints query the InMemoryEventBus, not the SQLite events table. Events are available only for the lifetime of the current process. For historical queries across restarts, query the DealStore directly.
Event Flow Diagram¶
This sequence diagram shows how events flow through the system during a typical DSP deal:
sequenceDiagram
participant Flow as DSPDealFlow
participant Helper as emit_event_sync()
participant Bus as InMemoryEventBus
participant Sub as Subscribers
participant API as GET /events
Flow->>Helper: emit_event_sync(QUOTE_REQUESTED, ...)
Helper->>Bus: publish(event)
Bus->>Bus: Append to event list
Bus->>Sub: Notify type subscribers
Bus->>Sub: Notify wildcard subscribers
Note over Flow: Seller responds...
Flow->>Helper: emit_event_sync(QUOTE_RECEIVED, ...)
Helper->>Bus: publish(event)
Note over Flow: Deal negotiated and booked...
Flow->>Helper: emit_event_sync(DEAL_BOOKED, ...)
Helper->>Bus: publish(event)
API->>Bus: list_events(flow_id=...)
Bus-->>API: [event1, event2, event3]
Usage Examples¶
Emitting Events from a Flow¶
from ad_buyer.events.helpers import emit_event_sync
from ad_buyer.events.models import EventType
# In a CrewAI flow method
def negotiate_price(self):
# ... negotiation logic ...
emit_event_sync(
EventType.NEGOTIATION_ROUND,
flow_id=self.state.flow_id,
flow_type="dsp_deal",
deal_id=self.state.deal_id,
payload={
"round": round_number,
"buyer_price": our_offer,
"seller_price": their_ask,
"action": "counter",
},
)
Subscribing to Events¶
from ad_buyer.events import get_event_bus, Event
bus = await get_event_bus()
def on_deal_booked(event: Event):
"""React to deal bookings (e.g., update a dashboard)."""
print(f"New deal: {event.deal_id} at {event.payload.get('final_cpm')}")
await bus.subscribe("deal.booked", on_deal_booked)
Querying Events via API¶
# All events for a specific flow
curl "http://localhost:8002/events?flow_id=flow-abc-123"
# All deal.booked events
curl "http://localhost:8002/events?event_type=deal.booked"
# A specific event
curl "http://localhost:8002/events/a1b2c3d4-5678-..."
Related¶
- Deal Store --- SQLite persistence layer including the
eventstable - Order State Machine --- State transitions emit events for observability
- Architecture Overview --- System architecture context
- Booking Flow --- End-to-end workflow that emits campaign and deal events
- Seller Event Bus --- Seller-side event bus implementation