Skip to content

Temporal Async ACP

Temporal Async ACP provides production-ready agent development with durable execution, fault tolerance, and automatic state management. You don't write the ACP handlers yourself - instead, you implement Temporal workflow methods and Agentex automatically handles the ACP mapping for you.

Core Characteristics

  • No explicit ACP handlers - You implement workflow methods instead:
    • @workflow.run (replaces @acp.on_task_create)
    • @workflow.signal (replaces @acp.on_task_event_send)
    • Task cancellation automatically handled (replaces @acp.on_task_cancel)
  • Automatic ACP mapping - Agentex handles the mapping for you, no manual configuration needed
  • Durable execution - Workflow state persisted automatically with event history
  • Fault tolerance - Built-in automatic retries and recovery
  • Production-ready - Designed for enterprise-grade reliability

Message Flow

sequenceDiagram
    participant Client
    participant Agentex
    participant TemporalACP
    participant Workflow

    Client->>Agentex: Create Task
    Agentex->>TemporalACP: on_task_create
    TemporalACP->>Workflow: @workflow.run(params)
    Workflow->>Workflow: Initialize State
    Workflow-->>TemporalACP: Workflow Running
    TemporalACP-->>Agentex: Task Created
    Agentex-->>Client: Task ID

    Client->>Agentex: Send Event
    Agentex->>TemporalACP: on_task_event_send
    TemporalACP->>Workflow: @workflow.signal(params)
    Workflow->>Workflow: Process Event
    Workflow->>Agentex: Create Messages
    Agentex-->>Client: Stream Response

    Client->>Agentex: Cancel Task
    Agentex->>TemporalACP: on_task_cancel
    TemporalACP->>Workflow: Cancel Signal
    Workflow->>Workflow: Cleanup
    Workflow-->>TemporalACP: Workflow Complete

Basic Implementation

Workflow Implementation

from temporalio import workflow
from agentex import adk
from agentex.lib.types.acp import CreateTaskParams, SendEventParams
from agentex.core.temporal.workflows.workflow import BaseWorkflow
from agentex.core.temporal.types.workflow import SignalName
from agentex.types.message_author import MessageAuthor
from agentex.types.text_content import TextContent

@workflow.defn(name="my-agent-workflow")
class MyAgentWorkflow(BaseWorkflow):
    def __init__(self):
        super().__init__(display_name="My Agent")
        self._complete_task = False

    @workflow.run
    async def on_task_create(self, params: CreateTaskParams) -> str:
        """
        Replaces @acp.on_task_create - Agentex maps this automatically
        Initialize new tasks - setup state, send welcome messages
        """

        # Send initial message
        await adk.messages.create(
            task_id=params.task.id,
            content=TextContent(
                author=MessageAuthor.AGENT,
                content="Hello! Task created."
            ),
        )

        # Wait for task completion
        await workflow.wait_condition(lambda: self._complete_task)
        return "Task completed"

    @workflow.signal(name=SignalName.RECEIVE_EVENT)
    async def on_task_event_send(self, params: SendEventParams) -> None:
        """
        Replaces @acp.on_task_event_send - Agentex maps this automatically
        Process events during task lifetime - core business logic
        """

        # Process event and send response
        await adk.messages.create(
            task_id=params.task.id,
            content=TextContent(
                author=MessageAuthor.AGENT,
                content=f"You said: {params.event.content.content}"
            ),
        )

    # Note: @acp.on_task_cancel is automatically handled by Agentex and Temporal
    # No implementation needed

ACP Configuration

import os
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.types.fastacp import TemporalACPConfig

# Create the ACP server
acp = FastACP.create(
    acp_type="async",
    config=TemporalACPConfig(
        type="temporal",
        temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
    )
)

# No handlers to register - Agentex automatically maps ACP to your workflow methods:
# @acp.on_task_create → @workflow.run
# @acp.on_task_event_send → @workflow.signal(name=SignalName.RECEIVE_EVENT)
# @acp.on_task_cancel → Automatically handled

Worker Configuration

import asyncio
from agentex.core.temporal.activities import get_all_activities
from agentex.core.temporal.workers.worker import AgentexWorker
from agentex.environment_variables import EnvironmentVariables
from workflow import MyAgentWorkflow

environment_variables = EnvironmentVariables.refresh()

async def main():
    task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
    if task_queue_name is None:
        raise ValueError("WORKFLOW_TASK_QUEUE is not set")

    worker = AgentexWorker(task_queue=task_queue_name)

    # get_all_activities() returns all temporal activities required for ADK
    await worker.run(
        activities=get_all_activities(),
        workflow=MyAgentWorkflow,
    )

if __name__ == "__main__":
    asyncio.run(main())

Workflow Parameters

CreateTaskParams

Used in @workflow.run (replaces @acp.on_task_create):

Bases: BaseModel

Parameters for task/create method.

Attributes:

Name Type Description
agent Agent

The agent that the task was sent to.

task Task

The task to be created.

params dict[str, Any] | None

The parameters for the task as inputted by the user.

request dict[str, Any] | None

Additional request context including headers forwarded to this agent.

SendEventParams

Used in @workflow.signal(name=SignalName.RECEIVE_EVENT) (replaces @acp.on_task_event_send):

Bases: BaseModel

Parameters for event/send method.

Attributes:

Name Type Description
agent Agent

The agent that the event was sent to.

task Task

The task that the message was sent to.

event Event

The event that was sent to the agent.

request dict[str, Any] | None

Additional request context including headers forwarded to this agent.