Skip to content

Agent Development Kit (ADK) Reference

ADK vs SDK

ADK (Agent Development Kit) - Use within agent code to interact with Agentex infrastructure (streaming, tracing, state management, etc.)

SDK (Software Development Kit) - Use to make requests to the Agentex server via its REST API. View SDK Docs →

This is the API reference for the Agentex Python ADK. Use this ADK to leverage convenient abstractions for interacting with clients through Agentex's Agentic Infrastructure. This ADK makes things like streaming, tracing, tool calling, and communicating with clients in an async manner easy.

Agent Development Kit (ADK)

Use the following modules to interact with the core functionality of Agentex.

agentex.lib.adk.acp

Use the following functions to interact with Agentex agents through the Agent 2 Client Protocol (ACP). Each agent handles standard ACP functions and can be communicated with through these methods.

Module for managing Agent to Client Protocol (ACP) agent operations in Agentex.

This interface provides high-level methods for interacting with the agent through the ACP.

Initialize the ACP module.

Parameters:

Name Type Description Default
acp_activities Optional[ACPActivities]

Optional pre-configured ACP activities. If None, will be auto-initialized.

required

cancel_task async

cancel_task(
    task_id: str | None = None,
    task_name: str | None = None,
    agent_id: str | None = None,
    agent_name: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    request: dict[str, Any] | None = None,
) -> Task

Cancel a task by sending cancel request to the agent that owns the task.

Parameters:

Name Type Description Default
task_id str | None

ID of the task to cancel.

None
task_name str | None

Name of the task to cancel.

None
agent_id str | None

ID of the agent that owns the task.

None
agent_name str | None

Name of the agent that owns the task.

None
trace_id str | None

The trace ID for the task.

None
parent_span_id str | None

The parent span ID for the task.

None
start_to_close_timeout timedelta

The start to close timeout for the task.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the task.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy for the task.

DEFAULT_RETRY_POLICY
request dict[str, Any] | None

Additional request context including headers to forward to the agent.

None

Returns:

Type Description
Task

The task entry.

Raises:

Type Description
ValueError

If neither agent_name nor agent_id is provided, or if neither task_name nor task_id is provided

create_task async

create_task(
    name: str | None = None,
    agent_id: str | None = None,
    agent_name: str | None = None,
    params: dict[str, Any] | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    request: dict[str, Any] | None = None,
) -> Task

Create a new task.

Parameters:

Name Type Description Default
name str | None

The name of the task.

None
agent_id str | None

The ID of the agent to create the task for.

None
agent_name str | None

The name of the agent to create the task for.

None
params dict[str, Any] | None

The parameters for the task.

None
start_to_close_timeout timedelta

The start to close timeout for the task.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the task.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy for the task.

DEFAULT_RETRY_POLICY
request dict[str, Any] | None

Additional request context including headers to forward to the agent.

None

Returns:

Type Description
Task

The task entry.

send_event async

send_event(
    task_id: str,
    content: TaskMessageContent,
    agent_id: str | None = None,
    agent_name: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    request: dict[str, Any] | None = None,
) -> Event

Send an event to a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task to send the event to.

required
content TaskMessageContent

The content to send to the event.

required
agent_id str | None

The ID of the agent to send the event to.

None
agent_name str | None

The name of the agent to send the event to.

None
trace_id str | None

The trace ID for the event.

None
parent_span_id str | None

The parent span ID for the event.

None
start_to_close_timeout timedelta

The start to close timeout for the event.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the event.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy for the event.

DEFAULT_RETRY_POLICY
request dict[str, Any] | None

Additional request context including headers to forward to the agent.

None

Returns:

Type Description
Event

The event entry.

send_message async

send_message(
    content: TaskMessageContent,
    task_id: str | None = None,
    agent_id: str | None = None,
    agent_name: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    request: dict[str, Any] | None = None,
) -> List[TaskMessage]

Send a message to a task.

Parameters:

Name Type Description Default
content TaskMessageContent

The task message content to send to the task.

required
task_id str | None

The ID of the task to send the message to.

None
agent_id str | None

The ID of the agent to send the message to.

None
agent_name str | None

The name of the agent to send the message to.

None
trace_id str | None

The trace ID for the message.

None
parent_span_id str | None

The parent span ID for the message.

None
start_to_close_timeout timedelta

The start to close timeout for the message.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the message.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy for the message.

DEFAULT_RETRY_POLICY
request dict[str, Any] | None

Additional request context including headers to forward to the agent.

None

Returns:

Type Description
List[TaskMessage]

The message entry.

agentex.lib.adk.tasks

Use the following functions to perform CRUD operations on tasks in Agentex.

Warning

Task creation is handled by the agentex.lib.adk.acp module, but you can use this module to get and delete tasks.

Module for managing tasks in Agentex. Provides high-level async methods for retrieving, listing, and deleting tasks.

delete async

delete(
    *,
    task_id: str | None = None,
    task_name: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> Task

Delete a task by ID or name. Args: task_id: The ID of the task to delete. task_name: The name of the task to delete. Returns: The deleted task entry.

get async

get(
    *,
    task_id: str | None = None,
    task_name: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> TaskRetrieveResponse | TaskRetrieveByNameResponse

Get a task by ID or name. Args: task_id: The ID of the task to retrieve. task_name: The name of the task to retrieve. Returns: The task entry.

agentex.lib.adk.messages

Use the following functions to perform CRUD operations on messages in Agentex.

Warning

Message creation here is pure CRUD, to send a message to an agent, use the agentex.lib.adk.acp module instead.

Module for managing task messages in Agentex. Provides high-level async methods for creating, retrieving, updating, and deleting messages.

create async

create(
    task_id: str,
    content: TaskMessageContent,
    emit_updates: bool = True,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> TaskMessage

Create a new message for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
message TaskMessage

The message to create.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
TaskMessageEntity TaskMessage

The created message.

create_batch async

create_batch(
    task_id: str,
    contents: list[TaskMessageContent],
    emit_updates: bool = True,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> list[TaskMessage]

Create a batch of messages for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
messages List[TaskMessage]

The messages to create.

required
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
list[TaskMessage]

List[TaskMessageEntity]: The created messages.

list async

list(
    task_id: str,
    limit: int | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> list[TaskMessage]

List messages for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
limit Optional[int]

The maximum number of messages to return.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
list[TaskMessage]

List[TaskMessageEntity]: The list of messages.

update async

update(
    task_id: str,
    message_id: str,
    content: TaskMessageContent,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> TaskMessage

Update a message for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
message_id str

The ID of the message.

required
message TaskMessage

The message to update.

required
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
TaskMessageEntity TaskMessage

The updated message.

update_batch async

update_batch(
    task_id: str,
    updates: dict[str, TaskMessageContent],
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> list[TaskMessage]

Update a batch of messages for a task.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
updates Dict[str, TaskMessage]

The updates to apply to the messages.

required
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
list[TaskMessage]

List[TaskMessageEntity]: The updated messages.

agentex.lib.adk.state

Use the following functions to perform CRUD operations on state in Agentex. State is uniquely identified by task and the agent that created it. The idea is that the agent is independently working on the task on its own and stores its working context in a state object for long-term memory.

Module for managing task state in Agentex. Provides high-level async methods for creating, retrieving, updating, and deleting state.

create async

create(
    task_id: str,
    agent_id: str,
    state: dict[str, Any] | BaseModel,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> State

Create a new state for a task and agent.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
agent_id str

The ID of the agent.

required
state Dict[str, Any]

The state to create.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
State State

The created state.

delete async

delete(
    state_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> State

Delete a state by ID.

Parameters:

Name Type Description Default
state_id str

The ID of the state.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
State State

The deleted state.

get async

get(
    state_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> State | None

Get a state by ID.

Parameters:

Name Type Description Default
state_id str

The ID of the state.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
State | None

Optional[State]: The state if found, None otherwise.

get_by_task_and_agent async

get_by_task_and_agent(
    task_id: str,
    agent_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> State | None

Get a state by task and agent ID. A state is uniquely identified by task and the agent that created it.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
agent_id str

The ID of the agent.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
State | None

Optional[State]: The state if found, None otherwise.

update async

update(
    state_id: str,
    task_id: str,
    agent_id: str,
    state: dict[str, Any] | BaseModel,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> State

Update a state by ID.

Parameters:

Name Type Description Default
state_id str

The ID of the state.

required
task_id str

The ID of the task.

required
agent_id str

The ID of the agent.

required
state Dict[str, Any]

The state to update.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
State State

The updated state.

agentex.lib.adk.streaming

This is a low-level module that allows you to stream intermediate results to clients. Most high level agentex.lib.adk.providers modules handle streaming for you, but you can use this if you are using an unsupported provider or want to stream something that isn't supported by the high level modules.

Module for streaming content to clients in Agentex.

This interface wraps around the StreamingService and provides a high-level API for streaming events to clients, supporting both synchronous and asynchronous (Temporal workflow) contexts.

Initialize the streaming interface.

Parameters:

Name Type Description Default
streaming_service Optional[StreamingService]

Optional StreamingService instance. If not provided, a new service will be created with default parameters.

None

streaming_task_message_context

streaming_task_message_context(
    task_id: str, initial_content: TaskMessageContent
) -> StreamingTaskMessageContext

Create a streaming context for managing TaskMessage lifecycle.

This is a context manager that automatically creates a TaskMessage, sends START event, and sends DONE event when the context exits. Perfect for simple streaming scenarios.

Parameters:

Name Type Description Default
task_id str

The ID of the task

required
initial_content TaskMessageContent

The initial content for the TaskMessage

required
agentex_client

The agentex client for creating/updating messages

required

Returns:

Name Type Description
StreamingTaskMessageContext StreamingTaskMessageContext

Context manager for streaming operations

agentex.lib.adk.tracing

This is a low-level module that allows you to start and end spans in the trace. Most high level agentex.lib.adk modules handle tracing for you at the lowest levels, but if you want to build a hierarchy of trace spans (for better readability), you should create your own spans with this module, so you can group lower level traces together. Do this by assigning the parent_span_id of downstream functions to the spans you create.

Module for managing tracing and span operations in Agentex. Provides high-level async methods for starting, ending, and managing spans for distributed tracing.

Initialize the tracing interface.

Parameters:

Name Type Description Default
tracing_service Optional[TracingService]

Optional pre-configured tracing service. If None, will be lazily created on first use so the httpx client is bound to the correct running event loop.

None

end_span async

end_span(
    trace_id: str,
    span: Span,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=1),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> Span

End an existing span in the trace.

Parameters:

Name Type Description Default
trace_id str

The trace ID for the span.

required
span Span

The span to end.

required
start_to_close_timeout timedelta

The start to close timeout for the span.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the span.

timedelta(seconds=1)
retry_policy RetryPolicy

The retry policy for the span.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
Span Span

The ended span object.

span async

span(
    trace_id: str,
    name: str,
    input: list[Any]
    | dict[str, Any]
    | BaseModel
    | None = None,
    data: list[Any]
    | dict[str, Any]
    | BaseModel
    | None = None,
    parent_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> AsyncGenerator[Span | None, None]

Async context manager for creating and automatically closing a span. Yields the started span object. The span is automatically ended when the context exits.

If trace_id is falsy, acts as a no-op context manager.

Parameters:

Name Type Description Default
trace_id str

The trace ID for the span.

required
name str

The name of the span.

required
input Union[List, Dict, BaseModel]

The input for the span.

None
parent_id Optional[str]

The parent span ID for the span.

None
data Optional[Union[List, Dict, BaseModel]]

The data for the span.

None
start_to_close_timeout timedelta

The start to close timeout for the span.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the span.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy for the span.

DEFAULT_RETRY_POLICY

Returns:

Type Description
AsyncGenerator[Span | None, None]

AsyncGenerator[Optional[Span], None]: An async generator that yields the started span object.

start_span async

start_span(
    trace_id: str,
    name: str,
    input: list[Any]
    | dict[str, Any]
    | BaseModel
    | None = None,
    parent_id: str | None = None,
    data: list[Any]
    | dict[str, Any]
    | BaseModel
    | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=1),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> Span | None

Start a new span in the trace.

Parameters:

Name Type Description Default
trace_id str

The trace ID for the span.

required
name str

The name of the span.

required
input Union[List, Dict, BaseModel]

The input for the span.

None
parent_id Optional[str]

The parent span ID for the span.

None
data Optional[Union[List, Dict, BaseModel]]

The data for the span.

None
start_to_close_timeout timedelta

The start to close timeout for the span.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout for the span.

timedelta(seconds=1)
retry_policy RetryPolicy

The retry policy for the span.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
Span Span | None

The started span object.

agentex.lib.adk.events

Use the following functions to retrieve and list events in Agentex. Events represent activity within tasks and are useful for tracking agent interactions and system events.

Module for managing events in Agentex. Provides high-level async methods for retrieving and listing events.

get async

get(
    event_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> Event | None

Get an event by ID.

Parameters:

Name Type Description Default
event_id str

The ID of the event.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
Event | None

Optional[Event]: The event if found, None otherwise.

list_events async

list_events(
    task_id: str,
    agent_id: str,
    last_processed_event_id: str | None = None,
    limit: int | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> list[Event]

List events for a specific task and agent.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
agent_id str

The ID of the agent.

required
last_processed_event_id Optional[str]

Optional event ID to get events after this ID.

None
limit Optional[int]

Optional limit on number of results.

None
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Type Description
list[Event]

List[Event]: List of events ordered by sequence_id.

agentex.lib.adk.agent_task_tracker

Use the following functions to manage agent task trackers in Agentex. Agent task trackers help monitor the processing status of agents working on specific tasks, including the last processed event ID and current status.

Module for managing agent task trackers in Agentex. Provides high-level async methods for retrieving, filtering, and updating agent task trackers.

get async

get(
    tracker_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> AgentTaskTracker

Get an agent task tracker by ID.

Parameters:

Name Type Description Default
tracker_id str

The ID of the tracker.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
AgentTaskTracker AgentTaskTracker

The agent task tracker.

get_by_task_and_agent async

get_by_task_and_agent(
    task_id: str,
    agent_id: str,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> AgentTaskTracker | None

Get an agent task tracker by task ID and agent ID.

update async

update(
    tracker_id: str,
    last_processed_event_id: str | None = None,
    status: str | None = None,
    status_reason: str | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=5
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=5),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> AgentTaskTracker

Update an agent task tracker.

Parameters:

Name Type Description Default
tracker_id str

The ID of the tracker to update.

required
request UpdateAgentTaskTrackerRequest

The update request containing the new values.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=5)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=5)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
AgentTaskTracker AgentTaskTracker

The updated agent task tracker.

External Providers

These are the modules that handle external provider functionality. The conveniently wrap external provider functionality in a high level interface that is easy to use and already handles streaming, tracing, and other core functionality for you.

agentex.lib.adk.providers.openai

Use the following module to interact with common OpenAI functions.

Module for managing OpenAI agent operations in Agentex. Provides high-level methods for running agents with and without streaming.

run_agent async

run_agent(
    input_list: list[dict[str, Any]],
    agent_name: str,
    agent_instructions: str,
    mcp_server_params: list[StdioServerParameters]
    | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=600
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=600),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    handoff_description: str | None = None,
    handoffs: list[Agent] | None = None,
    model: str | None = None,
    model_settings: ModelSettings | None = None,
    tools: list[Tool] | None = None,
    output_type: type[Any]
    | AgentOutputSchemaBase
    | None = None,
    tool_use_behavior: Literal[
        "run_llm_again", "stop_on_first_tool"
    ]
    | StopAtTools
    | ToolsToFinalOutputFunction = "run_llm_again",
    mcp_timeout_seconds: int | None = None,
    input_guardrails: list[InputGuardrail] | None = None,
    output_guardrails: list[OutputGuardrail] | None = None,
    max_turns: int | None = None,
    previous_response_id: str | None = None,
) -> SerializableRunResult | RunResult

Run an agent without streaming or TaskMessage creation.

DEFAULT: No TaskMessage creation, returns only the result.

Parameters:

Name Type Description Default
input_list list[dict[str, Any]]

List of input data for the agent.

required
mcp_server_params list[StdioServerParameters] | None

MCP server parameters for the agent.

None
agent_name str

The name of the agent to run.

required
agent_instructions str

Instructions for the agent.

required
trace_id str | None

Optional trace ID for tracing.

None
parent_span_id str | None

Optional parent span for tracing.

None
start_to_close_timeout timedelta

Maximum time allowed for the operation.

timedelta(seconds=600)
heartbeat_timeout timedelta

Maximum time between heartbeats.

timedelta(seconds=600)
retry_policy RetryPolicy

Policy for retrying failed operations.

DEFAULT_RETRY_POLICY
handoff_description str | None

Optional description of the handoff.

None
handoffs list[Agent] | None

Optional list of handoffs.

None
model str | None

Optional model to use.

None
model_settings ModelSettings | None

Optional model settings.

None
tools list[Tool] | None

Optional list of tools.

None
output_type type[Any] | AgentOutputSchemaBase | None

Optional output type.

None
tool_use_behavior Literal['run_llm_again', 'stop_on_first_tool'] | StopAtTools | ToolsToFinalOutputFunction

Optional tool use behavior.

'run_llm_again'
mcp_timeout_seconds int | None

Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds.

None
input_guardrails list[InputGuardrail] | None

Optional list of input guardrails to run on initial user input.

None
output_guardrails list[OutputGuardrail] | None

Optional list of output guardrails to run on final agent output.

None
max_turns int | None

Maximum number of turns the agent can take. Uses Runner's default if None.

None
previous_response_id str | None

Optional previous response ID for conversation continuity.

None

Returns:

Type Description
SerializableRunResult | RunResult

Union[SerializableRunResult, RunResult]: SerializableRunResult when in Temporal, RunResult otherwise.

run_agent_auto_send async

run_agent_auto_send(
    task_id: str,
    input_list: list[dict[str, Any]],
    agent_name: str,
    agent_instructions: str,
    mcp_server_params: list[StdioServerParameters]
    | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=600
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=600),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    handoff_description: str | None = None,
    handoffs: list[Agent] | None = None,
    model: str | None = None,
    model_settings: ModelSettings | None = None,
    tools: list[Tool] | None = None,
    output_type: type[Any]
    | AgentOutputSchemaBase
    | None = None,
    tool_use_behavior: Literal[
        "run_llm_again", "stop_on_first_tool"
    ]
    | StopAtTools
    | ToolsToFinalOutputFunction = "run_llm_again",
    mcp_timeout_seconds: int | None = None,
    input_guardrails: list[InputGuardrail] | None = None,
    output_guardrails: list[OutputGuardrail] | None = None,
    max_turns: int | None = None,
    previous_response_id: str | None = None,
) -> SerializableRunResult | RunResult

Run an agent with automatic TaskMessage creation.

Parameters:

Name Type Description Default
task_id str

The ID of the task to run the agent for.

required
input_list list[dict[str, Any]]

List of input data for the agent.

required
mcp_server_params list[StdioServerParameters] | None

MCP server parameters for the agent.

None
agent_name str

The name of the agent to run.

required
agent_instructions str

Instructions for the agent.

required
trace_id str | None

Optional trace ID for tracing.

None
parent_span_id str | None

Optional parent span for tracing.

None
start_to_close_timeout timedelta

Maximum time allowed for the operation.

timedelta(seconds=600)
heartbeat_timeout timedelta

Maximum time between heartbeats.

timedelta(seconds=600)
retry_policy RetryPolicy

Policy for retrying failed operations.

DEFAULT_RETRY_POLICY
handoff_description str | None

Optional description of the handoff.

None
handoffs list[Agent] | None

Optional list of handoffs.

None
model str | None

Optional model to use.

None
model_settings ModelSettings | None

Optional model settings.

None
tools list[Tool] | None

Optional list of tools.

None
output_type type[Any] | AgentOutputSchemaBase | None

Optional output type.

None
tool_use_behavior Literal['run_llm_again', 'stop_on_first_tool'] | StopAtTools | ToolsToFinalOutputFunction

Optional tool use behavior.

'run_llm_again'
mcp_timeout_seconds int | None

Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds.

None
input_guardrails list[InputGuardrail] | None

Optional list of input guardrails to run on initial user input.

None
output_guardrails list[OutputGuardrail] | None

Optional list of output guardrails to run on final agent output.

None
max_turns int | None

Maximum number of turns the agent can take. Uses Runner's default if None.

None
previous_response_id str | None

Optional previous response ID for conversation continuity.

None

Returns:

Type Description
SerializableRunResult | RunResult

Union[SerializableRunResult, RunResult]: SerializableRunResult when in Temporal, RunResult otherwise.

run_agent_streamed async

run_agent_streamed(
    input_list: list[dict[str, Any]],
    agent_name: str,
    agent_instructions: str,
    mcp_server_params: list[StdioServerParameters]
    | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    handoff_description: str | None = None,
    handoffs: list[Agent] | None = None,
    model: str | None = None,
    model_settings: ModelSettings | None = None,
    tools: list[Tool] | None = None,
    output_type: type[Any]
    | AgentOutputSchemaBase
    | None = None,
    tool_use_behavior: Literal[
        "run_llm_again", "stop_on_first_tool"
    ]
    | StopAtTools
    | ToolsToFinalOutputFunction = "run_llm_again",
    mcp_timeout_seconds: int | None = None,
    input_guardrails: list[InputGuardrail] | None = None,
    output_guardrails: list[OutputGuardrail] | None = None,
    max_turns: int | None = None,
    previous_response_id: str | None = None,
) -> RunResultStreaming

Run an agent with streaming enabled but no TaskMessage creation.

DEFAULT: No TaskMessage creation, returns only the result.

NOTE: This method does NOT work in Temporal workflows! Use run_agent_streamed_auto_send() instead for Temporal workflows.

Parameters:

Name Type Description Default
input_list list[dict[str, Any]]

List of input data for the agent.

required
mcp_server_params list[StdioServerParameters] | None

MCP server parameters for the agent.

None
agent_name str

The name of the agent to run.

required
agent_instructions str

Instructions for the agent.

required
trace_id str | None

Optional trace ID for tracing.

None
parent_span_id str | None

Optional parent span for tracing.

None
start_to_close_timeout

Maximum time allowed for the operation.

required
heartbeat_timeout

Maximum time between heartbeats.

required
retry_policy

Policy for retrying failed operations.

required
handoff_description str | None

Optional description of the handoff.

None
handoffs list[Agent] | None

Optional list of handoffs.

None
model str | None

Optional model to use.

None
model_settings ModelSettings | None

Optional model settings.

None
tools list[Tool] | None

Optional list of tools.

None
output_type type[Any] | AgentOutputSchemaBase | None

Optional output type.

None
tool_use_behavior Literal['run_llm_again', 'stop_on_first_tool'] | StopAtTools | ToolsToFinalOutputFunction

Optional tool use behavior.

'run_llm_again'
mcp_timeout_seconds int | None

Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds.

None
input_guardrails list[InputGuardrail] | None

Optional list of input guardrails to run on initial user input.

None
output_guardrails list[OutputGuardrail] | None

Optional list of output guardrails to run on final agent output.

None
max_turns int | None

Maximum number of turns the agent can take. Uses Runner's default if None.

None
previous_response_id str | None

Optional previous response ID for conversation continuity.

None

Returns:

Name Type Description
RunResultStreaming RunResultStreaming

The result of the agent run with streaming.

Raises:

Type Description
ValueError

If called from within a Temporal workflow

run_agent_streamed_auto_send async

run_agent_streamed_auto_send(
    task_id: str,
    input_list: list[dict[str, Any]],
    agent_name: str,
    agent_instructions: str,
    mcp_server_params: list[StdioServerParameters]
    | None = None,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=600
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=600),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
    handoff_description: str | None = None,
    handoffs: list[Agent] | None = None,
    model: str | None = None,
    model_settings: ModelSettings | None = None,
    tools: list[Tool] | None = None,
    output_type: type[Any]
    | AgentOutputSchemaBase
    | None = None,
    tool_use_behavior: Literal[
        "run_llm_again", "stop_on_first_tool"
    ]
    | StopAtTools
    | ToolsToFinalOutputFunction = "run_llm_again",
    mcp_timeout_seconds: int | None = None,
    input_guardrails: list[InputGuardrail] | None = None,
    output_guardrails: list[OutputGuardrail] | None = None,
    max_turns: int | None = None,
    previous_response_id: str | None = None,
) -> SerializableRunResultStreaming | RunResultStreaming

Run an agent with streaming enabled and automatic TaskMessage creation.

.. deprecated:: Use the OpenAI Agents SDK integration with Temporal instead. See examples in tutorials/10_async/10_temporal/ for migration guidance.

Parameters:

Name Type Description Default
task_id str

The ID of the task to run the agent for.

required
input_list list[dict[str, Any]]

List of input data for the agent.

required
mcp_server_params list[StdioServerParameters] | None

MCP server parameters for the agent.

None
agent_name str

The name of the agent to run.

required
agent_instructions str

Instructions for the agent.

required
trace_id str | None

Optional trace ID for tracing.

None
parent_span_id str | None

Optional parent span for tracing.

None
start_to_close_timeout timedelta

Maximum time allowed for the operation.

timedelta(seconds=600)
heartbeat_timeout timedelta

Maximum time between heartbeats.

timedelta(seconds=600)
retry_policy RetryPolicy

Policy for retrying failed operations.

DEFAULT_RETRY_POLICY
handoff_description str | None

Optional description of the handoff.

None
handoffs list[Agent] | None

Optional list of handoffs.

None
model str | None

Optional model to use.

None
model_settings ModelSettings | None

Optional model settings.

None
tools list[Tool] | None

Optional list of tools.

None
input_guardrails list[InputGuardrail] | None

Optional list of input guardrails to run on initial user input.

None
output_guardrails list[OutputGuardrail] | None

Optional list of output guardrails to run on final agent output.

None
output_type type[Any] | AgentOutputSchemaBase | None

Optional output type.

None
tool_use_behavior Literal['run_llm_again', 'stop_on_first_tool'] | StopAtTools | ToolsToFinalOutputFunction

Optional tool use behavior.

'run_llm_again'
mcp_timeout_seconds int | None

Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds.

None
max_turns int | None

Maximum number of turns the agent can take. Uses Runner's default if None.

None
previous_response_id str | None

Optional previous response ID for conversation continuity.

None

Returns:

Type Description
SerializableRunResultStreaming | RunResultStreaming

Union[SerializableRunResultStreaming, RunResultStreaming]: SerializableRunResultStreaming when in Temporal, RunResultStreaming otherwise.

agentex.lib.adk.providers.litellm

Use the following module to interact with common LiteLLM functions.

Module for managing LiteLLM agent operations in Agentex. Provides high-level methods for chat completion, streaming.

chat_completion async

chat_completion(
    llm_config: LLMConfig,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=120
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=120),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> Completion

Perform a chat completion using LiteLLM.

Parameters:

Name Type Description Default
llm_config LLMConfig

The configuration for the LLM.

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=120)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=120)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
Completion Completion

An OpenAI compatible Completion object

chat_completion_auto_send async

chat_completion_auto_send(
    task_id: str,
    llm_config: LLMConfig,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=120
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=120),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> TaskMessage | None

Chat completion with automatic TaskMessage creation.

Parameters:

Name Type Description Default
task_id str

The ID of the task.

required
llm_config LLMConfig

The configuration for the LLM (must have stream=False).

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=120)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=120)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
TaskMessage TaskMessage | None

The final TaskMessage

chat_completion_stream async

chat_completion_stream(
    llm_config: LLMConfig,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
) -> AsyncGenerator[Completion, None]

Stream chat completion chunks using LiteLLM.

DEFAULT: Returns raw streaming chunks for manual handling.

NOTE: This method does NOT work in Temporal workflows! Temporal activities cannot return generators. Use chat_completion_stream_auto_send() instead.

Parameters:

Name Type Description Default
llm_config LLMConfig

The configuration for the LLM (must have stream=True).

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

required
heartbeat_timeout timedelta

The heartbeat timeout.

required
retry_policy RetryPolicy

The retry policy.

required

Returns:

Type Description
AsyncGenerator[Completion, None]

AsyncGenerator[Completion, None]: Generator yielding completion chunks

Raises:

Type Description
ValueError

If called from within a Temporal workflow

chat_completion_stream_auto_send async

chat_completion_stream_auto_send(
    task_id: str,
    llm_config: LLMConfig,
    trace_id: str | None = None,
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=120
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=120),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> TaskMessage | None

Stream chat completion with automatic TaskMessage creation and streaming.

Parameters:

Name Type Description Default
task_id str

The ID of the task to run the agent for.

required
llm_config LLMConfig

The configuration for the LLM (must have stream=True).

required
trace_id Optional[str]

The trace ID for tracing.

None
parent_span_id Optional[str]

The parent span ID for tracing.

None
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=120)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=120)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
TaskMessage TaskMessage | None

The final TaskMessage after streaming is complete

agentex.lib.adk.providers.sgp

Use the following module to interact with common SGP functions.

Module for managing SGP agent operations in Agentex. Provides high-level methods for chat completion, streaming, and message classification.

download_file_content async

download_file_content(
    params: DownloadFileParams,
    start_to_close_timeout: timedelta = timedelta(
        seconds=30
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=30),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> FileContentResponse

Download the content of a file from SGP.

Parameters:

Name Type Description Default
params DownloadFileParams

The parameters for the download file content activity.

required
start_to_close_timeout timedelta

The start to close timeout.

timedelta(seconds=30)
heartbeat_timeout timedelta

The heartbeat timeout.

timedelta(seconds=30)
retry_policy RetryPolicy

The retry policy.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
FileContentResponse FileContentResponse

The content of the file

Utilities

These are utility modules used during AI development for example for formatting prompts and more.

agentex.lib.adk.utils.templating

Use the following module to render Jinja templates.

Module for managing templating operations in Agentex.

This interface provides high-level methods for rendering Jinja templates, abstracting away the underlying activity and workflow execution. It supports both synchronous and asynchronous (Temporal workflow) contexts.

Initialize the templating interface.

Parameters:

Name Type Description Default
templating_service Optional[TemplatingService]

Optional pre-configured templating service. If None, will be auto-initialized.

None

render_jinja async

render_jinja(
    trace_id: str,
    template: str,
    variables: dict[str, Any],
    parent_span_id: str | None = None,
    start_to_close_timeout: timedelta = timedelta(
        seconds=10
    ),
    heartbeat_timeout: timedelta = timedelta(seconds=10),
    retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
) -> str

Render a Jinja template.

Parameters:

Name Type Description Default
trace_id str

Unique identifier for tracing and correlation.

required
template str

The Jinja template string to render.

required
variables Dict[str, Any]

Variables to use in the template.

required
parent_span_id Optional[str]

Optional parent span for tracing.

None
start_to_close_timeout timedelta

Maximum time allowed for the operation.

timedelta(seconds=10)
heartbeat_timeout timedelta

Maximum time between heartbeats.

timedelta(seconds=10)
retry_policy RetryPolicy

Policy for retrying failed operations.

DEFAULT_RETRY_POLICY

Returns:

Name Type Description
str str

The rendered template as a string.