From 580dfc2a060834986ca6a5f06a45da1d68c49279 Mon Sep 17 00:00:00 2001 From: Oleg Cohen Date: Wed, 10 Dec 2025 09:31:59 -0700 Subject: [PATCH 1/8] Revised Tx Logic --- .../server/grpc/ArcadeDbGrpcService.java | 219 +++++++++--------- 1 file changed, 108 insertions(+), 111 deletions(-) diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java index 48776a819f..2397b0f428 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java @@ -165,7 +165,24 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver commit > begin-only⇒commit - if (hasTx) { - + // Check if this transaction is managed externally via beginTransaction() + // If so, we must NOT commit/rollback here - let commitTransaction()/rollbackTransaction() handle it + final String txId = (hasTx && tx != null) ? tx.getTransactionId() : null; + final boolean managedExternally = txId != null && !txId.isBlank() && activeTransactions.containsKey(txId); + + if (managedExternally) { + // Transaction was started via beginTransaction() RPC - don't touch its lifecycle + LogManager.instance().log(this, Level.FINE, + "executeCommand(): after - external transaction %s managed by beginTransaction/commitTransaction, skipping auto-commit/rollback", + txId); + } else if (hasTx) { + // Transaction end — precedence: rollback > commit > begin-only⇒commit if (tx.getRollback()) { - LogManager.instance() .log(this, Level.FINE, "executeCommand(): after - rolling back db=%s tid=%s", db.getName(), tx.getTransactionId()); db.rollback(); @@ -253,13 +291,17 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver @rid = %s", grpcRecord.getRid()); + LogManager.instance().log(this, Level.FINE, "executeQuery(): isElement"); - resultBuilder.addRecords(grpcRecord); + Record dbRecord = result.getElement().get(); - count++; + LogManager.instance().log(this, Level.FINE, "executeQuery(): dbRecord -> @rid = %s", dbRecord.getIdentity().toString()); - // Apply limit if specified - if (request.getLimit() > 0 && count >= request.getLimit()) { - break; + GrpcRecord grpcRecord = convertToGrpcRecord(dbRecord, database); + + LogManager.instance().log(this, Level.FINE, "executeQuery(): grpcRecord -> @rid = %s", grpcRecord.getRid()); + + resultBuilder.addRecords(grpcRecord); + + count++; + + // Apply limit if specified + if (request.getLimit() > 0 && count >= request.getLimit()) { + break; + } + } else { + + LogManager.instance().log(this, Level.FINE, "executeQuery(): NOT isElement"); + + // Scalar / projection row (e.g., RETURN COUNT) + + GrpcRecord.Builder recB = GrpcRecord.newBuilder(); + + for (String p : result.getPropertyNames()) { + + recB.putProperties(p, convertPropToGrpcValue(p, result, projectionConfig)); + } + + resultBuilder.addRecords(recB.build()); } } @@ -1053,7 +1127,7 @@ private void streamMaterialized(Database db, StreamQueryRequest request, int bat } /** - * Mode 3: only fetch one page’s worth of rows per emission via LIMIT/SKIP. + * Mode 3: only fetch one page's worth of rows per emission via LIMIT/SKIP. * * @param projectionConfig */ @@ -1165,7 +1239,7 @@ private void safeOnNext(ServerCallStreamObserver scso, AtomicBoolea } private void waitUntilReady(ServerCallStreamObserver scso, AtomicBoolean cancelled) { - // Skip if you’re okay with best-effort pushes; otherwise honor transport + // Skip if you're okay with best-effort pushes; otherwise honor transport // readiness if (scso.isReady()) return; @@ -2046,7 +2120,7 @@ private GrpcValue toGrpcValue(Object o, ProjectionConfig pc) { } } - // Shouldn’t get here, but fall back + // Shouldn't get here, but fall back LogManager.instance() .log(this, Level.FINE, "GRPC-ENC [toGrpcValue] PROJECTION unknown encoding %s; falling back to LINK/STRING", enc.name()); @@ -2301,83 +2375,6 @@ private Map convertParameters(Map protoParams return params; } - /** - * Converts a Result to GrpcRecord, preserving all properties including aliases. - * This method works at the Result level (not Record level) to maintain alias information. - * - * @param result the Result object from a query execution - * @param db the database instance - * @param projectionConfig optional projection configuration - * - * @return GrpcRecord with all properties and aliases preserved - */ - private GrpcRecord convertResultToGrpcRecord(Result result, Database db, ProjectionConfig projectionConfig) { - GrpcRecord.Builder builder = GrpcRecord.newBuilder(); - - // If this result wraps an element (Document/Vertex/Edge), get its metadata - if (result.isElement()) { - Document dbRecord = result.toElement(); - - if (dbRecord.getIdentity() != null) { - builder.setRid(dbRecord.getIdentity().toString()); - } - - if (dbRecord.getType() != null) { - builder.setType(dbRecord.getTypeName()); - } - } - - // Iterate over ALL properties from the Result, including aliases - for (String propertyName : result.getPropertyNames()) { - Object value = result.getProperty(propertyName); - - if (value != null) { - LogManager.instance() - .log(this, Level.FINE, "convertResultToGrpcRecord(): Converting %s\n value = %s\n class = %s", - propertyName, value, value.getClass()); - - GrpcValue gv = projectionConfig != null ? - toGrpcValue(value, projectionConfig) : - toGrpcValue(value); - - LogManager.instance() - .log(this, Level.FINE, "ENC-RES %s: %s -> %s", propertyName, summarizeJava(value), summarizeGrpc(gv)); - - builder.putProperties(propertyName, gv); - } - } - - // Ensure @rid and @type are always in the properties map when there's an element - // This matches JsonSerializer behavior and works around client-side limitations - if (result.isElement()) { - final Document document = result.toElement(); - - if (!builder.getPropertiesMap().containsKey(Property.RID_PROPERTY) && document.getIdentity() != null) { - builder.putProperties(Property.RID_PROPERTY, toGrpcValue(document.getIdentity())); - } - - if (!builder.getPropertiesMap().containsKey(Property.TYPE_PROPERTY) && document instanceof Document doc - && doc.getType() != null) { - builder.putProperties(Property.TYPE_PROPERTY, toGrpcValue(doc.getTypeName())); - } - } - - // If this is an Edge and @out/@in are not already in properties, add them - if (result.isElement() && result.getElement().get() instanceof Edge edge) { - if (!builder.getPropertiesMap().containsKey("@out")) { - builder.putProperties("@out", toGrpcValue(edge.getOut().getIdentity())); - } - if (!builder.getPropertiesMap().containsKey("@in")) { - builder.putProperties("@in", toGrpcValue(edge.getIn().getIdentity())); - } - } - - LogManager.instance().log(this, Level.FINE, "ENC-RES DONE rid=%s type=%s props=%s", - builder.getRid(), builder.getType(), builder.getPropertiesCount()); - - return builder.build(); - } - private GrpcRecord convertToGrpcRecord(Record dbRecord, Database db) { GrpcRecord.Builder builder = GrpcRecord.newBuilder().setRid(dbRecord.getIdentity().toString()); @@ -2960,4 +2957,4 @@ public ProjectionEncoding getEnc() { return enc; } } -} +} \ No newline at end of file From fc7217980b059944b95330a6eff160e336b5eb15 Mon Sep 17 00:00:00 2001 From: Oleg Cohen Date: Wed, 24 Dec 2025 13:16:50 -0700 Subject: [PATCH 2/8] Revised Tx logic for gRPC service and pom.xml enhancements for gRPC projects --- grpc/pom.xml | 24 ++ .../server/grpc/ArcadeDbGrpcService.java | 285 ++++++++++++------ pom.xml | 2 +- 3 files changed, 226 insertions(+), 85 deletions(-) diff --git a/grpc/pom.xml b/grpc/pom.xml index 356f4410f8..eb5e7a69d8 100644 --- a/grpc/pom.xml +++ b/grpc/pom.xml @@ -185,6 +185,30 @@ + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + io.github.ascopes + protobuf-maven-plugin + [0,) + + generate + + + + + + + + + + diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java index 2397b0f428..f2ce9b35b8 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java @@ -70,8 +70,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -92,8 +97,33 @@ public class ArcadeDbGrpcService extends ArcadeDbServiceGrpc.ArcadeDbServiceImpl .setUseVertexEdgeSize(true) .setUseCollectionSizeForEdges(false).setUseCollectionSize(false); - // Transaction management - private final Map activeTransactions = new ConcurrentHashMap<>(); + // Transaction management - now stores TransactionContext with executor for thread affinity + private final Map activeTransactions = new ConcurrentHashMap<>(); + + /** + * Holds transaction state including a single-thread executor to ensure all + * transaction operations run on the same thread (required by ArcadeDB's thread-local transactions). + */ + private static final class TransactionContext { + final Database db; + final ExecutorService executor; + final String txId; + + TransactionContext(Database db, String txId) { + this.db = db; + this.txId = txId; + // Single-thread executor ensures all tx operations happen on the same thread + this.executor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "arcadedb-tx-" + txId); + t.setDaemon(true); + return t; + }); + } + + void shutdown() { + executor.shutdown(); + } + } // Database connection pool private final Map databasePool = new ConcurrentHashMap<>(); @@ -127,15 +157,22 @@ public void close() { } databasePool.clear(); - // Clean up transactions - for (Database db : activeTransactions.values()) { + // Clean up transactions - shutdown executors and rollback + for (TransactionContext txCtx : activeTransactions.values()) { try { - if (db != null && db.isOpen()) { - db.rollback(); - db.close(); + if (txCtx.db != null && txCtx.db.isOpen()) { + // Execute rollback on the transaction's thread + txCtx.executor.submit(() -> { + try { + txCtx.db.rollback(); + } catch (Exception ignore) { + } + }).get(); } } catch (Exception e) { - LogManager.instance().log(this, Level.SEVERE, "Error closing transaction database", e); + LogManager.instance().log(this, Level.SEVERE, "Error closing transaction", e); + } finally { + txCtx.shutdown(); } } activeTransactions.clear(); @@ -146,13 +183,80 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver future = txCtx.executor.submit(() -> + executeCommandInternal(req, t0, txCtx.db, true)); + + ExecuteCommandResponse response = future.get(); + resp.onNext(response); + resp.onCompleted(); + } else { + // No external transaction - execute on current thread + Database db = getDatabase(req.getDatabase(), req.getCredentials()); + ExecuteCommandResponse response = executeCommandInternal(req, t0, db, false); + resp.onNext(response); + resp.onCompleted(); + } + + } catch (ExecutionException e) { + // Unwrap the cause from the executor + Throwable cause = e.getCause() != null ? e.getCause() : e; + LogManager.instance().log(this, Level.SEVERE, "ERROR in executeCommand", cause); + + final long ms = (System.nanoTime() - t0) / 1_000_000L; + ExecuteCommandResponse err = ExecuteCommandResponse + .newBuilder() + .setSuccess(false) + .setMessage(cause.getMessage() == null ? cause.toString() : cause.getMessage()) + .setAffectedRecords(0L) + .setExecutionTimeMs(ms) + .build(); + resp.onNext(err); + resp.onCompleted(); + + } catch (Exception e) { + LogManager.instance().log(this, Level.SEVERE, "ERROR in executeCommand", e); + + final long ms = (System.nanoTime() - t0) / 1_000_000L; + ExecuteCommandResponse err = ExecuteCommandResponse + .newBuilder() + .setSuccess(false) + .setMessage(e.getMessage() == null ? e.toString() : e.getMessage()) + .setAffectedRecords(0L) + .setExecutionTimeMs(ms) + .build(); + resp.onNext(err); + resp.onCompleted(); + } + } + + /** + * Internal implementation of executeCommand that runs on the appropriate thread. + * + * @param req the command request + * @param t0 start time in nanos + * @param db the database to use + * @param isExternalTransaction true if this is part of an externally-managed transaction + * @return the response + */ + private ExecuteCommandResponse executeCommandInternal(ExecuteCommandRequest req, long t0, + Database db, boolean isExternalTransaction) { + boolean beganHere = false; try { - - // Resolve DB + params - db = getDatabase(req.getDatabase(), req.getCredentials()); final Map params = convertParameters(req.getParametersMap()); // Language defaults to "sql" when empty @@ -160,29 +264,26 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver commit > begin-only⇒commit if (tx.getRollback()) { LogManager.instance() - .log(this, Level.FINE, "executeCommand(): after - rolling back db=%s tid=%s", db.getName(), tx.getTransactionId()); + .log(this, Level.FINE, "executeCommandInternal(): rolling back db=%s tid=%s", db.getName(), tx.getTransactionId()); db.rollback(); } else if (tx.getCommit()) { LogManager.instance() - .log(this, Level.FINE, "executeCommand(): after - committing [tx.getCommit() == true] db=%s tid=%s", db.getName(), + .log(this, Level.FINE, "executeCommandInternal(): committing [tx.getCommit() == true] db=%s tid=%s", db.getName(), tx.getTransactionId()); db.commit(); } else if (beganHere && db.isTransactionActive()) { // Began but no explicit commit/rollback flag — default to commit (HTTP parity) LogManager.instance() - .log(this, Level.FINE, "executeCommand(): after - committing [beganHere == true] db=%s tid=%s", db.getName(), + .log(this, Level.FINE, "executeCommandInternal(): committing [beganHere == true] db=%s tid=%s", db.getName(), tx.getTransactionId()); db.commit(); } } else if (beganHere && db.isTransactionActive()) { // Auto-wrapped tx: ensure we commit after fully draining ResultSet - LogManager.instance().log(this, Level.FINE, "executeCommand(): after - committing [autoWrap] db=%s", db.getName()); + LogManager.instance().log(this, Level.FINE, "executeCommandInternal(): committing [autoWrap] db=%s", db.getName()); db.commit(); } final long ms = (System.nanoTime() - t0) / 1_000_000L; out.setAffectedRecords(affected).setExecutionTimeMs(ms); - resp.onNext(out.build()); - resp.onCompleted(); + return out.build(); } catch (Exception e) { - LogManager.instance().log(this, Level.SEVERE, "ERROR", e); - - // Best-effort rollback if we began here and failed - // But NOT if transaction is managed externally via beginTransaction() - final String txIdForError = (req.hasTransaction() && req.getTransaction() != null) - ? req.getTransaction().getTransactionId() : null; - final boolean managedExternallyForError = txIdForError != null && !txIdForError.isBlank() - && activeTransactions.containsKey(txIdForError); + LogManager.instance().log(this, Level.SEVERE, "ERROR in executeCommandInternal", e); + // Best-effort rollback if we began here and failed (only for non-external transactions) try { - if (beganHere && db != null && !managedExternallyForError) { + if (beganHere && db != null && !isExternalTransaction) { db.rollback(); - } else if (managedExternallyForError) { + } else if (isExternalTransaction) { LogManager.instance().log(this, Level.FINE, - "executeCommand(): error occurred but transaction %s is externally managed, skipping auto-rollback", txIdForError); + "executeCommandInternal(): error occurred but external transaction - skipping auto-rollback"); } } catch (Exception ignore) { /* no-op */ } final long ms = (System.nanoTime() - t0) / 1_000_000L; - ExecuteCommandResponse err = ExecuteCommandResponse + return ExecuteCommandResponse .newBuilder() .setSuccess(false) .setMessage(e.getMessage() == null ? e.toString() : e.getMessage()) .setAffectedRecords(0L) .setExecutionTimeMs(ms) .build(); - - // Prefer returning a structured response so clients always get timing/message - resp.onNext(err); - resp.onCompleted(); } } @@ -684,10 +769,11 @@ public void executeQuery(ExecuteQueryRequest request, StreamObserver"), (database != null ? database.getClass().getSimpleName() : ""), (database != null ? System.identityHashCode(database) : 0)); - LogManager.instance().log(this, Level.FINE, "beginTransaction(): calling database.begin()"); - - // Begin transaction - database.begin(); - // Generate transaction ID and register + // Generate transaction ID first so we can create the context final String transactionId = generateTransactionId(); - activeTransactions.put(transactionId, database); + + // Create transaction context with dedicated executor thread + final TransactionContext txCtx = new TransactionContext(database, transactionId); + + LogManager.instance().log(this, Level.FINE, "beginTransaction(): calling database.begin() on dedicated thread for txId=%s", transactionId); + + // Begin transaction ON THE DEDICATED THREAD - this is critical because ArcadeDB + // transactions are thread-local + Future beginFuture = txCtx.executor.submit(() -> { + database.begin(); + }); + beginFuture.get(); // Wait for begin to complete + + // Register the transaction context + activeTransactions.put(transactionId, txCtx); LogManager.instance() .log(this, Level.FINE, "beginTransaction(): started txId=%s for db=%s activeTxCount(after)=%s", transactionId, @@ -830,11 +926,12 @@ public void beginTransaction(BeginTransactionRequest request, StreamObserver"), - t.toString(), t); - LogManager.instance().log(this, Level.SEVERE, "Error beginning transaction: %s", t, t.getMessage()); - responseObserver.onError(Status.INTERNAL.withDescription("Failed to begin transaction: " + t.getMessage()).asException()); + cause.toString(), cause); + LogManager.instance().log(this, Level.SEVERE, "Error beginning transaction: %s", cause, cause.getMessage()); + responseObserver.onError(Status.INTERNAL.withDescription("Failed to begin transaction: " + cause.getMessage()).asException()); } } @@ -851,14 +948,14 @@ public void commitTransaction(CommitTransactionRequest req, StreamObserver commitFuture = txCtx.executor.submit(() -> { + txCtx.db.commit(); + }); + commitFuture.get(); // Wait for commit to complete + LogManager.instance().log(this, Level.FINE, "commitTransaction(): commit OK txId=%s", txId); rsp.onNext(CommitTransactionResponse.newBuilder().setSuccess(true).setCommitted(true).build()); rsp.onCompleted(); } catch (Throwable t) { - LogManager.instance().log(this, Level.FINE, "commitTransaction(): commit FAILED txId=%s err=%s", txId, t.toString(), t); + Throwable cause = (t instanceof ExecutionException && t.getCause() != null) ? t.getCause() : t; + LogManager.instance().log(this, Level.FINE, "commitTransaction(): commit FAILED txId=%s err=%s", txId, cause.toString(), cause); // tx is unusable; do not reinsert into the map - rsp.onError(Status.ABORTED.withDescription("Commit failed: " + t.getMessage()).asException()); + rsp.onError(Status.ABORTED.withDescription("Commit failed: " + cause.getMessage()).asException()); + } finally { + // Always shutdown the executor + txCtx.shutdown(); } } @@ -893,14 +1000,14 @@ public void rollbackTransaction(RollbackTransactionRequest req, StreamObserver rollbackFuture = txCtx.executor.submit(() -> { + txCtx.db.rollback(); + }); + rollbackFuture.get(); // Wait for rollback to complete + LogManager.instance().log(this, Level.FINE, "rollbackTransaction(): rollback OK txId=%s", txId); rsp.onNext(RollbackTransactionResponse.newBuilder().setSuccess(true).setRolledBack(true).build()); rsp.onCompleted(); } catch (Throwable t) { - LogManager.instance().log(this, Level.FINE, "rollbackTransaction(): rollback FAILED txId=%s err=%s", txId, t.toString(), t); - rsp.onError(Status.ABORTED.withDescription("Rollback failed: " + t.getMessage()).asException()); + Throwable cause = (t instanceof ExecutionException && t.getCause() != null) ? t.getCause() : t; + LogManager.instance().log(this, Level.FINE, "rollbackTransaction(): rollback FAILED txId=%s err=%s", txId, cause.toString(), cause); + rsp.onError(Status.ABORTED.withDescription("Rollback failed: " + cause.getMessage()).asException()); + } finally { + // Always shutdown the executor + txCtx.shutdown(); } } @@ -2957,4 +3074,4 @@ public ProjectionEncoding getEnc() { return enc; } } -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index 7bd51ae269..73a2a6ce8b 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ 4.1.1 1.77.0 - 4.32.0 + 4.33.0 4.33.2 2.63.2 1.3.2 From f94ea7f0464ee4153a0065282ecf5dd65b550585 Mon Sep 17 00:00:00 2001 From: Oleg Cohen Date: Wed, 24 Dec 2025 13:17:15 -0700 Subject: [PATCH 3/8] .gitignore --- .mvn/.gitignore | 1 + grpc/.mvn/.gitignore | 1 + 2 files changed, 2 insertions(+) create mode 100644 .mvn/.gitignore create mode 100644 grpc/.mvn/.gitignore diff --git a/.mvn/.gitignore b/.mvn/.gitignore new file mode 100644 index 0000000000..9cf32eb36c --- /dev/null +++ b/.mvn/.gitignore @@ -0,0 +1 @@ +/extensions.xml diff --git a/grpc/.mvn/.gitignore b/grpc/.mvn/.gitignore new file mode 100644 index 0000000000..9cf32eb36c --- /dev/null +++ b/grpc/.mvn/.gitignore @@ -0,0 +1 @@ +/extensions.xml From 00918f99b1abff5d229fcfc99894c363a4910af6 Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Fri, 26 Dec 2025 18:52:50 +0100 Subject: [PATCH 4/8] fix pre-commit --- .../server/grpc/ArcadeDbGrpcService.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java index f2ce9b35b8..5cdf89f9b2 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java @@ -188,17 +188,17 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver future = txCtx.executor.submit(() -> + + Future future = txCtx.executor.submit(() -> executeCommandInternal(req, t0, txCtx.db, true)); - + ExecuteCommandResponse response = future.get(); resp.onNext(response); resp.onCompleted(); @@ -214,7 +214,7 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver commitFuture = txCtx.executor.submit(() -> { txCtx.db.commit(); }); commitFuture.get(); // Wait for commit to complete - + LogManager.instance().log(this, Level.FINE, "commitTransaction(): commit OK txId=%s", txId); rsp.onNext(CommitTransactionResponse.newBuilder().setSuccess(true).setCommitted(true).build()); rsp.onCompleted(); @@ -1018,13 +1018,13 @@ public void rollbackTransaction(RollbackTransactionRequest req, StreamObserver rollbackFuture = txCtx.executor.submit(() -> { txCtx.db.rollback(); }); rollbackFuture.get(); // Wait for rollback to complete - + LogManager.instance().log(this, Level.FINE, "rollbackTransaction(): rollback OK txId=%s", txId); rsp.onNext(RollbackTransactionResponse.newBuilder().setSuccess(true).setRolledBack(true).build()); rsp.onCompleted(); From 0cfc5b673f2e8d613d2912e2a5b17215d7523111 Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Fri, 26 Dec 2025 19:28:59 +0100 Subject: [PATCH 5/8] fix: address Gemini code review comments for gRPC transaction handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address three issues identified by Gemini Code Assist in PR #3074: 1. HIGH: Fix thread leak in beginTransaction() - Move TransactionContext declaration outside try block - Add txCtx.shutdown() in catch block to clean up executor on failure 2. MEDIUM: Add logging for rollback exceptions in close() - Replace silent catch with WARNING-level log message - Helps diagnose shutdown issues instead of hiding them 3. MEDIUM: Merge duplicate catch blocks in executeCommand() - Consolidate ExecutionException and Exception handlers - Single block handles cause unwrapping for both cases 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../server/grpc/ArcadeDbGrpcService.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java index 5cdf89f9b2..ce27ea7c97 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java @@ -165,7 +165,8 @@ public void close() { txCtx.executor.submit(() -> { try { txCtx.db.rollback(); - } catch (Exception ignore) { + } catch (Exception e) { + LogManager.instance().log(this, Level.WARNING, "Failed to rollback transaction %s during shutdown", e, txCtx.txId); } }).get(); } @@ -210,9 +211,12 @@ public void executeCommand(ExecuteCommandRequest req, StreamObserver"), activeTransactions.size()); + // Declare txCtx outside try block so we can clean it up on failure + TransactionContext txCtx = null; + try { final Database database = getDatabase(reqDb, request.getCredentials()); @@ -902,7 +895,7 @@ public void beginTransaction(BeginTransactionRequest request, StreamObserver"), From 901cf16876ecb2e02287f3dc8587bef6cfbb7b20 Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Sat, 27 Dec 2025 16:46:15 +0100 Subject: [PATCH 6/8] reverted mapping of record to fix failing tests --- .../remote/grpc/GrpcServerPluginIT.java | 5 + .../server/grpc/ArcadeDbGrpcService.java | 138 ++++++++++++------ 2 files changed, 97 insertions(+), 46 deletions(-) diff --git a/grpc-client/src/test/java/com/arcadedb/remote/grpc/GrpcServerPluginIT.java b/grpc-client/src/test/java/com/arcadedb/remote/grpc/GrpcServerPluginIT.java index 5797a5d8a9..da27c00096 100644 --- a/grpc-client/src/test/java/com/arcadedb/remote/grpc/GrpcServerPluginIT.java +++ b/grpc-client/src/test/java/com/arcadedb/remote/grpc/GrpcServerPluginIT.java @@ -24,6 +24,7 @@ import com.arcadedb.test.BaseGraphServerTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.util.List; @@ -108,6 +109,7 @@ public void endTest() { } @Test + @DisplayName("Query should return aliases (@rid, @type) and custom projection (author AS _author)") void grpcQueryWithAliasesAndMetadata() { String query = "SELECT *, @rid, @type, author AS _author FROM article"; @@ -123,6 +125,7 @@ void grpcQueryWithAliasesAndMetadata() { } @Test + @DisplayName("UPDATE with RETURN AFTER should return modified record with alias") void grpcUpdateWithAlas() { String update = """ UPDATE article SET title = "My third article updated" RETURN AFTER *, author AS _author WHERE id = 3 @@ -140,6 +143,7 @@ void grpcUpdateWithAlas() { } @Test + @DisplayName("INSERT with RETURN @this should return the newly created record") void grpcInsertWithReturn() { String command = """ INSERT INTO article CONTENT { @@ -167,6 +171,7 @@ void grpcInsertWithReturn() { } @Test + @DisplayName("CREATE VERTEX should return the newly created vertex with all properties") void grpcCreateVertexWithReturn() { String command = """ CREATE VERTEX article CONTENT { diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java index ce27ea7c97..afb5723c9a 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/ArcadeDbGrpcService.java @@ -305,38 +305,30 @@ private ExecuteCommandResponse executeCommandInternal(ExecuteCommandRequest req, while (rs.hasNext()) { - Result r = rs.next(); + Result result = rs.next(); - if (r.isElement()) { - affected++; // count modified/returned records - if (emitted < maxRows) { - out.addRecords(convertToGrpcRecord(r.getElement().get(), db)); - emitted++; - } + if (result.isElement()) { + affected++; } else { - // Scalar / projection row (e.g., RETURN COUNT) + for (String p : result.getPropertyNames()) { + Object v = result.getProperty(p); + if (v instanceof Number n) { + affected += n.longValue(); + } + } + } if (emitted < maxRows) { - GrpcRecord.Builder recB = GrpcRecord.newBuilder(); - - for (String p : r.getPropertyNames()) { - - recB.putProperties(p, convertPropToGrpcValue(p, r)); - } - - out.addRecords(recB.build()); + // Convert Result to GrpcRecord, preserving aliases and all properties + GrpcRecord grpcRecord = convertResultToGrpcRecord(result, db, + new ProjectionConfig(true, ProjectionEncoding.PROJECTION_AS_JSON, 0)); + out.addRecords(grpcRecord); emitted++; } - for (String p : r.getPropertyNames()) { - Object v = r.getProperty(p); - if (v instanceof Number n) - affected += n.longValue(); - } - } } } else { @@ -790,15 +782,8 @@ public void executeQuery(ExecuteQueryRequest request, StreamObserver @rid = %s", dbRecord.getIdentity().toString()); - - GrpcRecord grpcRecord = convertToGrpcRecord(dbRecord, database); + // Convert Result to GrpcRecord, preserving aliases and all properties + GrpcRecord grpcRecord = convertResultToGrpcRecord(result, database, projectionConfig); LogManager.instance().log(this, Level.FINE, "executeQuery(): grpcRecord -> @rid = %s", grpcRecord.getRid()); @@ -810,21 +795,6 @@ public void executeQuery(ExecuteQueryRequest request, StreamObserver 0 && count >= request.getLimit()) { break; } - } else { - - LogManager.instance().log(this, Level.FINE, "executeQuery(): NOT isElement"); - - // Scalar / projection row (e.g., RETURN COUNT) - - GrpcRecord.Builder recB = GrpcRecord.newBuilder(); - - for (String p : result.getPropertyNames()) { - - recB.putProperties(p, convertPropToGrpcValue(p, result, projectionConfig)); - } - - resultBuilder.addRecords(recB.build()); - } } LogManager.instance().log(this, Level.FINE, "executeQuery(): count = %s", count); @@ -2489,6 +2459,82 @@ private Map convertParameters(Map protoParams return params; } + /** + * Converts a Result to GrpcRecord, preserving all properties including aliases. + * This method works at the Result level (not Record level) to maintain alias information. + * + * @param result the Result object from a query execution + * @param db the database instance + * @param projectionConfig optional projection configuration + * + * @return GrpcRecord with all properties and aliases preserved + */ + private GrpcRecord convertResultToGrpcRecord(Result result, Database db, ProjectionConfig projectionConfig) { + GrpcRecord.Builder builder = GrpcRecord.newBuilder(); + + // If this result wraps an element (Document/Vertex/Edge), get its metadata + if (result.isElement()) { + Document dbRecord = result.toElement(); + + if (dbRecord.getIdentity() != null) { + builder.setRid(dbRecord.getIdentity().toString()); + } + + if (dbRecord.getType() != null) { + builder.setType(dbRecord.getTypeName()); + } + } + + // Iterate over ALL properties from the Result, including aliases + for (String propertyName : result.getPropertyNames()) { + Object value = result.getProperty(propertyName); + + if (value != null) { + LogManager.instance() + .log(this, Level.FINE, "convertResultToGrpcRecord(): Converting %s\n value = %s\n class = %s", + propertyName, value, value.getClass()); + + GrpcValue gv = projectionConfig != null ? + toGrpcValue(value, projectionConfig) : + toGrpcValue(value); + + LogManager.instance() + .log(this, Level.FINE, "ENC-RES %s: %s -> %s", propertyName, summarizeJava(value), summarizeGrpc(gv)); + + builder.putProperties(propertyName, gv); + } + } + + // Ensure @rid and @type are always in the properties map when there's an element + // This matches JsonSerializer behavior and works around client-side limitations + if (result.isElement()) { + final Document document = result.toElement(); + + if (!builder.getPropertiesMap().containsKey(Property.RID_PROPERTY) && document.getIdentity() != null) { + builder.putProperties(Property.RID_PROPERTY, toGrpcValue(document.getIdentity())); + } + + if (!builder.getPropertiesMap().containsKey(Property.TYPE_PROPERTY) && document instanceof Document doc + && doc.getType() != null) { + builder.putProperties(Property.TYPE_PROPERTY, toGrpcValue(doc.getTypeName())); + } + } + + // If this is an Edge and @out/@in are not already in properties, add them + if (result.isElement() && result.getElement().get() instanceof Edge edge) { + if (!builder.getPropertiesMap().containsKey("@out")) { + builder.putProperties("@out", toGrpcValue(edge.getOut().getIdentity())); + } + if (!builder.getPropertiesMap().containsKey("@in")) { + builder.putProperties("@in", toGrpcValue(edge.getIn().getIdentity())); + } + } + + LogManager.instance().log(this, Level.FINE, "ENC-RES DONE rid=%s type=%s props=%s", + builder.getRid(), builder.getType(), builder.getPropertiesCount()); + + return builder.build(); + } private GrpcRecord convertToGrpcRecord(Record dbRecord, Database db) { GrpcRecord.Builder builder = GrpcRecord.newBuilder().setRid(dbRecord.getIdentity().toString()); From 9c20e58cec35ce715ccf312afff52c6db0d8626b Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Sat, 27 Dec 2025 23:19:14 +0100 Subject: [PATCH 7/8] add log to show protocol add configuration of netty on gRPC server --- .../performance/SingleServerLoadTestIT.java | 1 + .../SingleServerSimpleLoadTestIT.java | 1 + .../server/grpc/GrpcServerPlugin.java | 29 +++++++++++++------ studio/package-lock.json | 13 +-------- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java index ecc9fb66ed..1864c1fca1 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java @@ -58,6 +58,7 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { int expectedFriendshipCount = numOfFriendship; int expectedLikeCount = numOfLike; LocalDateTime startedAt = LocalDateTime.now(); + logger.info("Starting load test on protocol {}", protocol.name()); logger.info("Creating {} users using {} threads", expectedUsersCount, numOfThreads); logger.info("Expected users: {} - photos: {} - friendships: {} - likes: {}", expectedUsersCount, expectedPhotoCount, expectedFriendshipCount, expectedLikeCount); diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java index 53ccc659eb..bb9e71f214 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java @@ -54,6 +54,7 @@ void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { int expectedUsersCount = numOfUsers * numOfThreads; int expectedPhotoCount = expectedUsersCount * numOfPhotos; + logger.info("Starting load test on protocol {}", protocol.name()); logger.info("Creating {} users using {} threads", expectedUsersCount, numOfThreads); ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); for (int i = 0; i < numOfThreads; i++) { diff --git a/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java b/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java index 2adee720bc..ca2e92fd05 100644 --- a/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java +++ b/grpcw/src/main/java/com/arcadedb/server/grpc/GrpcServerPlugin.java @@ -33,6 +33,7 @@ import io.grpc.ServerCredentials; import io.grpc.TlsServerCredentials; import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.xds.XdsServerBuilder; @@ -126,15 +127,23 @@ private void startStandardServer(ContextConfiguration config) throws IOException int port = getConfigInt(config, CONFIG_PORT, 50051); String host = getConfigString(config, CONFIG_HOST, "0.0.0.0"); - ServerBuilder serverBuilder; + NettyServerBuilder serverBuilder; // Configure TLS if enabled if (getConfigBoolean(config, CONFIG_TLS_ENABLED, false)) { serverBuilder = configureStandardTls(port, config); } else { - serverBuilder = ServerBuilder.forPort(port); + serverBuilder = NettyServerBuilder.forPort(port); } + // Configure keepalive settings to prevent GOAWAY ENHANCE_YOUR_CALM errors + // Allow clients to send keepalive pings every 10 seconds (client sends every 30s) + serverBuilder + .permitKeepAliveTime(10, TimeUnit.SECONDS) + .permitKeepAliveWithoutCalls(true) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS); + // Configure the server configureServer(serverBuilder, config); @@ -244,14 +253,14 @@ private void configureServer(ServerBuilder serverBuilder, ContextConfiguratio } } - private ServerBuilder configureStandardTls(int port, ContextConfiguration config) { + private NettyServerBuilder configureStandardTls(int port, ContextConfiguration config) { String certPath = getConfigString(config, CONFIG_TLS_CERT, null); String keyPath = getConfigString(config, CONFIG_TLS_KEY, null); if (certPath == null || keyPath == null) { LogManager.instance() .log(this, Level.WARNING, "TLS enabled but certificate or key path not provided. Falling back to insecure."); - return ServerBuilder.forPort(port); + return NettyServerBuilder.forPort(port); } File certFile = new File(certPath); @@ -259,16 +268,18 @@ private ServerBuilder configureStandardTls(int port, ContextConfiguration con if (!certFile.exists() || !keyFile.exists()) { LogManager.instance().log(this, Level.WARNING, "TLS certificate or key file not found. Falling back to insecure."); - return ServerBuilder.forPort(port); + return NettyServerBuilder.forPort(port); } try { - ServerCredentials credentials = TlsServerCredentials.create(certFile, keyFile); - // Use Grpc.newServerBuilderForPort for TLS - return Grpc.newServerBuilderForPort(port, credentials); + // Configure Netty with TLS using SslContext + return NettyServerBuilder.forPort(port) + .sslContext(io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts + .forServer(certFile, keyFile) + .build()); } catch (Exception e) { LogManager.instance().log(this, Level.SEVERE, "Failed to configure TLS", e); - return ServerBuilder.forPort(port); + return NettyServerBuilder.forPort(port); } } diff --git a/studio/package-lock.json b/studio/package-lock.json index 0ee4c79a53..16b57fa2f9 100644 --- a/studio/package-lock.json +++ b/studio/package-lock.json @@ -469,7 +469,6 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -496,7 +495,6 @@ "integrity": "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", @@ -645,7 +643,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -962,7 +959,6 @@ "resolved": "https://registry.npmjs.org/cytoscape/-/cytoscape-3.33.1.tgz", "integrity": "sha512-iJc4TwyANnOGR1OmWhsS9ayRS3s+XQ185FmuHObThD+5AeJCakAAbWv8KimMTt08xCCLNgneQwFp+JRJOr9qGQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10" } @@ -1542,7 +1538,6 @@ "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -2080,8 +2075,7 @@ "version": "3.7.1", "resolved": "https://registry.npmjs.org/jquery/-/jquery-3.7.1.tgz", "integrity": "sha512-m4avr8yL8kmFN8psrbFFFmB/If14iN5o9nw/NgnnM+kybDJpRsAynV2BsfpTYrTRysYUdADVD7CkUUizgkpLfg==", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/json-parse-even-better-errors": { "version": "2.3.1", @@ -2459,7 +2453,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -2505,7 +2498,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", @@ -3358,7 +3350,6 @@ "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -3451,7 +3442,6 @@ "integrity": "sha512-Qphch25abbMNtekmEGJmeRUhLDbe+QfiWTiqpKYkpCOWY64v9eyl+KRRLmqOFA2AvKPpc9DC6+u2n76tQLBoaA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/eslint-scope": "^3.7.7", "@types/estree": "^1.0.8", @@ -3501,7 +3491,6 @@ "integrity": "sha512-MfwFQ6SfwinsUVi0rNJm7rHZ31GyTcpVE5pgVA3hwFRb7COD4TzjUUwhGWKfO50+xdc2MQPuEBBJoqIMGt3JDw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@discoveryjs/json-ext": "^0.6.1", "@webpack-cli/configtest": "^3.0.1", From 45ddfe5a00d1719be0c9b68a9728b23ac0dd8a2e Mon Sep 17 00:00:00 2001 From: Roberto Franchini Date: Sat, 27 Dec 2025 23:24:17 +0100 Subject: [PATCH 8/8] add param to name --- .../com/arcadedb/test/performance/SingleServerLoadTestIT.java | 2 +- .../arcadedb/test/performance/SingleServerSimpleLoadTestIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java index 1864c1fca1..44f32ccbcc 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerLoadTestIT.java @@ -36,7 +36,7 @@ class SingleServerLoadTestIT extends ContainersTestTemplate { @DisplayName("Single server load test") - @ParameterizedTest + @ParameterizedTest(name = "Load test with {0} protocol") @EnumSource(DatabaseWrapper.Protocol.class) void singleServerLoadTest(DatabaseWrapper.Protocol protocol) throws Exception { diff --git a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java index bb9e71f214..46bc5c0f3c 100644 --- a/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java +++ b/e2e-perf/src/test/java/com/arcadedb/test/performance/SingleServerSimpleLoadTestIT.java @@ -33,7 +33,7 @@ class SingleServerSimpleLoadTestIT extends ContainersTestTemplate { @DisplayName("Single server load test") - @ParameterizedTest + @ParameterizedTest(name = "Load test with {0} protocol") @EnumSource(DatabaseWrapper.Protocol.class) //to eneable only one protocol use the following annotation //@EnumSource(value = DatabaseWrapper.Protocol.class, names = "GRPC")