Task Concepts¶
Tasks are the central entity in Agentex, representing a conversation or workflow instance between a client and an agent. Understanding tasks is fundamental to building effective agents.
What is a Task?¶
A Task represents a stateful conversation or workflow session. It's the top-level container that holds:
- Messages exchanged between client and agent
- Persistent state managed by the agent
- Lifecycle information (creation, status, completion)
- Metadata about the conversation
Think of a task like a chat session, customer support ticket, or workflow execution that can span multiple interactions over time.
Task Relationships¶
For Detailed Implementation
This section explains the architectural relationships between core Agentex entities. For specific implementation patterns, refer to the Agent Client Protocol guides.
Task ↔ Messages (One-to-Many)¶
Tasks maintain a flat ledger of messages. This flat design is an intentional architectural decision that enables flexible multi-actor communication patterns.
# A single task's message ledger might include:
messages = [
{"author": "USER", "content": "I need help with analysis"}, # Human user
{"author": "AGENT", "content": "I'll analyze this for you"}, # Primary agent
{"author": "AGENT", "content": "Analysis complete: ..."}, # Primary agent result
{"author": "USER", "content": "Can you also check the data?"}, # Human user follow-up
{"author": "AGENT", "content": "Data validation in progress..."}, # Secondary agent (if multi-agent)
{"author": "AGENT", "content": "Data is valid"}, # Secondary agent result
]
Why This Design?¶
This flat structure allows:
- Multi-agent systems where different agents contribute to the same conversation
- Complex user interactions where multiple users (or user systems) participate
- Simple message retrieval without needing to understand actor hierarchies
- Chronological ordering that preserves the natural flow of communication
Task ↔ Events (One-to-Many)¶
Events are not persisted objects - they are notifications that wrap task message content. When a new message arrives in the task message ledger, an event is generated to notify agents, but you should always operate on the actual TaskMessages.
Key Characteristics:¶
- Notification System: Events signal that new messages have arrived, like "new mail in your mailbox"
- Content Wrapper: Events contain
TaskMessageContentbut are not the source of truth - Ephemeral: Events are notifications, not stored entities you query later
- Triggering Mechanism: In Async Agents, events trigger your
@acp.on_task_event_sendhandlers
Processing Strategies:¶
You have two approaches when handling events:
- Process Event Content Directly: Handle the
TaskMessageContentwrapped in the event - Process All New Messages: Use the event as a trigger to fetch all new messages since your last cursor
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# Approach 1: Process the event's message content directly
if params.event.content:
user_message = params.event.content.content
await process_single_message(user_message)
# Approach 2: Treat event as "you have new mail" notification
# Fetch all new messages since last processed
all_messages = await adk.messages.list(task_id=params.task.id)
new_messages = get_unprocessed_messages(all_messages, last_cursor)
Using Agent Task Tracker for Coordinated Processing¶
For Approach 2, you can use Agent Task Tracker as a cursor system to track which events have been processed:
@acp.on_task_event_send
async def handle_event_send_with_cursor(params: SendEventParams):
# Get current processing cursor from Agent Task Tracker
tracker = await adk.agent_task_tracker.get_by_task_and_agent(
task_id=params.task.id,
agent_id=params.agent.id
)
# Get all unprocessed events since last cursor
unprocessed_events = await adk.events.list_events(
task_id=params.task.id,
agent_id=params.agent.id,
last_processed_event_id=tracker.last_processed_event_id,
limit=50
)
if not unprocessed_events:
return # No new events to process
# Process batch of events
for event in unprocessed_events:
# Process each event and corresponding task messages
if event.content:
await process_event_content(event.content)
# Update cursor ONLY after all processing is complete
await adk.agent_task_tracker.update(
tracker_id=tracker.id,
request=UpdateAgentTaskTrackerRequest(
last_processed_event_id=unprocessed_events[-1].id,
status_reason=f"Processed {len(unprocessed_events)} events"
)
)
Benefits of cursor-based processing:
- Resumable: Pick up where you left off after restarts
- Batch Processing: Process multiple events together efficiently
- Progress Tracking: Know exactly which events have been handled
- Race Reduction: Coordinate processing across instances
Optional Coordination
Agent Task Tracker cursors are optional - only use them when you need sophisticated processing coordination patterns. For simple event handling, processing events individually works fine.
Cursor Safety
Cursors can only move forward - never backward. Only update last_processed_event_id after processing is completely finished.
Task ↔ State (One-to-One per Agent)¶
State is scoped to the union of a task and an agent. Each agent maintains its own isolated state when working on a specific task.
Key Characteristics:¶
- Agent-Scoped State: Each
(task_id, agent_id)pair gets its own state - Isolation: Agents don't interfere with each other's state, even on the same task
- Simplicity: Individual agents only need to manage their own state
- Parallel Safety: Multiple agents can work on the same task without state conflicts
Why This Design?¶
# Task "task_123" with multiple agents:
# Agent A's state (focused on analysis)
agent_a_state = {
"analysis_stage": "data_processing",
"processed_rows": 1500,
"findings": ["anomaly_1", "pattern_2"]
}
# Agent B's state (focused on reporting)
agent_b_state = {
"report_format": "executive_summary",
"sections_completed": ["intro", "methodology"],
"pending_charts": ["trend_analysis", "comparison"]
}
# Agent C's state (focused on validation)
agent_c_state = {
"validation_rules": ["rule_1", "rule_2", "rule_3"],
"validated_items": 45,
"errors_found": []
}
This design enables:
- Simple reasoning: Each agent only considers its own state and responsibilities
- Parallel execution: Multiple agents work simultaneously without coordination overhead
- Code maintainability: Agent logic remains focused and doesn't need to understand other agents
- System reliability: One agent's state issues don't affect other agents
Task Handling by Agent Type¶
The way you work with tasks depends heavily on which agent type you choose. Agentex supports three agent types: Sync Agents, Async Agents (Base), and Async Agents (Temporal).
Choosing Your Agent Type
Not sure which agent type is right for you? See the Choose Your Agent Type guide for a detailed comparison and decision matrix.
Sync Agents - Simple Request-Response¶
Sync Agents treat tasks as lightweight conversation containers where the Agentex service handles all task and message lifecycle management automatically.
What Agentex Handles (Automatic):
- Creates or retrieves tasks (auto-created for single-turn interactions, or reused if you pass a
task_idfor multi-turn conversations) - Saves all incoming messages to conversation history
- Routes requests to your handler
- Persists all your response messages to conversation history
- Returns responses to clients
What You Implement (Your Code):
- Process the incoming message from the
TaskMessageContentpayload. - Return or yield your agent's response (return for blocking responses, yield for streaming)
Why This Works:
Because Agentex persists both the inputs and outputs, you can focus entirely on business logic - just transform inputs into outputs. No need to manage tasks, save messages, or handle lifecycle. You write pure processing code: receive message → generate response → return or yield.
Key Characteristics:
- One handler:
@acp.on_message_send - Tasks and messages managed automatically
- Stateless by default (though you can add state via
adk.stateAPI) - Blocking, synchronous execution
Task Interaction Pattern:
@acp.on_message_send
async def handle_message_send(params: SendMessageParams):
"""Only handler needed for Sync Agents"""
task = params.task # Task context provided automatically
user_message = params.content.content
# Process the message and return response
response = await process_user_message(user_message)
return TextContent(
author=MessageAuthor.AGENT,
content=response
)
Lifecycle Overview:
# 1. Task Creation - Handled automatically by Agentex
# 2. Message Processing - Your responsibility
@acp.on_message_send
async def handle_message_send(params: SendMessageParams):
pass
# 3. Task Completion - Unnecessary in Sync Agents
Async Agents - Full Lifecycle Management¶
Async Agents (both Base and Temporal) give you complete control over task lifecycle with event-driven handlers. Unlike Sync Agents, you explicitly manage when tasks start, how they process events, and when they clean up.
What Agentex Handles (Automatic):
- Routes lifecycle events to your three handlers (
on_task_create,on_task_event_send,on_task_cancel) - Delivers events asynchronously (events are notifications, not persisted messages - you decide what to persist)
- Returns responses to clients
What You Implement (Your Code):
- Initialize tasks in
@acp.on_task_create(send welcome messages, set up state) - Process events in
@acp.on_task_event_send(handle user messages, business logic) - Clean up in
@acp.on_task_cancel(archive data, release resources) - Explicitly persist ALL messages using
adk.messages.create()- including incoming user messages (this gives you control to preprocess or filter ephemeral events like webhooks before saving them) - Manage your own state using
adk.stateAPI (Base) or workflow variables (Temporal)
Why This Works:
You have full control over the task lifecycle, allowing complex workflows like multi-step processes, conditional logic, and long-running operations. The trade-off is you must explicitly manage message creation and state - Agentex provides the infrastructure but you orchestrate the flow.
Lifecycle Overview:
# 1. Task Creation - Initialize whatever you need
@acp.on_task_create
async def handle_task_create(params: CreateTaskParams):
# Set up initial state, send welcome messages
pass
# 2. Task Processing - Handle ongoing interactions
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# Handle user messages, business logic, update state, send responses
pass
# 3. Task Completion - Clean up and archive
@acp.on_task_cancel
async def handle_task_cancel(params: CancelTaskParams):
# Archive data, release resources
pass
Base vs Temporal: State Persistence
With Temporal: Your workflow state (class variables) persists automatically across restarts and failures. No manual state management needed.
With Base: You must manually persist state using the adk.state API.
Learn more in the Temporal Development Overview.