Implementation Guide¶
Critical Implementation Considerations¶
Deterministic Replay Requirements¶
❌ Avoid in workflows:
@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def bad_workflow_handler(self, params):
current_time = datetime.now() # Non-deterministic!
random_id = uuid.uuid4() # Non-deterministic!
api_response = requests.get() # Side effect!
✅ Correct approach:
@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def good_workflow_handler(self, params):
current_time = workflow.now() # Deterministic
random_id = workflow.uuid4() # Deterministic
# Side effects go through Agentex ADK (activities)
await adk.messages.create(...)
Activity vs Workflow Boundaries¶
- Workflows: Pure, deterministic business logic
- Activities: All external interactions (APIs, databases, file I/O)
Common Implementation Issues¶
Data Serialization Limits¶
# ❌ Don't pass huge objects between activities
@activity.defn
async def process_data(giant_dataset: List[Dict]): # Can hit 2MB limit
...
# ✅ Pass references instead
@activity.defn
async def process_data(dataset_id: str): # Load data inside activity
dataset = await load_from_database(dataset_id)
Why: Temporal has a 2MB limit per workflow event. Large data structures will fail.
Workflow State Size Explosion¶
# ❌ Don't accumulate unbounded state
async def bad_workflow():
all_responses = [] # This grows forever!
while True:
response = await get_response()
all_responses.append(response) # Memory leak in workflow state
Why: Workflow state is replayed from the beginning. Growing state = slower replays.
Activity Retry Idempotency¶
# ❌ Non-idempotent operations will duplicate
@activity.defn
async def send_email(user_id: str):
await email_service.send(f"Welcome {user_id}!") # Sends multiple emails on retry
# ✅ Make activities idempotent
@activity.defn
async def send_email(user_id: str, idempotency_key: str):
await email_service.send_once(key=idempotency_key, ...)
Why: Activities retry automatically. Non-idempotent operations will run multiple times.
Async/Await in Wrong Places¶
# ❌ Don't await non-Temporal async calls in workflows
@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def bad_workflow_handler(self, params):
await asyncio.sleep(60) # Will break replay!
response = requests.get("...") # Non-deterministic!
# ✅ Use AgentEx ADK for external operations
@workflow.signal(name=SignalName.RECEIVE_EVENT)
async def good_workflow_handler(self, params):
# All external operations go through ADK (activities)
await adk.messages.create(...) # Replay-safe
test_agent = Agent(...)
result = await Runner.run(...) # Replay-safe
Why: Non-Agentex async operations aren't tracked in event history and break deterministic replay.