A2A Workflows: A Practical Implementation Guide for Agent-to-Agent Execution
A2A workflows are the backbone of production multi-agent systems. While individual agents can communicate and collaborate, a workflow defines the end‑to‑end flow of tasks – the sequence, parallelism, iteration, and aggregation rules that turn a set of independent agents into a reliable task execution engine.
This guide focuses exclusively on implementing A2A workflows: how to define tasks, delegate to workers, manage shared state, handle failures, and monitor execution. You will find no high‑level architectural patterns or distributed systems theory here – only concrete, code‑ready practices for building collaborative agent workflows.
What Are A2A Workflows
A2A workflows are collaborative execution flows where multiple AI agents work together through communication and task coordination to achieve a complex goal. A workflow defines:
- Which agents participate
- In what order they execute (sequential, parallel, iterative)
- What context is shared between steps
- How results are aggregated into a final output
In implementation terms, an A2A workflow is a directed graph of task assignments – each node is a subtask executed by a specific agent, and edges represent data flow or completion dependencies.
Unlike a simple request‑response pair, workflows are:
- Multi‑step – involve two or more agent interactions
- Stateful – carry context across steps
- Resilient – can retry, fallback, or degrade on failure
- Observable – emit metrics and logs at each stage
Why A2A Workflows Matter
Without explicit workflow logic, agents can only perform single‑step requests. Workflows unlock real‑world automation.
| Requirement | Why Workflows Are Essential |
|---|---|
| Complex task execution | A single agent cannot both query a database and write a report. A workflow chains specialised agents. |
| Task decomposition | A high‑level goal (“analyse customer sentiment”) is broken into smaller steps (fetch data → clean → analyse → summarise). |
| Specialised agent capabilities | Each agent does one thing well (SQL, charting, summarisation). Workflows combine them. |
| Parallel task processing | Workflows can fan‑out to multiple agents simultaneously, reducing total execution time. |
Practical example: “Generate a weekly sales dashboard.” A workflow:
- Data Agent – pulls raw sales data
- Clean Agent – normalises and deduplicates
- Analytics Agent – computes KPIs
- Chart Agent – renders visualisations
- Report Agent – assembles email
Each agent is simple, independently testable, and reusable across workflows.
Workflow vs Communication vs Collaboration vs Messaging
These four concepts form a stack. Understanding the distinction ensures you implement the right layer.
| Concept | Focus | Unit of work | Example |
|---|---|---|---|
| Messaging | Transport & structure of individual messages | Single JSON/Protobuf message | {"type":"request", "payload":...} |
| Communication | Exchange patterns between agents | Request‑response, event, notification | Agent A sends request, Agent B replies |
| Collaboration | Joint execution of a shared task | Subtask assignment, context sharing, result return | Owner assigns subtask to worker |
| A2A Workflow | End‑to‑end task flow | Sequence/parallel/iterative graph of collaborations | Fetch → clean → analyse → report |
Rule of thumb:
- Messaging is how you send bytes.
- Communication is when and what you send.
- Collaboration is who does what.
- Workflow is the whole story – the end‑to‑end flow from goal to completion.
This article focuses on the workflow layer, assuming you have already implemented messaging and communication (see the Agent Messaging and Agent Communication guides). Collaboration (covered in Agent Collaboration) is the building block; workflow is the blueprint.
A2A Workflow Lifecycle
Every A2A workflow follows a standard lifecycle. Implement each stage explicitly.
Stage details:
- Task Creation – A user or parent workflow invokes the workflow orchestrator with a goal and parameters. A unique
workflow_idis created. - Task Decomposition – The orchestrator breaks the high‑level goal into a directed acyclic graph (DAG) of subtasks. Each subtask specifies a required agent capability and input parameters.
- Task Assignment – The orchestrator resolves the capability to a specific agent address and sends a request message (assignment).
- Agent Execution – The worker performs its subtask. For long‑running work, it may send periodic progress events.
- Result Collection – The worker returns a result (or error). The orchestrator stores it in the workflow state.
- Result Aggregation – After all subtasks of a stage are complete, the orchestrator aggregates intermediate results and may feed them into downstream subtasks.
- Completion – The final aggregated result is returned to the caller, and the workflow state is archived or deleted.
Core Workflow Components
Every workflow implementation requires these five components.
| Component | Responsibility | Implementation Example |
|---|---|---|
| Task Owner (Orchestrator) | Decomposes goal, manages state, assigns subtasks, aggregates results | A dedicated WorkflowOrchestrator class |
| Worker Agent | Executes a single subtask (pure function from input to output) | Agent with registered capability handlers |
| Shared Context | Carries conversation ID, task state, intermediate data across subtasks | Redis hash keyed by workflow_id |
| Workflow State | Persistent record of current status, completed subtasks, results, errors | Database table or Redis with TTL |
| Result Aggregator | Combines outputs from multiple workers | Could be the orchestrator or a dedicated aggregator agent |
Minimal orchestrator skeleton:
class WorkflowOrchestrator:
def __init__(self, registry: CapabilityRegistry, state_store: WorkflowStateStore):
self.registry = registry
self.state = state_store
async def run(self, workflow_id: str, goal: str, params: dict):
# 1. Create workflow record
await self.state.create(workflow_id, goal, params)
# 2. Decompose (static or dynamic)
subtasks = self.decompose(goal, params)
# 3. Execute according to DAG
results = await self.execute_dag(subtasks, workflow_id)
# 4. Aggregate
final = self.aggregate(results, goal)
# 5. Complete
await self.state.complete(workflow_id, final)
return final
Sequential Workflows
The simplest workflow pattern: agents execute one after another, passing results forward.
When to use: Each step depends on the previous step’s output. No parallelisable work.
Implementation:
async def sequential_workflow(subtasks: list[Subtask]) -> Any:
current_input = None
for idx, subtask in enumerate(subtasks):
if current_input:
subtask.params["previous_result"] = current_input
result = await assign_and_execute(subtask)
current_input = result
# Optionally persist intermediate result
await state_store.set_subtask_result(workflow_id, idx, result)
return current_input
Example: Content generation workflow
- Outline Agent – generates a bullet‑point outline
- Draft Agent – writes a full draft from the outline
- Edit Agent – polishes grammar and style
- Format Agent – converts to final markdown/HTML
Each agent receives the previous agent’s output as input. The orchestrator chains them.
Parallel Workflows
Multiple agents execute independently on the same input or on different partitions of data.
When to use: Subtasks are independent and can be executed concurrently to reduce total latency.
Implementation:
async def parallel_workflow(subtasks: list[Subtask], timeout_seconds: int = 30):
# Execute all subtasks concurrently
tasks = [assign_and_execute(st) for st in subtasks]
results = await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=timeout_seconds)
# Separate successes and failures
successes = []
failures = []
for i, res in enumerate(results):
if isinstance(res, Exception):
failures.append((i, res))
else:
successes.append((i, res))
# Decide how to handle partial failures (see Failure Handling section)
return aggregate_parallel_results(successes, failures, strategy="fail_fast")
Example: Data enrichment workflow A customer ID is sent to three agents simultaneously:
- Credit Agent – fetches credit score
- Address Agent – validates address
- Preference Agent – pulls marketing preferences
The orchestrator waits for all three (or a configurable quorum) and combines results into a single enriched customer profile.
Iterative Workflows
Agents refine a result through repeated cycles until a quality threshold is met.
When to use: The task has no single‑pass solution (e.g., creative writing, code generation, optimisation).
Implementation:
async def iterative_workflow(
initial_input: Any,
generator_capability: str,
critic_capability: str,
max_iterations: int = 5,
quality_threshold: float = 0.9
) -> Any:
current = initial_input
for i in range(max_iterations):
# Generate
generated = await assign_and_execute(Subtask(generator_capability, {"input": current}))
# Critique
critique = await assign_and_execute(Subtask(critic_capability, {"draft": generated}))
quality = critique.get("quality_score", 0.0)
if quality >= quality_threshold:
return generated
# Prepare for next iteration
current = {"previous_draft": generated, "feedback": critique.get("feedback")}
raise MaxIterationsExceededError(f"Quality only reached {quality} after {max_iterations} iterations")
Example: Code refinement workflow
- Generator Agent writes a Python function based on a spec.
- Reviewer Agent checks for bugs, style, performance.
- If score < 0.9, the orchestrator sends the review back to the generator for improvement.
- Loop until passing quality or max iterations.
Task Delegation
Task delegation is the mechanism by which the orchestrator assigns subtasks to workers.
Task Assignment Message
The orchestrator sends a request message that includes enough context for the worker to execute autonomously.
{
"message_id": "0194f0a2-...",
"type": "request",
"sender": "orchestrator/workflow/v1",
"receiver": "data-agent/v1",
"timestamp": "2025-06-10T14:30:00Z",
"payload": {
"operation": "query",
"parameters": {
"table": "sales",
"date_range": {"start": "2025-01-01", "end": "2025-03-31"}
}
},
"metadata": {
"workflow_id": "wf_123",
"subtask_id": "sub_1",
"correlation_id": "msg_456",
"ttl_ms": 30000
}
}
Task Ownership
After sending an assignment, the orchestrator marks the subtask as running and stores the worker’s address. It becomes responsible for:
- Monitoring progress (via heartbeat or explicit progress events)
- Handling timeouts
- Retrying on failure (if retryable)
Ownership table (stored in workflow state):
| Workflow ID | Subtask ID | Status | Assigned To | Started At | Attempts | Last Progress |
|---|---|---|---|---|---|---|
| wf_123 | sub_1 | running | data-agent-01 | 14:30:00 | 1 | 14:30:15 |
| wf_123 | sub_2 | pending | – | – | 0 | – |
Task Tracking
Track each subtask with a state machine:
pending → assigned → running → completed
↘ failed → retrying (back to assigned)
↘ failed_permanent
Implementation:
class SubtaskTracker:
def __init__(self, state_store):
self.store = state_store
async def assign(self, workflow_id, subtask_id, worker_addr):
await self.store.update(workflow_id, subtask_id, {
"status": "running",
"assigned_to": worker_addr,
"started_at": utc_now(),
"attempts": self.store.get_attempts(workflow_id, subtask_id) + 1
})
async def complete(self, workflow_id, subtask_id, result):
await self.store.update(workflow_id, subtask_id, {
"status": "completed",
"result": result,
"completed_at": utc_now()
})
async def fail(self, workflow_id, subtask_id, error, retryable):
if retryable and self.store.get_attempts(workflow_id, subtask_id) < MAX_RETRIES:
await self.store.update(workflow_id, subtask_id, {"status": "retrying"})
# Re‑assign after backoff
else:
await self.store.update(workflow_id, subtask_id, {"status": "failed_permanent", "error": error})
Workflow State Management
Workflows are stateful. You must persist state across subtasks to survive crashes and enable resumption.
Shared State Storage
Use a fast, durable key‑value store (Redis, DynamoDB). Key by workflow_id.
State schema:
{
"workflow_id": "wf_123",
"status": "running",
"goal": "generate_sales_report",
"params": {"region": "EMEA", "month": "2025-03"},
"created_at": "2025-06-10T14:30:00Z",
"subtasks": {
"sub_1": {
"capability": "data_query",
"status": "completed",
"result": {"data_uri": "s3://.../raw.parquet"}
},
"sub_2": {
"capability": "aggregate",
"status": "running",
"assigned_to": "analytics-agent",
"started_at": "2025-06-10T14:30:05Z"
}
},
"context": {
"conversation_id": "conv_abc",
"intermediate": {
"data_uri": "s3://.../raw.parquet"
}
}
}
Context Synchronization
Workers should not modify the shared state directly. Instead, they return deltas, and the orchestrator applies them.
Pattern:
- Orchestrator includes a
context_ref(e.g.,redis://wf_123/context) in the assignment. - Worker reads the context, performs work, and returns a
context_updateobject. - Orchestrator merges the update into the shared state.
# Worker's response
{
"type": "response",
"payload": {
"result": {"aggregated_value": 12345},
"context_update": {
"intermediate.aggregated_value": 12345,
"intermediate.last_updated": "2025-06-10T14:30:10Z"
}
}
}
Workflow Checkpoints
For long‑running workflows (minutes to hours), save checkpoints after each subtask. If the orchestrator crashes, it can resume from the last checkpoint.
async def checkpoint(workflow_id: str):
state = await state_store.get(workflow_id)
await checkpoint_store.save(f"checkpoint:{workflow_id}", state, ttl=86400)
On restart, the orchestrator loads the latest checkpoint and continues from the next pending subtask.
Result Aggregation
After workers finish, the orchestrator must combine their outputs into a final result.
Collecting Outputs
Store results keyed by subtask ID in the workflow state.
async def collect_result(workflow_id: str, subtask_id: str, result: Any):
async with state_store.lock(workflow_id):
state = await state_store.get(workflow_id)
state["subtasks"][subtask_id]["result"] = result
await state_store.set(workflow_id, state)
Merging Outputs
Different workflow patterns require different aggregation strategies.
| Workflow Type | Aggregation Strategy | Example |
|---|---|---|
| Sequential | Pass output of step N as input to step N+1 | Report generation |
| Parallel (all) | Combine all results into a list or dict | Enrichment (credit + address + preferences) |
| Parallel (quorum) | Take majority vote or first K results | Ensemble classification |
| Iterative | Use final iteration’s output | Code refinement |
| Fan‑out / Fan‑in | Reduce (e.g., sum, average) across workers | Map‑reduce over partitions |
Parallel aggregation example (merge dicts):
def aggregate_parallel_dict(results: list[dict]) -> dict:
merged = {}
for r in results:
merged.update(r) # later keys overwrite earlier
return merged
Quorum aggregation (majority vote):
def aggregate_majority_vote(results: list[Any]) -> Any:
from collections import Counter
counter = Counter(results)
# If tie, return the first of the tied values
return counter.most_common(1)[0][0]
Validation Before Aggregation
Never trust worker outputs blindly. Validate each result against an expected schema and business rules.
def validate_and_collect(result: Any, expected_schema: dict) -> Any:
try:
jsonschema.validate(result, expected_schema)
return result
except ValidationError as e:
raise InvalidWorkerOutputError(f"Validation failed: {e.message}")
Failure Handling
Workflows fail. Your implementation must degrade gracefully.
Agent Failures
When a worker agent is unavailable or returns an error:
async def execute_with_fallback(subtask: Subtask, retries=2):
for attempt in range(retries + 1):
try:
worker = await registry.resolve(subtask.capability)
return await worker.send_request(subtask.to_message())
except (AgentUnavailableError, NetworkError) as e:
if attempt < retries:
await asyncio.sleep(2 ** attempt) # exponential backoff
continue
# Try fallback capability
fallback_cap = registry.get_fallback(subtask.capability)
if fallback_cap:
worker = await registry.resolve(fallback_cap)
return await worker.send_request(subtask.to_message())
raise WorkflowSubtaskFailedError(subtask.id, str(e))
Workflow Retries
For transient failures (network blip, temporary overload), retry the entire workflow or individual subtasks.
Idempotency requirement: Workflows must be idempotent – running the same workflow twice with the same workflow_id should produce the same final result and not cause duplicate side effects. Use workflow_id to deduplicate at the orchestrator entry point.
async def run_workflow(workflow_id: str, goal: str, params: dict):
# Deduplication
existing = await state_store.get_status(workflow_id)
if existing in ("completed", "running"):
return await state_store.get_result(workflow_id)
# First time or failed – execute
try:
result = await do_run(workflow_id, goal, params)
await state_store.complete(workflow_id, result)
return result
except Exception as e:
await state_store.fail(workflow_id, str(e))
raise
Partial Completion
In parallel workflows, you may choose to ignore failures of non‑critical workers.
async def parallel_with_optional(subtasks: list[Subtask], critical_indices: set[int]) -> dict:
results = {}
for idx, st in enumerate(subtasks):
try:
results[idx] = await assign_and_execute(st)
except Exception as e:
if idx in critical_indices:
raise
else:
log.warning(f"Optional subtask {idx} failed: {e}")
results[idx] = None
return results
Timeout Handling
Set a workflow‑level timeout (maximum wall‑clock time). If exceeded, cancel all pending subtasks and mark workflow as failed.
async def run_with_timeout(workflow_id, coro, timeout_seconds):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
await state_store.fail(workflow_id, "Workflow timeout")
# Attempt to cancel running subtasks (if supported by workers)
await cancel_pending_subtasks(workflow_id)
raise WorkflowTimeoutError()
Workflow Monitoring
Observability is non‑negotiable. Export these metrics from your workflow orchestrator.
| Metric | Type | Labels | Alert When |
|---|---|---|---|
workflow_starts_total | Counter | workflow_type | – |
workflow_completions_total | Counter | workflow_type, status (success/failure) | – |
workflow_duration_seconds | Histogram | workflow_type | p95 > expected SLA |
subtask_duration_seconds | Histogram | capability | p99 > 30s |
subtask_retries_total | Counter | capability, reason | Retry rate > 10% |
workflow_partial_failures_total | Counter | workflow_type | Any increase (indicates degraded mode) |
workflow_timeouts_total | Counter | workflow_type | Any |
active_workflows | Gauge | workflow_type | > max_concurrent |
Prometheus instrumentation example:
from prometheus_client import Counter, Histogram, Gauge
workflow_duration = Histogram("workflow_duration_seconds", "Workflow execution time", ["type"])
subtask_retries = Counter("subtask_retries_total", "Retried subtasks", ["capability", "reason"])
active_workflows = Gauge("active_workflows", "Currently running workflows", ["type"])
async def monitored_workflow(workflow_type, coro):
active_workflows.labels(type=workflow_type).inc()
start = time.perf_counter()
try:
result = await coro
workflow_duration.labels(type=workflow_type).observe(time.perf_counter() - start)
return result
except Exception:
workflow_duration.labels(type=workflow_type).observe(time.perf_counter() - start)
raise
finally:
active_workflows.labels(type=workflow_type).dec()
Logging: Emit structured logs at workflow start, subtask assignment, subtask completion, workflow completion, and any failure. Always include workflow_id.
{"event": "workflow_started", "workflow_id": "wf_123", "type": "sales_report", "timestamp": "..."}
{"event": "subtask_assigned", "workflow_id": "wf_123", "subtask_id": "sub_1", "capability": "data_query", "worker": "data-agent-01"}
{"event": "subtask_completed", "workflow_id": "wf_123", "subtask_id": "sub_1", "duration_ms": 234}
{"event": "workflow_completed", "workflow_id": "wf_123", "status": "success", "duration_sec": 12.3}
Workflow Testing
Test workflows at multiple levels. Do not rely on manual end‑to‑end tests only.
Unit Testing
Test decomposition, aggregation, and state transition logic in isolation (mock workers).
async def test_sequential_aggregation():
orchestrator = WorkflowOrchestrator(mock_registry, mock_state)
subtasks = [Subtask("cap_a"), Subtask("cap_b")]
# Mock worker returns input + 1
async def mock_execute(st): return st.params.get("value", 0) + 1
orchestrator.assign_and_execute = mock_execute
result = await orchestrator.execute_sequential(subtasks, initial_value=5)
assert result == 7 # 5+1=6, then 6+1=7
Integration Testing
Run real worker agents in test containers and verify a complete workflow.
async def test_sales_report_workflow_integration():
with DockerContainer("data-agent:latest") as data_agent:
with DockerContainer("analytics-agent:latest") as analytics:
with DockerContainer("report-agent:latest") as report:
orchestrator = WorkflowOrchestrator(...)
result = await orchestrator.run(
workflow_id="test_001",
goal="sales_report",
params={"date": "2025-03-01"}
)
assert "report_uri" in result
assert result["status"] == "success"
Workflow Testing (DAG validation)
Test that the workflow DAG (subtask dependencies) is correctly constructed and executed in order.
def test_workflow_decomposition():
orchestrator = WorkflowOrchestrator(...)
dag = orchestrator.decompose("sales_report", {})
assert dag.nodes == ["data_query", "aggregate", "chart", "report"]
assert dag.edges == [("data_query", "aggregate"), ("aggregate", "chart"), ("aggregate", "report")] # chart and report depend on aggregate
Failure Testing
Inject failures to verify retry, fallback, and partial completion logic.
async def test_workflow_fallback():
registry = CapabilityRegistry()
registry.register("data_query", failing_worker) # always fails
registry.set_fallback("data_query", fallback_worker) # succeeds
orchestrator = WorkflowOrchestrator(registry, ...)
result = await orchestrator.run_workflow("test", "query_data", {})
assert result == "fallback_success"
# Verify that failing worker was retried, then fallback used
assert failing_worker.call_count == 3
assert fallback_worker.call_count == 1
Workflow Security
When workflows involve multiple agents, especially across trust boundaries, secure the execution.
Access Control Checklist
- Workflow‑level authentication – Only authorised callers (users or parent workflows) can start a workflow.
- Per‑subtask authorisation – The orchestrator must verify that the caller has permission to invoke each worker capability.
- Least privilege for workers – Workers should receive only the data they need (no full database access).
- Isolated workflow state – Different workflow instances cannot read each other’s state unless explicitly allowed (e.g., via shared
conversation_id). - Audit logging – Log every workflow start, subtask assignment, and state change with
workflow_idand caller identity.
Shared Context Protection
If using a shared state store (Redis), use separate keyspaces per workflow and enforce access via short‑lived tokens.
# Orchestrator creates a token for the worker
token = generate_jwt({"workflow_id": workflow_id, "subtask_id": subtask_id, "exp": time()+300})
# Worker uses token to read/write context
context = await http_get(f"https://state-store/context/{workflow_id}", headers={"Authorization": f"Bearer {token}"})
Secure Aggregation
When aggregating results from multiple workers, ensure that one worker cannot influence the aggregation in a way that compromises security (e.g., vote stuffing). Use signed results if workers are in different trust domains.
A2A Workflow Best Practices
Adopt these 12 guidelines for production‑grade A2A workflows.
-
Keep workflows simple – Each workflow should do one logical thing. Decompose large workflows into sub‑workflows (reusable).
-
Clearly define ownership – Every workflow has a single orchestrator. No two orchestrators manage the same workflow instance.
-
Persist workflow state – Store state in a durable store (Redis with AOF, DynamoDB). Never rely on in‑memory only.
-
Make workflows idempotent – Use
workflow_idto deduplicate start requests. Rerunning a completed workflow should return the same result. -
Set timeouts at every level – Workflow‑level timeout, subtask‑level timeout, and per‑message TTL.
-
Handle partial failures explicitly – Decide which subtasks are critical and which are optional. Log when you continue despite a failure.
-
Monitor everything – Expose metrics for start, completion, duration, retries, and failures. Alert on anomalies.
-
Use exponential backoff for retries – Jitter to avoid thundering herds. Cap at a reasonable maximum (e.g., 30 seconds).
-
Validate all worker outputs – Schema validation + business rules before aggregation. Never trust workers.
-
Design for resumability – After a crash, the orchestrator should reload workflow state and continue from the last checkpoint.
-
Test failure scenarios – Simulate worker crashes, timeouts, invalid outputs, and network partitions.
-
Version your workflows – When you change the DAG or input/output schemas, increment a workflow version (e.g.,
sales_report_v2). Support both versions during migration.
Common Workflow Mistakes
| Mistake | Consequence | Solution |
|---|---|---|
| No clear task ownership – Two agents think they own the same subtask. | Race conditions, duplicate work. | Designate a single orchestrator per workflow. |
| No workflow state persistence – State only in memory. | Crashes lose progress; workflow cannot resume. | Persist state to Redis/DynamoDB. |
| Excessive communication – Workers send progress updates every millisecond. | Network congestion, high latency. | Batch updates or limit to ≤1 per second. |
| Missing retries – A single worker failure aborts the whole workflow. | Fragile system, low success rate. | Implement retries with backoff and fallback. |
| No partial failure handling – All subtasks must succeed, even optional ones. | Unnecessary failures. | Classify subtasks as critical vs optional. |
| Ignoring idempotency – Retries cause duplicate charges or data corruption. | Financial or data integrity issues. | Store processed workflow_id and subtask_id. |
| Poor monitoring – No metrics or logs. | Impossible to debug production issues. | Export Prometheus metrics and structured logs. |
| Overly complex workflows – One workflow does everything. | Hard to test, debug, reuse. | Break into smaller, composable sub‑workflows. |
Case Study: Multi‑Agent Research Workflow
Scenario: A user requests “Analyse customer feedback from Q2 2025 and produce a sentiment report with action items.”
Workflow definition (research_workflow_v1):
Step 1 – Workflow Creation
Orchestrator receives request with workflow_id = "wf_789" and goal = "sentiment_report", parameters {"quarter": "2025Q2", "source": "support_tickets"}.
It creates a state record:
{
"workflow_id": "wf_789",
"status": "running",
"goal": "sentiment_report",
"params": {"quarter": "2025Q2", "source": "support_tickets"},
"subtasks": {},
"context": {}
}
Step 2 – Decomposition and Assignment
The orchestrator’s decomposition function returns the DAG shown above.
Subtask 1 (Fetch):
- Capability:
data_fetch - Parameters:
{"source": "support_tickets", "date_range": "2025-04-01..2025-06-30"} - Assigned to: Data Agent
Subtask 2 (Clean):
- Depends on subtask 1
- Capability:
text_cleaner - Parameters:
{"input_uri": "$context.data_uri", "remove_pii": true}
Subtask 3 & 4 (Parallel):
- Both depend on subtask 2
- Capability:
sentiment_analysisandtopic_modelling - Each receives the cleaned data URI
Subtask 5 (Report):
- Depends on both subtask 3 and 4
- Capability:
report_generator - Parameters:
{"sentiment_result": "$context.sentiment", "topics": "$context.topics"}
Step 3 – Execution & State Tracking
Orchestrator executes the DAG using a topological scheduler.
async def execute_dag(dag: DAG, workflow_id: str):
pending = set(dag.nodes)
completed = set()
results = {}
while pending:
# Find nodes whose dependencies are all completed
ready = [n for n in pending if all(d in completed for d in dag.dependencies[n])]
if not ready:
raise CyclicDependencyError()
# Execute ready subtasks in parallel
tasks = [execute_subtask(workflow_id, n) for n in ready]
subtask_results = await asyncio.gather(*tasks, return_exceptions=True)
for node, res in zip(ready, subtask_results):
if isinstance(res, Exception):
# Handle failure based on node criticality
if dag.is_critical(node):
raise WorkflowExecutionError(f"Critical node {node} failed: {res}")
else:
log.warning(f"Optional node {node} failed, skipping")
else:
results[node] = res
await state_store.set_subtask_result(workflow_id, node, res)
completed.add(node)
pending.remove(node)
return results
Step 4 – Aggregation
After all subtasks complete, the orchestrator collects results:
final_result = {
"sentiment": results["sentiment_analysis"],
"topics": results["topic_modelling"],
"action_items": derive_action_items(results["sentiment"], results["topics"])
}
Step 5 – Completion
Orchestrator marks workflow as completed, stores final result, and returns it to the user.
Monitoring snapshot:
- Workflow duration: 8.3 seconds
- Subtask retries: 0
- Status: success
Logs:
{"event": "workflow_started", "workflow_id": "wf_789", "type": "research_workflow_v1"}
{"event": "subtask_assigned", "workflow_id": "wf_789", "subtask": "fetch", "worker": "data-agent"}
{"event": "subtask_completed", "workflow_id": "wf_789", "subtask": "fetch", "duration_ms": 1200}
{"event": "subtask_assigned", "workflow_id": "wf_789", "subtask": "clean", "worker": "clean-agent"}
...
{"event": "workflow_completed", "workflow_id": "wf_789", "duration_sec": 8.3, "status": "success"}
FAQ
1. What are A2A workflows?
A2A workflows are end‑to‑end execution flows where multiple AI agents collaborate via messages to achieve a complex goal. They define task decomposition, ordering, state management, and result aggregation.
2. How is a workflow different from a collaboration?
Collaboration focuses on a single interaction (owner‑worker). A workflow is a composition of multiple collaborations in a sequence, parallel, or iterative pattern.
3. Who is responsible for workflow state persistence?
The workflow orchestrator (owner). It must persist state to a durable store (Redis, DynamoDB) to survive crashes and allow resumption.
4. How do I handle failures in parallel subtasks?
Decide per workflow: fail fast (any failure aborts all), continue with successes (ignore non‑critical failures), or require quorum (e.g., at least 2 out of 3 must succeed).
5. Can workflows be nested (a subtask that is itself a workflow)?
Yes. This is a powerful pattern for reusability. The parent orchestrator treats the child workflow as a “worker” with a capability like run_subworkflow.
6. How do I ensure idempotency in a workflow?
Use a unique workflow_id. Before starting, check if the workflow already exists in a terminal state (completed, failed). If so, return the stored result. Store processed subtask message_ids to avoid duplicate execution.
7. What is the recommended way to pass data between sequential subtasks?
Use the shared workflow context (state store). The orchestrator writes the output of subtask A into context, then includes a context reference (e.g., context.data_uri) in subtask B’s assignment.
8. How long should a workflow be allowed to run?
Set a workflow‑level timeout (e.g., 5 minutes for interactive, 24 hours for batch). Use the orchestrator’s timeout mechanism to cancel long‑running workflows.
9. How do I test a workflow without real agents?
Mock the assign_and_execute method. Test decomposition logic and aggregation logic separately. Use dependency injection to swap real workers with mocks.
10. Can workflows be paused and resumed later?
Yes, if you persist state and implement a “pause” signal that stops assigning new subtasks. Resume by reloading state and continuing the DAG scheduler.
11. How do I monitor workflow health in production?
Export metrics (start, completion, duration, retries, failures) and logs. Set alerts for high failure rate or high p99 duration. Use distributed tracing to correlate workflow events.
12. What is the best way to version workflows?
Include a workflow_version field in the workflow definition. When you make a breaking change (different DAG, changed I/O schemas), create a new workflow type (e.g., sales_report_v2). Keep the old version running during migration.
13. How do I avoid overloading a worker agent with too many parallel subtasks?
Implement a semaphore in the orchestrator to limit concurrency per capability. Use a worker‑side rate limiter that returns AGENT_OVERLOADED; the orchestrator should then back off.
14. Can a worker agent be part of multiple workflows simultaneously?
Yes, if it is stateless or uses per‑workflow context isolation. Ensure the worker can handle concurrency (e.g., asyncio or threading).
15. How do I handle workflows that require human approval?
Insert a “human‑in‑the‑loop” subtask. The orchestrator sends a notification to an external system (e.g., Slack, email) and waits for a callback webhook. The workflow state is persisted while waiting.
16. What is the difference between orchestration and choreography in workflows?
Orchestration (this guide) has a central controller (orchestrator). Choreography has no central controller; agents communicate via events. This guide focuses on orchestration because it is easier to implement, monitor, and debug.
17. How do I clean up stale workflow state?
Set a TTL on workflow state entries (e.g., 7 days for completed, 1 hour for running – but be careful). Implement a background sweeper that aborts workflows that have been running longer than their timeout.
Internal Linking Recommendations
Deepen your understanding of agent workflows with these related implementation guides:
/guides/a2a/– A2A protocol fundamentals/guides/a2a/agent-communication/– Information exchange patterns/guides/a2a/agent-messaging/– Message structure and delivery/guides/a2a/agent-collaboration/– Single‑task collaboration between owner and worker/guides/agent-workflows/– More advanced workflow orchestration patterns/guides/agent-memory/– Long‑term shared memory across workflow steps/guides/agent-tools/– How agents expose capabilities for workflows
This article is part of the AgentDevPro Handbook – practical, engineering‑focused guides for building production AI agent systems.