Skip to main content

A2A Communication: A Practical Implementation Guide for A2A Protocols

Effective agent communication is the backbone of any production-ready multi-agent system. This guide focuses exclusively on the how—implementing reliable, secure, and observable message exchange between AI agents using Agent-to-Agent (A2A) protocols. You won’t find distributed systems theory or high-level architectural patterns here. Instead, you’ll get code-ready payload designs, validation strategies, error handling patterns, and operational best practices that you can apply immediately.

What Is Agent Communication

Agent communication is the structured exchange of information, requests, and responses between two or more autonomous AI agents. Unlike simple API calls, agent communication carries semantic meaning—context, goals, partial results, and coordination directives—that allows each agent to act intelligently on the received data.

In A2A (Agent-to-Agent) communication, each agent plays a well-defined role in a conversation:

  • Sender Agent – initiates the exchange by composing and transmitting a message
  • Receiver Agent – accepts, validates, processes, and responds to the message

The communication happens over a defined A2A communication channel using a mutually understood agent communication protocol that governs message structure, delivery guarantees, and error handling.

Why Agent Communication Matters

Without robust agent communication, agents become isolated silos. Here’s why implementing proper A2A communication is non‑negotiable in production systems.

Use CaseWhy Communication Matters
Task coordinationAgent A cannot delegate subtasks to Agent B without a reliable request–response cycle.
Information sharingIntermediate results (e.g., a retrieved document, a calculated vector) must move between agents with full context.
Tool result exchangeOne agent might own a tool (e.g., a database query engine). Others must send requests and receive structured results.
Workflow executionMulti‑step workflows require agents to signal completion, failures, or the need for human‑in‑the‑loop.

Practical example: A Research Agent needs to fetch customer data from a Data Agent. Without proper agent message exchange, the Research Agent cannot:

  • Verify that the Data Agent received the request
  • Handle temporary unavailability of the Data Agent
  • Correlate the response with the original request (important when many requests are in flight)

Implementing A2A communication solves all of the above.

Communication Flow Overview

At its simplest, agent-to-agent communication follows a synchronous request‑response pattern:

Even in asynchronous setups, the same logical flow exists—only the delivery mechanism changes (queue, event bus, callback).

Communication diagram:

┌─────────────┐ Request ┌─────────────┐
│ Agent A │ ──────────────────►│ Agent B │
│ (Sender) │ │ (Receiver) │
│ │ Response │ │
│ │ ◄────────────────── │ │
└─────────────┘ └─────────────┘

The channel can be HTTP, gRPC, message broker (RabbitMQ, Kafka), or even a file‑based dropbox. The protocol defines what travels over that channel, not the channel itself.

Core Communication Components

Every agent communication implementation must define five core components.

ComponentResponsibilityImplementation Example
Sender AgentComposes messages, handles timeouts, applies retriesPython class with send_request() method
Receiver AgentListens for messages, validates, routes to handlersFastAPI endpoint / message consumer
Message PayloadCarries the actual data + metadataJSON object with type, data, context
Communication ChannelTransfers bytes between agentsHTTP POST, Redis pub/sub, NATS
Response HandlerProcesses async/sync replies, maps to original requestCorrelator map (messageId → future/promise)

Sender Agent (pseudo‑code):

class AgentClient:
def __init__(self, channel: Channel, timeout_seconds: int = 30):
self.channel = channel
self.timeout = timeout_seconds

async def send_request(self, target_agent_id: str, payload: dict) -> Response:
msg = MessageBuilder().with_sender(self.id).with_receiver(target_agent_id).with_payload(payload).build()
return await self.channel.request(msg, timeout=self.timeout)

Receiver Agent (pseudo‑code):

class AgentServer:
def __init__(self, message_handlers: dict):
self.handlers = message_handlers # message_type -> callable

async def on_message(self, raw_msg: dict):
msg = Message.parse(raw_msg)
if not msg.is_valid():
return self._error_response("invalid_message")
handler = self.handlers.get(msg.type)
if not handler:
return self._error_response("unsupported_type")
result = await handler(msg.payload, msg.context)
return self._success_response(result)

Communication Lifecycle

The complete agent communication lifecycle consists of six stages. Every production implementation should account for each.

Stage details:

  1. Message Creation – Sender constructs a message with unique ID, timestamp, type, payload, and context.
  2. Validation – Sender validates the message against a schema before transmission (fail fast).
  3. Transmission – Message is serialized (JSON, Protobuf, etc.) and sent over the channel.
  4. Processing – Receiver deserializes, validates again (never trust the sender), and routes to the appropriate business logic.
  5. Response Generation – Handler produces a result or error, wrapped in a standard response message.
  6. Response Handling – Sender’s response handler correlates the reply to the original request and unblocks the waiting coroutine/callback.

Agent Message Structure

A well‑designed agent message exchange uses a consistent, self‑describing structure. Below is the canonical JSON format used in most A2A implementations.

{
"version": "1.0",
"messageId": "a1b2c3d4-5678-90ab-cdef-1234567890ab",
"timestamp": "2025-03-15T10:30:00.123Z",
"sender": "agent/research/v1",
"receiver": "agent/data/v1",
"type": "request",
"context": {
"conversationId": "conv_xyz_123",
"taskId": "task_retrieve_customers",
"replyTo": "agent/research/v1/callbacks",
"ttl": 30000
},
"payload": {
"operation": "query",
"parameters": {
"table": "customers",
"filter": { "status": "active" }
}
},
"signature": "sha256$...",
"metadata": {
"priority": 5,
"retryCount": 0
}
}
FieldRequiredDescription
messageIdYesUnique UUIDv7 (sortable, time‑ordered) for correlation
timestampYesISO 8601 UTC – helps with ordering and debugging
sender / receiverYesLogical agent names or full addresses
typeYesrequest, response, event, notification
contextRecommendedConversation state, task ID, TTL, callback address
payloadYesThe actual data (operation, parameters, result)
signatureFor securityHMAC or digital signature for integrity
metadataOptionalRetry count, priority, trace ID

Keep message size under 256 KB for most channels. For larger data, use references: {"type": "reference", "uri": "s3://bucket/result.parquet"}.

Communication Types

A2A communication distinguishes between four primary message types. Each serves a different role in agent conversation.

Request Messages

A directive that expects a paired response. Requests are idempotent where possible.

{
"type": "request",
"payload": {
"operation": "vector_search",
"parameters": { "embedding": [0.1, 0.2, ...], "topK": 10 }
}
}

Response Messages

Direct reply to a request. Contains either result or error.

{
"type": "response",
"payload": {
"result": { "documents": [...] },
"error": null
}
}

Event Messages

One‑way notifications that do not expect a response. Used for state changes, progress updates, or log entries.

{
"type": "event",
"payload": {
"eventType": "processing_progress",
"data": { "step": 3, "total": 10, "percent": 30 }
}
}

Notification Messages

Specialised events that require acknowledgements at the transport level (e.g., message received) but no business response.

{
"type": "notification",
"payload": {
"severity": "info",
"message": "Data Agent completed cache warm-up"
}
}

Synchronous Communication

Synchronous agent communication is the simplest to implement. Agent A sends a request and blocks (or awaits) until Agent B returns a response.

Request‑Response Flow

# Synchronous (blocking) example with timeout
async def sync_agent_call():
request = create_request()
try:
response = await channel.request(request, timeout=30.0)
return response.payload["result"]
except TimeoutError:
# Handle timeout – see retry strategies below
log.error("Data Agent did not respond within 30s")
raise

Timeout Handling

Always set a timeout per request. Never rely on indefinite waits.

Timeout TypeValue (example)Action
Connection timeout5 secondsFail fast if agent unreachable
Request timeout30 secondsCancel request, attempt retry or fallback
End‑to‑end deadline60 secondsOverall workflow deadline, propagates via context

Retry Strategies

Not every failure should be retried. Use this decision table:

Failure TypeRetry?Strategy
Network error / timeoutYesExponential backoff (1s, 2s, 4s, max 3 retries)
5xx (server error)YesSame as above
4xx (client error, e.g., invalid payload)NoFix and resend manually
Rate limiting (429)YesWait Retry-After header + jitter

Implementation snippet (exponential backoff):

async def send_with_retry(request, max_retries=3):
for attempt in range(max_retries):
try:
return await channel.request(request, timeout=30)
except (NetworkError, TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
raise RuntimeError("Unreachable")

Asynchronous Communication

Asynchronous agent message exchange decouples sender and receiver in time. It’s essential for long‑running operations or when the receiver may be temporarily offline.

Event‑Driven Communication

Agents publish events to a topic or stream; other agents subscribe and react.

# Publisher (Agent A)
await event_bus.publish("data.updated", {"record_id": 123, "new_value": "xyz"})

# Subscriber (Agent B)
@event_bus.subscribe("data.updated")
async def handle_data_update(event):
record_id = event.payload["record_id"]
await process_update(record_id)

Channel options: Redis pub/sub, Kafka, NATS JetStream, Amazon SNS.

Queue‑Based Communication

Use a message queue (RabbitMQ, SQS) for point‑to‑point asynchronous requests. Each message is processed exactly once (at‑least‑once semantics require idempotent handlers).

# Sender enqueues a request
await queue.send({
"messageId": uuid7(),
"replyTo": "agent/research/responses", # queue name for replies
"payload": {"operation": "train_model", "data_uri": "s3://..."}
})

# Receiver processes and sends response to replyTo queue
async def worker():
async for msg in queue.consume("data.requests"):
result = await process(msg.payload)
await queue.send(msg.replyTo, {"messageId": msg.messageId, "result": result})

Callback Handling

In asynchronous patterns, the sender must correlate responses with original requests. Maintain an in‑memory or Redis‑backed correlator.

class AsyncCorrelator:
def __init__(self):
self._futures = {} # messageId -> asyncio.Future

def register(self, message_id: str) -> asyncio.Future:
fut = asyncio.get_event_loop().create_future()
self._futures[message_id] = fut
return fut

def resolve(self, message_id: str, result):
if fut := self._futures.pop(message_id, None):
fut.set_result(result)

Context Sharing

Robust AI agent communication requires carrying context across message boundaries. Context answers: “Why is this message being sent?” and “What has already happened?”

Conversation Context

Maintains a shared session across multiple exchanges.

{
"context": {
"conversationId": "conv_20250315_abc",
"turn": 3,
"previousMessages": ["msg_001", "msg_002"]
}
}

Task Context

Carries workflow‑specific data such as a task ID, parent task, and partial results.

{
"context": {
"taskId": "task_xyz",
"workflowType": "customer_onboarding",
"artifacts": {
"extracted_name": "John Doe",
"credit_score": 720
}
}
}

Workflow Context

Used for long‑running processes. Pass a serializable dictionary that each agent can extend.

Implementation example:

def propagate_context(parent_context: dict, agent_specific_data: dict) -> dict:
new_context = parent_context.copy()
new_context.setdefault("history", []).append({
"agent": current_agent_id,
"timestamp": utc_now(),
"data": agent_specific_data
})
return new_context

Limit context size to < 10 KB. Store large context in object storage and pass a reference.

Communication Validation

Every incoming message must be validated. Never trust the sender, even if both agents are under your control.

Schema Validation

Use JSON Schema (or Protobuf) to define the expected structure of every message type.

{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["messageId", "sender", "receiver", "type", "payload"],
"properties": {
"messageId": {"type": "string", "format": "uuid"},
"type": {"enum": ["request", "response", "event", "notification"]},
"payload": {"type": "object"}
}
}

Python validation (using jsonschema):

from jsonschema import validate, ValidationError

def validate_message(raw: dict) -> Message:
try:
validate(instance=raw, schema=MESSAGE_SCHEMA)
except ValidationError as e:
raise InvalidMessageError(f"Schema violation: {e.message}")
return Message(raw)

Payload Validation

Beyond the envelope, validate the specific payload based on type and operation.

def validate_payload(msg_type: str, payload: dict):
if msg_type == "request":
operation = payload.get("operation")
if operation not in ["query", "update", "delete"]:
raise InvalidPayloadError(f"Unknown operation: {operation}")
# Additional field‑level validation

Context Validation

Ensure that context.ttl (time‑to‑live) is not expired, and that required correlation IDs are present.

def validate_context(context: dict):
if "ttl" in context:
expiry = datetime.fromisoformat(context["ttl"])
if datetime.utcnow() > expiry:
raise ContextExpiredError("Message TTL exceeded")

Error Handling

Errors are inevitable. Define a standard error envelope so senders can react predictably.

Error Response Structure

{
"type": "response",
"payload": {
"result": null,
"error": {
"code": "PAYLOAD_VALIDATION_FAILED",
"message": "Missing required field 'operation'",
"details": {
"field": "payload.operation",
"provided": null
},
"retryable": false,
"timestamp": "2025-03-15T10:31:00Z"
}
}
}

Common Error Codes

CodeMeaningRetryable?
INVALID_MESSAGE_SCHEMAMessage does not conform to base schemaNo
UNSUPPORTED_MESSAGE_TYPEtype not recognisedNo
PAYLOAD_VALIDATION_FAILEDPayload missing fields or invalid valuesNo
CONTEXT_EXPIREDTTL exceededNo
PROCESSING_TIMEOUTInternal handler timed outYes
AGENT_UNAVAILABLEReceiver agent down or overloadedYes (with backoff)
RATE_LIMITEDSender hit rate limitYes (after delay)

Handling in Sender

async def handle_response(response: dict):
error = response["payload"].get("error")
if error:
if error.get("retryable"):
await schedule_retry(response["messageId"])
else:
log.critical("Non‑retryable error", extra=error)
raise AgentCommunicationError(error["code"], error["message"])
return response["payload"]["result"]

Communication Security

Secure agent communication is mandatory when agents cross trust boundaries (e.g., different microservices, external agents).

Security Checklist

  • Authentication – Each message must prove the sender’s identity.
    • Option A: Mutual TLS (mTLS) between agents
    • Option B: Bearer tokens (JWT) with short expiration
    • Option C: Pre‑shared symmetric keys (for internal, high‑performance scenarios)
  • Authorization – Verify that the sender is allowed to perform the requested operation.
    • Implement policy checks (e.g., OPA, custom RBAC) before processing the payload.
  • Message Integrity – Prevent tampering in transit.
    • Use TLS 1.3 for the channel.
    • Optionally add an HMAC in the signature field.
  • Confidentiality – Encrypt sensitive payload fields.
    • Use envelope encryption: generate a per‑message AES key, encrypt the payload, and encrypt the AES key with the receiver’s public key.
  • Sensitive Data Handling – Never log full messages containing PII, credentials, or API keys.
    • Redact fields like password, authorization, credit_card before logging.

Example of adding an HMAC signature:

import hmac, hashlib

def sign_message(message: dict, secret_key: bytes) -> str:
# Sort keys for deterministic serialisation
canonical = json.dumps(message, sort_keys=True, separators=(',', ':'))
signature = hmac.new(secret_key, canonical.encode(), hashlib.sha256).hexdigest()
return f"sha256${signature}"

Monitoring Agent Communication

You cannot improve what you do not measure. Every production agent communication layer must expose the following metrics.

MetricTypeAggregationAlert Threshold
agent_messages_sent_totalCounterSum over time
agent_messages_received_totalCounterSum over time
agent_message_latency_secondsHistogramp50, p95, p99p99 > 5s
agent_message_success_rateGauge (ratio)Rolling 5 min< 99%
agent_message_errors_totalCounterLabel by error codeAny spike
agent_retries_totalCounterLabel by reasonRetries > 10% of total

Instrumentation Example (Prometheus)

from prometheus_client import Counter, Histogram

messages_sent = Counter('agent_messages_sent_total', 'Messages sent', ['type'])
latency = Histogram('agent_message_latency_seconds', 'End‑to‑end latency', ['sender', 'receiver'])

async def send_with_monitoring(msg):
start = time.monotonic()
messages_sent.labels(type=msg.type).inc()
try:
response = await channel.send(msg)
latency.labels(sender=msg.sender, receiver=msg.receiver).observe(time.monotonic() - start)
return response
except Exception:
errors.labels(error_code='send_failed').inc()
raise

Logging Best Practices

  • Structured logs (JSON) with messageId, sender, receiver, duration_ms, status
  • Correlation: inject trace ID from context into all logs
  • Sampling: for high‑volume systems, log only errors and 1% of successful messages

Communication Testing

Test agent communication at three levels.

Unit Testing

Mock the channel and validate that messages are built correctly and timeouts trigger.

async def test_sender_timeout():
mock_channel = AsyncMock()
mock_channel.request.side_effect = TimeoutError()
agent = AgentClient(mock_channel, timeout=1)

with pytest.raises(TimeoutError):
await agent.send_request("data_agent", {"op": "ping"})

mock_channel.request.assert_called_once()

Integration Testing

Spin up real agent instances (in test containers) and send a known message.

async def test_request_response_integration():
# Start receiver agent in background
receiver = DataAgent()
await receiver.start(port=8888)

client = AgentClient(Channel.http("http://localhost:8888"))
response = await client.send_request("data_agent", {"operation": "status"})

assert response.payload["result"]["status"] == "ready"
await receiver.stop()

End‑to-End Testing

Test the entire workflow with real dependencies (database, queues) but in a disposable environment.

# e2e test using testcontainers
def test_research_to_data_flow():
with DockerContainer("data-agent:latest") as data_agent:
with DockerContainer("research-agent:latest") as research:
research_client = AgentClient.for_container(research)
result = research_client.send_request("data_agent", query="SELECT count(*) FROM users")
assert result.payload["result"]["count"] > 0

Communication Best Practices

Adopt these 10+ implementation guidelines for production‑grade A2A communication.

  1. Keep messages small – Under 256 KB. Use references (S3, GCS) for blobs.
  2. Use structured payloads – JSON Schema or Protobuf. Avoid ad‑hoc stringly‑typed data.
  3. Validate every message – At both sender (fail fast) and receiver (defence in depth).
  4. Always include a messageId – UUIDv7 enables correlation and ordering.
  5. Set explicit timeouts – No unbounded waits. Propagate deadlines via context.
  6. Implement idempotency – Store processed messageIds (at least for a retention window) to safely retry.
  7. Monitor communication failures – Expose metrics for latency, errors, and retries.
  8. Use exponential backoff with jitter – Prevent thundering herds on retry storms.
  9. Design for backward compatibility – Use version field; never remove required fields.
  10. Log responsibly – Redact secrets and PII; include messageId and traceId.
  11. Separate control from data – Large data goes to object storage, only metadata in messages.
  12. Support asynchronous replies – Even for “synchronous” APIs, use a callback queue for long‑running ops.

Common Communication Mistakes

Avoid these frequently seen anti‑patterns in agent communication implementations.

MistakeWhy It’s HarmfulSolution
Missing validationMalformed messages crash receivers or cause silent data corruption.Always validate against a schema before processing.
Large payloads (>1 MB)Clogs network, increases latency, and may exceed broker limits.Use chunking or reference‑based payloads.
No retry mechanismTemporary network glitches or agent restarts cause permanent failures.Implement exponential backoff with max retries.
No monitoringYou cannot debug failures or prove SLAs.Export metrics and logs as described above.
Unstructured messagesEvery sender invents its own format; receivers become brittle.Adopt a shared message schema (e.g., the JSON structure in this article).
Synchronous blocking without timeoutA stuck receiver blocks the sender indefinitely.Always use asyncio.wait_for or equivalent.
Ignoring message orderWhen using queues, out‑of‑order delivery can break stateful workflows.Use sequence numbers or idempotent handlers.

Case Study: Research Agent ↔ Data Agent Communication

Scenario: A Research Agent needs to analyse customer churn. It queries a Data Agent for the last 90 days of transaction data. The Data Agent returns a large result (500 MB Parquet file). The system must handle timeouts, partial failures, and monitoring.

Step 1 – Request (Research Agent)

{
"messageId": "0194f0a2-9e8c-7a3b-8b2a-1c3d5e7f9a2b",
"timestamp": "2025-03-15T10:00:00Z",
"sender": "agent/research/alpha",
"receiver": "agent/data/v1",
"type": "request",
"context": {
"taskId": "churn_analysis_2025q1",
"conversationId": "conv_churn_001",
"replyTo": "queue://research/responses",
"ttl": "2025-03-15T10:10:00Z"
},
"payload": {
"operation": "export",
"parameters": {
"table": "transactions",
"date_range": ["2025-01-01", "2025-03-15"],
"format": "parquet",
"destination": "s3://research-bucket/intermediate/"
}
}
}

Step 2 – Processing (Data Agent)

  • Validates message schema and payload (allowed operations: query, export, stats).
  • Checks authorization: Research Agent has read permission on transactions table.
  • Initiates async export to S3.
  • Immediately returns an accepted response with a jobId to the replyTo queue.
{
"type": "response",
"payload": {
"result": {
"status": "accepted",
"jobId": "job_export_789",
"estimatedCompletion": "2025-03-15T10:05:00Z"
},
"error": null
}
}

Step 3 – Polling / Callback

Research Agent listens on its response queue. After 3 minutes, it receives:

{
"messageId": "0194f0a2-9e8c-7a3b-8b2a-1c3d5e7f9a2c",
"type": "response",
"payload": {
"result": {
"status": "completed",
"data_uri": "s3://research-bucket/intermediate/transactions_2025q1.parquet",
"row_count": 15423000,
"size_bytes": 524288000
}
}
}

Step 4 – Error Handling (Simulated Failure)

Imagine the Data Agent’s database is temporarily overloaded. The Research Agent receives:

{
"payload": {
"error": {
"code": "TEMPORARY_DATABASE_TIMEOUT",
"message": "Query timed out after 30 seconds",
"retryable": true
}
}
}

Research Agent retries with exponential backoff (1s, 2s, 4s). After 3 retries, it escalates to a dead‑letter queue and alerts an operator.

Step 5 – Monitoring

Prometheus metrics from this interaction:

agent_messages_sent_total{type="request"} 1
agent_messages_received_total{type="response"} 1
agent_message_latency_seconds{sender="research", receiver="data"} 180.2
agent_message_success_rate 1.0

If the retry scenario occurred, agent_retries_total{reason="TEMPORARY_DATABASE_TIMEOUT"} increments.

FAQ

1. What is agent communication?
Agent communication is the structured exchange of requests, responses, events, and notifications between autonomous AI agents using a defined protocol (A2A). It includes message formatting, validation, delivery, and error handling.

2. How should messages be structured for A2A communication?
Use a consistent JSON envelope containing messageId, timestamp, sender, receiver, type, context, and payload. Provide a JSON Schema for validation. For high‑performance scenarios, use Protobuf.

3. What’s the difference between synchronous and asynchronous agent communication?
Synchronous: sender waits for a direct response (timeout limited). Asynchronous: sender continues after sending; response arrives later via callback, queue, or event bus.

4. How do agents share context across multiple messages?
Include a context object with conversationId, taskId, and accumulated data. For long workflows, store context externally and pass a reference (contextUri).

5. How should communication failures be handled?
Categorise errors as retryable (network, timeout, 5xx) or non‑retryable (schema violation, authorisation). Use exponential backoff with jitter for retries. Log all failures with messageId.

6. What’s the recommended message size limit?
Keep messages under 256 KB. For larger data, send a reference (URI to blob storage) and have the receiver fetch it.

7. How do I ensure message ordering?
Use a single message queue with a single consumer per agent (or partition key). For unordered protocols, embed a sequence number and handle reordering on the receiver side.

8. Do I need to implement idempotency?
Yes, if you use at‑least‑once delivery (most queues). Store processed messageIds in a Redis set with TTL = maximum retry window.

9. How can I test agent communication locally?
Use test containers to spin up real agent instances, or mock the channel with an in‑memory queue. Write unit tests for message construction and integration tests for the full round trip.

10. What security is required for agent communication?
At minimum: mTLS or JWTs for authentication, TLS 1.3 for encryption in transit, and HMAC or digital signatures for integrity. Authorise every request based on sender identity.

11. How do I monitor agent communication in production?
Export metrics (count, latency, success rate, errors) to Prometheus, and structured logs to ELK or Loki. Create alerts for high error rates (>1%) or high p99 latency (>5s).

12. Can agents communicate across different programming languages?
Yes. A2A protocols are language‑agnostic. Use JSON + HTTP/gRPC or a message broker. The message schema acts as the contract.

13. What’s the role of a message broker in agent communication?
Brokers (RabbitMQ, Kafka, NATS) provide durability, exactly‑once/at‑least‑once semantics, and decouple senders from receivers in time and space.

14. How do I handle versioning of agent communication schemas?
Include a version field. Never remove fields – only add optional ones. Use separate message types for breaking changes (e.g., export_v2 operation).

15. What is the best way to correlate responses with requests in async patterns?
Generate a UUIDv7 as messageId. The receiver copies this ID into the response’s correlationId field (or reuses messageId). The sender maintains a dictionary mapping messageId to a future or callback.

Internal Linking Recommendations

Continue your learning with these related implementation guides from the AgentDevPro Handbook:


This article is part of the AgentDevPro Handbook – practical, engineering‑focused guides for building production AI agent systems.