Events vs Messages¶
Critical for Async ACP
Events and TaskMessages serve different purposes and are stored in separate database tables. This distinction is fundamental to understanding how Async ACP works.
The Core Distinction¶
| Events | TaskMessages |
|---|---|
| ✅ Stored persistently in events table | ✅ Stored persistently in messages table |
| 🔄 Agent processing notifications | 💬 User-facing conversation history |
| ✅ Can be queried for processing | ✅ Can be retrieved for conversation context |
| 🚀 Written to DB BEFORE ACP delivery | 📝 Created by agents or users directly |
Understanding the Relationship¶
Events Are "Processing Queue" Items¶
Think of events like work items in a processing queue:
# Event = "Work item ready for processing"
# - Stored persistently in events table
# - Contains processing context and metadata
# - Written to database BEFORE being sent to agent
# - Can be queried and tracked for processing progress
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# This event was already saved to DB before reaching here
event = params.event # ✅ Persistent in events table
# You can query all events for this task/agent
all_events = await adk.events.list_events(
task_id=params.task.id,
agent_id=params.agent.id
) # ✅ All processing history available
# TaskMessages are the user-facing conversation
messages = await adk.messages.list(task_id=params.task.id) # ✅ Conversation context
TaskMessages Are the "User Conversation"¶
# TaskMessage = User-facing conversation history
# - Permanently stored in messages table
# - Contains conversation content for users/clients
# - Can be retrieved anytime for chat history
# - Forms the visible conversation thread
# Access user-facing conversation history
task_messages = await adk.messages.list(task_id=task_id)
for message in task_messages:
print(f"User sees: {message.content}")
# Events are for agent processing coordination
events = await adk.events.list_events(task_id=task_id, agent_id=agent_id)
for event in events:
print(f"Agent processes: {event.id} at sequence {event.sequence_id}")
Event Processing Patterns¶
❌ Don't Process Only Current Event Content¶
# WRONG: Only processing the single event content
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
if params.event.content:
# Only processing the current event content
user_message = params.event.content.content
response = await process_message(user_message)
# Missing: No context from conversation history or other events!
✅ Use Events for Coordination, Process with Full Context¶
# CORRECT: Event triggers processing, get full context from both sources
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# Event tells us "work is ready to be processed"
# (This event is already stored in DB before reaching here)
event = params.event
# Get conversation context from TaskMessages
conversation_messages = await adk.messages.list(task_id=params.task.id)
# Get processing context from Events (for coordination)
all_events = await adk.events.list_events(
task_id=params.task.id,
agent_id=params.agent.id
)
# Process with full context from both sources
response = await process_with_context(conversation_messages, all_events)
# Create new message for user conversation
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author=MessageAuthor.AGENT,
content=response
)
)
Why This Architecture Exists¶
Separation of Concerns: Processing vs Conversation¶
The dual-table architecture enables powerful patterns:
- Events Table: Tracks agent processing state and coordination
- Messages Table: Maintains user-facing conversation history
Enables Flexible Processing Strategies¶
# Strategy 1: Process events immediately
@acp.on_task_event_send
async def immediate_processing(params: SendEventParams):
# React to each event as it arrives
# Events are already in DB for coordination
response = await quick_response(params.event.content)
# Strategy 2: Batch process accumulated events
@acp.on_task_event_send
async def batch_processing(params: SendEventParams):
# Get current processing cursor from agent task tracker
tracker = await adk.agent_task_tracker.get_by_task_and_agent(
task_id=params.task.id,
agent_id=params.agent.id
)
# Get all unprocessed events since last cursor
unprocessed_events = await adk.events.list_events(
task_id=params.task.id,
agent_id=params.agent.id,
last_processed_event_id=tracker.last_processed_event_id,
limit=100
)
if len(unprocessed_events) >= 5: # Process batch of 5
await process_event_batch(unprocessed_events)
# Update cursor to track progress - ONLY after processing is complete
await adk.agent_task_tracker.update(
tracker_id=tracker.id,
request=UpdateAgentTaskTrackerRequest(
last_processed_event_id=unprocessed_events[-1].id,
status_reason=f"Processed batch of {len(unprocessed_events)} events"
)
)
Database Write Order Guarantees¶
Critical: Events are written to the database BEFORE being sent to the agent:
- User sends message → TaskMessage created in messages table
- Event created → Event written to events table
- Event delivered → Agent receives event via ACP
- Agent processes → Can query both events and messages tables
This ensures agents can always access the event data, even if there are delivery failures.