State Machines in Agentex¶
State machines are a commonly used pattern in Agentex for managing complex workflows with multiple states and transitions. This document explains the core concepts and structure of the state machine SDK.
Overview¶
A state machine in Agentex consists of:
- States: Defined as enums or strings that represent different phases of execution
- Workflows: The logic that executes when a state is active
- Transitions: Movement between states based on workflow execution results
- Data: Persistent data that flows through the state machine
Core Classes¶
StateMachine¶
The StateMachine class is the main orchestrator that manages state transitions and workflow execution.
class StateMachine(ABC, Generic[T]):
def __init__(
self,
initial_state: str,
states: list[State],
task_id: str | None = None,
state_machine_data: T | None = None,
trace_transitions: bool = False,
):
Key Components:
initial_state: The starting state namestates: List of all possible states with their associated workflowstask_id: Required identifier for tracing and debugging (can be set later)state_machine_data: Generic data model that persists throughout executiontrace_transitions: Enables detailed logging of state transitions
Execution Flow:
- The state machine starts in the
initial_state - It repeatedly calls
step()untilterminal_condition()returnsTrue - Each
step()executes the current state's workflow and transitions to the next state
State¶
A State represents a single phase in the state machine with an associated workflow:
class State(BaseModel):
name: str
workflow: StateWorkflow
Components:
name: Unique identifier for the stateworkflow: TheStateWorkflowinstance that contains the execution logic
StateWorkflow¶
StateWorkflow is an abstract base class that defines the interface for state execution logic:
class StateWorkflow(ABC):
@abstractmethod
async def execute(
self, state_machine: "StateMachine", state_machine_data: BaseModel | None = None
) -> str:
pass
Key Methods:
execute(): Runs the workflow logic and returns the name of the next state to transition to
State Transitions and Execution¶
How State Transitions Work¶
- Current State Retrieval: The state machine gets the current state
- Workflow Execution: Calls
execute()on the current state's workflow - Next State Determination: The workflow returns the name of the next state
- Transition: The state machine moves to the new state
async def step(self) -> str:
current_state_name = self.get_current_state()
current_workflow = self.get_current_workflow()
next_state_name = await current_workflow.execute(
state_machine=self, state_machine_data=self.state_machine_data
)
await self.transition(next_state_name)
return next_state_name
Workflow Execution¶
Each workflow's execute() method:
- Receives the state machine instance and current data
- Performs the state-specific logic
- Returns the name of the next state to transition to
Example Workflow:
class MyWorkflow(StateWorkflow):
async def execute(
self, state_machine: "StateMachine", state_machine_data: BaseModel | None = None
) -> str:
# Perform state-specific logic
result = await self.process_data(state_machine_data)
# Determine next state based on result
if result.is_success():
return "next_state"
else:
return "error_state"
State Definition and Mapping¶
Defining States¶
States are typically defined as enums or constants and mapped to workflows:
from enum import Enum
class MyStates(str, Enum):
START = "start"
PROCESSING = "processing"
COMPLETE = "complete"
ERROR = "error"
# Define workflows for each state
start_workflow = StartWorkflow()
processing_workflow = ProcessingWorkflow()
complete_workflow = NoOpWorkflow() # Terminal state
error_workflow = ErrorWorkflow()
# Create State objects
states = [
State(name=MyStates.START, workflow=start_workflow),
State(name=MyStates.PROCESSING, workflow=processing_workflow),
State(name=MyStates.COMPLETE, workflow=complete_workflow),
State(name=MyStates.ERROR, workflow=error_workflow),
]
State Machine Initialization¶
class MyStateMachine(StateMachine[MyDataModel]):
async def terminal_condition(self) -> bool:
return self.get_current_state() == MyStates.COMPLETE
# Initialize the state machine
state_machine = MyStateMachine(
initial_state=MyStates.START,
states=states,
state_machine_data=MyDataModel(),
trace_transitions=True
)
Data Flow¶
State Machine Data¶
The state_machine_data parameter is a generic Pydantic model that persists throughout the entire state machine execution:
class MyDataModel(BaseModel):
input_data: str
processed_results: list[str] = []
error_message: str | None = None
Data Access in Workflows¶
Workflows can access and modify the state machine data:
class ProcessingWorkflow(StateWorkflow):
async def execute(
self, state_machine: "StateMachine", state_machine_data: MyDataModel | None = None
) -> str:
if state_machine_data:
# Access and modify the data
result = await self.process(state_machine_data.input_data)
state_machine_data.processed_results.append(result)
return "complete"
return "error"
Persistence and Serialization¶
Saving State¶
The state machine can be serialized to a dictionary for persistence:
saved_state = state_machine.dump()
# Returns: {
# "task_id": "task_123",
# "current_state": "processing",
# "initial_state": "start",
# "state_machine_data": {...},
# "trace_transitions": True
# }
Loading State¶
A state machine can be restored from saved data:
restored_machine = await MyStateMachine.load(saved_state, states)
Tracing and Debugging¶
Transition Tracing¶
When trace_transitions=True, the state machine logs detailed information about each transition:
- Input state and data
- Output state and data
- Transition timing
- Task correlation
Task ID Management¶
The task_id is used for correlating traces and debugging. In Temporal workflows, the task_id is often not known until the task handler function receives the task, so you can initialize the state machine without it and set it later:
# Initialize state machine without task_id
state_machine = MyStateMachine(
initial_state=MyStates.START,
states=states,
state_machine_data=MyDataModel(),
trace_transitions=True
)
# Set task_id when you receive the task in your handler
async def handle_task(task: Task) -> TaskResult:
state_machine.set_task_id(task.task_id)
# ... rest of your task handling logic
Important: The task_id is required for tracing to work properly. If you enable trace_transitions=True but don't set a task_id, the state machine will raise a ClientError when trying to trace transitions.
Terminal Conditions¶
The state machine runs until terminal_condition() returns True. This method must be implemented by subclasses:
class MyStateMachine(StateMachine[MyDataModel]):
async def terminal_condition(self) -> bool:
current_state = self.get_current_state()
return current_state in ["complete", "error"]
Common Patterns¶
Terminal States¶
Use NoOpWorkflow for states that don't perform any action:
complete_state = State(name="complete", workflow=NoOpWorkflow())
Error Handling¶
Create dedicated error states and workflows:
class ErrorWorkflow(StateWorkflow):
async def execute(self, state_machine, state_machine_data):
# Log error, cleanup, etc.
return "complete" # or stay in error state
Conditional Transitions¶
Workflows can implement complex logic to determine the next state:
class DecisionWorkflow(StateWorkflow):
async def execute(self, state_machine, state_machine_data):
if state_machine_data.needs_more_processing():
return "processing"
elif state_machine_data.has_errors():
return "error"
else:
return "complete"
This foundation provides a robust, type-safe, and traceable framework for building complex workflows in Agentex applications.