Migration Guide: Converting Between ACP Types¶
This guide provides comprehensive, step-by-step instructions for migrating agents between complexity levels as your requirements evolve. Each migration path includes real examples, common challenges, and testing strategies.
Migration Paths Overview¶
| From | To | When to Migrate | Complexity Increase |
|---|---|---|---|
| Sync ACP (~30 lines) | Base Async ACP (~80 lines) | Need state management, lifecycle control | Medium |
| Base Async ACP (~80 lines) | Temporal Async ACP (~150+ lines) | Production reliability, enterprise scale | High |
| Sync ACP (~30 lines) | Temporal Async ACP (~150+ lines) | Direct to production (skip intermediate) | Very High |
Part 1: Sync ACP → Base Async ACP¶
When to Migrate¶
Upgrade Triggers - Migrate when you need:
- ✅ Complex state management across multiple interactions
- ✅ Explicit lifecycle control (initialization, cleanup)
- ✅ Event-driven processing patterns
- ✅ Resource coordination between requests
- ✅ Advanced debugging and monitoring
Warning Signs in your Sync ACP:
- State management becoming complex with multiple
adk.state.update()calls - Need for initialization logic before first message
- Requirements for cleanup after task completion
- Multiple API calls that need coordination
- Growing handler function (>50 lines)
Step-by-Step Conversion Process¶
Step 1: Create the New Handler Structure¶
Before (Sync ACP):
from agentex.sdk.fastacp.fastacp import FastACP
from agentex.types.acp import SendMessageParams
from agentex.types.task_messages import TextContent, MessageAuthor
acp = FastACP.create(acp_type="sync")
@acp.on_message_send
async def handle_message_send(params: SendMessageParams):
# Your existing logic here
response = await process_message(params.content.content)
return TextContent(author=MessageAuthor.AGENT, content=response)
After (Base Async ACP):
from agentex.sdk.fastacp.fastacp import FastACP
from agentex.types.acp import CreateTaskParams, SendEventParams, CancelTaskParams
from agentex.types.task_messages import TextContent, MessageAuthor
from agentex import adk
acp = FastACP.create(
acp_type="async",
acp_config=AsyncACPConfig(acp_type="base")
)
@acp.on_task_create
async def handle_task_create(params: CreateTaskParams):
# Initialize state (new in Async)
await adk.state.create(
task_id=params.task.id,
agent_id=params.agent.id,
state={"initialized": True, "message_count": 0}
)
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# Your migrated logic here
if params.event.content:
response = await process_message(params.event.content.content)
# Must manually create messages (new requirement)
await adk.messages.create(
task_id=params.task.id,
content=TextContent(author=MessageAuthor.AGENT, content=response)
)
@acp.on_task_cancel
async def handle_task_cancel(params: CancelTaskParams):
# Cleanup logic (new in Async)
await cleanup_resources(params.task.id)
Step 2: Handle State Migration¶
Key Changes:
- State Initialization: Move any setup logic to
on_task_create - Message Creation: Manually create all messages with
adk.messages.create() - State Updates: Update state in
on_task_event_sendas needed - Resource Cleanup: Add cleanup logic in
on_task_cancel
Example: Chat Bot Migration¶
Original Sync ACP (35 lines):
from agentex.sdk.fastacp.fastacp import FastACP
from agentex.types.acp import SendMessageParams
from agentex.types.task_messages import TextContent, MessageAuthor
from agentex import adk
acp = FastACP.create(acp_type="sync")
@acp.on_message_send
async def handle_message_send(params: SendMessageParams) -> TextContent:
"""Simple chatbot with conversation history"""
# Get conversation history
messages = await adk.messages.list(task_id=params.task.id)
conversation_context = build_conversation_context(messages)
# Generate response
response = await generate_ai_response(
user_input=params.content.content,
context=conversation_context
)
return TextContent(
author=MessageAuthor.AGENT,
content=response
)
def build_conversation_context(messages):
"""Helper function - stays the same"""
return [msg.content.content for msg in messages[-5:]]
async def generate_ai_response(user_input, context):
"""Helper function - stays the same"""
return f"You said: {user_input}. Context: {len(context)} previous messages"
Migrated Base Async ACP (85 lines):
from agentex.sdk.fastacp.fastacp import FastACP
from agentex.types.acp import CreateTaskParams, SendEventParams, CancelTaskParams
from agentex.types.task_messages import TextContent, MessageAuthor
from agentex.types.fastacp import AsyncACPConfig
from agentex import adk
acp = FastACP.create(
acp_type="async",
acp_config=AsyncACPConfig(acp_type="base")
)
@acp.on_task_create
async def handle_task_create(params: CreateTaskParams):
"""Initialize conversation state"""
await adk.state.create(
task_id=params.task.id,
agent_id=params.agent.id,
state={
"conversation_started": True,
"total_messages": 0,
"user_preferences": {}
}
)
# Send welcome message
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author=MessageAuthor.AGENT,
content="Hello! I'm ready to chat. How can I help you today?"
)
)
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
"""Process conversation events with enhanced state tracking"""
if not params.event.content:
return
# Update state
current_state = await adk.state.get(task_id=params.task.id)
current_state["total_messages"] += 1
await adk.state.update(
task_id=params.task.id,
state=current_state
)
# Get conversation history (same logic as before)
messages = await adk.messages.list(task_id=params.task.id)
conversation_context = build_conversation_context(messages)
# Generate response (same logic as before)
response = await generate_ai_response(
user_input=params.event.content.content,
context=conversation_context,
state=current_state # Now we can pass state!
)
# Manually create response message (new requirement)
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author=MessageAuthor.AGENT,
content=response
)
)
@acp.on_task_cancel
async def handle_task_cancel(params: CancelTaskParams):
"""Clean up conversation resources"""
# Log conversation summary
state = await adk.state.get(task_id=params.task.id)
print(f"Conversation ended. Total messages: {state.get('total_messages', 0)}")
# Could add: save conversation to database, cleanup external resources, etc.
# Helper functions (mostly unchanged)
def build_conversation_context(messages):
"""Helper function - same as before"""
return [msg.content.content for msg in messages[-5:]]
async def generate_ai_response(user_input, context, state=None):
"""Enhanced helper function - now uses state"""
message_count = state.get('total_messages', 0) if state else 0
return f"You said: {user_input}. Context: {len(context)} previous messages. Session: {message_count} total messages"
Key Migration Changes:
- Handler Structure: 1 handler → 3 handlers
- State Management: Optional → Required initialization
- Message Creation: Automatic → Manual with
adk.messages.create() - Lifecycle Control: None → Explicit create/cancel handlers
- Enhanced Capabilities: Can now track conversation state, send welcome messages, cleanup resources
Part 2: Base Async ACP → Temporal Async ACP¶
When to Migrate¶
Upgrade Triggers - Migrate when you need:
- ✅ Production reliability with automatic retries
- ✅ Long-running workflows (hours, days, weeks)
- ✅ Enterprise-grade durability that survives restarts
- ✅ Complex state coordination across multiple activities
- ✅ Guaranteed execution with distributed processing
Warning Signs in your Base Async ACP:
- Manual retry logic becoming complex
- State coordination across multiple async operations
- Need for workflow persistence across server restarts
- Requirements for enterprise SLAs and reliability
- Complex error handling and recovery scenarios
Step-by-Step Conversion Process¶
Step 1: Create Temporal Workflow Structure¶
Before (Base Async ACP):
acp = FastACP.create(
acp_type="async",
acp_config=AsyncACPConfig(acp_type="base")
)
@acp.on_task_create
async def handle_task_create(params: CreateTaskParams):
await adk.state.create(task_id=params.task.id, agent_id=params.agent.id, state={})
@acp.on_task_event_send
async def handle_event_send(params: SendEventParams):
# Process events directly
result = await some_api_call(params.event.content)
await adk.messages.create(task_id=params.task.id, content=result)
After (Temporal Async ACP):
from temporalio import workflow, activity
from agentex.core.temporal.workflow import AgentexWorkflow
acp = FastACP.create(
acp_type="async",
acp_config=AsyncACPConfig(acp_type="temporal")
)
@workflow.defn
class MyAgentWorkflow(AgentexWorkflow):
def __init__(self):
super().__init__()
self.state = {"initialized": False}
@workflow.run
async def run_workflow(self, params):
# Workflow orchestration logic
await workflow.execute_activity(
initialize_agent,
params,
start_to_close_timeout=timedelta(seconds=30)
)
# Process events through activities
async for event in self.events():
result = await workflow.execute_activity(
process_event_activity,
event,
start_to_close_timeout=timedelta(minutes=5)
)
@activity.defn
async def initialize_agent(params):
"""Temporal activity - automatically retried"""
await adk.state.create(task_id=params.task.id, agent_id=params.agent.id, state={})
@activity.defn
async def process_event_activity(event):
"""Temporal activity - automatically retried"""
result = await some_api_call(event.content)
await adk.messages.create(task_id=event.task.id, content=result)
return result
@acp.workflow(MyAgentWorkflow)
async def workflow_handler():
return MyAgentWorkflow()
Step 2: Wrap Operations in Activities¶
Key Principle: Any operation that could fail or has side effects should be wrapped in a Temporal activity.
Before (Direct calls):
# Direct API calls (can fail, no retries)
result = await external_api.call(data)
await database.save(result)
await send_notification(result)
After (Temporal activities):
# Wrapped in activities (automatic retries, durability)
result = await workflow.execute_activity(
external_api_activity,
data,
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(maximum_attempts=3)
)
await workflow.execute_activity(
database_save_activity,
result,
start_to_close_timeout=timedelta(seconds=30)
)
await workflow.execute_activity(
notification_activity,
result,
start_to_close_timeout=timedelta(seconds=10)
)
Key Migration Benefits:
- Automatic Retries: All activities retry automatically on failure
- Durable State: Workflow state persists across server restarts
- Enterprise Reliability: Guaranteed execution with Temporal's durability
- Complex Orchestration: Easy to coordinate multiple activities
- Fault Tolerance: Comprehensive error handling and recovery
Common Migration Challenges & Solutions¶
Challenge 1: State Management Changes¶
Problem: Different state APIs between ACP types
Solution: Create adapter functions:
# Migration helper
async def migrate_state(task_id: str, from_type: str, to_type: str):
if from_type == "sync" and to_type == "async":
# Sync uses automatic state, Async needs explicit creation
existing_messages = await adk.messages.list(task_id=task_id)
initial_state = {"migrated_message_count": len(existing_messages)}
await adk.state.create(task_id=task_id, state=initial_state)
Challenge 2: Message Creation Differences¶
Problem: Sync auto-creates messages, Async requires manual creation
Solution: Create message creation helpers:
# Migration helper
async def create_agent_message(task_id: str, content: str):
"""Standardized message creation for migrations"""
await adk.messages.create(
task_id=task_id,
content=TextContent(author=MessageAuthor.AGENT, content=content)
)
Summary¶
Migration between ACP types follows clear patterns. Test your migrated agent with the same inputs to ensure equivalent behavior before deploying to production.