diff --git a/samples/java/custom_java_impl/client/src/main/java/com/google/a2a/client/A2AClient.java b/samples/java/custom_java_impl/client/src/main/java/com/google/a2a/client/A2AClient.java index 4fdb1c5a..690a286e 100644 --- a/samples/java/custom_java_impl/client/src/main/java/com/google/a2a/client/A2AClient.java +++ b/samples/java/custom_java_impl/client/src/main/java/com/google/a2a/client/A2AClient.java @@ -19,11 +19,11 @@ * A2A protocol client implementation */ public class A2AClient { - + private final String baseUrl; private final HttpClient httpClient; private final ObjectMapper objectMapper; - + /** * Create a new A2A client * @@ -32,11 +32,11 @@ public class A2AClient { public A2AClient(String baseUrl) { this.baseUrl = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; this.httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(30)) - .build(); + .connectTimeout(Duration.ofSeconds(30)) + .build(); this.objectMapper = new ObjectMapper(); } - + /** * Create a new A2A client with custom HTTP client * @@ -48,7 +48,7 @@ public A2AClient(String baseUrl, HttpClient httpClient) { this.httpClient = httpClient; this.objectMapper = new ObjectMapper(); } - + /** * Send a task message to the agent * @@ -58,15 +58,15 @@ public A2AClient(String baseUrl, HttpClient httpClient) { */ public JSONRPCResponse sendTask(TaskSendParams params) throws A2AClientException { JSONRPCRequest request = new JSONRPCRequest( - generateRequestId(), - "2.0", - "message/send", + generateRequestId(), + "2.0", + "message/send", params ); - + return doRequest(request); } - + /** * Get the status of a task * @@ -76,15 +76,15 @@ public JSONRPCResponse sendTask(TaskSendParams params) throws A2AClientException */ public JSONRPCResponse getTask(TaskQueryParams params) throws A2AClientException { JSONRPCRequest request = new JSONRPCRequest( - generateRequestId(), - "2.0", - "tasks/get", + generateRequestId(), + "2.0", + "tasks/get", params ); - + return doRequest(request); } - + /** * Cancel a task * @@ -94,15 +94,15 @@ public JSONRPCResponse getTask(TaskQueryParams params) throws A2AClientException */ public JSONRPCResponse cancelTask(TaskIDParams params) throws A2AClientException { JSONRPCRequest request = new JSONRPCRequest( - generateRequestId(), - "2.0", - "tasks/cancel", + generateRequestId(), + "2.0", + "tasks/cancel", params ); - + return doRequest(request); } - + /** * Send a task with streaming response * @@ -114,64 +114,72 @@ public CompletableFuture sendTaskStreaming(TaskSendParams params, Streamin return CompletableFuture.runAsync(() -> { try { JSONRPCRequest request = new JSONRPCRequest( - generateRequestId(), - "2.0", - "message/send", - params - ); - + generateRequestId(), + "2.0", + "message/stream", + params + ); + String requestBody = objectMapper.writeValueAsString(request); - + HttpRequest httpRequest = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/a2a/stream")) - .header("Content-Type", "application/json") - .header("Accept", "text/event-stream") - .POST(HttpRequest.BodyPublishers.ofString(requestBody)) - .build(); - + .uri(URI.create(baseUrl + "/a2a/stream")) + .header("Content-Type", "application/json") + .header("Accept", "text/event-stream") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)) + .build(); + HttpResponse response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString()); - + if (response.statusCode() != 200) { listener.onError(new A2AClientException("HTTP " + response.statusCode() + ": " + response.body())); return; } - + // Parse streaming response String[] lines = response.body().split("\n"); - for (String line : lines) { - if (line.trim().isEmpty()) continue; - - try { - SendTaskStreamingResponse streamingResponse = objectMapper.readValue(line, SendTaskStreamingResponse.class); - - if (streamingResponse.error() != null) { - A2AError error = streamingResponse.error(); - Integer errorCode = error.code() != null ? error.code().getValue() : null; - listener.onError(new A2AClientException( - error.message(), - errorCode - )); - return; - } - - if (streamingResponse.result() != null) { - listener.onEvent(streamingResponse.result()); + StringBuilder buf = new StringBuilder(); + + for (String raw : lines) { + String line = raw.trim(); + if (line.isEmpty()) { // blank line = end of one SSE event + if (buf.length() > 0) { + String payload = buf.toString(); + SendTaskStreamingResponse msg = objectMapper.readValue(payload, + SendTaskStreamingResponse.class); + + if (msg.error() != null) { + Integer code = msg.error().code() != null ? msg.error().code().getValue() : null; + listener.onError(new A2AClientException(msg.error().message(), code)); + return; + } + if (msg.result() != null) + listener.onEvent(msg.result()); + buf.setLength(0); } - - } catch (Exception e) { - listener.onError(new A2AClientException("Failed to parse streaming response", e)); - return; + continue; } + + if (line.startsWith("data:")) { + buf.append(line.substring(5).trim()); + } // ignore event:, id:, retry: + } + + // flush tail if no trailing blank line + if (buf.length() > 0) { + SendTaskStreamingResponse msg = objectMapper.readValue(buf.toString(), + SendTaskStreamingResponse.class); + if (msg.result() != null) + listener.onEvent(msg.result()); } - listener.onComplete(); - + } catch (Exception e) { listener.onError(new A2AClientException("Streaming request failed", e)); } }); } - + /** * Get agent card information * @@ -181,80 +189,80 @@ public CompletableFuture sendTaskStreaming(TaskSendParams params, Streamin public AgentCard getAgentCard() throws A2AClientException { try { HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/.well-known/agent-card.json")) - .header("Accept", "application/json") - .GET() - .build(); - + .uri(URI.create(baseUrl + "/.well-known/agent-card.json")) + .header("Accept", "application/json") + .GET() + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); - + if (response.statusCode() != 200) { throw new A2AClientException("HTTP " + response.statusCode() + ": " + response.body()); } - + return objectMapper.readValue(response.body(), AgentCard.class); - + } catch (IOException | InterruptedException e) { throw new A2AClientException("Failed to get agent card", e); } } - + /** * Perform HTTP request and handle response */ private JSONRPCResponse doRequest(JSONRPCRequest request) throws A2AClientException { try { String requestBody = objectMapper.writeValueAsString(request); - + HttpRequest httpRequest = HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/a2a")) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(requestBody)) - .build(); - + .uri(URI.create(baseUrl + "/a2a")) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(requestBody)) + .build(); + HttpResponse response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString()); - + if (response.statusCode() != 200) { throw new A2AClientException("HTTP " + response.statusCode() + ": " + response.body()); } - + // Parse the response JsonNode responseNode = objectMapper.readTree(response.body()); - + // Extract basic fields Object id = responseNode.has("id") ? responseNode.get("id").asText() : null; String jsonrpc = responseNode.get("jsonrpc").asText(); - + // Handle error JSONRPCError error = null; if (responseNode.has("error") && !responseNode.get("error").isNull()) { error = objectMapper.treeToValue(responseNode.get("error"), JSONRPCError.class); } - + // Handle result Object result = null; if (responseNode.has("result") && !responseNode.get("result").isNull()) { result = objectMapper.treeToValue(responseNode.get("result"), Task.class); } - + JSONRPCResponse jsonrpcResponse = new JSONRPCResponse(id, jsonrpc, result, error); - + // Check for A2A errors if (error != null) { throw new A2AClientException(error.message(), error.code()); } - + return jsonrpcResponse; - + } catch (IOException | InterruptedException e) { throw new A2AClientException("Request failed", e); } } - + /** * Generate a unique request ID */ private String generateRequestId() { return UUID.randomUUID().toString(); } -} \ No newline at end of file +} \ No newline at end of file diff --git a/samples/java/custom_java_impl/server/src/main/java/com/google/a2a/server/A2AController.java b/samples/java/custom_java_impl/server/src/main/java/com/google/a2a/server/A2AController.java index 45c6c7a2..7421b2f4 100644 --- a/samples/java/custom_java_impl/server/src/main/java/com/google/a2a/server/A2AController.java +++ b/samples/java/custom_java_impl/server/src/main/java/com/google/a2a/server/A2AController.java @@ -101,7 +101,7 @@ public SseEmitter handleStreamingTask(@RequestBody JSONRPCRequest request) { // Process task asynchronously CompletableFuture.runAsync(() -> { try { - if (!"message/send".equals(request.method())) { + if (!"message/stream".equals(request.method())) { sendErrorEvent(emitter, request.id(), ErrorCode.METHOD_NOT_FOUND, "Method not found"); return; }