Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
* A2A protocol client implementation
*/
public class A2AClient {

private final String baseUrl;
private final HttpClient httpClient;
private final ObjectMapper objectMapper;

/**
* Create a new A2A client
*
Expand All @@ -32,11 +32,11 @@ public class A2AClient {
public A2AClient(String baseUrl) {
this.baseUrl = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
.connectTimeout(Duration.ofSeconds(30))
.build();
this.objectMapper = new ObjectMapper();
}

/**
* Create a new A2A client with custom HTTP client
*
Expand All @@ -48,7 +48,7 @@ public A2AClient(String baseUrl, HttpClient httpClient) {
this.httpClient = httpClient;
this.objectMapper = new ObjectMapper();
}

/**
* Send a task message to the agent
*
Expand All @@ -58,15 +58,15 @@ public A2AClient(String baseUrl, HttpClient httpClient) {
*/
public JSONRPCResponse sendTask(TaskSendParams params) throws A2AClientException {
JSONRPCRequest request = new JSONRPCRequest(
generateRequestId(),
"2.0",
"message/send",
generateRequestId(),
"2.0",
"message/send",
params
);

return doRequest(request);
}

/**
* Get the status of a task
*
Expand All @@ -76,15 +76,15 @@ public JSONRPCResponse sendTask(TaskSendParams params) throws A2AClientException
*/
public JSONRPCResponse getTask(TaskQueryParams params) throws A2AClientException {
JSONRPCRequest request = new JSONRPCRequest(
generateRequestId(),
"2.0",
"tasks/get",
generateRequestId(),
"2.0",
"tasks/get",
params
);

return doRequest(request);
}

/**
* Cancel a task
*
Expand All @@ -94,15 +94,15 @@ public JSONRPCResponse getTask(TaskQueryParams params) throws A2AClientException
*/
public JSONRPCResponse cancelTask(TaskIDParams params) throws A2AClientException {
JSONRPCRequest request = new JSONRPCRequest(
generateRequestId(),
"2.0",
"tasks/cancel",
generateRequestId(),
"2.0",
"tasks/cancel",
params
);

return doRequest(request);
}

/**
* Send a task with streaming response
*
Expand All @@ -114,64 +114,72 @@ public CompletableFuture<Void> sendTaskStreaming(TaskSendParams params, Streamin
return CompletableFuture.runAsync(() -> {
try {
JSONRPCRequest request = new JSONRPCRequest(
generateRequestId(),
"2.0",
"message/send",
params
);
generateRequestId(),
"2.0",
"message/stream",
params
);

String requestBody = objectMapper.writeValueAsString(request);

HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/a2a/stream"))
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
.uri(URI.create(baseUrl + "/a2a/stream"))
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();

HttpResponse<String> response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation reads the entire response body into memory using HttpResponse.BodyHandlers.ofString() before processing. This can be inefficient for large streams and isn't a true streaming implementation. Consider using HttpResponse.BodyHandlers.ofLines() which provides a Stream<String> to process the response line-by-line as it arrives, without buffering the whole body in memory.


if (response.statusCode() != 200) {
listener.onError(new A2AClientException("HTTP " + response.statusCode() + ": " + response.body()));
return;
}

// Parse streaming response
String[] lines = response.body().split("\n");
for (String line : lines) {
if (line.trim().isEmpty()) continue;
try {
SendTaskStreamingResponse streamingResponse = objectMapper.readValue(line, SendTaskStreamingResponse.class);

if (streamingResponse.error() != null) {
A2AError error = streamingResponse.error();
Integer errorCode = error.code() != null ? error.code().getValue() : null;
listener.onError(new A2AClientException(
error.message(),
errorCode
));
return;
}

if (streamingResponse.result() != null) {
listener.onEvent(streamingResponse.result());
StringBuilder buf = new StringBuilder();

for (String raw : lines) {
String line = raw.trim();
if (line.isEmpty()) { // blank line = end of one SSE event
if (buf.length() > 0) {
String payload = buf.toString();
SendTaskStreamingResponse msg = objectMapper.readValue(payload,
SendTaskStreamingResponse.class);

if (msg.error() != null) {
Integer code = msg.error().code() != null ? msg.error().code().getValue() : null;
listener.onError(new A2AClientException(msg.error().message(), code));
return;
}
if (msg.result() != null)
listener.onEvent(msg.result());
Comment on lines +156 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low

For consistency and to prevent potential issues, it's a good practice in Java to always use braces {} with if statements, even for single-line blocks.

Suggested change
if (msg.result() != null)
listener.onEvent(msg.result());
if (msg.result() != null) {
listener.onEvent(msg.result());
}

buf.setLength(0);
}

} catch (Exception e) {
listener.onError(new A2AClientException("Failed to parse streaming response", e));
return;
continue;
}

if (line.startsWith("data:")) {
buf.append(line.substring(5).trim());
} // ignore event:, id:, retry:
}

// flush tail if no trailing blank line
if (buf.length() > 0) {
SendTaskStreamingResponse msg = objectMapper.readValue(buf.toString(),
SendTaskStreamingResponse.class);
if (msg.result() != null)
listener.onEvent(msg.result());
}
Comment on lines +169 to 174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This logic for handling the final part of the stream is missing error handling. If the last message from the server is an error, it will be ignored and onComplete() will be called, which is incorrect. You should add the same error handling logic that is present in the main loop (lines 151-155). Also, as a matter of style, it's good practice to use braces for the if statement on line 172.

                if (buf.length() > 0) {
                    SendTaskStreamingResponse msg = objectMapper.readValue(buf.toString(),
                            SendTaskStreamingResponse.class);
                    if (msg.error() != null) {
                        Integer code = msg.error().code() != null ? msg.error().code().getValue() : null;
                        listener.onError(new A2AClientException(msg.error().message(), code));
                        return;
                    }
                    if (msg.result() != null) {
                        listener.onEvent(msg.result());
                    }
                }


listener.onComplete();

} catch (Exception e) {
listener.onError(new A2AClientException("Streaming request failed", e));
}
});
}

/**
* Get agent card information
*
Expand All @@ -181,80 +189,80 @@ public CompletableFuture<Void> sendTaskStreaming(TaskSendParams params, Streamin
public AgentCard getAgentCard() throws A2AClientException {
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/.well-known/agent-card.json"))
.header("Accept", "application/json")
.GET()
.build();
.uri(URI.create(baseUrl + "/.well-known/agent-card.json"))
.header("Accept", "application/json")
.GET()
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

if (response.statusCode() != 200) {
throw new A2AClientException("HTTP " + response.statusCode() + ": " + response.body());
}

return objectMapper.readValue(response.body(), AgentCard.class);

} catch (IOException | InterruptedException e) {
throw new A2AClientException("Failed to get agent card", e);
}
}

/**
* Perform HTTP request and handle response
*/
private JSONRPCResponse doRequest(JSONRPCRequest request) throws A2AClientException {
try {
String requestBody = objectMapper.writeValueAsString(request);

HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/a2a"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
.uri(URI.create(baseUrl + "/a2a"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();

HttpResponse<String> response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());

if (response.statusCode() != 200) {
throw new A2AClientException("HTTP " + response.statusCode() + ": " + response.body());
}

// Parse the response
JsonNode responseNode = objectMapper.readTree(response.body());

// Extract basic fields
Object id = responseNode.has("id") ? responseNode.get("id").asText() : null;
String jsonrpc = responseNode.get("jsonrpc").asText();

// Handle error
JSONRPCError error = null;
if (responseNode.has("error") && !responseNode.get("error").isNull()) {
error = objectMapper.treeToValue(responseNode.get("error"), JSONRPCError.class);
}

// Handle result
Object result = null;
if (responseNode.has("result") && !responseNode.get("result").isNull()) {
result = objectMapper.treeToValue(responseNode.get("result"), Task.class);
}

JSONRPCResponse jsonrpcResponse = new JSONRPCResponse(id, jsonrpc, result, error);

// Check for A2A errors
if (error != null) {
throw new A2AClientException(error.message(), error.code());
}

return jsonrpcResponse;

} catch (IOException | InterruptedException e) {
throw new A2AClientException("Request failed", e);
}
}

/**
* Generate a unique request ID
*/
private String generateRequestId() {
return UUID.randomUUID().toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public SseEmitter handleStreamingTask(@RequestBody JSONRPCRequest request) {
// Process task asynchronously
CompletableFuture.runAsync(() -> {
try {
if (!"message/send".equals(request.method())) {
if (!"message/stream".equals(request.method())) {
sendErrorEvent(emitter, request.id(), ErrorCode.METHOD_NOT_FOUND, "Method not found");
return;
}
Expand Down