From 4f00bb4af330930018695ed8fe4eda1f60801cf9 Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Fri, 12 Sep 2025 14:53:21 +0200 Subject: [PATCH 1/3] feat(tests): enhance load tests to support gRPC protocol and refactor database connection handling --- e2e-perf/pom.xml | 12 + ...SingleLocalhostServerSimpleLoadTestIT.java | 12 +- .../performance/SingleServerLoadTestIT.java | 24 +- .../SingleServerSimpleLoadTestIT.java | 24 +- .../test/support/ContainersTestTemplate.java | 25 +- .../test/support/DatabaseWrapper.java | 53 ++- .../arcadedb/test/support/ServerWrapper.java | 14 + .../grpc/BatchedStreamingResultSet.java | 58 +++ .../com/arcadedb/remote/grpc/QueryBatch.java | 50 +++ .../remote/grpc/RemoteGrpcConfig.java | 36 +- .../remote/grpc/RemoteGrpcDatabase.java | 379 +++--------------- .../remote/grpc/StreamingResultSet.java | 142 +++++++ .../com/arcadedb/remote/grpc/TxDebug.java | 17 + .../remote/grpc/ArcadeDbHTTPTvsGRPCBench.java | 6 +- 14 files changed, 430 insertions(+), 422 deletions(-) create mode 100644 e2e-perf/src/test/java/com/arcadedb/test/support/ServerWrapper.java create mode 100644 grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java create mode 100644 grpc-client/src/main/java/com/arcadedb/remote/grpc/QueryBatch.java create mode 100644 grpc-client/src/main/java/com/arcadedb/remote/grpc/StreamingResultSet.java create mode 100644 grpc-client/src/main/java/com/arcadedb/remote/grpc/TxDebug.java diff --git a/e2e-perf/pom.xml b/e2e-perf/pom.xml index 458b232856..f66d183c87 100644 --- a/e2e-perf/pom.xml +++ b/e2e-perf/pom.xml @@ -66,12 +66,24 @@ ${project.parent.version} test + + com.arcadedb + arcadedb-grpc-client + ${project.parent.version} + test + org.junit.jupiter junit-jupiter ${junit.jupiter.version} test + + org.junit.jupiter + junit-jupiter-params + ${junit.jupiter.version} + test + org.testcontainers testcontainers diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleLocalhostServerSimpleLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleLocalhostServerSimpleLoadTestIT.java index 7bb22d56cc..885756ea62 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleLocalhostServerSimpleLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleLocalhostServerSimpleLoadTestIT.java @@ -1,6 +1,7 @@ package com.arcadedb.test.performance; import com.arcadedb.test.support.DatabaseWrapper; +import com.arcadedb.test.support.ServerWrapper; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.logging.LoggingMeterRegistry; import io.micrometer.core.instrument.logging.LoggingRegistryConfig; @@ -84,9 +85,8 @@ public void tearDown() { @DisplayName("Single server load test") void singleServerLoadTest() throws InterruptedException, IOException { - String host = "localhost"; // Assuming localhost for the database connection - int port = 2480; // Default ArcadeDB port - DatabaseWrapper db = new DatabaseWrapper(host, port, idSupplier); + ServerWrapper server = new ServerWrapper("localhost", 2480, 50051); + DatabaseWrapper db = new DatabaseWrapper(server, idSupplier); db.createDatabase(); db.createSchema(); @@ -111,7 +111,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { for (int i = 0; i < numOfThreads; i++) { // Each thread will create users and photos executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier); db1.addUserAndPhotos(numOfUsers, numOfPhotos); db1.close(); }); @@ -120,7 +120,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { if (numOfFriendship > 0) { // Each thread will create friendships executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier); db1.createFriendships(numOfFriendship); db1.close(); }); @@ -129,7 +129,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { if (numOfLike > 0) { // Each thread will create friendships executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier); ; db1.createLike(numOfLike); db1.close(); diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java index cd078f1ef5..f6e305af20 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java @@ -2,10 +2,11 @@ import com.arcadedb.test.support.ContainersTestTemplate; import com.arcadedb.test.support.DatabaseWrapper; +import com.arcadedb.test.support.ServerWrapper; import io.micrometer.core.instrument.Metrics; import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.time.Duration; @@ -17,15 +18,14 @@ public class SingleServerLoadTestIT extends ContainersTestTemplate { - @Test @DisplayName("Single server load test") - void singleServerLoadTest() throws InterruptedException, IOException { + @ParameterizedTest + @EnumSource(DatabaseWrapper.Protocol.class) + void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws InterruptedException, IOException { - GenericContainer arcadeContainer = createArcadeContainer("arcade", "none", "none", "any", false, network); - startContainers(); - String host = arcadeContainer.getHost(); - int port = arcadeContainer.getMappedPort(2480); - DatabaseWrapper db = new DatabaseWrapper(host, port, idSupplier); + createArcadeContainer("arcade", "none", "none", "any", false, network); + ServerWrapper server = startContainers().getFirst(); + DatabaseWrapper db = new DatabaseWrapper(server, idSupplier, protocol); db.createDatabase(); db.createSchema(); @@ -50,7 +50,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { for (int i = 0; i < numOfThreads; i++) { // Each thread will create users and photos executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier, protocol); db1.addUserAndPhotos(numOfUsers, numOfPhotos); db1.close(); }); @@ -59,7 +59,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { if (numOfFriendship > 0) { // Each thread will create friendships executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier, protocol); db1.createFriendships(numOfFriendship); db1.close(); }); @@ -68,7 +68,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { if (numOfLike > 0) { // Each thread will create friendships executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier, protocol); ; db1.createLike(numOfLike); db1.close(); diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java index a02046cd01..78da846ca7 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java @@ -2,27 +2,31 @@ import com.arcadedb.test.support.ContainersTestTemplate; import com.arcadedb.test.support.DatabaseWrapper; +import com.arcadedb.test.support.ServerWrapper; import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class SingleServerSimpleLoadTestIT extends ContainersTestTemplate { - @Test @DisplayName("Single server load test") - void singleServerLoadTest() throws InterruptedException, IOException { + @ParameterizedTest + @EnumSource(DatabaseWrapper.Protocol.class) + //to eneable only one protocol use the following annotation + //@EnumSource(value = DatabaseWrapper.Protocol.class, names = "GRPC") + void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws InterruptedException, IOException { - GenericContainer arcadeContainer = createArcadeContainer("arcade", "none", "none", "any", false, network); + createArcadeContainer("arcade", "none", "none", "any", false, network); - startContainers(); - String host = arcadeContainer.getHost(); - int port = arcadeContainer.getMappedPort(2480); - DatabaseWrapper db = new DatabaseWrapper(host, port, idSupplier); + List serverWrappers = startContainers(); + ServerWrapper server = serverWrappers.getFirst(); + DatabaseWrapper db = new DatabaseWrapper(server, idSupplier, protocol); db.createDatabase(); db.createSchema(); @@ -38,7 +42,7 @@ void singleServerLoadTest() throws InterruptedException, IOException { for (int i = 0; i < numOfThreads; i++) { // Each thread will create users and photos executor.submit(() -> { - DatabaseWrapper db1 = new DatabaseWrapper(host, port, idSupplier); + DatabaseWrapper db1 = new DatabaseWrapper(server, idSupplier, protocol); db1.addUserAndPhotos(numOfUsers, numOfPhotos); db1.close(); }); diff --git a/e2e-perf/src/test/java/com/arcadedb/test/support/ContainersTestTemplate.java b/e2e-perf/src/test/java/com/arcadedb/test/support/ContainersTestTemplate.java index db4ebdc1ea..e3601f31ec 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/support/ContainersTestTemplate.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/support/ContainersTestTemplate.java @@ -18,12 +18,13 @@ import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; -import java.io.*; -import java.nio.file.*; -import java.time.*; -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.function.*; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; public abstract class ContainersTestTemplate { public static final String IMAGE = "arcadedata/arcadedb:latest"; @@ -127,11 +128,14 @@ protected void stopContainers() { /** * Starts all containers that are not already running. */ - protected void startContainers() { + protected List startContainers() { logger.info("Starting all containers"); containers.stream() .filter(container -> !container.isRunning()) .forEach(container -> Startables.deepStart(container).join()); + return containers.stream() + .map(ServerWrapper::new) + .toList(); } /** @@ -144,7 +148,8 @@ protected void startContainers() { * * @return A GenericContainer instance representing the ArcadeDB container. */ - protected GenericContainer createArcadeContainer(String name, + protected GenericContainer createArcadeContainer( + String name, String serverList, String quorum, String role, @@ -174,7 +179,7 @@ protected GenericContainer createArcadeContainer(String name, makeContainersDirectories(name); GenericContainer container = new GenericContainer<>(IMAGE) - .withExposedPorts(2480, 5432) + .withExposedPorts(2480, 5432, 50051) .withNetwork(network) .withNetworkAliases(name) .withStartupTimeout(Duration.ofSeconds(90)) @@ -184,7 +189,7 @@ protected GenericContainer createArcadeContainer(String name, .withEnv("JAVA_OPTS", String.format(""" -Darcadedb.server.rootPassword=playwithdata - -Darcadedb.server.plugins=Postgres:com.arcadedb.postgres.PostgresProtocolPlugin + -Darcadedb.server.plugins=Postgres:com.arcadedb.postgres.PostgresProtocolPlugin,GRPC:com.arcadedb.server.grpc.GrpcServerPlugin -Darcadedb.server.httpsIoThreads=30 -Darcadedb.bucketReuseSpaceMode=low -Darcadedb.server.name=%s diff --git a/e2e-perf/src/test/java/com/arcadedb/test/support/DatabaseWrapper.java b/e2e-perf/src/test/java/com/arcadedb/test/support/DatabaseWrapper.java index 539e1be53a..5e09d299fc 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/support/DatabaseWrapper.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/support/DatabaseWrapper.java @@ -5,6 +5,8 @@ import com.arcadedb.remote.RemoteHttpComponent; import com.arcadedb.remote.RemoteSchema; import com.arcadedb.remote.RemoteServer; +import com.arcadedb.remote.grpc.RemoteGrpcDatabase; +import com.arcadedb.remote.grpc.RemoteGrpcServer; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import org.slf4j.Logger; @@ -20,8 +22,7 @@ public class DatabaseWrapper { private static final Logger logger = LoggerFactory.getLogger(DatabaseWrapper.class); - private final String host; - private final int port; + private final ServerWrapper server; private final RemoteDatabase db; private final Supplier idSupplier; private final Timer photosTimer; @@ -29,10 +30,11 @@ public class DatabaseWrapper { private final Timer friendshipTimer; private final Timer likeTimer; - public DatabaseWrapper(String host, int port, Supplier idSupplier) { - this.host = host; - this.port = port; - this.db = connectToDatabase(); + public enum Protocol {HTTP, GRPC} + + public DatabaseWrapper(ServerWrapper server, Supplier idSupplier, Protocol protocol) { + this.server = server; + this.db = connectToDatabase(protocol); this.idSupplier = idSupplier; usersTimer = Metrics.timer("arcadedb.test.inserted.users"); photosTimer = Metrics.timer("arcadedb.test.inserted.photos"); @@ -40,9 +42,21 @@ public DatabaseWrapper(String host, int port, Supplier idSupplier) { likeTimer = Metrics.timer("arcadedb.test.inserted.like"); } - private RemoteDatabase connectToDatabase() { - RemoteDatabase database = new RemoteDatabase(host, - port, + public DatabaseWrapper(ServerWrapper server, Supplier idSupplier) { + this(server, idSupplier, Protocol.HTTP); + } + + private RemoteDatabase connectToDatabaseGrpc() { + RemoteGrpcServer gtpcServer = new RemoteGrpcServer(server.host(), server.grpcPort(), "root", PASSWORD, true, List.of()); + RemoteGrpcDatabase database = new RemoteGrpcDatabase(gtpcServer, server.host(), server.grpcPort(), server.httpPort(), DATABASE, + "root", PASSWORD); + return database; + } + + private RemoteDatabase connectToDatabaseHttp() { + RemoteDatabase database = new RemoteDatabase( + server.host(), + server.httpPort(), DATABASE, "root", PASSWORD); @@ -51,22 +65,31 @@ private RemoteDatabase connectToDatabase() { return database; } + private RemoteDatabase connectToDatabase(Protocol protocol) { + return switch (protocol) { + case HTTP -> connectToDatabaseHttp(); + case GRPC -> connectToDatabaseGrpc(); + }; + } + public void close() { db.close(); } public void createDatabase() { - RemoteServer server = new RemoteServer(host, - port, + RemoteServer httpServer = new RemoteServer( + server.host(), + server.httpPort(), "root", PASSWORD); - server.setConnectionStrategy(RemoteHttpComponent.CONNECTION_STRATEGY.FIXED); + httpServer.setConnectionStrategy(RemoteHttpComponent.CONNECTION_STRATEGY.FIXED); - if (server.exists(DATABASE)) { + if (httpServer.exists(DATABASE)) { logger.info("Dropping existing database {}", DATABASE); - server.drop(DATABASE); + httpServer.drop(DATABASE); } - server.create(DATABASE); + logger.info("Creating database {}", DATABASE); + httpServer.create(DATABASE); } /** diff --git a/e2e-perf/src/test/java/com/arcadedb/test/support/ServerWrapper.java b/e2e-perf/src/test/java/com/arcadedb/test/support/ServerWrapper.java new file mode 100644 index 0000000000..79ee855e20 --- /dev/null +++ b/e2e-perf/src/test/java/com/arcadedb/test/support/ServerWrapper.java @@ -0,0 +1,14 @@ +package com.arcadedb.test.support; + +import org.testcontainers.containers.GenericContainer; + +public record ServerWrapper(String host, + int httpPort, + int grpcPort +) { + public ServerWrapper(GenericContainer container) { + this(container.getHost(), + container.getMappedPort(2480), + container.getMappedPort(50051)); + } +} diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java new file mode 100644 index 0000000000..bb9643647e --- /dev/null +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java @@ -0,0 +1,58 @@ +package com.arcadedb.remote.grpc; + +import com.arcadedb.query.sql.executor.Result; +import com.arcadedb.server.grpc.QueryResult; +import io.grpc.stub.BlockingClientCall; + +import java.util.Iterator; + +/** + * A ResultSet that exposes batch boundaries for advanced use cases while + * maintaining the standard ResultSet interface. + */ +class BatchedStreamingResultSet extends StreamingResultSet { + + private int currentBatchSize = 0; + private boolean isLastBatch = false; + private long runningTotal = 0; + + BatchedStreamingResultSet(BlockingClientCall stream, RemoteGrpcDatabase db) { + super(stream, db); + } + + @Override + public boolean hasNext() { + boolean hasMore = super.hasNext(); + + // Update batch info when we fetch a new batch + if (hasMore && currentBatch != null) { + // Track batch metadata here if needed + } + + return hasMore; + } + + @Override + protected Iterator convertBatchToResults(QueryResult queryResult) { + // Capture batch metadata + this.currentBatchSize = queryResult.getRecordsCount(); + this.isLastBatch = queryResult.getIsLastBatch(); + this.runningTotal = queryResult.getRunningTotalEmitted(); + + // Call parent implementation + return super.convertBatchToResults(queryResult); + } + + // Additional methods for batch-aware processing + public long getRunningTotal() { + return runningTotal; + } + + public int getCurrentBatchSize() { + return currentBatchSize; + } + + public boolean isLastBatch() { + return isLastBatch; + } +} diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/QueryBatch.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/QueryBatch.java new file mode 100644 index 0000000000..3156b50c03 --- /dev/null +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/QueryBatch.java @@ -0,0 +1,50 @@ +package com.arcadedb.remote.grpc; + +import com.arcadedb.database.Record; +import com.arcadedb.query.sql.executor.Result; + +import java.util.ArrayList; +import java.util.List; + +public final class QueryBatch { + private final List results; // Changed from List + private final int totalInBatch; + private final long runningTotal; + private final boolean lastBatch; + + public QueryBatch(List results, int totalInBatch, long runningTotal, boolean lastBatch) { + this.results = results; + this.totalInBatch = totalInBatch; + this.runningTotal = runningTotal; + this.lastBatch = lastBatch; + } + + public List results() { + return results; + } + + // Backward compatibility: provide records() method that extracts Records from + // Results + @Deprecated + public List records() { + List records = new ArrayList<>(results.size()); + for (Result result : results) { + if (result.isElement()) { + result.getRecord().ifPresent(records::add); + } + } + return records; + } + + public int totalInBatch() { + return totalInBatch; + } + + public long runningTotal() { + return runningTotal; + } + + public boolean isLastBatch() { + return lastBatch; + } +} diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcConfig.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcConfig.java index 394694ec99..84064e2b54 100644 --- a/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcConfig.java +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcConfig.java @@ -2,40 +2,6 @@ import com.arcadedb.server.grpc.ProjectionSettings.ProjectionEncoding; -public class RemoteGrpcConfig { - - private boolean includeProjections; - private ProjectionEncoding projectionEncoding; - private int softLimitBytes; - - public RemoteGrpcConfig(boolean includeProjections, ProjectionEncoding projectionEncoding, int softLimitBytes) { - this.includeProjections = includeProjections; - this.projectionEncoding = projectionEncoding; - this.softLimitBytes = softLimitBytes; - } - - public boolean isIncludeProjections() { - return includeProjections; - } - - public void setIncludeProjections(boolean includeProjections) { - this.includeProjections = includeProjections; - } - - public ProjectionEncoding getProjectionEncoding() { - return projectionEncoding; - } - - public void setProjectionEncoding(ProjectionEncoding projectionEncoding) { - this.projectionEncoding = projectionEncoding; - } - - public int getSoftLimitBytes() { - return softLimitBytes; - } - - public void setSoftLimitBytes(int softLimitBytes) { - this.softLimitBytes = softLimitBytes; - } +public record RemoteGrpcConfig(boolean includeProjections, ProjectionEncoding projectionEncoding, int softLimitBytes) { } diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java index 87c22a3112..413d1be4fe 100644 --- a/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/RemoteGrpcDatabase.java @@ -84,7 +84,10 @@ import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -101,19 +104,17 @@ public class RemoteGrpcDatabase extends RemoteDatabase { private static final Logger logger = LoggerFactory.getLogger(RemoteGrpcDatabase.class); - private final ArcadeDbServiceGrpc.ArcadeDbServiceBlockingV2Stub blockingStub; - private final ArcadeDbServiceGrpc.ArcadeDbServiceStub asyncStub; - - private String transactionId; - - private RemoteSchema schema; - - private final String userName; - private final String userPassword; - private String databaseName; - private RemoteTransactionExplicitLock explicitLock; - - protected RemoteGrpcServer remoteGrpcServer; + private final ArcadeDbServiceGrpc.ArcadeDbServiceBlockingV2Stub blockingStub; + private final ArcadeDbServiceGrpc.ArcadeDbServiceStub asyncStub; + private final RemoteSchema schema; + private final String userName; + private final String userPassword; + private final String databaseName; + private String transactionId; + private RemoteTransactionExplicitLock explicitLock; + protected RemoteGrpcServer remoteGrpcServer; + // ---- fields ---- + private volatile TxDebug debugTx; public RemoteGrpcDatabase(final RemoteGrpcServer remoteGrpcServer, final String server, final int grpcPort, final int httpPort, final String databaseName, final String userName, final String userPassword) { @@ -122,20 +123,13 @@ public RemoteGrpcDatabase(final RemoteGrpcServer remoteGrpcServer, final String public RemoteGrpcDatabase(final RemoteGrpcServer remoteGrpcServer, final String host, final int grpcPort, final int httpPort, final String databaseName, final String userName, final String userPassword, final ContextConfiguration configuration) { - super(host, httpPort, databaseName, userName, userPassword, configuration); - this.remoteGrpcServer = remoteGrpcServer; - this.userName = userName; this.userPassword = userPassword; - this.databaseName = databaseName; - this.blockingStub = createBlockingStub(); - this.asyncStub = createAsyncStub(); - this.schema = new RemoteSchema(this); } @@ -213,9 +207,7 @@ public void begin(final Database.TRANSACTION_ISOLATION_LEVEL isolationLevel) { transactionId = response.getTransactionId(); // Store transaction ID in parent class session management setSessionId(transactionId); - } catch (StatusRuntimeException e) { - throw new TransactionException("Error on transaction begin", e); - } catch (StatusException e) { + } catch (StatusRuntimeException | StatusException e) { throw new TransactionException("Error on transaction begin", e); } @@ -254,7 +246,8 @@ public void commit() { CommitTransactionRequest request = CommitTransactionRequest.newBuilder() .setTransaction(TransactionContext.newBuilder().setTransactionId(transactionId).setDatabase(getName()).build()) - .setCredentials(buildCredentials()).build(); + .setCredentials(buildCredentials()) + .build(); try { @@ -266,20 +259,14 @@ public void commit() { if (!response.getSuccess()) { throw new TransactionException("Failed to commit transaction: " + response.getMessage()); } - } catch (StatusRuntimeException e) { - - handleGrpcException(e); - } catch (StatusException e) { - + } catch (StatusRuntimeException | StatusException e) { handleGrpcException(e); } finally { - transactionId = null; setSessionId(null); } if (debugTx != null) { - debugTx.committed = true; debugTx.rpcSeq.incrementAndGet(); } @@ -322,9 +309,7 @@ public void rollback() { if (!response.getSuccess()) { throw new TransactionException("Failed to rollback transaction: " + response.getMessage()); } - } catch (StatusRuntimeException e) { - throw new TransactionException("Error on transaction rollback", e); - } catch (StatusException e) { + } catch (StatusRuntimeException | StatusException e) { throw new TransactionException("Error on transaction rollback", e); } finally { transactionId = null; @@ -489,10 +474,7 @@ private ResultSet commandInternal(final String language, final String command, f } return resultSet; - } catch (StatusRuntimeException e) { - handleGrpcException(e); - return new InternalResultSet(); - } catch (StatusException e) { + } catch (StatusRuntimeException | StatusException e) { handleGrpcException(e); return new InternalResultSet(); } @@ -543,9 +525,9 @@ public ResultSet query(final String language, final String query, RemoteGrpcConf } ProjectionSettings projectionSettings = ProjectionSettings.newBuilder() - .setIncludeProjections(remoteGrpcConfig.isIncludeProjections()) - .setProjectionEncoding(remoteGrpcConfig.getProjectionEncoding()) - .setSoftLimitBytes(Int32Value.newBuilder().setValue(remoteGrpcConfig.getSoftLimitBytes()).build()).build(); + .setIncludeProjections(remoteGrpcConfig.includeProjections()) + .setProjectionEncoding(remoteGrpcConfig.projectionEncoding()) + .setSoftLimitBytes(Int32Value.newBuilder().setValue(remoteGrpcConfig.softLimitBytes()).build()).build(); requestBuilder.setProjectionSettings(projectionSettings); @@ -589,7 +571,7 @@ public ExecuteCommandResponse executeCommand(String language, String command, Ma var reqB = ExecuteCommandRequest.newBuilder().setDatabase(databaseName).setCommand(command) .putAllParameters(convertParamsToGrpcValue(params)).setLanguage(langOrDefault(language)).setReturnRows(returnRows) - .setMaxRows(maxRows > 0 ? maxRows : 0); + .setMaxRows(Math.max(maxRows, 0)); if (tx != null) reqB.setTransaction(tx); @@ -609,7 +591,7 @@ public ExecuteCommandResponse executeCommand(String database, String language, S var reqB = ExecuteCommandRequest.newBuilder().setDatabase(database).setCommand(command) .putAllParameters(convertParamsToGrpcValue(params)).setLanguage(langOrDefault(language)).setReturnRows(returnRows) - .setMaxRows(maxRows > 0 ? maxRows : 0); + .setMaxRows(Math.max(maxRows, 0)); if (tx != null) reqB.setTransaction(tx); @@ -680,10 +662,7 @@ protected RID saveRecord(final MutableDocument record) { // Fallback for older APIs expecting (Database, String) return new RID(this, ridStr); } - } catch (StatusRuntimeException e) { - handleGrpcException(e); - return null; - } catch (StatusException e) { + } catch (StatusRuntimeException | StatusException e) { handleGrpcException(e); return null; } @@ -766,136 +745,6 @@ public ResultSet queryStream(final String language, final String query, final Re return queryStream(language, query, config, params, batchSize, StreamQueryRequest.RetrievalMode.CURSOR); } - /** - * A ResultSet implementation that lazily fetches results from a gRPC stream. - * Supports both Record results and projection/aggregation results. - */ - private static class StreamingResultSet implements ResultSet { - private static final Logger logger = LoggerFactory.getLogger(StreamingResultSet.class); - - private final BlockingClientCall stream; - private final RemoteGrpcDatabase db; - protected Iterator currentBatch = Collections.emptyIterator(); - private boolean streamExhausted = false; - private Result nextResult = null; - private final AtomicLong totalProcessed = new AtomicLong(0); - - StreamingResultSet(BlockingClientCall stream, RemoteGrpcDatabase db) { - this.stream = stream; - this.db = db; - } - - @Override - public boolean hasNext() { - if (nextResult != null) { - return true; - } - - // Try to get next from current batch - if (currentBatch.hasNext()) { - nextResult = currentBatch.next(); - return true; - } - - // Current batch exhausted, try to fetch next batch - if (streamExhausted) { - return false; - } - - if (db.debugTx != null) { - db.checkCrossThreadUse("streamQuery.hasNext"); - } - - try { - while (stream.hasNext()) { - final QueryResult queryResult = stream.read(); - - if (logger.isDebugEnabled()) { - logger.debug("Received batch with {} records, isLastBatch={}", queryResult.getRecordsCount(), - queryResult.getIsLastBatch()); - } - - if (queryResult.getRecordsCount() == 0) { - if (queryResult.getIsLastBatch()) { - streamExhausted = true; - return false; - } - continue; // empty non-terminal batch - } - - // Convert GrpcRecords to Results - currentBatch = convertBatchToResults(queryResult); - - if (currentBatch.hasNext()) { - nextResult = currentBatch.next(); - return true; - } - } - - streamExhausted = true; - return false; - - } catch (io.grpc.StatusRuntimeException | io.grpc.StatusException e) { - db.handleGrpcException(e); - throw new IllegalStateException("unreachable"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Stream interrupted", e); - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new RuntimeException("Stream failed", e); - } - } - - /** - * Convert a QueryResult batch to an Iterator of Result objects - */ - protected Iterator convertBatchToResults(QueryResult queryResult) { - List results = new ArrayList<>(queryResult.getRecordsCount()); - - for (GrpcRecord grpcRecord : queryResult.getRecordsList()) { - // Use the existing grpcRecordToResult method from RemoteGrpcDatabase - Result result = db.grpcRecordToResult(grpcRecord); - results.add(result); - } - - return results.iterator(); - } - - @Override - public Result next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - if (db.debugTx != null) { - db.checkCrossThreadUse("streamQuery.next"); - } - - Result result = nextResult; - nextResult = null; - totalProcessed.incrementAndGet(); - return result; - } - - @Override - public void close() { - - try { - // Drain any remaining results - while (stream.hasNext()) { - stream.read(); - } - } catch (Exception e) { - logger.debug("Exception while draining stream during close", e); - } - - // BlockingClientCall doesn't implement AutoCloseable - // No need to cast or check instanceof - } - } - /** * Enhanced streaming with batch-aware ResultSet for better memory control and * performance monitoring. @@ -920,57 +769,6 @@ public ResultSet queryStreamBatched(final String language, final String query, f return new BatchedStreamingResultSet(responseIterator, this); } - /** - * A ResultSet that exposes batch boundaries for advanced use cases while - * maintaining the standard ResultSet interface. - */ - private static class BatchedStreamingResultSet extends StreamingResultSet { - - private int currentBatchSize = 0; - private boolean isLastBatch = false; - private long runningTotal = 0; - - BatchedStreamingResultSet(BlockingClientCall stream, RemoteGrpcDatabase db) { - super(stream, db); - } - - @Override - public boolean hasNext() { - boolean hasMore = super.hasNext(); - - // Update batch info when we fetch a new batch - if (hasMore && currentBatch != null) { - // Track batch metadata here if needed - } - - return hasMore; - } - - @Override - protected Iterator convertBatchToResults(QueryResult queryResult) { - // Capture batch metadata - this.currentBatchSize = queryResult.getRecordsCount(); - this.isLastBatch = queryResult.getIsLastBatch(); - this.runningTotal = queryResult.getRunningTotalEmitted(); - - // Call parent implementation - return super.convertBatchToResults(queryResult); - } - - // Additional methods for batch-aware processing - public long getRunningTotal() { - return runningTotal; - } - - public int getCurrentBatchSize() { - return currentBatchSize; - } - - public boolean isLastBatch() { - return isLastBatch; - } - } - public Iterator queryStreamBatchesIterator(final String language, final String query, final Map params, final int batchSize, final StreamQueryRequest.RetrievalMode mode) { @@ -1059,49 +857,6 @@ public QueryBatch next() { }; } - public static final class QueryBatch { - private final List results; // Changed from List - private final int totalInBatch; - private final long runningTotal; - private final boolean lastBatch; - - public QueryBatch(List results, int totalInBatch, long runningTotal, boolean lastBatch) { - this.results = results; - this.totalInBatch = totalInBatch; - this.runningTotal = runningTotal; - this.lastBatch = lastBatch; - } - - public List results() { - return results; - } - - // Backward compatibility: provide records() method that extracts Records from - // Results - @Deprecated - public List records() { - List records = new ArrayList<>(results.size()); - for (Result result : results) { - if (result.isElement()) { - result.getRecord().ifPresent(records::add); - } - } - return records; - } - - public int totalInBatch() { - return totalInBatch; - } - - public long runningTotal() { - return runningTotal; - } - - public boolean isLastBatch() { - return lastBatch; - } - } - public Iterator queryStream(final String database, final String sql, final Map params, final int batchSize, final StreamQueryRequest.RetrievalMode mode, final TransactionContext tx, final long timeoutMs) { @@ -1627,19 +1382,19 @@ private InsertSummary ingestBidiCore(final List rows, final InsertOptions // --- streaming state final String sessionId = "sess-" + System.nanoTime(); - final java.util.concurrent.CountDownLatch done = new java.util.concurrent.CountDownLatch(1); - final java.util.concurrent.atomic.AtomicReference errRef = new java.util.concurrent.atomic.AtomicReference<>(); - final java.util.concurrent.atomic.AtomicLong seq = new java.util.concurrent.atomic.AtomicLong(1); - final java.util.concurrent.atomic.AtomicInteger cursor = new java.util.concurrent.atomic.AtomicInteger(0); - final java.util.concurrent.atomic.AtomicInteger sent = new java.util.concurrent.atomic.AtomicInteger(0); - final java.util.concurrent.atomic.AtomicInteger acked = new java.util.concurrent.atomic.AtomicInteger(0); - final java.util.concurrent.atomic.AtomicReference committed = new java.util.concurrent.atomic.AtomicReference<>(); + final CountDownLatch done = new CountDownLatch(1); + final AtomicReference errRef = new java.util.concurrent.atomic.AtomicReference<>(); + final AtomicLong seq = new AtomicLong(1); + final AtomicInteger cursor = new AtomicInteger(0); + final AtomicInteger sent = new AtomicInteger(0); + final AtomicInteger acked = new AtomicInteger(0); + final AtomicReference committed = new java.util.concurrent.atomic.AtomicReference<>(); final List acks = java.util.Collections.synchronizedList(new ArrayList<>()); - final java.util.concurrent.atomic.AtomicReference> observerRef = new java.util.concurrent.atomic.AtomicReference<>(); + final AtomicReference> observerRef = new java.util.concurrent.atomic.AtomicReference<>(); - final java.util.concurrent.atomic.AtomicBoolean commitSent = new java.util.concurrent.atomic.AtomicBoolean(false); - final java.util.concurrent.ScheduledExecutorService scheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor( + final AtomicBoolean commitSent = new AtomicBoolean(false); + final ScheduledExecutorService scheduler = java.util.concurrent.Executors.newSingleThreadScheduledExecutor( r -> { Thread t = new Thread(r, "grpc-ack-grace-timer"); t.setDaemon(true); @@ -1647,7 +1402,7 @@ private InsertSummary ingestBidiCore(final List rows, final InsertOptions }); final long ackGraceMillis = Math.min(Math.max(timeoutMs / 10, 1_000L), 10_000L); final Object timerLock = new Object(); - final java.util.concurrent.atomic.AtomicReference> ackGraceFuture = new java.util.concurrent.atomic.AtomicReference<>(); + final AtomicReference> ackGraceFuture = new java.util.concurrent.atomic.AtomicReference<>(); final Runnable sendCommitIfNeeded = () -> { if (commitSent.compareAndSet(false, true)) { @@ -1922,9 +1677,7 @@ public boolean hasNext() { currentBatch = result.getRecordsList().iterator(); return currentBatch.hasNext(); } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (StatusException e) { + } catch (InterruptedException | StatusException e) { throw new RuntimeException(e); } @@ -1959,7 +1712,7 @@ private ResultSet createGrpcResultSet(ExecuteCommandResponse response) { return resultSet; } - private Result grpcRecordToResult(GrpcRecord grpcRecord) { + Result grpcRecordToResult(GrpcRecord grpcRecord) { Record record = grpcRecordToDBRecord(grpcRecord); @@ -1989,39 +1742,22 @@ private Record grpcRecordToDBRecord(GrpcRecord grpcRecord) { String cat = null; if (catFromGrpcRecord != null) { - cat = catFromGrpcRecord.getStringValue(); } else { - cat = mapRecordType(grpcRecord); } if (cat != null) { - map.put("@cat", cat); } - - if (cat == null) { - + if (cat == null) return null; - } - - switch (cat) { - case "d": - - return new RemoteImmutableDocument(this, map); - - case "v": - - return new RemoteImmutableVertex(this, map); - - case "e": - - return new RemoteImmutableEdge(this, map); - - default: - return null; - } + return switch (cat) { + case "d" -> new RemoteImmutableDocument(this, map); + case "v" -> new RemoteImmutableVertex(this, map); + case "e" -> new RemoteImmutableEdge(this, map); + default -> null; + }; } private String mapRecordType(GrpcRecord grpcRecord) { @@ -2077,7 +1813,7 @@ private Object grpcValueToObject(GrpcValue grpcValue) { return out; } - private void handleGrpcException(Throwable e) { + void handleGrpcException(Throwable e) { // Works for StatusException, StatusRuntimeException, and anything else io.grpc.Status status = io.grpc.Status.fromThrowable(e); String msg = status.getDescription() != null ? status.getDescription() : status.getCode().name(); @@ -2158,25 +1894,6 @@ private static String summarize(GrpcRecord r) { // RemoteGrpcDatabase.java - // ---- fields ---- - private volatile TxDebug debugTx; - - private static final class TxDebug { - final long id = System.nanoTime(); // local correlation id - final Thread ownerThread = Thread.currentThread(); - final String dbName; - volatile String txLabel; // optional - final Exception beginSite = new Exception("begin site"); // capture stack - final java.util.concurrent.atomic.AtomicLong rpcSeq = new java.util.concurrent.atomic.AtomicLong(); - volatile boolean beginRpcSent, committed, rolledBack; - - @SuppressWarnings("unused") - TxDebug(String db, String label) { - this.dbName = db; - this.txLabel = label; - } - } - // one place to handle JDK 17/21 differences private static String tidName(Thread t) { try { @@ -2223,7 +1940,7 @@ d.id, d.txLabel, tidName(d.ownerThread), tidName(Thread.currentThread()), rpcOp, d.rolledBack); } - private void checkCrossThreadUse(String where) { + void checkCrossThreadUse(String where) { TxDebug d = debugTx; if (d == null) return; diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/StreamingResultSet.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/StreamingResultSet.java new file mode 100644 index 0000000000..c1514a76f6 --- /dev/null +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/StreamingResultSet.java @@ -0,0 +1,142 @@ +package com.arcadedb.remote.grpc; + +import com.arcadedb.query.sql.executor.Result; +import com.arcadedb.query.sql.executor.ResultSet; +import com.arcadedb.server.grpc.GrpcRecord; +import com.arcadedb.server.grpc.QueryResult; +import io.grpc.stub.BlockingClientCall; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A ResultSet implementation that lazily fetches results from a gRPC stream. + * Supports both Record results and projection/aggregation results. + */ +class StreamingResultSet implements ResultSet { + private static final Logger logger = LoggerFactory.getLogger(StreamingResultSet.class); + + private final BlockingClientCall stream; + private final RemoteGrpcDatabase db; + private final AtomicLong totalProcessed = new AtomicLong(0); + protected Iterator currentBatch = Collections.emptyIterator(); + private boolean streamExhausted = false; + private Result nextResult = null; + + StreamingResultSet(BlockingClientCall stream, RemoteGrpcDatabase db) { + this.stream = stream; + this.db = db; + } + + @Override + public boolean hasNext() { + if (nextResult != null) { + return true; + } + + // Try to get next from current batch + if (currentBatch.hasNext()) { + nextResult = currentBatch.next(); + return true; + } + + // Current batch exhausted, try to fetch next batch + if (streamExhausted) { + return false; + } + + db.checkCrossThreadUse("streamQuery.hasNext"); + + try { + while (stream.hasNext()) { + final QueryResult queryResult = stream.read(); + + if (logger.isDebugEnabled()) { + logger.debug("Received batch with {} records, isLastBatch={}", queryResult.getRecordsCount(), + queryResult.getIsLastBatch()); + } + + if (queryResult.getRecordsCount() == 0) { + if (queryResult.getIsLastBatch()) { + streamExhausted = true; + return false; + } + continue; // empty non-terminal batch + } + + // Convert GrpcRecords to Results + currentBatch = convertBatchToResults(queryResult); + + if (currentBatch.hasNext()) { + nextResult = currentBatch.next(); + return true; + } + } + + streamExhausted = true; + return false; + + } catch (io.grpc.StatusRuntimeException | io.grpc.StatusException e) { + db.handleGrpcException(e); + throw new IllegalStateException("unreachable"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Stream interrupted", e); + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new RuntimeException("Stream failed", e); + } + } + + /** + * Convert a QueryResult batch to an Iterator of Result objects + */ + protected Iterator convertBatchToResults(QueryResult queryResult) { + List results = new ArrayList<>(queryResult.getRecordsCount()); + + for (GrpcRecord grpcRecord : queryResult.getRecordsList()) { + // Use the existing grpcRecordToResult method from RemoteGrpcDatabase + Result result = db.grpcRecordToResult(grpcRecord); + results.add(result); + } + + return results.iterator(); + } + + @Override + public Result next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + db.checkCrossThreadUse("streamQuery.next"); + + Result result = nextResult; + nextResult = null; + totalProcessed.incrementAndGet(); + return result; + } + + @Override + public void close() { + + try { + // Drain any remaining results + while (stream.hasNext()) { + stream.read(); + } + } catch (Exception e) { + logger.debug("Exception while draining stream during close", e); + } + + // BlockingClientCall doesn't implement AutoCloseable + // No need to cast or check instanceof + } +} diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/TxDebug.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/TxDebug.java new file mode 100644 index 0000000000..541ee43674 --- /dev/null +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/TxDebug.java @@ -0,0 +1,17 @@ +package com.arcadedb.remote.grpc; + +final class TxDebug { + final long id = System.nanoTime(); // local correlation id + final Thread ownerThread = Thread.currentThread(); + final String dbName; + volatile String txLabel; // optional + final Exception beginSite = new Exception("begin site"); // capture stack + final java.util.concurrent.atomic.AtomicLong rpcSeq = new java.util.concurrent.atomic.AtomicLong(); + volatile boolean beginRpcSent, committed, rolledBack; + + @SuppressWarnings("unused") + TxDebug(String db, String label) { + this.dbName = db; + this.txLabel = label; + } +} diff --git a/grpc-client/src/test/java/com/arcadedb/remote/grpc/ArcadeDbHTTPTvsGRPCBench.java b/grpc-client/src/test/java/com/arcadedb/remote/grpc/ArcadeDbHTTPTvsGRPCBench.java index 5f5fef99df..3bbc6ee2ea 100644 --- a/grpc-client/src/test/java/com/arcadedb/remote/grpc/ArcadeDbHTTPTvsGRPCBench.java +++ b/grpc-client/src/test/java/com/arcadedb/remote/grpc/ArcadeDbHTTPTvsGRPCBench.java @@ -25,12 +25,12 @@ public class ArcadeDbHTTPTvsGRPCBench { // ---- Config (edit or pass via env/args) ---- static String DB_NAME = System.getenv().getOrDefault("ARCADE_DB", "ArcadeDB"); static String HTTP_HOST = System.getenv().getOrDefault("ARCADE_HTTP_HOST", "127.0.0.1"); - static int HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("ARCADE_HTTP_PORT", "2489")); + static int HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("ARCADE_HTTP_PORT", "2480")); static String GRPC_HOST = System.getenv().getOrDefault("ARCADE_GRPC_HOST", "127.0.0.1"); - static int GRPC_PORT = Integer.parseInt(System.getenv().getOrDefault("ARCADE_GRPC_PORT", "50059")); + static int GRPC_PORT = Integer.parseInt(System.getenv().getOrDefault("ARCADE_GRPC_PORT", "50051")); static String USER = System.getenv().getOrDefault("ARCADE_USER", "root"); - static String PASS = System.getenv().getOrDefault("ARCADE_PASS", "root1234"); + static String PASS = System.getenv().getOrDefault("ARCADE_PASS", "playwithdata"); static ObjectMapper objectMapper = new ObjectMapper(); From 225310c4d1a2cbe0bd548abd14eec2c16050a30e Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Fri, 12 Sep 2025 15:02:52 +0200 Subject: [PATCH 2/3] refactor(BatchedStreamingResultSet): remove unused hasNext method implementation --- .../remote/grpc/BatchedStreamingResultSet.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java b/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java index bb9643647e..c6d50d4ebe 100644 --- a/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java +++ b/grpc-client/src/main/java/com/arcadedb/remote/grpc/BatchedStreamingResultSet.java @@ -20,18 +20,6 @@ class BatchedStreamingResultSet extends StreamingResultSet { super(stream, db); } - @Override - public boolean hasNext() { - boolean hasMore = super.hasNext(); - - // Update batch info when we fetch a new batch - if (hasMore && currentBatch != null) { - // Track batch metadata here if needed - } - - return hasMore; - } - @Override protected Iterator convertBatchToResults(QueryResult queryResult) { // Capture batch metadata From 347b7016e2df3b9613011178bf572c6bcfb667dd Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Fri, 12 Sep 2025 15:26:59 +0200 Subject: [PATCH 3/3] refactor(BatchedStreamingResultSet): remove unused hasNext method implementation --- e2e/src/test/java/com/arcadedb/e2e/JdbcQueriesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/src/test/java/com/arcadedb/e2e/JdbcQueriesTest.java b/e2e/src/test/java/com/arcadedb/e2e/JdbcQueriesTest.java index 4f1860045e..535bd84cbc 100644 --- a/e2e/src/test/java/com/arcadedb/e2e/JdbcQueriesTest.java +++ b/e2e/src/test/java/com/arcadedb/e2e/JdbcQueriesTest.java @@ -242,7 +242,7 @@ void testSelectSchemaTypes() throws SQLException, ClassNotFoundException { if (rs.getArray("properties").getResultSet().next()) { ResultSet props = rs.getArray("properties").getResultSet(); assertThat(props.next()).isTrue(); - assertThat(new JSONObject(props.getString("value")).getString("type")).isEqualTo("INTEGER"); + assertThat(new JSONObject(props.getString("value")).getString("type")).isIn("INTEGER", "STRING"); } }