Deal Store¶
The DealStore is the buyer agent's SQLite-backed persistence layer for deal lifecycle state, negotiation history, booking records, job tracking, and status transitions. It provides crash resilience, a full audit trail, and the foundation for portfolio tracking across sellers.
Why Persistence?¶
Without persistent storage, all deal state lives in memory and is lost on process restart. The DealStore solves three problems:
- Crash resilience --- A server restart mid-negotiation no longer means lost deals. The buyer can resume from the last known state.
- Audit trail --- Every status change and negotiation round is recorded in append-only tables. This supports post-campaign analysis and dispute resolution.
- Portfolio tracking --- The buyer can query all active deals across sellers, filter by status, and calculate aggregate spend --- enabling informed budget allocation decisions.
Synchronous by design
The DealStore uses synchronous sqlite3 (not aiosqlite) because CrewAI runs flows in worker threads that may not have an asyncio event loop. Thread safety is provided by check_same_thread=False and a threading.Lock().
SQLite Schema¶
The schema consists of five relational tables managed by a versioned migration system. All tables use ISO 8601 timestamps and are created automatically on first connection.
Entity-Relationship Diagram¶
erDiagram
deals ||--o{ negotiation_rounds : "has"
deals ||--o{ booking_records : "booked as"
deals ||--o{ status_transitions : "tracked by"
booking_records ||--o{ status_transitions : "tracked by"
jobs ||--o{ status_transitions : "tracked by"
deals {
TEXT id PK
TEXT seller_url
TEXT seller_deal_id
TEXT product_id
TEXT product_name
TEXT deal_type
TEXT status
REAL price
REAL original_price
INTEGER impressions
TEXT flight_start
TEXT flight_end
TEXT buyer_context
TEXT metadata
TEXT created_at
TEXT updated_at
}
negotiation_rounds {
INTEGER id PK
TEXT deal_id FK
TEXT proposal_id
INTEGER round_number
REAL buyer_price
REAL seller_price
TEXT action
TEXT rationale
TEXT created_at
}
booking_records {
INTEGER id PK
TEXT deal_id FK
TEXT order_id
TEXT line_id
TEXT channel
INTEGER impressions
REAL cost
TEXT booking_status
TEXT booked_at
TEXT metadata
}
jobs {
TEXT id PK
TEXT status
REAL progress
TEXT brief
INTEGER auto_approve
TEXT budget_allocs
TEXT recommendations
TEXT booked_lines
TEXT errors
TEXT created_at
TEXT updated_at
}
status_transitions {
INTEGER id PK
TEXT entity_type
TEXT entity_id
TEXT from_status
TEXT to_status
TEXT triggered_by
TEXT notes
TEXT created_at
}
Table Descriptions¶
deals¶
The central table tracking every deal the buyer initiates. Each row represents a single deal with a seller for a specific product.
| Column | Type | Description |
|---|---|---|
id |
TEXT PK |
UUID, generated client-side or provided |
seller_url |
TEXT NOT NULL |
Seller endpoint URL |
seller_deal_id |
TEXT |
Seller-assigned deal ID (populated after booking) |
product_id |
TEXT NOT NULL |
Product being purchased |
product_name |
TEXT |
Human-readable product name |
deal_type |
TEXT |
PG (guaranteed), PD (preferred), or PA (auction) |
status |
TEXT |
Current lifecycle status (see Deal Lifecycle) |
price |
REAL |
Current/final CPM |
original_price |
REAL |
Pre-discount CPM (for savings tracking) |
impressions |
INTEGER |
Contracted impression volume |
flight_start |
TEXT |
Flight start date (ISO 8601) |
flight_end |
TEXT |
Flight end date (ISO 8601) |
buyer_context |
TEXT |
JSON-serialized BuyerContext (seat, agency, advertiser) |
metadata |
TEXT |
Extensible JSON field for additional data |
created_at |
TEXT |
Creation timestamp (UTC) |
updated_at |
TEXT |
Last modification timestamp (UTC) |
Indexes: status, seller_url, seller_deal_id, created_at, composite (status, created_at).
negotiation_rounds¶
Append-only log of every negotiation round for a deal. Each row captures the buyer's offer, the seller's ask, and the action taken.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK |
Auto-increment row ID |
deal_id |
TEXT FK |
References deals(id) with CASCADE delete |
proposal_id |
TEXT |
Seller's proposal identifier |
round_number |
INTEGER |
Sequential round (1, 2, 3, ...) |
buyer_price |
REAL |
Buyer's offered CPM |
seller_price |
REAL |
Seller's asking CPM |
action |
TEXT |
counter, accept, reject, or final_offer |
rationale |
TEXT |
Explanation for the action taken |
created_at |
TEXT |
Round timestamp (UTC) |
Constraint: UNIQUE(deal_id, round_number) prevents duplicate rounds.
booking_records¶
Tracks booked line items resulting from completed deals. Links deals to OpenDirect orders and lines.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK |
Auto-increment row ID |
deal_id |
TEXT FK |
References deals(id) with CASCADE delete |
order_id |
TEXT |
OpenDirect order ID |
line_id |
TEXT |
OpenDirect line ID |
channel |
TEXT |
Channel name (branding, ctv, mobile, performance) |
impressions |
INTEGER |
Contracted impressions for this line |
cost |
REAL |
Line cost |
booking_status |
TEXT |
pending, confirmed, cancelled |
booked_at |
TEXT |
Booking timestamp (UTC) |
metadata |
TEXT |
Extensible JSON field |
Constraint: UNIQUE(deal_id, line_id) prevents duplicate line bookings.
jobs¶
Tracks API-initiated booking jobs. Replaces the in-memory job dictionary used in earlier versions. Each job corresponds to one POST /bookings request.
| Column | Type | Description |
|---|---|---|
id |
TEXT PK |
Job UUID |
status |
TEXT |
pending, running, awaiting_approval, completed, failed |
progress |
REAL |
0.0 to 1.0 progress indicator |
brief |
TEXT |
JSON-serialized campaign brief |
auto_approve |
INTEGER |
Boolean flag (0 or 1) |
budget_allocs |
TEXT |
JSON budget allocations |
recommendations |
TEXT |
JSON recommendation list |
booked_lines |
TEXT |
JSON booked line items |
errors |
TEXT |
JSON error list |
created_at |
TEXT |
Job creation timestamp (UTC) |
updated_at |
TEXT |
Last update timestamp (UTC) |
The save_job method uses INSERT ... ON CONFLICT DO UPDATE (upsert) so that job state can be updated incrementally as the flow progresses.
status_transitions¶
Append-only audit log for all status changes across deals and bookings. Every call to update_deal_status automatically writes a row here.
| Column | Type | Description |
|---|---|---|
id |
INTEGER PK |
Auto-increment row ID |
entity_type |
TEXT |
deal or booking |
entity_id |
TEXT |
The entity's primary key |
from_status |
TEXT |
Previous status (NULL for creation) |
to_status |
TEXT |
New status |
triggered_by |
TEXT |
system, seller_push, user, or agent |
notes |
TEXT |
Free-text explanation |
created_at |
TEXT |
Transition timestamp (UTC) |
Schema Versioning¶
The schema_version table tracks which migrations have been applied. The initialize_schema() function runs on every DealStore.connect() call:
- Creates all tables and indexes (idempotent via
IF NOT EXISTS). - Checks the current schema version.
- Applies any pending migrations sequentially.
- Records the new version.
This ensures forward-compatible schema evolution without manual migration steps.
DealStore API¶
All public methods are synchronous and thread-safe. The store is initialized with a SQLite connection string and must be explicitly connected before use.
Lifecycle¶
from ad_buyer.storage.deal_store import DealStore
store = DealStore("sqlite:///./ad_buyer.db")
store.connect() # Opens connection, sets pragmas, initializes schema
# ... use the store ...
store.disconnect() # Closes connection
On connect(), the store sets three SQLite pragmas:
| Pragma | Value | Purpose |
|---|---|---|
journal_mode |
WAL |
Write-ahead logging for concurrent read/write |
foreign_keys |
ON |
Enforce referential integrity |
busy_timeout |
5000 |
Wait up to 5 seconds for locks instead of failing immediately |
Singleton Factory¶
For application-wide access, use the get_deal_store() factory which returns a module-level singleton:
from ad_buyer.storage import get_deal_store
store = get_deal_store() # Uses default: sqlite:///./ad_buyer.db
store = get_deal_store("sqlite:///./custom.db") # Custom path (first call only)
First-call semantics
The database_url parameter is only used on the first call. Subsequent calls return the cached instance regardless of the URL passed.
Deal Methods¶
save_deal(**kwargs) -> str¶
Insert a new deal record. Returns the deal ID (auto-generated UUID if not provided).
deal_id = store.save_deal(
seller_url="http://seller.example.com:8001",
product_id="prod-ctv-sports-001",
product_name="CTV Sports Premium",
deal_type="PD",
status="draft",
price=14.50,
original_price=18.00,
impressions=500_000,
flight_start="2026-07-01",
flight_end="2026-09-30",
)
Automatically records an initial status transition (None -> draft) in the audit log.
get_deal(deal_id) -> dict | None¶
Retrieve a single deal by ID, or None if not found.
list_deals(*, status, seller_url, created_after, limit) -> list[dict]¶
Query deals with optional filters. Results ordered by created_at descending.
# All booked deals for a seller
deals = store.list_deals(status="booked", seller_url="http://seller:8001")
# Recent deals (last 24 hours)
deals = store.list_deals(created_after="2026-03-09T00:00:00Z", limit=100)
update_deal_status(deal_id, new_status, *, triggered_by, notes) -> bool¶
Update a deal's status and automatically record the transition in the audit log.
store.update_deal_status(
"deal-uuid",
"booked",
triggered_by="agent",
notes="Deal confirmed by seller after negotiation",
)
Negotiation Methods¶
save_negotiation_round(**kwargs) -> int¶
Record a negotiation round. Returns the auto-generated row ID.
store.save_negotiation_round(
deal_id="deal-uuid",
proposal_id="prop-001",
round_number=1,
buyer_price=10.0,
seller_price=15.0,
action="counter",
rationale="Countered at $10 CPM based on historical rates",
)
get_negotiation_history(deal_id) -> list[dict]¶
Retrieve all negotiation rounds for a deal, ordered by round number ascending.
Booking Methods¶
save_booking_record(**kwargs) -> int¶
Record a booked line item linked to a deal. Returns the auto-generated row ID.
store.save_booking_record(
deal_id="deal-uuid",
order_id="order-123",
line_id="line-456",
channel="ctv",
impressions=500_000,
cost=7250.00,
booking_status="confirmed",
)
get_booking_records(deal_id) -> list[dict]¶
Retrieve all booking records for a deal.
Job Methods¶
save_job(**kwargs) -> str¶
Insert or update a job record (upsert). JSON fields (brief, budget_allocs, recommendations, booked_lines, errors) are stored as serialized strings.
get_job(job_id) -> dict | None¶
Retrieve a job by ID. JSON fields are automatically deserialized into Python objects. The auto_approve integer is converted to a boolean.
list_jobs(*, status, limit) -> list[dict]¶
List jobs with optional status filter, ordered by created_at descending.
Audit Methods¶
record_status_transition(**kwargs) -> int¶
Write a row to the status_transitions audit table. Called automatically by save_deal and update_deal_status, but can also be called directly for booking status changes.
get_status_history(entity_type, entity_id) -> list[dict]¶
Retrieve the full status history for a deal or booking, ordered chronologically.
history = store.get_status_history("deal", "deal-uuid")
for t in history:
print(f"{t['from_status']} -> {t['to_status']} ({t['triggered_by']})")
Deal Lifecycle Tracking¶
The DealStore tracks deals through a well-defined status progression. Every transition is recorded in the status_transitions table.
Status Flow¶
stateDiagram-v2
[*] --> draft: save_deal()
draft --> quoted: request_quote() via DealsClient
quoted --> negotiating: negotiation begins
negotiating --> negotiating: counter / final_offer
negotiating --> booked: book_deal() via DealsClient
quoted --> booked: book_deal() (no negotiation)
booked --> active: seller activates
active --> completed: impressions fulfilled
booked --> rejected: seller rejects
quoted --> expired: quote expires
draft --> cancelled: buyer cancels
Status Descriptions¶
| Status | Meaning | Triggered By |
|---|---|---|
draft |
Deal created locally, not yet sent to seller | save_deal() |
quoted |
Price quote received from seller | DealsClient.request_quote() |
negotiating |
Active price negotiation in progress | Negotiation rounds |
booked |
Deal confirmed with seller | DealsClient.book_deal() |
active |
Deal is live, accepting bids | Seller status push |
completed |
Impression target fulfilled | Seller status push |
rejected |
Seller rejected the deal | Seller status push |
expired |
Quote or deal expired | Seller status push |
cancelled |
Buyer cancelled before booking | Buyer action |
Negotiation Tracking¶
Each negotiation round captures both sides of the conversation:
sequenceDiagram
participant Buyer as Buyer Agent
participant Store as DealStore
participant Seller as Seller Agent
Note over Store: Round 1
Seller->>Buyer: Quote at $18 CPM
Buyer->>Store: save_negotiation_round(action=counter, buyer=12, seller=18)
Buyer->>Seller: Counter at $12 CPM
Note over Store: Round 2
Seller->>Buyer: Counter at $15 CPM
Buyer->>Store: save_negotiation_round(action=counter, buyer=13.5, seller=15)
Buyer->>Seller: Counter at $13.50 CPM
Note over Store: Round 3
Seller->>Buyer: Final offer at $14 CPM
Buyer->>Store: save_negotiation_round(action=accept, buyer=14, seller=14)
Buyer->>Seller: Accept at $14 CPM
The action field captures the buyer's decision at each round:
| Action | Meaning |
|---|---|
counter |
Buyer counters with a different price |
accept |
Buyer accepts the seller's price |
reject |
Buyer walks away from the deal |
final_offer |
Buyer's last offer (take it or leave it) |
Integration with DealsClient and Negotiation¶
The DealStore is an optional dependency of the DealsClient. When attached, the client automatically persists deal state at key moments in the quote-then-book flow.
Automatic Persistence Points¶
graph LR
A["request_quote()"] -->|"save_deal(status=quoted)"| Store["DealStore"]
B["book_deal()"] -->|"update status to booked"| Store
C["get_deal()"] -->|"sync status from seller"| Store
D["negotiate()"] -->|"save_negotiation_round()"| Store
| Client Method | DealStore Action | Notes |
|---|---|---|
request_quote() |
save_deal() with status quoted |
Creates the deal record |
book_deal() |
update_deal_status() to booked |
Records seller-issued deal ID |
get_deal() |
update_deal_status() if changed |
Syncs status from seller |
| Negotiation flow | save_negotiation_round() per round |
Records full price history |
Non-fatal persistence
DealStore failures are logged but never raised to the caller. The API call succeeds even if local persistence fails. This prevents storage issues from blocking deal execution.
Wiring in Application Code¶
The FastAPI application creates the DealStore singleton from the database_url setting and injects it into the DealsClient:
# In interfaces/api/main.py
from ad_buyer.storage.deal_store import DealStore
store = DealStore(settings.database_url)
store.connect()
client = DealsClient(
seller_url=settings.seller_url,
api_key=settings.api_key,
deal_store=store,
)
Configuration¶
The DealStore's database path is controlled by the database_url setting in the buyer agent's configuration.
Settings¶
| Setting | Default | Environment Variable | Description |
|---|---|---|---|
database_url |
sqlite:///./ad_buyer.db |
DATABASE_URL |
SQLite connection string |
The DealStore._parse_url() method handles the sqlite:/// prefix:
| Input | Resolved Path |
|---|---|
sqlite:///./ad_buyer.db |
./ad_buyer.db |
sqlite:///:memory: |
:memory: (in-memory, for testing) |
sqlite:///path/to/db |
path/to/db |
/absolute/path.db |
/absolute/path.db (pass-through) |
Testing Configuration¶
Use an in-memory database for tests to avoid file system side effects:
Related¶
- Deals API Client ---
DealsClientmethods, request/response models, and error handling - Booking Flow --- End-to-end sequence diagram for the
DealBookingFlow - Negotiation Guide --- How to negotiate pricing with sellers
- Models --- Pydantic data model reference
- Seller Quotes API --- Seller-side quote endpoints
- Seller Orders API --- Seller-side deal/order endpoints