observationProvider() {
+ return Optional.ofNullable(observationProvider);
+ }
+
/**
* Used to build new config instances
*/
@@ -425,9 +423,9 @@ public static final class ConfigBuilder {
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis(30);
private long maxTransactionRetryTimeMillis = ExponentialBackoffRetryLogic.DEFAULT_MAX_RETRY_TIME_MS;
private ServerAddressResolver resolver;
- private MetricsAdapter metricsAdapter = MetricsAdapter.DEV_NULL;
private long fetchSize = 1000;
private int eventLoopThreads = 0;
+ private ObservationProvider observationProvider;
@SuppressWarnings("deprecation")
private NotificationConfig notificationConfig = NotificationConfig.defaultConfig();
@@ -744,47 +742,21 @@ public ConfigBuilder withResolver(ServerAddressResolver resolver) {
}
/**
- * Enable driver metrics backed by internal basic implementation. The metrics can be obtained afterwards via {@link Driver#metrics()}.
- *
- * @return this builder.
- */
- public ConfigBuilder withDriverMetrics() {
- return withMetricsEnabled(true);
- }
-
- /**
- * Disable driver metrics. When disabled, driver metrics cannot be accessed via {@link Driver#metrics()}.
- *
- * @return this builder.
+ * Sets the {@link ObservationProvider} that the driver should use.
+ * @param observationProvider the {@link ObservationProvider} or {@code null} to disable
+ * @return this builder
+ * @since 6.0.0
*/
- public ConfigBuilder withoutDriverMetrics() {
- return withMetricsEnabled(false);
- }
-
- private ConfigBuilder withMetricsEnabled(boolean enabled) {
- if (!enabled) {
- withMetricsAdapter(MetricsAdapter.DEV_NULL);
- } else if (this.metricsAdapter == null || this.metricsAdapter == MetricsAdapter.DEV_NULL) {
- withMetricsAdapter(MetricsAdapter.DEFAULT);
+ @Preview(name = "Observability")
+ public ConfigBuilder withObservationProvider(ObservationProvider observationProvider) {
+ this.observationProvider =
+ Objects.requireNonNullElseGet(observationProvider, NoopObservationProvider::getInstance);
+ if (!(observationProvider instanceof DriverObservationProvider)) {
+ throw new IllegalArgumentException("Unssupported observation provider");
}
return this;
}
- /**
- * Enable driver metrics with given {@link MetricsAdapter}.
- *
- * {@link MetricsAdapter#MICROMETER} enables implementation based on Micrometer. The metrics can be obtained
- * afterwards via Micrometer means and {@link Driver#metrics()}. Micrometer must be on classpath when using this option.
- *
- * @param metricsAdapter the metrics adapter to use. Use {@link MetricsAdapter#DEV_NULL} to disable metrics.
- * @return this builder.
- */
- @Experimental
- public ConfigBuilder withMetricsAdapter(MetricsAdapter metricsAdapter) {
- this.metricsAdapter = Objects.requireNonNull(metricsAdapter, "metricsAdapter");
- return this;
- }
-
/**
* Configure the event loop thread count. This specifies how many threads the driver can use to handle network I/O events
* and user's events in driver's I/O threads. By default, 2 * NumberOfProcessors amount of threads will be used instead.
diff --git a/driver/src/main/java/org/neo4j/driver/Driver.java b/driver/src/main/java/org/neo4j/driver/Driver.java
index 5b9841c89e..d9e1faf93e 100644
--- a/driver/src/main/java/org/neo4j/driver/Driver.java
+++ b/driver/src/main/java/org/neo4j/driver/Driver.java
@@ -17,7 +17,6 @@
package org.neo4j.driver;
import java.util.concurrent.CompletionStage;
-import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.UnsupportedFeatureException;
/**
@@ -256,22 +255,6 @@ default T session(Class sessionClass, SessionConfig s
*/
CompletionStage closeAsync();
- /**
- * Returns the driver metrics if metrics reporting is enabled via {@link Config.ConfigBuilder#withDriverMetrics()}.
- * Otherwise, a {@link ClientException} will be thrown.
- *
- * @return the driver metrics if enabled.
- * @throws ClientException if the driver metrics reporting is not enabled.
- */
- Metrics metrics();
-
- /**
- * Returns true if the driver metrics reporting is enabled via {@link Config.ConfigBuilder#withDriverMetrics()}, otherwise false.
- *
- * @return true if the metrics reporting is enabled.
- */
- boolean isMetricsEnabled();
-
/**
* This verifies if the driver can connect to a remote server or a cluster
* by establishing a network connection with the remote and possibly exchanging a few data before closing the connection.
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index f0ade37812..5a81378818 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -40,7 +40,6 @@
import org.neo4j.bolt.connection.DefaultDomainNameResolver;
import org.neo4j.bolt.connection.DomainNameResolver;
import org.neo4j.bolt.connection.LoggingProvider;
-import org.neo4j.bolt.connection.MetricsListener;
import org.neo4j.bolt.connection.NotificationConfig;
import org.neo4j.bolt.connection.RoutedBoltConnectionParameters;
import org.neo4j.bolt.connection.pooled.PooledBoltConnectionSource;
@@ -53,20 +52,18 @@
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logging;
-import org.neo4j.driver.MetricsAdapter;
import org.neo4j.driver.exceptions.AuthTokenManagerExecutionException;
import org.neo4j.driver.internal.adaptedbolt.AdaptingDriverBoltConnectionSource;
import org.neo4j.driver.internal.adaptedbolt.BoltAuthTokenManager;
import org.neo4j.driver.internal.adaptedbolt.BoltConnectionProviderFactoryLoader;
+import org.neo4j.driver.internal.adaptedbolt.BoltObservationProvider;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnectionSource;
import org.neo4j.driver.internal.adaptedbolt.ErrorMapper;
import org.neo4j.driver.internal.adaptedbolt.SingleRoutedBoltConnectionSource;
import org.neo4j.driver.internal.boltlistener.BoltConnectionListener;
import org.neo4j.driver.internal.homedb.HomeDatabaseCache;
-import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
-import org.neo4j.driver.internal.metrics.InternalMetricsProvider;
-import org.neo4j.driver.internal.metrics.MetricsProvider;
-import org.neo4j.driver.internal.metrics.MicrometerMetricsProvider;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.NoopObservationProvider;
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.security.BoltSecurityPlanManager;
@@ -132,14 +129,11 @@ public final Driver newInstance(
@SuppressWarnings("deprecation")
var retryLogic = createRetryLogic(config.maxTransactionRetryTimeMillis(), retryExecutor, config.logging());
- var metricsProvider = getOrCreateMetricsProvider(config, createClock());
-
return createDriver(
uri,
securityPlanManager,
eventLoopGroup,
retryLogic,
- metricsProvider,
config,
authTokenManager,
rediscoverySupplier,
@@ -157,27 +151,12 @@ private static ScheduledExecutorService retryScheduledExecutorService() {
});
}
- @SuppressWarnings("deprecation")
- protected static MetricsProvider getOrCreateMetricsProvider(Config config, Clock clock) {
- var metricsAdapter = config.metricsAdapter();
- // This can actually only happen when someone mocks the config
- if (metricsAdapter == null) {
- metricsAdapter = config.isMetricsEnabled() ? MetricsAdapter.DEFAULT : MetricsAdapter.DEV_NULL;
- }
- return switch (metricsAdapter) {
- case DEV_NULL -> DevNullMetricsProvider.INSTANCE;
- case DEFAULT -> new InternalMetricsProvider(clock, config.logging());
- case MICROMETER -> MicrometerMetricsProvider.forGlobalRegistry();
- };
- }
-
@SuppressWarnings("deprecation")
private InternalDriver createDriver(
URI uri,
BoltSecurityPlanManager securityPlanManager,
ScheduledExecutorService eventLoopGroup,
RetryLogic retryLogic,
- MetricsProvider metricsProvider,
Config config,
AuthTokenManager authTokenManager,
Supplier rediscoverySupplier,
@@ -186,6 +165,8 @@ private InternalDriver createDriver(
try {
var homeDatabaseCache = HomeDatabaseCache.newInstance(Scheme.isRoutingScheme(uri.getScheme()));
var valueFactory = BoltValueFactory.getInstance();
+ var observationProvider = (DriverObservationProvider)
+ config.observationProvider().orElseGet(NoopObservationProvider::getInstance);
boltConnectionProvider = createDriverBoltConnectionProvider(
uri,
config,
@@ -195,7 +176,7 @@ private InternalDriver createDriver(
DriverInfoUtil.boltAgent(),
config.userAgent(),
config.connectionTimeoutMillis(),
- metricsProvider.metricsListener(),
+ observationProvider,
authTokenManager,
securityPlanManager::plan,
NotificationConfigMapper.map(config.notificationConfig()),
@@ -206,8 +187,9 @@ private InternalDriver createDriver(
retryLogic,
config,
authTokenManager,
- homeDatabaseCache);
- var driver = createDriver(securityPlanManager, sessionFactory, metricsProvider, config);
+ homeDatabaseCache,
+ observationProvider);
+ var driver = createDriver(securityPlanManager, sessionFactory, config);
var log = config.logging().getLog(getClass());
log.info("Driver instance %s created for server uri '%s'", driver.hashCode(), uri);
return driver;
@@ -236,7 +218,7 @@ private DriverBoltConnectionSource createDriverBoltConnectionProvider(
BoltAgent boltAgent,
String userAgent,
int connectTimeoutMillis,
- MetricsListener metricsListener,
+ DriverObservationProvider observationProvider,
AuthTokenManager authTokenManager,
SecurityPlanSupplier securityPlanSupplier,
NotificationConfig notificationConfig,
@@ -254,14 +236,18 @@ private DriverBoltConnectionSource createDriverBoltConnectionProvider(
boltAgent,
userAgent,
connectTimeoutMillis,
- metricsListener,
+ new BoltObservationProvider(observationProvider),
clock,
boltAuthTokenManager,
securityPlanSupplier,
notificationConfig,
boltConnectionProviderFactory);
return new AdaptingDriverBoltConnectionSource(
- boltConnectionProvider, errorMapper, boltValueFactory, Scheme.isRoutingScheme(uri.getScheme()));
+ boltConnectionProvider,
+ errorMapper,
+ boltValueFactory,
+ Scheme.isRoutingScheme(uri.getScheme()),
+ observationProvider);
}
@SuppressWarnings("deprecation")
@@ -274,7 +260,7 @@ protected BoltConnectionSource createBoltConnect
BoltAgent boltAgent,
String userAgent,
int connectTimeoutMillis,
- MetricsListener metricsListener,
+ BoltObservationProvider observationProvider,
Clock clock,
org.neo4j.bolt.connection.pooled.AuthTokenManager authTokenManager,
SecurityPlanSupplier securityPlanSupplier,
@@ -302,7 +288,7 @@ protected BoltConnectionSource createBoltConnect
boltAgent,
userAgent,
connectTimeoutMillis,
- metricsListener,
+ observationProvider,
authTokenManager,
securityPlanSupplier,
notificationConfig,
@@ -319,7 +305,7 @@ protected BoltConnectionSource createBoltConnect
boltAgent,
userAgent,
connectTimeoutMillis,
- metricsListener);
+ observationProvider);
} else {
boltConnectionSource = new SingleRoutedBoltConnectionSource(pooledSourceSupplierFactory.create(uri, null));
}
@@ -337,7 +323,7 @@ private RoutedBoltConnectionSource createRoutedBoltConnectionProvider(
BoltAgent boltAgent,
String userAgent,
int connectTimeoutMillis,
- MetricsListener metricsListener) {
+ BoltObservationProvider observationProvider) {
var boltServerAddressResolver = createBoltServerAddressResolver(config);
var rediscovery = rediscoverySupplier != null ? rediscoverySupplier.get() : null;
return new RoutedBoltConnectionSource(
@@ -349,7 +335,8 @@ private RoutedBoltConnectionSource createRoutedBoltConnectionProvider(
clock,
loggingProvider,
uri,
- List.of(AuthTokenManagerExecutionException.class));
+ List.of(AuthTokenManagerExecutionException.class),
+ observationProvider);
}
private BoltConnectionSourceFactory createPooledBoltConnectionSource(
@@ -362,7 +349,7 @@ private BoltConnectionSourceFactory createPooledBoltConnectionSource(
BoltAgent boltAgent,
String userAgent,
int connectTimeoutMillis,
- MetricsListener metricsListener,
+ BoltObservationProvider observationProvider,
org.neo4j.bolt.connection.pooled.AuthTokenManager authTokenManager,
SecurityPlanSupplier securityPlanSupplier,
NotificationConfig notificationConfig,
@@ -374,6 +361,7 @@ private BoltConnectionSourceFactory createPooledBoltConnectionSource(
clock,
loggingProvider,
config.eventLoopThreads(),
+ observationProvider,
boltConnectionProviderFactory);
var listeningBoltConnectionProvider = BoltConnectionListener.listeningBoltConnectionProvider(
boltConnectionProvider, boltConnectionListener);
@@ -388,7 +376,7 @@ private BoltConnectionSourceFactory createPooledBoltConnectionSource(
config.connectionAcquisitionTimeoutMillis(),
config.maxConnectionLifetimeMillis(),
config.idleTimeBeforeConnectionTest(),
- metricsListener,
+ observationProvider,
routingContextAddress,
boltAgent,
userAgent,
@@ -403,6 +391,7 @@ private BoltConnectionProvider createBoltConnectionProvider(
Clock clock,
LoggingProvider loggingProvider,
int eventLoopThreads,
+ BoltObservationProvider observationProvider,
BoltConnectionProviderFactory boltConnectionProviderFactory) {
var additionalConfig = new HashMap();
additionalConfig.put("clock", clock);
@@ -416,7 +405,7 @@ private BoltConnectionProvider createBoltConnectionProvider(
additionalConfig.put("localAddress", localAddress);
}
return boltConnectionProviderFactory.create(
- loggingProvider, BoltValueFactory.getInstance(), null, additionalConfig);
+ loggingProvider, BoltValueFactory.getInstance(), observationProvider, additionalConfig);
}
@SuppressWarnings("SameReturnValue")
@@ -431,12 +420,14 @@ protected SocketAddress localAddress() {
*/
@SuppressWarnings("deprecation")
protected InternalDriver createDriver(
- BoltSecurityPlanManager securityPlanManager,
- SessionFactory sessionFactory,
- MetricsProvider metricsProvider,
- Config config) {
+ BoltSecurityPlanManager securityPlanManager, SessionFactory sessionFactory, Config config) {
return new InternalDriver(
- securityPlanManager, sessionFactory, metricsProvider, config.isTelemetryDisabled(), config.logging());
+ securityPlanManager,
+ sessionFactory,
+ config.isTelemetryDisabled(),
+ config.logging(),
+ (DriverObservationProvider)
+ config.observationProvider().orElseGet(NoopObservationProvider::getInstance));
}
/**
@@ -457,9 +448,16 @@ protected SessionFactory createSessionFactory(
RetryLogic retryLogic,
Config config,
AuthTokenManager authTokenManager,
- HomeDatabaseCache homeDatabaseCache) {
+ HomeDatabaseCache homeDatabaseCache,
+ DriverObservationProvider observationProvider) {
return new SessionFactoryImpl(
- securityPlanManager, connectionProvider, retryLogic, config, authTokenManager, homeDatabaseCache);
+ securityPlanManager,
+ connectionProvider,
+ retryLogic,
+ config,
+ authTokenManager,
+ homeDatabaseCache,
+ observationProvider);
}
/**
diff --git a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
index 0c59a0644b..b830735d13 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/FailableCursor.java
@@ -17,17 +17,18 @@
package org.neo4j.driver.internal;
import java.util.concurrent.CompletionStage;
+import org.neo4j.driver.internal.observation.Observation;
public interface FailableCursor {
/**
* Discarding all unconsumed records and returning failure if there is any pull errors.
*/
- CompletionStage discardAllFailureAsync();
+ CompletionStage discardAllFailureAsync(Observation parentObservation);
/**
* Pulling all unconsumed records into memory and returning failure if there is any pull errors.
*/
- CompletionStage pullAllFailureAsync();
+ CompletionStage pullAllFailureAsync(Observation parentObservation);
CompletionStage consumed();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
index a656d92073..3092a46645 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
@@ -19,6 +19,7 @@
import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,7 +33,6 @@
import org.neo4j.driver.ExecutableQuery;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
-import org.neo4j.driver.Metrics;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.Session;
@@ -42,8 +42,7 @@
import org.neo4j.driver.exceptions.UnsupportedFeatureException;
import org.neo4j.driver.internal.async.InternalAsyncSession;
import org.neo4j.driver.internal.async.NetworkSession;
-import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
-import org.neo4j.driver.internal.metrics.MetricsProvider;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
import org.neo4j.driver.internal.security.BoltSecurityPlanManager;
import org.neo4j.driver.internal.util.Futures;
@@ -57,6 +56,7 @@ public class InternalDriver implements Driver {
BookmarkManagers.defaultManager(BookmarkManagerConfig.builder().build());
private final BoltSecurityPlanManager securityPlanManager;
private final SessionFactory sessionFactory;
+ private final DriverObservationProvider observationProvider;
@SuppressWarnings("deprecation")
private final Logger log;
@@ -64,19 +64,18 @@ public class InternalDriver implements Driver {
private final boolean telemetryDisabled;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final MetricsProvider metricsProvider;
InternalDriver(
BoltSecurityPlanManager securityPlanManager,
SessionFactory sessionFactory,
- MetricsProvider metricsProvider,
boolean telemetryDisabled,
- @SuppressWarnings("deprecation") Logging logging) {
+ @SuppressWarnings("deprecation") Logging logging,
+ DriverObservationProvider observationProvider) {
this.securityPlanManager = securityPlanManager;
this.sessionFactory = sessionFactory;
- this.metricsProvider = metricsProvider;
this.log = logging.getLog(getClass());
this.telemetryDisabled = telemetryDisabled;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
@@ -97,15 +96,15 @@ public T session(
requireNonNull(sessionClass, "sessionConfig must not be null");
T session;
if (Session.class.isAssignableFrom(sessionClass)) {
- session = (T) new InternalSession(newSession(sessionConfig, sessionAuthToken));
+ session = (T) new InternalSession(newSession(sessionConfig, sessionAuthToken), observationProvider);
} else if (AsyncSession.class.isAssignableFrom(sessionClass)) {
- session = (T) new InternalAsyncSession(newSession(sessionConfig, sessionAuthToken));
+ session = (T) new InternalAsyncSession(newSession(sessionConfig, sessionAuthToken), observationProvider);
} else if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) {
session = (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(
- newSession(sessionConfig, sessionAuthToken));
+ newSession(sessionConfig, sessionAuthToken), observationProvider);
} else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) {
session = (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(
- newSession(sessionConfig, sessionAuthToken));
+ newSession(sessionConfig, sessionAuthToken), observationProvider);
} else {
throw new IllegalArgumentException(
String.format("Unsupported session type '%s'", sessionClass.getCanonicalName()));
@@ -113,16 +112,6 @@ public T session(
return session;
}
- @Override
- public Metrics metrics() {
- return metricsProvider.metrics();
- }
-
- @Override
- public boolean isMetricsEnabled() {
- return metricsProvider != DevNullMetricsProvider.INSTANCE;
- }
-
@Override
public boolean isEncrypted() {
assertOpen();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
index 918bf47a52..121708606e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java
@@ -17,8 +17,10 @@
package org.neo4j.driver.internal;
import static java.util.Collections.emptyMap;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.observe;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import org.neo4j.bolt.connection.TelemetryApi;
import org.neo4j.driver.AccessMode;
@@ -32,14 +34,18 @@
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.async.NetworkSession;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.Futures;
public class InternalSession extends AbstractQueryRunner implements Session {
private final NetworkSession session;
+ private final DriverObservationProvider observationProvider;
- public InternalSession(NetworkSession session) {
+ public InternalSession(NetworkSession session, DriverObservationProvider observationProvider) {
this.session = session;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
@@ -59,13 +65,16 @@ public Result run(String query, Map parameters, TransactionConfi
@Override
public Result run(Query query, TransactionConfig config) {
- var cursor = Futures.blockingGet(
- session.runAsync(query, config),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while running query in session"));
-
- // query executed, it is safe to obtain a connection in a blocking way
- var connection = Futures.getNow(session.connectionAsync());
- return new InternalResult(connection, cursor);
+ var runObservation = observationProvider.sessionRun(Session.class, query.text(), query.parameters());
+ return observe(runObservation, () -> {
+ var cursor = Futures.blockingGet(
+ session.runAsync(query, config, runObservation, Result.class),
+ () -> terminateConnectionOnThreadInterrupt("Thread interrupted while running query in session"));
+
+ // query executed, it is safe to obtain a connection in a blocking way
+ var connection = Futures.getNow(session.connectionAsync());
+ return new InternalResult(connection, cursor);
+ });
}
@Override
@@ -75,9 +84,12 @@ public boolean isOpen() {
@Override
public void close() {
- Futures.blockingGet(
- session.closeAsync(),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while closing the session"));
+ var closeObservation = observationProvider.sessionClose(Session.class);
+ observe(
+ closeObservation,
+ () -> Futures.blockingGet(
+ session.closeAsync(closeObservation),
+ () -> terminateConnectionOnThreadInterrupt("Thread interrupted while closing the session")));
}
@Override
@@ -91,10 +103,14 @@ public Transaction beginTransaction(TransactionConfig config) {
}
public Transaction beginTransaction(TransactionConfig config, String txType) {
- var tx = Futures.blockingGet(
- session.beginTransactionAsync(config, txType, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION)),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
- return new InternalTransaction(tx);
+ var beginTransaction = observationProvider.beginTransaction(Transaction.class);
+ return observe(beginTransaction, () -> {
+ var tx = Futures.blockingGet(
+ session.beginTransactionAsync(
+ config, txType, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION), beginTransaction),
+ () -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
+ return new InternalTransaction(tx, observationProvider, null);
+ });
}
@Override
@@ -140,8 +156,9 @@ private T transaction(
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
var apiTelemetryWork = new ApiTelemetryWork(telemetryApi);
- return session.retryLogic().retry(() -> {
- try (var tx = beginTransaction(mode, config, apiTelemetryWork, flush)) {
+ var executeObservation = observationProvider.sessionExecute(Session.class, mode);
+ return observe(executeObservation, () -> session.retryLogic().retry(() -> {
+ try (var tx = beginTransaction(mode, config, apiTelemetryWork, flush, executeObservation)) {
var result = work.execute(new DelegatingTransactionContext(tx));
if (result instanceof Result) {
@@ -162,15 +179,19 @@ private T transaction(
}
return result;
}
- });
+ }));
}
- private Transaction beginTransaction(
- AccessMode mode, TransactionConfig config, ApiTelemetryWork apiTelemetryWork, boolean flush) {
+ private InternalTransaction beginTransaction(
+ AccessMode mode,
+ TransactionConfig config,
+ ApiTelemetryWork apiTelemetryWork,
+ boolean flush,
+ Observation parentObservation) {
var tx = Futures.blockingGet(
- session.beginTransactionAsync(mode, config, null, apiTelemetryWork, flush),
+ session.beginTransactionAsync(mode, config, null, apiTelemetryWork, flush, parentObservation),
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while starting a transaction"));
- return new InternalTransaction(tx);
+ return new InternalTransaction(tx, observationProvider, parentObservation);
}
private void terminateConnectionOnThreadInterrupt(String reason) {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
index ef328e5378..f8babc3cf3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java
@@ -16,46 +16,74 @@
*/
package org.neo4j.driver.internal;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.observe;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.neo4j.driver.Query;
import org.neo4j.driver.Result;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.util.Futures;
public class InternalTransaction extends AbstractQueryRunner implements Transaction {
private final UnmanagedTransaction tx;
+ private final DriverObservationProvider observationProvider;
+ private final Observation parentObservation;
- public InternalTransaction(UnmanagedTransaction tx) {
+ public InternalTransaction(
+ UnmanagedTransaction tx, DriverObservationProvider observationProvider, Observation parentObservation) {
this.tx = tx;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
+ this.parentObservation = parentObservation;
}
@Override
public void commit() {
- Futures.blockingGet(
- tx.commitAsync(),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while committing the transaction"));
+ observeWithParentOrSupplier(
+ parentObservation,
+ () -> observationProvider.transactionCommit(Transaction.class),
+ (observation) -> Futures.blockingGet(
+ tx.commitAsync(observation),
+ () -> terminateConnectionOnThreadInterrupt(
+ "Thread interrupted while committing the transaction")));
}
@Override
public void rollback() {
- Futures.blockingGet(
- tx.rollbackAsync(),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while rolling back the transaction"));
+ observeWithParentOrSupplier(
+ parentObservation,
+ () -> observationProvider.transactionRollback(Transaction.class),
+ (observation) -> Futures.blockingGet(
+ tx.rollbackAsync(observation),
+ () -> terminateConnectionOnThreadInterrupt(
+ "Thread interrupted while rolling back the transaction")));
}
@Override
public void close() {
- Futures.blockingGet(
- tx.closeAsync(),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while closing the transaction"));
+ observeWithParentOrSupplier(
+ parentObservation,
+ () -> observationProvider.transactionClose(Transaction.class),
+ (observation) -> Futures.blockingGet(
+ tx.closeAsync(observation),
+ () -> terminateConnectionOnThreadInterrupt(
+ "Thread interrupted while closing the transaction")));
}
@Override
public Result run(Query query) {
- var cursor = Futures.blockingGet(
- tx.runAsync(query),
- () -> terminateConnectionOnThreadInterrupt("Thread interrupted while running query in transaction"));
- return new InternalResult(null, cursor);
+ var runObservation = observationProvider.transactionRun(Transaction.class, query.text(), query.parameters());
+ return observe(runObservation, () -> {
+ var cursor = Futures.blockingGet(
+ tx.runAsync(query, runObservation, Result.class),
+ () -> terminateConnectionOnThreadInterrupt(
+ "Thread interrupted while running query in transaction"));
+ return new InternalResult(null, cursor);
+ });
}
@Override
@@ -69,9 +97,9 @@ public boolean isOpen() {
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
* transaction has not already been terminated, is not closed or closing.
*
- * @since 5.11
* @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
* @see org.neo4j.driver.exceptions.TransactionTerminatedException
+ * @since 5.11
*/
public void terminate() {
Futures.blockingGet(
@@ -82,4 +110,14 @@ public void terminate() {
private void terminateConnectionOnThreadInterrupt(String reason) {
tx.connection().forceClose(reason);
}
+
+ private static void observeWithParentOrSupplier(
+ Observation parentObservation, Supplier observationSupplier, Consumer runnable) {
+ if (parentObservation != null) {
+ runnable.accept(parentObservation);
+ } else {
+ var observation = observationSupplier.get();
+ observe(observation, () -> runnable.accept(observation));
+ }
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index 720fef697b..af973099fb 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -39,6 +39,7 @@
import org.neo4j.driver.internal.async.LeakLoggingNetworkSession;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.homedb.HomeDatabaseCache;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.security.BoltSecurityPlanManager;
@@ -54,6 +55,7 @@ public class SessionFactoryImpl implements SessionFactory {
private final long defaultFetchSize;
private final AuthTokenManager authTokenManager;
private final HomeDatabaseCache homeDatabaseCache;
+ private final DriverObservationProvider observationProvider;
@SuppressWarnings("deprecation")
SessionFactoryImpl(
@@ -62,7 +64,8 @@ public class SessionFactoryImpl implements SessionFactory {
RetryLogic retryLogic,
Config config,
AuthTokenManager authTokenManager,
- HomeDatabaseCache homeDatabaseCache) {
+ HomeDatabaseCache homeDatabaseCache,
+ DriverObservationProvider observationProvider) {
this.securityPlanManager = Objects.requireNonNull(securityPlanManager);
this.connectionSource = connectionSource;
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
@@ -71,6 +74,7 @@ public class SessionFactoryImpl implements SessionFactory {
this.defaultFetchSize = config.fetchSize();
this.authTokenManager = authTokenManager;
this.homeDatabaseCache = Objects.requireNonNull(homeDatabaseCache);
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@SuppressWarnings("deprecation")
@@ -172,7 +176,8 @@ private NetworkSession createSession(
authToken,
telemetryDisabled,
authTokenManager,
- homeDatabaseCache)
+ homeDatabaseCache,
+ observationProvider)
: new NetworkSession(
connectionProvider,
retryLogic,
@@ -187,7 +192,8 @@ private NetworkSession createSession(
authToken,
telemetryDisabled,
authTokenManager,
- homeDatabaseCache);
+ homeDatabaseCache,
+ observationProvider);
}
public DriverBoltConnectionSource getConnectionSource() {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java
index 2140f1a07b..8a6dcd3782 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnection.java
@@ -16,37 +16,65 @@
*/
package org.neo4j.driver.internal.adaptedbolt;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.neo4j.bolt.connection.AuthInfo;
import org.neo4j.bolt.connection.BoltConnection;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.message.Message;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
+import org.neo4j.driver.internal.observation.util.ObservationUtil;
import org.neo4j.driver.internal.value.BoltValueFactory;
final class AdaptingDriverBoltConnection implements DriverBoltConnection {
private final BoltConnection connection;
private final ErrorMapper errorMapper;
private final BoltValueFactory boltValueFactory;
+ private final List messageNames = new ArrayList<>();
+ private final DriverObservationProvider observationProvider;
AdaptingDriverBoltConnection(
- BoltConnection connection, ErrorMapper errorMapper, BoltValueFactory boltValueFactory) {
+ BoltConnection connection,
+ ErrorMapper errorMapper,
+ BoltValueFactory boltValueFactory,
+ DriverObservationProvider observationProvider) {
this.connection = Objects.requireNonNull(connection);
this.errorMapper = Objects.requireNonNull(errorMapper);
this.boltValueFactory = Objects.requireNonNull(boltValueFactory);
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
- public CompletionStage writeAndFlush(DriverResponseHandler handler, List messages) {
+ public CompletionStage writeAndFlush(
+ DriverResponseHandler handler, List messages, Observation parentObservation) {
+ var handleObservation = ObservationUtil.scoped(observationProvider, parentObservation, () -> observationProvider
+ .boltHandle(appendAndClearMessages(messages))
+ .start());
return connection
- .writeAndFlush(new AdaptingDriverResponseHandler(handler, errorMapper, boltValueFactory), messages)
- .exceptionally(errorMapper::mapAndThrow);
+ .writeAndFlush(
+ new AdaptingDriverResponseHandler(handler, errorMapper, boltValueFactory, handleObservation),
+ messages,
+ new BoltObservation(handleObservation))
+ .exceptionally(throwable -> {
+ var mappedThrowable = errorMapper.map(throwable);
+ handleObservation.error(mappedThrowable);
+ handleObservation.stop();
+ if (mappedThrowable instanceof RuntimeException runtimeException) {
+ throw runtimeException;
+ } else {
+ throw new CompletionException(mappedThrowable);
+ }
+ });
}
@Override
public CompletionStage write(List messages) {
+ appendMessages(messages);
return connection.write(messages).exceptionally(errorMapper::mapAndThrow);
}
@@ -94,4 +122,15 @@ public boolean serverSideRoutingEnabled() {
public BoltValueFactory valueFactory() {
return boltValueFactory;
}
+
+ private synchronized List appendAndClearMessages(List messages) {
+ appendMessages(messages);
+ var messageNames = List.copyOf(this.messageNames);
+ this.messageNames.clear();
+ return messageNames;
+ }
+
+ private synchronized void appendMessages(List messages) {
+ messages.forEach(message -> messageNames.add(message.name()));
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionSource.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionSource.java
index 9a3a66524c..24da0d3590 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionSource.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverBoltConnectionSource.java
@@ -16,10 +16,14 @@
*/
package org.neo4j.driver.internal.adaptedbolt;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.scoped;
+
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import org.neo4j.bolt.connection.BoltConnectionSource;
import org.neo4j.bolt.connection.RoutedBoltConnectionParameters;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.value.BoltValueFactory;
public class AdaptingDriverBoltConnectionSource implements DriverBoltConnectionSource {
@@ -27,28 +31,33 @@ public class AdaptingDriverBoltConnectionSource implements DriverBoltConnectionS
private final ErrorMapper errorMapper;
private final BoltValueFactory boltValueFactory;
private final boolean routed;
+ private final DriverObservationProvider observationProvider;
public AdaptingDriverBoltConnectionSource(
BoltConnectionSource delegate,
ErrorMapper errorMapper,
BoltValueFactory boltValueFactory,
- boolean routed) {
+ boolean routed,
+ DriverObservationProvider observationProvider) {
this.delegate = Objects.requireNonNull(delegate);
this.errorMapper = Objects.requireNonNull(errorMapper);
this.boltValueFactory = Objects.requireNonNull(boltValueFactory);
this.routed = routed;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
- public CompletionStage getConnection(RoutedBoltConnectionParameters parameters) {
- return delegate.getConnection(parameters)
+ public CompletionStage getConnection(
+ RoutedBoltConnectionParameters parameters, Observation parentObservation) {
+ return scoped(observationProvider, parentObservation, () -> delegate.getConnection(parameters))
.exceptionally(errorMapper::mapAndThrow)
.thenApply(boltConnection -> new AdaptingDriverBoltConnection(
boltConnection,
routed || boltConnection.serverSideRoutingEnabled()
? new RoutedErrorMapper(boltConnection.serverAddress(), parameters.accessMode())
: errorMapper,
- boltValueFactory));
+ boltValueFactory,
+ observationProvider));
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverResponseHandler.java
index 91e1369c37..eb73213a0f 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/AdaptingDriverResponseHandler.java
@@ -20,6 +20,17 @@
import java.util.Map;
import java.util.Objects;
import org.neo4j.bolt.connection.ResponseHandler;
+import org.neo4j.bolt.connection.message.BeginMessage;
+import org.neo4j.bolt.connection.message.CommitMessage;
+import org.neo4j.bolt.connection.message.DiscardMessage;
+import org.neo4j.bolt.connection.message.LogoffMessage;
+import org.neo4j.bolt.connection.message.LogonMessage;
+import org.neo4j.bolt.connection.message.PullMessage;
+import org.neo4j.bolt.connection.message.ResetMessage;
+import org.neo4j.bolt.connection.message.RollbackMessage;
+import org.neo4j.bolt.connection.message.RouteMessage;
+import org.neo4j.bolt.connection.message.RunMessage;
+import org.neo4j.bolt.connection.message.TelemetryMessage;
import org.neo4j.bolt.connection.summary.BeginSummary;
import org.neo4j.bolt.connection.summary.CommitSummary;
import org.neo4j.bolt.connection.summary.DiscardSummary;
@@ -32,32 +43,42 @@
import org.neo4j.bolt.connection.summary.RunSummary;
import org.neo4j.bolt.connection.summary.TelemetrySummary;
import org.neo4j.driver.Value;
+import org.neo4j.driver.internal.observation.BoltHandleObservation;
import org.neo4j.driver.internal.value.BoltValueFactory;
final class AdaptingDriverResponseHandler implements ResponseHandler {
private final DriverResponseHandler delegate;
private final ErrorMapper errorMapper;
private final BoltValueFactory boltValueFactory;
+ private final BoltHandleObservation observation;
AdaptingDriverResponseHandler(
- DriverResponseHandler delegate, ErrorMapper errorMapper, BoltValueFactory boltValueFactory) {
+ DriverResponseHandler delegate,
+ ErrorMapper errorMapper,
+ BoltValueFactory boltValueFactory,
+ BoltHandleObservation observation) {
this.delegate = Objects.requireNonNull(delegate);
this.errorMapper = Objects.requireNonNull(errorMapper);
this.boltValueFactory = Objects.requireNonNull(boltValueFactory);
+ this.observation = Objects.requireNonNull(observation);
}
@Override
public void onError(Throwable throwable) {
- delegate.onError(errorMapper.map(throwable));
+ throwable = errorMapper.map(throwable);
+ observation.error(throwable);
+ delegate.onError(throwable);
}
@Override
public void onBeginSummary(BeginSummary summary) {
+ observation.onSummary(BeginMessage.NAME);
delegate.onBeginSummary(summary);
}
@Override
public void onRunSummary(RunSummary summary) {
+ observation.onSummary(RunMessage.NAME);
delegate.onRunSummary(summary);
}
@@ -68,6 +89,7 @@ public void onRecord(List fields) {
@Override
public void onPullSummary(PullSummary summary) {
+ observation.onSummary(PullMessage.NAME);
delegate.onPullSummary(new org.neo4j.driver.internal.adaptedbolt.summary.PullSummary() {
@Override
public boolean hasMore() {
@@ -83,51 +105,61 @@ public Map metadata() {
@Override
public void onDiscardSummary(DiscardSummary summary) {
+ observation.onSummary(DiscardMessage.NAME);
delegate.onDiscardSummary(() -> boltValueFactory.toDriverMap(summary.metadata()));
}
@Override
public void onCommitSummary(CommitSummary summary) {
+ observation.onSummary(CommitMessage.NAME);
delegate.onCommitSummary(summary);
}
@Override
public void onRollbackSummary(RollbackSummary summary) {
+ observation.onSummary(RollbackMessage.NAME);
delegate.onRollbackSummary(summary);
}
@Override
public void onResetSummary(ResetSummary summary) {
+ observation.onSummary(ResetMessage.NAME);
delegate.onResetSummary(summary);
}
@Override
public void onRouteSummary(RouteSummary summary) {
+ observation.onSummary(RouteMessage.NAME);
delegate.onRouteSummary(summary);
}
@Override
public void onLogoffSummary(LogoffSummary summary) {
+ observation.onSummary(LogoffMessage.NAME);
delegate.onLogoffSummary(summary);
}
@Override
public void onLogonSummary(LogonSummary summary) {
+ observation.onSummary(LogonMessage.NAME);
delegate.onLogonSummary(summary);
}
@Override
public void onTelemetrySummary(TelemetrySummary summary) {
+ observation.onSummary(TelemetryMessage.NAME);
delegate.onTelemetrySummary(summary);
}
@Override
public void onIgnored() {
+ observation.onIgnored();
delegate.onIgnored();
}
@Override
public void onComplete() {
+ observation.stop();
delegate.onComplete();
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltExchangeObservationImpl.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltExchangeObservationImpl.java
new file mode 100644
index 0000000000..788f1f1acd
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltExchangeObservationImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [https://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.adaptedbolt;
+
+import org.neo4j.bolt.connection.observation.BoltExchangeObservation;
+
+final class BoltExchangeObservationImpl extends BoltObservation implements BoltExchangeObservation {
+ private final org.neo4j.driver.internal.observation.BoltExchangeObservation delegate;
+
+ BoltExchangeObservationImpl(org.neo4j.driver.internal.observation.BoltExchangeObservation delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public BoltExchangeObservation onWrite(String messageName) {
+ delegate.onWrite(messageName);
+ return this;
+ }
+
+ @Override
+ public BoltExchangeObservation onRecord() {
+ delegate.onRecord();
+ return this;
+ }
+
+ @Override
+ public BoltExchangeObservation onSummary(String messageName) {
+ delegate.onSummary(messageName);
+ return this;
+ }
+
+ @Override
+ public BoltExchangeObservation error(Throwable error) {
+ delegate.error(error);
+ return this;
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservation.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservation.java
new file mode 100644
index 0000000000..24721992a9
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservation.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [https://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.adaptedbolt;
+
+import java.util.Objects;
+import org.neo4j.bolt.connection.observation.Observation;
+
+class BoltObservation implements Observation {
+ private final org.neo4j.driver.internal.observation.Observation delegate;
+
+ BoltObservation(org.neo4j.driver.internal.observation.Observation delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ @Override
+ public Observation error(Throwable error) {
+ delegate.error(error);
+ return this;
+ }
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ }
+
+ org.neo4j.driver.internal.observation.Observation delegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ var that = (BoltObservation) o;
+ return Objects.equals(delegate, that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(delegate);
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservationProvider.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservationProvider.java
new file mode 100644
index 0000000000..a7dceb434d
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/BoltObservationProvider.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [https://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.adaptedbolt;
+
+import java.net.URI;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import org.neo4j.bolt.connection.BoltProtocolVersion;
+import org.neo4j.bolt.connection.observation.BoltExchangeObservation;
+import org.neo4j.bolt.connection.observation.HttpExchangeObservation;
+import org.neo4j.bolt.connection.observation.ImmutableObservation;
+import org.neo4j.bolt.connection.observation.Observation;
+import org.neo4j.bolt.connection.pooled.observation.PoolObservationProvider;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+
+public final class BoltObservationProvider implements PoolObservationProvider {
+ private final DriverObservationProvider delegate;
+
+ public BoltObservationProvider(DriverObservationProvider delegate) {
+ this.delegate = Objects.requireNonNull(delegate);
+ }
+
+ @Override
+ public Observation connectionPoolCreate(String id, URI uri, int maxSize) {
+ return new BoltObservation(
+ delegate.connectionPoolCreate(id, uri, maxSize).start());
+ }
+
+ @Override
+ public Observation connectionPoolClose(String id, URI uri) {
+ return new BoltObservation(delegate.connectionPoolClose(id, uri).start());
+ }
+
+ @Override
+ public Observation pooledConnectionCreate(String id, URI uri) {
+ return new BoltObservation(delegate.pooledConnectionCreate(id, uri).start());
+ }
+
+ @Override
+ public Observation pooledConnectionClose(String id, URI uri) {
+ return new BoltObservation(delegate.pooledConnectionClose(id, uri).start());
+ }
+
+ @Override
+ public Observation pooledConnectionAcquire(String id, URI uri) {
+ return new BoltObservation(delegate.pooledConnectionAcquire(id, uri).start());
+ }
+
+ @Override
+ public Observation pooledConnectionInUse(ImmutableObservation parentObsevation, String id, URI uri) {
+ return supplyInScope(
+ parentObsevation,
+ () -> new BoltObservation(
+ delegate.pooledConnectionInUse(id, uri).start()));
+ }
+
+ @Override
+ public BoltExchangeObservation boltExchange(
+ ImmutableObservation parentObsevation,
+ String host,
+ int port,
+ BoltProtocolVersion boltVersion,
+ BiConsumer setter) {
+ return supplyInScope(
+ parentObsevation,
+ () -> new BoltExchangeObservationImpl(
+ delegate.boltExchange(host, port, boltVersion, setter).start()));
+ }
+
+ @Override
+ public HttpExchangeObservation httpExchange(
+ ImmutableObservation parentObsevation,
+ URI uri,
+ String method,
+ String uriTemplate,
+ BiConsumer setter) {
+ return supplyInScope(
+ parentObsevation,
+ () -> new HttpExchangeObservationImpl(
+ delegate.httpExchange(uri, method, uriTemplate, setter).start()));
+ }
+
+ @Override
+ public Observation scopedObservation() {
+ var observation = delegate.scopedObservation();
+ return observation != null ? new BoltObservation(observation) : null;
+ }
+
+ @Override
+ public T supplyInScope(ImmutableObservation observation, Supplier supplier) {
+ if (observation instanceof BoltObservation boltObservation) {
+ var scopedObservation = scopedObservation();
+ if (observation.equals(scopedObservation)) {
+ return supplier.get();
+ } else {
+ try (var scope = boltObservation.delegate().openScope()) {
+ return supplier.get();
+ }
+ }
+ } else {
+ return supplier.get();
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java
index 1c995908b6..cf993bdea8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnection.java
@@ -22,14 +22,17 @@
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.message.Message;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.value.BoltValueFactory;
public interface DriverBoltConnection {
- default CompletionStage writeAndFlush(DriverResponseHandler handler, Message messages) {
- return writeAndFlush(handler, List.of(messages));
+ default CompletionStage writeAndFlush(
+ DriverResponseHandler handler, Message messages, Observation parentObservation) {
+ return writeAndFlush(handler, List.of(messages), parentObservation);
}
- CompletionStage writeAndFlush(DriverResponseHandler handler, List messages);
+ CompletionStage writeAndFlush(
+ DriverResponseHandler handler, List messages, Observation parentObservation);
CompletionStage write(List messages);
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionSource.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionSource.java
index b0d9e4a0fe..74ad48fd46 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionSource.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/DriverBoltConnectionSource.java
@@ -18,9 +18,11 @@
import java.util.concurrent.CompletionStage;
import org.neo4j.bolt.connection.RoutedBoltConnectionParameters;
+import org.neo4j.driver.internal.observation.Observation;
public interface DriverBoltConnectionSource {
- CompletionStage getConnection(RoutedBoltConnectionParameters parameters);
+ CompletionStage getConnection(
+ RoutedBoltConnectionParameters parameters, Observation parentObservation);
CompletionStage verifyConnectivity();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/HttpExchangeObservationImpl.java b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/HttpExchangeObservationImpl.java
new file mode 100644
index 0000000000..245061db79
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/adaptedbolt/HttpExchangeObservationImpl.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [https://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.adaptedbolt;
+
+import java.util.List;
+import java.util.Map;
+import org.neo4j.bolt.connection.observation.HttpExchangeObservation;
+
+final class HttpExchangeObservationImpl extends BoltObservation implements HttpExchangeObservation {
+ private final org.neo4j.driver.internal.observation.HttpExchangeObservation delegate;
+
+ HttpExchangeObservationImpl(org.neo4j.driver.internal.observation.HttpExchangeObservation delegate) {
+ super(delegate);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public HttpExchangeObservation onHeaders(Map> headers) {
+ delegate.onHeaders(headers);
+ return this;
+ }
+
+ @Override
+ public HttpExchangeObservation onResponse(Response response) {
+ delegate.onResponse(new org.neo4j.driver.internal.observation.HttpExchangeObservation.Response() {
+ @Override
+ public int statusCode() {
+ return response.statusCode();
+ }
+
+ @Override
+ public Map> headers() {
+ return response.headers();
+ }
+
+ @Override
+ public String httpVersion() {
+ return response.httpVersion();
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public HttpExchangeObservation error(Throwable error) {
+ delegate.error(error);
+ return this;
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BoltConnectionWithAuthTokenManager.java b/driver/src/main/java/org/neo4j/driver/internal/async/BoltConnectionWithAuthTokenManager.java
index 9a53b00ba3..1dd1a35ea2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/BoltConnectionWithAuthTokenManager.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/BoltConnectionWithAuthTokenManager.java
@@ -25,6 +25,7 @@
import org.neo4j.driver.exceptions.SecurityRetryableException;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.security.InternalAuthToken;
import org.neo4j.driver.internal.value.BoltValueFactory;
@@ -37,8 +38,10 @@ public BoltConnectionWithAuthTokenManager(DriverBoltConnection delegate, AuthTok
}
@Override
- public CompletionStage writeAndFlush(DriverResponseHandler handler, List messages) {
- return delegate.writeAndFlush(new ErrorMappingResponseHandler(handler, this::mapSecurityError), messages);
+ public CompletionStage writeAndFlush(
+ DriverResponseHandler handler, List messages, Observation parentObservation) {
+ return delegate.writeAndFlush(
+ new ErrorMappingResponseHandler(handler, this::mapSecurityError), messages, parentObservation);
}
private Throwable mapSecurityError(Throwable throwable) {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java
index 5b16db0a29..fe68463729 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingBoltConnection.java
@@ -25,6 +25,7 @@
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.value.BoltValueFactory;
public abstract class DelegatingBoltConnection implements DriverBoltConnection {
@@ -35,8 +36,9 @@ protected DelegatingBoltConnection(DriverBoltConnection delegate) {
}
@Override
- public CompletionStage writeAndFlush(DriverResponseHandler handler, List messages) {
- return delegate.writeAndFlush(handler, messages);
+ public CompletionStage writeAndFlush(
+ DriverResponseHandler handler, List messages, Observation parentObservation) {
+ return delegate.writeAndFlush(handler, messages, parentObservation);
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
index 0aa035dae3..ad5b268617 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java
@@ -17,10 +17,13 @@
package org.neo4j.driver.internal.async;
import static java.util.Collections.emptyMap;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.observeAsync;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.scoped;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -35,14 +38,18 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.GqlStatusError;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.Futures;
public class InternalAsyncSession extends AsyncAbstractQueryRunner implements AsyncSession {
private final NetworkSession session;
+ private final DriverObservationProvider observationProvider;
- public InternalAsyncSession(NetworkSession session) {
+ public InternalAsyncSession(NetworkSession session, DriverObservationProvider observationProvider) {
this.session = session;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
@@ -63,12 +70,14 @@ public CompletionStage runAsync(
@Override
public CompletionStage runAsync(Query query, TransactionConfig config) {
- return session.runAsync(query, config);
+ var runObservation = observationProvider.sessionRun(AsyncSession.class, query.text(), query.parameters());
+ return observeAsync(runObservation, () -> session.runAsync(query, config, runObservation, ResultCursor.class));
}
@Override
public CompletionStage closeAsync() {
- return session.closeAsync();
+ var closeObservation = observationProvider.sessionClose(AsyncSession.class);
+ return observeAsync(closeObservation, () -> session.closeAsync(closeObservation));
}
@Override
@@ -78,8 +87,10 @@ public CompletionStage beginTransactionAsync() {
@Override
public CompletionStage beginTransactionAsync(TransactionConfig config) {
- return session.beginTransactionAsync(config, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION))
- .thenApply(InternalAsyncTransaction::new);
+ var beginObservation = observationProvider.beginTransaction(AsyncTransaction.class);
+ return observeAsync(beginObservation, () -> session.beginTransactionAsync(
+ config, new ApiTelemetryWork(TelemetryApi.UNMANAGED_TRANSACTION), beginObservation)
+ .thenApply(tx -> new InternalAsyncTransaction(tx, observationProvider, null)));
}
@Override
@@ -102,32 +113,34 @@ public Set lastBookmarks() {
private CompletionStage transactionAsync(
AccessMode mode, AsyncTransactionCallback> work, TransactionConfig config) {
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
- return session.retryLogic().retryAsync(() -> {
+ var executeObservation = observationProvider.sessionExecute(AsyncSession.class, mode);
+ return observeAsync(executeObservation, () -> session.retryLogic().retryAsync(() -> {
var resultFuture = new CompletableFuture();
- var txFuture = session.beginTransactionAsync(mode, config, apiTelemetryWork);
+ var txFuture = session.beginTransactionAsync(mode, config, apiTelemetryWork, executeObservation);
txFuture.whenComplete((tx, completionError) -> {
var error = Futures.completionExceptionCause(completionError);
if (error != null) {
resultFuture.completeExceptionally(error);
} else {
- executeWork(resultFuture, tx, work);
+ executeWork(resultFuture, tx, work, executeObservation);
}
});
return resultFuture;
- });
+ }));
}
private void executeWork(
CompletableFuture resultFuture,
UnmanagedTransaction tx,
- AsyncTransactionCallback> work) {
- var workFuture = safeExecuteWork(tx, work);
+ AsyncTransactionCallback> work,
+ Observation parentObservation) {
+ var workFuture = safeExecuteWork(tx, work, parentObservation);
workFuture.whenComplete((result, completionError) -> {
var error = Futures.completionExceptionCause(completionError);
if (error != null) {
- closeTxAfterFailedTransactionWork(tx, resultFuture, error);
+ closeTxAfterFailedTransactionWork(tx, resultFuture, error, parentObservation);
} else if (result instanceof ResultCursor) {
var message = String.format(
"%s is not a valid return value, it should be consumed before producing a return value",
@@ -139,20 +152,25 @@ private void executeWork(
message,
GqlStatusError.DIAGNOSTIC_RECORD,
null);
- closeTxAfterFailedTransactionWork(tx, resultFuture, error);
+ closeTxAfterFailedTransactionWork(tx, resultFuture, error, parentObservation);
} else {
- closeTxAfterSucceededTransactionWork(tx, resultFuture, result);
+ closeTxAfterSucceededTransactionWork(tx, resultFuture, result, parentObservation);
}
});
}
private CompletionStage safeExecuteWork(
- UnmanagedTransaction tx, AsyncTransactionCallback> work) {
+ UnmanagedTransaction tx, AsyncTransactionCallback> work, Observation parentObservation) {
// given work might fail in both async and sync way
// async failure will result in a failed future being returned
// sync failure will result in an exception being thrown
try {
- var result = work.execute(new DelegatingAsyncTransactionContext(new InternalAsyncTransaction(tx)));
+ var result = scoped(
+ observationProvider,
+ parentObservation,
+ () -> work.execute(new DelegatingAsyncTransactionContext(
+ new InternalAsyncTransaction(tx, observationProvider, parentObservation))));
+ ;
// protect from given transaction function returning null
return result == null ? completedWithNull() : result;
@@ -163,8 +181,11 @@ private CompletionStage safeExecuteWork(
}
private void closeTxAfterFailedTransactionWork(
- UnmanagedTransaction tx, CompletableFuture resultFuture, Throwable error) {
- tx.closeAsync().whenComplete((ignored, rollbackError) -> {
+ UnmanagedTransaction tx,
+ CompletableFuture resultFuture,
+ Throwable error,
+ Observation parentObservation) {
+ tx.closeAsync(parentObservation).whenComplete((ignored, rollbackError) -> {
if (rollbackError != null) {
error.addSuppressed(rollbackError);
}
@@ -173,8 +194,8 @@ private void closeTxAfterFailedTransactionWork(
}
private void closeTxAfterSucceededTransactionWork(
- UnmanagedTransaction tx, CompletableFuture resultFuture, T result) {
- tx.closeAsync(true).whenComplete((ignored, completionError) -> {
+ UnmanagedTransaction tx, CompletableFuture resultFuture, T result, Observation parentObservation) {
+ tx.closeAsync(true, parentObservation).whenComplete((ignored, completionError) -> {
var commitError = Futures.completionExceptionCause(completionError);
if (commitError != null) {
resultFuture.completeExceptionally(commitError);
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java
index 9e3c78ca89..82d570b32d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java
@@ -16,32 +16,43 @@
*/
package org.neo4j.driver.internal.async;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.observeAsync;
+
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.Query;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.ResultCursor;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.Observation;
public class InternalAsyncTransaction extends AsyncAbstractQueryRunner implements AsyncTransaction {
private final UnmanagedTransaction tx;
+ private final DriverObservationProvider observationProvider;
- public InternalAsyncTransaction(UnmanagedTransaction tx) {
+ public InternalAsyncTransaction(
+ UnmanagedTransaction tx, DriverObservationProvider observationProvider, Observation parentObservation) {
this.tx = tx;
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
@Override
public CompletionStage commitAsync() {
- return tx.commitAsync();
+ var commitObservation = observationProvider.transactionCommit(AsyncTransaction.class);
+ return observeAsync(commitObservation, () -> tx.commitAsync(commitObservation));
}
@Override
public CompletionStage rollbackAsync() {
- return tx.rollbackAsync();
+ var rollbackObservation = observationProvider.transactionRollback(AsyncTransaction.class);
+ return observeAsync(rollbackObservation, () -> tx.rollbackAsync(rollbackObservation));
}
@Override
public CompletionStage closeAsync() {
- return tx.closeAsync();
+ var closeObservation = observationProvider.transactionClose(AsyncTransaction.class);
+ return observeAsync(closeObservation, () -> tx.closeAsync(closeObservation));
}
@Override
@@ -51,7 +62,9 @@ public CompletionStage isOpenAsync() {
@Override
public CompletionStage runAsync(Query query) {
- return tx.runAsync(query);
+ var runObservation =
+ observationProvider.transactionRun(AsyncTransaction.class, query.text(), query.parameters());
+ return observeAsync(runObservation, () -> tx.runAsync(query, runObservation, ResultCursor.class));
}
public boolean isOpen() {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
index 05e4541d20..f16ce28a9e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java
@@ -31,6 +31,7 @@
import org.neo4j.driver.NotificationConfig;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnectionSource;
import org.neo4j.driver.internal.homedb.HomeDatabaseCache;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.util.Futures;
@@ -51,7 +52,8 @@ public LeakLoggingNetworkSession(
AuthToken overrideAuthToken,
boolean telemetryDisabled,
AuthTokenManager authTokenManager,
- HomeDatabaseCache homeDatabaseCache) {
+ HomeDatabaseCache homeDatabaseCache,
+ DriverObservationProvider observationProvider) {
super(
connectionProvider,
retryLogic,
@@ -66,7 +68,8 @@ public LeakLoggingNetworkSession(
overrideAuthToken,
telemetryDisabled,
authTokenManager,
- homeDatabaseCache);
+ homeDatabaseCache,
+ observationProvider);
this.stackTrace = captureStackTrace();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
index cf37c34c27..7cb0aa7420 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java
@@ -80,6 +80,9 @@
import org.neo4j.driver.internal.homedb.HomeDatabaseCache;
import org.neo4j.driver.internal.homedb.HomeDatabaseCacheKey;
import org.neo4j.driver.internal.logging.PrefixedLogger;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.NoopObservation;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.security.InternalAuthToken;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
@@ -112,6 +115,7 @@ public class NetworkSession {
private final AuthTokenManager authTokenManager;
private final HomeDatabaseCache homeDatabaseCache;
private final HomeDatabaseCacheKey homeDatabaseKey;
+ private final DriverObservationProvider observationProvider;
public NetworkSession(
DriverBoltConnectionSource boltConnectionProvider,
@@ -127,7 +131,8 @@ public NetworkSession(
AuthToken overrideAuthToken,
boolean telemetryDisabled,
AuthTokenManager authTokenManager,
- HomeDatabaseCache homeDatabaseCache) {
+ HomeDatabaseCache homeDatabaseCache,
+ DriverObservationProvider observationProvider) {
Objects.requireNonNull(bookmarks, "bookmarks may not be null");
Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null");
this.boltConnectionProvider = Objects.requireNonNull(boltConnectionProvider);
@@ -149,12 +154,14 @@ public NetworkSession(
this.authTokenManager = authTokenManager;
this.homeDatabaseCache = Objects.requireNonNull(homeDatabaseCache);
this.homeDatabaseKey = HomeDatabaseCacheKey.of(overrideAuthToken, impersonatedUser);
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
- public CompletionStage runAsync(Query query, TransactionConfig config) {
+ public CompletionStage runAsync(
+ Query query, TransactionConfig config, Observation parentObservation, Class> resultType) {
ensureSessionIsOpen();
var disposable = ensureNoOpenTxBeforeRunningQuery()
- .thenCompose(ignore -> acquireConnection(mode))
+ .thenCompose(ignore -> acquireConnection(mode, parentObservation))
.thenCompose(connection -> {
var parameters = query.parameters().asMap(Values::value);
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.AUTO_COMMIT_TRANSACTION);
@@ -167,7 +174,9 @@ public CompletionStage runAsync(Query query, TransactionConfig con
true,
null,
this::handleDatabaseName,
- null);
+ null,
+ observationProvider,
+ resultType);
var cursorStage = CompletableFuture.completedStage(null)
.thenCompose(ignored -> {
var messages = new ArrayList(3);
@@ -176,7 +185,7 @@ public CompletionStage runAsync(Query query, TransactionConfig con
.ifPresent(messages::add);
messages.add(newRunMessage(connection, query, parameters, config));
messages.add(Messages.pull(-1, fetchSize));
- return connection.writeAndFlush(resultCursor, messages);
+ return connection.writeAndFlush(resultCursor, messages, parentObservation);
})
.thenCompose(ignored -> resultCursor.resultCursor())
.handle((resultCursorImpl, throwable) -> {
@@ -205,10 +214,13 @@ public CompletionStage runAsync(Query query, TransactionConfig con
}
public CompletionStage runRx(
- Query query, TransactionConfig config, CompletionStage cursorPublishStage) {
+ Query query,
+ TransactionConfig config,
+ CompletionStage cursorPublishStage,
+ Observation parentObservation) {
ensureSessionIsOpen();
var newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
- .thenCompose(ignore -> acquireConnection(mode))
+ .thenCompose(ignore -> acquireConnection(mode, parentObservation))
.thenCompose(connection -> {
var parameters = query.parameters().asMap(Values::value);
var apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.AUTO_COMMIT_TRANSACTION);
@@ -223,7 +235,7 @@ public CompletionStage runRx(
.getTelemetryMessageIfEnabled(connection)
.ifPresent(messages::add);
messages.add(newRunMessage(connection, query, parameters, config));
- return connection.writeAndFlush(responseHandler, messages);
+ return connection.writeAndFlush(responseHandler, messages, parentObservation);
})
.thenCompose(ignored -> responseHandler.cursorFuture)
.handle((resultCursor, throwable) -> {
@@ -255,18 +267,21 @@ public CompletionStage runRx(
}
public CompletionStage beginTransactionAsync(
- TransactionConfig config, ApiTelemetryWork apiTelemetryWork) {
- return beginTransactionAsync(mode, config, null, apiTelemetryWork, true);
+ TransactionConfig config, ApiTelemetryWork apiTelemetryWork, Observation parentObservation) {
+ return beginTransactionAsync(mode, config, null, apiTelemetryWork, true, parentObservation);
}
public CompletionStage beginTransactionAsync(
- TransactionConfig config, String txType, ApiTelemetryWork apiTelemetryWork) {
- return this.beginTransactionAsync(mode, config, txType, apiTelemetryWork, true);
+ TransactionConfig config, String txType, ApiTelemetryWork apiTelemetryWork, Observation parentObservation) {
+ return this.beginTransactionAsync(mode, config, txType, apiTelemetryWork, true, parentObservation);
}
public CompletionStage beginTransactionAsync(
- org.neo4j.driver.AccessMode mode, TransactionConfig config, ApiTelemetryWork apiTelemetryWork) {
- return beginTransactionAsync(mode, config, null, apiTelemetryWork, true);
+ org.neo4j.driver.AccessMode mode,
+ TransactionConfig config,
+ ApiTelemetryWork apiTelemetryWork,
+ Observation parentObservation) {
+ return beginTransactionAsync(mode, config, null, apiTelemetryWork, true, parentObservation);
}
public CompletionStage beginTransactionAsync(
@@ -274,14 +289,15 @@ public CompletionStage beginTransactionAsync(
TransactionConfig config,
String txType,
ApiTelemetryWork apiTelemetryWork,
- boolean flush) {
+ boolean flush,
+ Observation parentObservation) {
ensureSessionIsOpen();
apiTelemetryWork.setEnabled(!telemetryDisabled);
// create a chain that acquires connection and starts a transaction
var newTransactionStage = ensureNoOpenTxBeforeStartingTx()
- .thenCompose(ignore -> acquireConnection(mode))
+ .thenCompose(ignore -> acquireConnection(mode, parentObservation))
.thenCompose(connection -> {
var tx = new UnmanagedTransaction(
connection,
@@ -293,8 +309,9 @@ public CompletionStage beginTransactionAsync(
notificationConfig,
apiTelemetryWork,
this::handleDatabaseName,
- logging);
- return tx.beginAsync(determineBookmarks(true), config, txType, flush);
+ logging,
+ observationProvider);
+ return tx.beginAsync(determineBookmarks(true), config, txType, flush, parentObservation);
});
// update the reference to the only known transaction
@@ -338,7 +355,8 @@ public void onComplete() {
future.complete(null);
}
},
- Messages.reset())
+ Messages.reset(),
+ NoopObservation.getInstance())
.thenCompose(ignored -> future);
} else {
return completedWithNull();
@@ -373,18 +391,18 @@ public boolean isOpen() {
return open.get();
}
- public CompletionStage closeAsync() {
+ public CompletionStage closeAsync(Observation parentObservation) {
if (open.compareAndSet(true, false)) {
return resultCursorStage
.thenCompose(cursor -> {
if (cursor != null) {
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
- return cursor.discardAllFailureAsync();
+ return cursor.discardAllFailureAsync(parentObservation);
}
// no result cursor exists so no error exists
return completedWithNull();
})
- .thenCompose(cursorError -> closeTransactionAndReleaseConnection()
+ .thenCompose(cursorError -> closeTransactionAndReleaseConnection(parentObservation)
.thenApply(txCloseError -> {
// now we have cursor error, active transaction has been closed and connection has been
// released
@@ -412,11 +430,12 @@ private void handleDatabaseName(String name) {
homeDatabaseCache.put(homeDatabaseKey, name);
}
- private CompletionStage acquireConnection(AccessMode mode) {
+ private CompletionStage acquireConnection(
+ AccessMode mode, Observation parentObservation) {
var overrideAuthToken = connectionContext.overrideAuthToken();
var authTokenManager = overrideAuthToken != null ? NoopAuthTokenManager.INSTANCE : this.authTokenManager;
- var newConnectionStage = pulledResultCursorStage(connectionStage)
- .thenCompose(ignored -> acquireAdaptedConnection(mode))
+ var newConnectionStage = pulledResultCursorStage(connectionStage, parentObservation)
+ .thenCompose(ignored -> acquireAdaptedConnection(mode, parentObservation))
.thenApply(connection ->
(DriverBoltConnection) new BoltConnectionWithAuthTokenManager(connection, authTokenManager))
.thenApply(BoltConnectionWithCloseTracking::new)
@@ -458,7 +477,8 @@ private BoltConnectionWithCloseTracking mapAcquisitionError(Throwable throwable)
}
}
- private CompletionStage acquireAdaptedConnection(AccessMode mode) {
+ private CompletionStage acquireAdaptedConnection(
+ AccessMode mode, Observation parentObservation) {
var databaseName = connectionContext.databaseNameFuture().getNow(null);
var impersonatedUser = connectionContext.impersonatedUser();
var minVersion = minBoltVersion(connectionContext);
@@ -491,18 +511,18 @@ private CompletionStage acquireAdaptedConnection(AccessMod
.withBookmarks(bookmarks)
.withImpersonatedUser(impersonatedUser)
.build();
- return boltConnectionProvider.getConnection(parameters);
+ return boltConnectionProvider.getConnection(parameters, parentObservation);
}
private CompletionStage pulledResultCursorStage(
- CompletionStage connectionStage) {
+ CompletionStage connectionStage, Observation parentObservation) {
return resultCursorStage
.thenCompose(cursor -> {
if (cursor == null) {
return completedWithNull();
}
// make sure previous result is fully consumed and connection is released back to the pool
- return cursor.pullAllFailureAsync();
+ return cursor.pullAllFailureAsync(parentObservation);
})
.thenCompose(error -> {
if (error == null) {
@@ -520,12 +540,12 @@ private CompletionStage pulledResultCursorStage(
});
}
- private CompletionStage closeTransactionAndReleaseConnection() {
+ private CompletionStage closeTransactionAndReleaseConnection(Observation parentObservation) {
return existingTransactionOrNull()
.thenCompose(tx -> {
if (tx != null) {
// there exists an open transaction, let's close it and propagate the error, if any
- return tx.closeAsync()
+ return tx.closeAsync(parentObservation)
.thenApply(ignore -> (Throwable) null)
.exceptionally(Function.identity());
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
index 3aee3200db..8a37766823 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ResultCursorsHolder.java
@@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.FailableCursor;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.util.Futures;
public class ResultCursorsHolder {
@@ -45,20 +46,20 @@ void add(CompletionStage extends FailableCursor> cursorStage) {
});
}
- CompletionStage retrieveNotConsumedError() {
+ CompletionStage retrieveNotConsumedError(Observation parentObservation) {
List> cursorStages;
synchronized (this) {
cursorStages = List.copyOf(this.cursorStages);
}
- var failures = retrieveAllFailures(cursorStages);
+ var failures = retrieveAllFailures(cursorStages, parentObservation);
return CompletableFuture.allOf(failures).thenApply(ignore -> findFirstFailure(failures));
}
@SuppressWarnings("unchecked")
private static CompletableFuture[] retrieveAllFailures(
- List> cursorStages) {
+ List> cursorStages, Observation parentObservation) {
return cursorStages.stream()
- .map(ResultCursorsHolder::retrieveFailure)
+ .map(result -> retrieveFailure(result, parentObservation))
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new);
}
@@ -73,9 +74,11 @@ private static Throwable findFirstFailure(CompletableFuture[] complet
.orElse(null);
}
- private static CompletionStage retrieveFailure(CompletionStage extends FailableCursor> cursorStage) {
+ private static CompletionStage retrieveFailure(
+ CompletionStage extends FailableCursor> cursorStage, Observation parentObservation) {
return cursorStage
.exceptionally(cursor -> null)
- .thenCompose(cursor -> cursor == null ? completedWithNull() : cursor.discardAllFailureAsync());
+ .thenCompose(cursor ->
+ cursor == null ? completedWithNull() : cursor.discardAllFailureAsync(parentObservation));
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java
index 53ea10ac93..6464b1d186 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/TerminationAwareBoltConnection.java
@@ -28,6 +28,7 @@
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.util.Futures;
final class TerminationAwareBoltConnection extends DelegatingBoltConnection {
@@ -52,10 +53,10 @@ public TerminationAwareBoltConnection(
this.throwableConsumer = Objects.requireNonNull(throwableConsumer);
}
- public CompletionStage reset() {
+ public CompletionStage reset(Observation parentObservation) {
var future = new CompletableFuture();
var thisVal = this;
- executor.execute(ignored -> resetBolt(future)).whenComplete((ignored, throwable) -> {
+ executor.execute(ignored -> resetBolt(future, parentObservation)).whenComplete((ignored, throwable) -> {
if (throwable != null) {
throwableConsumer.accept(throwable);
future.completeExceptionally(throwable);
@@ -64,7 +65,7 @@ public CompletionStage reset() {
return future;
}
- private CompletionStage resetBolt(CompletableFuture future) {
+ private CompletionStage resetBolt(CompletableFuture future, Observation parentObservation) {
return delegate.writeAndFlush(
new DriverResponseHandler() {
Throwable throwable = null;
@@ -85,21 +86,27 @@ public void onComplete() {
}
}
},
- List.of(Messages.reset()));
+ List.of(Messages.reset()),
+ parentObservation);
}
@Override
- public CompletionStage writeAndFlush(DriverResponseHandler handler, List messages) {
- return executor.execute(causeOfTermination -> flushBolt(causeOfTermination, handler, messages));
+ public CompletionStage writeAndFlush(
+ DriverResponseHandler handler, List messages, Observation parentObservation) {
+ return executor.execute(
+ causeOfTermination -> flushBolt(causeOfTermination, handler, messages, parentObservation));
}
private CompletionStage flushBolt(
- Throwable causeOfTermination, DriverResponseHandler handler, List messages) {
+ Throwable causeOfTermination,
+ DriverResponseHandler handler,
+ List messages,
+ Observation parentObservation) {
if (causeOfTermination == null) {
log.trace("This connection is active, will flush");
var terminationAwareResponseHandler =
new TerminationAwareResponseHandler(logging, handler, executor, throwableConsumer);
- return delegate.writeAndFlush(terminationAwareResponseHandler, messages)
+ return delegate.writeAndFlush(terminationAwareResponseHandler, messages, parentObservation)
.handle((ignored, flushThrowable) -> {
flushThrowable = Futures.completionExceptionCause(flushThrowable);
if (flushThrowable != null) {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
index 4d162331ec..9b7cbf23b2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java
@@ -65,6 +65,9 @@
import org.neo4j.driver.internal.cursor.ResultCursorImpl;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.NoopObservation;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.internal.util.Futures;
@@ -125,6 +128,7 @@ private enum State {
private final ApiTelemetryWork apiTelemetryWork;
private final Consumer databaseNameConsumer;
+ private final DriverObservationProvider observationProvider;
private Message[] beginMessages;
public UnmanagedTransaction(
@@ -137,7 +141,8 @@ public UnmanagedTransaction(
NotificationConfig notificationConfig,
ApiTelemetryWork apiTelemetryWork,
Consumer databaseNameConsumer,
- @SuppressWarnings("deprecation") Logging logging) {
+ @SuppressWarnings("deprecation") Logging logging,
+ DriverObservationProvider observationProvider) {
this(
connection,
databaseName,
@@ -149,7 +154,8 @@ public UnmanagedTransaction(
notificationConfig,
apiTelemetryWork,
databaseNameConsumer,
- logging);
+ logging,
+ observationProvider);
}
protected UnmanagedTransaction(
@@ -163,7 +169,8 @@ protected UnmanagedTransaction(
NotificationConfig notificationConfig,
ApiTelemetryWork apiTelemetryWork,
Consumer databaseNameConsumer,
- @SuppressWarnings("deprecation") Logging logging) {
+ @SuppressWarnings("deprecation") Logging logging,
+ DriverObservationProvider observationProvider) {
this.logging = logging;
this.connection = new TerminationAwareBoltConnection(logging, connection, this, this::markTerminated);
this.databaseName = databaseName;
@@ -175,11 +182,16 @@ protected UnmanagedTransaction(
this.notificationConfig = notificationConfig;
this.apiTelemetryWork = apiTelemetryWork;
this.databaseNameConsumer = Objects.requireNonNull(databaseNameConsumer);
+ this.observationProvider = Objects.requireNonNull(observationProvider);
}
// flush = false is only supported for async mode with a single subsequent run
public CompletionStage beginAsync(
- Set initialBookmarks, TransactionConfig config, String txType, boolean flush) {
+ Set initialBookmarks,
+ TransactionConfig config,
+ String txType,
+ boolean flush,
+ Observation parentObservation) {
var bookmarks = initialBookmarks.stream().map(Bookmark::value).collect(Collectors.toSet());
return CompletableFuture.completedStage(null)
.thenApply(ignored -> {
@@ -200,7 +212,7 @@ public CompletionStage beginAsync(
if (flush) {
var responseHandler = new BeginResponseHandler(apiTelemetryWork, databaseNameConsumer);
connection
- .writeAndFlush(responseHandler, messages)
+ .writeAndFlush(responseHandler, messages, parentObservation)
.thenCompose(ignored -> responseHandler.summaryFuture)
.whenComplete((summary, throwable) -> {
if (throwable != null) {
@@ -230,23 +242,23 @@ public CompletionStage beginAsync(
});
}
- public CompletionStage closeAsync() {
- return closeAsync(false);
+ public CompletionStage closeAsync(Observation parentObservation) {
+ return closeAsync(false, parentObservation);
}
- public CompletionStage closeAsync(boolean commit) {
- return closeAsync(commit, true);
+ public CompletionStage closeAsync(boolean commit, Observation parentObservation) {
+ return closeAsync(commit, true, parentObservation);
}
- public CompletionStage commitAsync() {
- return closeAsync(true, false);
+ public CompletionStage commitAsync(Observation parentObservation) {
+ return closeAsync(true, false, parentObservation);
}
- public CompletionStage rollbackAsync() {
- return closeAsync(false, false);
+ public CompletionStage rollbackAsync(Observation parentObservation) {
+ return closeAsync(false, false, parentObservation);
}
- public CompletionStage runAsync(Query query) {
+ public CompletionStage runAsync(Query query, Observation parentObservation, Class> resultType) {
ensureCanRunQueries();
var parameters = query.parameters().asMap(Values::value);
var resultCursor = new ResultCursorImpl(
@@ -257,12 +269,14 @@ public CompletionStage runAsync(Query query) {
false,
beginFuture,
databaseNameConsumer,
- apiTelemetryWork);
+ apiTelemetryWork,
+ observationProvider,
+ resultType);
var flushStage = CompletableFuture.completedStage(null).thenCompose(ignored -> {
var messages = List.of(
Messages.run(query.text(), connection.valueFactory().toBoltMap(parameters)),
Messages.pull(-1, fetchSize));
- return connection.writeAndFlush(resultCursor, messages);
+ return connection.writeAndFlush(resultCursor, messages, parentObservation);
});
return beginFuture.thenCompose(ignored -> {
var cursorStage = flushStage
@@ -273,14 +287,15 @@ public CompletionStage runAsync(Query query) {
});
}
- public CompletionStage runRx(Query query) {
+ public CompletionStage runRx(Query query, Observation parentObservation) {
ensureCanRunQueries();
var parameters = query.parameters().asMap(Values::value);
var responseHandler = new RunRxResponseHandler(logging, apiTelemetryWork, beginFuture, connection, query);
var flushStage = CompletableFuture.completedStage(null)
.thenCompose(runMessage -> connection.writeAndFlush(
responseHandler,
- Messages.run(query.text(), connection.valueFactory().toBoltMap(parameters))));
+ Messages.run(query.text(), connection.valueFactory().toBoltMap(parameters)),
+ parentObservation));
return beginFuture.thenCompose(ignored -> {
var cursorStage = flushStage.thenCompose(flushResult -> responseHandler.cursorFuture);
resultCursors.add(cursorStage);
@@ -345,7 +360,7 @@ public CompletionStage terminateAsync() {
return terminationStage != null ? terminationStage : completedFuture(null);
} else {
markTerminated(null);
- terminationStage = connection.reset();
+ terminationStage = connection.reset(NoopObservation.getInstance());
return terminationStage;
}
}
@@ -408,7 +423,7 @@ private void ensureCanRunQueries() {
});
}
- private CompletionStage doCommitAsync(Throwable cursorFailure) {
+ private CompletionStage doCommitAsync(Throwable cursorFailure, Observation parentObservation) {
ClientException exception = executeWithLock(
lock,
() -> state == State.TERMINATED
@@ -428,7 +443,7 @@ private CompletionStage doCommitAsync(Throwable cursorFailure) {
var commitSummary = new CompletableFuture();
var responseHandler = new BasicResponseHandler();
connection
- .writeAndFlush(responseHandler, Messages.commit())
+ .writeAndFlush(responseHandler, Messages.commit(), parentObservation)
.thenCompose(ignored -> responseHandler.summaries())
.whenComplete((summaries, throwable) -> {
if (throwable != null) {
@@ -459,14 +474,14 @@ private CompletionStage doCommitAsync(Throwable cursorFailure) {
}
}
- private CompletionStage doRollbackAsync() {
+ private CompletionStage doRollbackAsync(Observation parentObservation) {
if (executeWithLock(lock, () -> state) == State.TERMINATED) {
return completedWithNull();
} else {
var rollbackFuture = new CompletableFuture();
var responseHandler = new BasicResponseHandler();
connection
- .writeAndFlush(responseHandler, Messages.rollback())
+ .writeAndFlush(responseHandler, Messages.rollback(), parentObservation)
.thenCompose(ignored -> responseHandler.summaries())
.whenComplete((summaries, throwable) -> {
if (throwable != null) {
@@ -526,7 +541,8 @@ private CompletionStage handleTransactionCompletion(boolean commitAttempt,
}
@SuppressWarnings("DuplicatedCode")
- private CompletionStage closeAsync(boolean commit, boolean completeWithNullIfNotOpen) {
+ private CompletionStage closeAsync(
+ boolean commit, boolean completeWithNullIfNotOpen, Observation parentObservation) {
var stage = executeWithLock(lock, () -> {
CompletionStage resultStage = null;
if (completeWithNullIfNotOpen && !isOpen()) {
@@ -588,13 +604,15 @@ private CompletionStage closeAsync(boolean commit, boolean completeWithNul
Function> targetAction;
if (commit) {
targetFuture = commitFuture;
- targetAction = throwable -> doCommitAsync(throwable).handle(handleCommitOrRollback(throwable));
+ targetAction = throwable ->
+ doCommitAsync(throwable, parentObservation).handle(handleCommitOrRollback(throwable));
} else {
targetFuture = rollbackFuture;
- targetAction = throwable -> doRollbackAsync().handle(handleCommitOrRollback(throwable));
+ targetAction =
+ throwable -> doRollbackAsync(parentObservation).handle(handleCommitOrRollback(throwable));
}
resultCursors
- .retrieveNotConsumedError()
+ .retrieveNotConsumedError(parentObservation)
.thenCompose(targetAction)
.handle((ignored, throwable) -> handleTransactionCompletion(commit, throwable))
.thenCompose(Function.identity())
diff --git a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java
index afa78ebf5a..45b241bd3d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnection.java
@@ -28,6 +28,7 @@
import org.neo4j.bolt.connection.BoltServerAddress;
import org.neo4j.bolt.connection.ResponseHandler;
import org.neo4j.bolt.connection.message.Message;
+import org.neo4j.bolt.connection.observation.ImmutableObservation;
final class ListeningBoltConnection implements BoltConnection {
private final BoltConnection delegate;
@@ -39,8 +40,9 @@ public ListeningBoltConnection(BoltConnection delegate, BoltConnectionListener b
}
@Override
- public CompletionStage writeAndFlush(ResponseHandler handler, List messages) {
- return delegate.writeAndFlush(handler, messages);
+ public CompletionStage writeAndFlush(
+ ResponseHandler handler, List messages, ImmutableObservation parentObservation) {
+ return delegate.writeAndFlush(handler, messages, parentObservation);
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java
index bdbf08db0e..af1b3b0170 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/boltlistener/ListeningBoltConnectionProvider.java
@@ -26,6 +26,7 @@
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.NotificationConfig;
import org.neo4j.bolt.connection.SecurityPlan;
+import org.neo4j.bolt.connection.observation.ImmutableObservation;
final class ListeningBoltConnectionProvider implements BoltConnectionProvider {
private final BoltConnectionProvider delegate;
@@ -47,7 +48,8 @@ public CompletionStage connect(
SecurityPlan securityPlan,
AuthToken authToken,
BoltProtocolVersion minVersion,
- NotificationConfig notificationConfig) {
+ NotificationConfig notificationConfig,
+ ImmutableObservation parentObservation) {
return delegate.connect(
uri,
routingContextAddress,
@@ -57,7 +59,8 @@ public CompletionStage connect(
securityPlan,
authToken,
minVersion,
- notificationConfig)
+ notificationConfig,
+ parentObservation)
.thenApply(boltConnection -> {
boltConnection = new ListeningBoltConnection(boltConnection, boltConnectionListener);
boltConnectionListener.onOpen(boltConnection);
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableResultCursorImpl.java
index f0e3927d1a..81e8d7e0a0 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableResultCursorImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/DisposableResultCursorImpl.java
@@ -28,6 +28,7 @@
import org.neo4j.driver.Record;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.internal.FailableCursor;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.summary.ResultSummary;
public class DisposableResultCursorImpl implements ResultCursor, FailableCursor {
@@ -96,14 +97,14 @@ boolean isDisposed() {
}
@Override
- public CompletionStage discardAllFailureAsync() {
+ public CompletionStage discardAllFailureAsync(Observation parentObservation) {
isDisposed = true;
- return delegate.discardAllFailureAsync();
+ return delegate.discardAllFailureAsync(parentObservation);
}
@Override
- public CompletionStage pullAllFailureAsync() {
- return delegate.pullAllFailureAsync();
+ public CompletionStage pullAllFailureAsync(Observation parentObservation) {
+ return delegate.pullAllFailureAsync(parentObservation);
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorImpl.java
index b948b07894..4070a3f73b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/ResultCursorImpl.java
@@ -17,6 +17,7 @@
package org.neo4j.driver.internal.cursor;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.neo4j.driver.internal.observation.util.ObservationUtil.observeAsyncStarted;
import static org.neo4j.driver.internal.types.InternalTypeSystem.TYPE_SYSTEM;
import java.util.ArrayDeque;
@@ -50,6 +51,9 @@
import org.neo4j.driver.internal.adaptedbolt.summary.DiscardSummary;
import org.neo4j.driver.internal.adaptedbolt.summary.PullSummary;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
+import org.neo4j.driver.internal.observation.DriverObservationProvider;
+import org.neo4j.driver.internal.observation.NoopObservation;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
@@ -77,6 +81,8 @@ public class ResultCursorImpl extends AbstractRecordStateResponseHandler
private final ApiTelemetryWork apiTelemetryWork;
private final CompletableFuture consumedFuture = new CompletableFuture<>();
private final Consumer databaseNameConsumer;
+ private final DriverObservationProvider observationProvider;
+ private final Class> resultType;
private RunSummary runSummary;
private State state;
@@ -90,6 +96,8 @@ public class ResultCursorImpl extends AbstractRecordStateResponseHandler
private ResultSummary summary;
private Throwable error;
private boolean errorExposed;
+ private Observation pendingObservation;
+ private boolean observeBoltOnly;
private enum State {
READY,
@@ -107,7 +115,9 @@ public ResultCursorImpl(
boolean closeOnSummary,
CompletableFuture beginFuture,
Consumer databaseNameConsumer,
- ApiTelemetryWork apiTelemetryWork) {
+ ApiTelemetryWork apiTelemetryWork,
+ DriverObservationProvider observationProvider,
+ Class> resultType) {
this.boltConnection = Objects.requireNonNull(boltConnection);
this.legacyNotifications = new BoltProtocolVersion(5, 5).compareTo(boltConnection.protocolVersion()) > 0;
updateRecordState(RecordState.REQUESTED);
@@ -119,6 +129,8 @@ public ResultCursorImpl(
this.beginFuture = beginFuture;
this.apiTelemetryWork = apiTelemetryWork;
this.databaseNameConsumer = Objects.requireNonNull(databaseNameConsumer);
+ this.observationProvider = Objects.requireNonNull(observationProvider);
+ this.resultType = Objects.requireNonNull(resultType);
}
public CompletionStage resultCursor() {
@@ -130,9 +142,12 @@ public synchronized List keys() {
return runSummary.keys();
}
- @SuppressWarnings("DuplicatedCode")
@Override
public synchronized CompletionStage consumeAsync() {
+ return consumeAsync(null);
+ }
+
+ private synchronized CompletionStage consumeAsync(Observation parentObservation) {
if (apiCallInProgress) {
var message = "API calls to result cursor must be sequential.";
return CompletableFuture.failedStage(new ClientException(
@@ -146,12 +161,23 @@ public synchronized CompletionStage consumeAsync() {
CompletionStage summaryFt =
switch (state) {
case READY -> {
+ Observation consumeObservation;
+ Observation boltObservation;
+ if (parentObservation != null) {
+ consumeObservation = NoopObservation.getInstance();
+ boltObservation = parentObservation;
+ } else {
+ consumeObservation = observationProvider
+ .resultConsume(resultType)
+ .start();
+ boltObservation = consumeObservation;
+ }
apiCallInProgress = true;
summaryFuture = new CompletableFuture<>();
var future = summaryFuture;
state = State.DISCARDING;
boltConnection
- .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1), boltObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
CompletableFuture summaryFuture;
@@ -166,9 +192,16 @@ public synchronized CompletionStage consumeAsync() {
summaryFuture.completeExceptionally(error);
}
});
- yield future;
+ yield observeAsyncStarted(consumeObservation, () -> future);
}
case STREAMING -> {
+ if (parentObservation != null) {
+ pendingObservation = parentObservation;
+ observeBoltOnly = true;
+ } else {
+ pendingObservation = observationProvider.resultConsume(resultType);
+ observeBoltOnly = false;
+ }
apiCallInProgress = true;
summaryFuture = new CompletableFuture<>();
yield summaryFuture;
@@ -224,13 +257,15 @@ var record = records.poll();
// buffer is empty
return switch (state) {
case READY -> {
+ var nextObservation =
+ observationProvider.resultNext(resultType).start();
apiCallInProgress = true;
recordFuture = new CompletableFuture<>();
var result = recordFuture;
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize), nextObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
CompletableFuture recordFuture;
@@ -245,9 +280,11 @@ var record = records.poll();
recordFuture.completeExceptionally(error);
}
});
- yield result;
+ yield observeAsyncStarted(nextObservation, () -> result);
}
case STREAMING -> {
+ pendingObservation = observationProvider.resultNext(resultType);
+ observeBoltOnly = false;
apiCallInProgress = true;
recordFuture = new CompletableFuture<>();
yield recordFuture;
@@ -288,13 +325,15 @@ var record = records.peek();
// buffer is empty
return switch (state) {
case READY -> {
+ var peekObservation =
+ observationProvider.resultPeek(resultType).start();
apiCallInProgress = true;
peekFuture = new CompletableFuture<>();
var future = peekFuture;
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize), peekObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
if (error != null) {
@@ -309,9 +348,11 @@ var record = records.peek();
recordFuture.completeExceptionally(error);
}
});
- yield future;
+ yield observeAsyncStarted(peekObservation, () -> future);
}
case STREAMING -> {
+ pendingObservation = observationProvider.resultPeek(resultType);
+ observeBoltOnly = false;
apiCallInProgress = true;
peekFuture = new CompletableFuture<>();
yield peekFuture;
@@ -355,6 +396,8 @@ public synchronized CompletionStage singleAsync() {
return switch (state) {
case READY -> {
if (records.isEmpty()) {
+ var singleObservation =
+ observationProvider.resultSingle(resultType).start();
apiCallInProgress = true;
recordFuture = new CompletableFuture<>();
secondRecordFuture = new CompletableFuture<>();
@@ -374,7 +417,7 @@ public synchronized CompletionStage singleAsync() {
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize), singleObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
if (error != null) {
@@ -393,7 +436,7 @@ public synchronized CompletionStage singleAsync() {
secondRecordFuture.completeExceptionally(error);
}
});
- yield singleFuture;
+ yield observeAsyncStarted(singleObservation, () -> singleFuture);
} else {
// records is not empty and the state is READY, meaning the result is not exhausted
yield CompletableFuture.failedStage(
@@ -402,6 +445,8 @@ public synchronized CompletionStage singleAsync() {
}
}
case STREAMING -> {
+ pendingObservation = observationProvider.resultSingle(resultType);
+ observeBoltOnly = false;
apiCallInProgress = true;
if (records.isEmpty()) {
recordFuture = new CompletableFuture<>();
@@ -496,13 +541,14 @@ public synchronized CompletionStage> listAsync() {
}
return switch (state) {
case READY -> {
+ var listObservation = observationProvider.resultList(resultType).start();
apiCallInProgress = true;
recordsFuture = new CompletableFuture<>();
var future = recordsFuture;
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1), listObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
CompletableFuture> recordsFuture;
@@ -517,9 +563,11 @@ public synchronized CompletionStage> listAsync() {
recordsFuture.completeExceptionally(error);
}
});
- yield future;
+ yield observeAsyncStarted(listObservation, () -> future);
}
case STREAMING -> {
+ pendingObservation = observationProvider.resultList(resultType);
+ observeBoltOnly = false;
apiCallInProgress = true;
recordsFuture = new CompletableFuture<>();
yield recordsFuture;
@@ -601,6 +649,7 @@ var record = new InternalRecord(runSummary.keys(), fields);
peekFuture = this.peekFuture;
this.peekFuture = null;
if (peekFuture != null) {
+ clearPendingObservation();
apiCallInProgress = false;
records.add(record);
} else {
@@ -610,11 +659,13 @@ var record = new InternalRecord(runSummary.keys(), fields);
secondRecordFuture = this.secondRecordFuture;
if (recordFuture == null) {
if (secondRecordFuture != null) {
+ clearPendingObservation();
apiCallInProgress = false;
this.secondRecordFuture = null;
}
records.add(record);
} else {
+ clearPendingObservation();
if (secondRecordFuture == null) {
apiCallInProgress = false;
}
@@ -760,10 +811,11 @@ public void onPullSummary(PullSummary summary) {
synchronized (this) {
if (this.peekFuture != null) {
// peek is pending, keep streaming
+ var observation = getAndClearPendingObservation(this.peekFuture);
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize), observation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
if (error != null) {
@@ -780,10 +832,11 @@ public void onPullSummary(PullSummary summary) {
});
} else if (this.recordFuture != null) {
// next is pending, keep streaming
+ var observation = getAndClearPendingObservation(this.recordFuture);
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), fetchSize), observation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
if (error != null) {
@@ -809,10 +862,11 @@ public void onPullSummary(PullSummary summary) {
} else {
if (this.recordsFuture != null) {
// list is pending, stream all
+ var observation = getAndClearPendingObservation(this.recordsFuture);
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1), observation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
if (error != null) {
@@ -829,9 +883,10 @@ public void onPullSummary(PullSummary summary) {
});
} else if (this.summaryFuture != null) {
// consume is pending, discard all
+ var observation = getAndClearPendingObservation(this.summaryFuture);
state = State.DISCARDING;
boltConnection
- .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1), observation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
CompletableFuture summaryFuture;
@@ -1172,13 +1227,13 @@ public void onComplete() {
}
@Override
- public synchronized CompletionStage discardAllFailureAsync() {
- return consumeAsync().handle((summary, error) -> error);
+ public synchronized CompletionStage discardAllFailureAsync(Observation parentObservation) {
+ return consumeAsync(parentObservation).handle((summary, error) -> error);
}
@SuppressWarnings("DuplicatedCode")
@Override
- public CompletionStage pullAllFailureAsync() {
+ public CompletionStage pullAllFailureAsync(Observation parentObservation) {
synchronized (this) {
if (apiCallInProgress) {
var message = "API calls to result cursor must be sequential.";
@@ -1197,7 +1252,7 @@ public CompletionStage pullAllFailureAsync() {
state = State.STREAMING;
updateRecordState(RecordState.NO_RECORD);
boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), -1), parentObservation)
.whenComplete((ignored, throwable) -> {
var error = Futures.completionExceptionCause(throwable);
CompletableFuture summaryFuture;
@@ -1247,4 +1302,28 @@ private CompletionStage stageExposingError(T value) {
}
return CompletableFuture.completedStage(value);
}
+
+ private synchronized void clearPendingObservation() {
+ pendingObservation = null;
+ }
+
+ private synchronized Observation getAndClearPendingObservation(CompletionStage> observable) {
+ if (pendingObservation == null) {
+ return NoopObservation.getInstance();
+ } else {
+ var observation = pendingObservation;
+ pendingObservation = null;
+ if (!observeBoltOnly) {
+ observation.start();
+ observable.whenComplete((ignored, throwable) -> {
+ if (throwable != null) {
+ observation.error(Futures.completionExceptionCause(throwable));
+ }
+ observation.stop();
+ });
+ }
+ observeBoltOnly = false;
+ return observation;
+ }
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java
index cc4ce9e7de..1d5010f2c9 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursor.java
@@ -21,15 +21,16 @@
import java.util.function.BiConsumer;
import org.neo4j.driver.Record;
import org.neo4j.driver.internal.FailableCursor;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Subscription;
public interface RxResultCursor extends Subscription, FailableCursor {
List keys();
- void installRecordConsumer(BiConsumer recordConsumer);
+ void installRecordConsumer(BiConsumer recordConsumer, Observation observation);
- CompletionStage summaryAsync();
+ CompletionStage summaryAsync(Observation observation);
boolean isDone();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java
index fe4824e0f2..d5c934dd7a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/cursor/RxResultCursorImpl.java
@@ -47,6 +47,8 @@
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
import org.neo4j.driver.internal.adaptedbolt.summary.DiscardSummary;
import org.neo4j.driver.internal.adaptedbolt.summary.PullSummary;
+import org.neo4j.driver.internal.observation.NoopObservation;
+import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.summary.GqlStatusObject;
@@ -108,6 +110,7 @@ public Optional databaseName() {
private PullSummary pullSummary;
private DiscardSummary discardSummary;
private Throwable error;
+ private Observation recordsObservation;
private enum State {
READY,
@@ -159,7 +162,7 @@ public boolean isDone() {
}
@Override
- public void installRecordConsumer(BiConsumer recordConsumer) {
+ public void installRecordConsumer(BiConsumer recordConsumer, Observation observation) {
Objects.requireNonNull(recordConsumer);
if (summaryExposed) {
throw newResultConsumedError();
@@ -168,6 +171,7 @@ public void installRecordConsumer(BiConsumer recordConsumer)
synchronized (this) {
if (this.recordConsumer == null) {
this.recordConsumer = safeRecordConsumer(recordConsumer);
+ this.recordsObservation = Objects.requireNonNull(observation);
log.trace("[%d] Record consumer installed", hashCode());
if (runError != null) {
handleError(runError);
@@ -192,7 +196,7 @@ public void request(long n) {
var request = appendDemand(n);
state = State.STREAMING;
runnable = () -> boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), request))
+ .writeAndFlush(this, Messages.pull(runSummary.queryId(), request), recordsObservation)
.whenComplete((ignored, throwable) -> {
throwable = Futures.completionExceptionCause(throwable);
if (throwable != null) {
@@ -217,7 +221,7 @@ public void cancel() {
synchronized (this) {
log.trace("[%d] Cancellation requested in %s state", hashCode(), state);
switch (state) {
- case READY -> runnable = setupDiscardRunnable();
+ case READY -> runnable = setupDiscardRunnable(recordsObservation);
case STREAMING -> discardPending = true;
case DISCARDING, FAILED, SUCCEEDED -> {}
}
@@ -226,7 +230,7 @@ public void cancel() {
}
@Override
- public CompletionStage summaryAsync() {
+ public CompletionStage summaryAsync(Observation observation) {
var runnable = NOOP_RUNNABLE;
synchronized (this) {
log.trace("[%d] Summary requested in %s state", hashCode(), state);
@@ -241,7 +245,7 @@ public CompletionStage summaryAsync() {
handleError(runError);
runnable = this::onComplete;
} else {
- runnable = setupDiscardRunnable();
+ runnable = setupDiscardRunnable(observation);
}
}
case STREAMING -> discardPending = true;
@@ -285,7 +289,8 @@ public void onComplete() {
}
}
},
- Messages.reset())
+ Messages.reset(),
+ NoopObservation.getInstance())
.whenComplete((ignored, throwable) -> {
throwable = Futures.completionExceptionCause(throwable);
if (throwable != null) {
@@ -354,17 +359,17 @@ public synchronized void onDiscardSummary(DiscardSummary summary) {
}
@Override
- public synchronized CompletionStage discardAllFailureAsync() {
+ public synchronized CompletionStage discardAllFailureAsync(Observation parentObservation) {
log.trace("[%d] Discard all requested", hashCode());
var summaryExposed = this.summaryExposed;
var runErrorExposed = this.runErrorExposed;
- return summaryAsync()
+ return summaryAsync(parentObservation)
.thenApply(ignored -> (Throwable) null)
.exceptionally(throwable -> runErrorExposed || summaryExposed ? null : throwable);
}
@Override
- public synchronized CompletionStage pullAllFailureAsync() {
+ public synchronized CompletionStage pullAllFailureAsync(Observation parentObservation) {
log.trace("[%d] Pull all failure requested", hashCode());
var unfinishedState =
switch (state) {
@@ -376,7 +381,7 @@ public synchronized CompletionStage pullAllFailureAsync() {
new TransactionNestingException(
"You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result."));
}
- return discardAllFailureAsync();
+ return discardAllFailureAsync(parentObservation);
}
private synchronized long appendDemand(long n) {
@@ -405,10 +410,10 @@ private synchronized void decrementDemand() {
log.trace("[%d] Decremented demand, outstanding is %d", hashCode(), outstandingDemand);
}
- private synchronized Runnable setupDiscardRunnable() {
+ private synchronized Runnable setupDiscardRunnable(Observation observation) {
state = State.DISCARDING;
return () -> boltConnection
- .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1), observation)
.whenComplete((ignored, throwable) -> {
throwable = Futures.completionExceptionCause(throwable);
if (throwable != null) {
@@ -427,7 +432,7 @@ private synchronized Runnable setupCompletionRunnableWithPullSummary() {
discardPending = false;
state = State.DISCARDING;
runnable = () -> boltConnection
- .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1))
+ .writeAndFlush(this, Messages.discard(runSummary.queryId(), -1), NoopObservation.getInstance())
.whenComplete((ignored, flushThrowable) -> {
var error = Futures.completionExceptionCause(flushThrowable);
if (error != null) {
@@ -440,7 +445,10 @@ private synchronized Runnable setupCompletionRunnableWithPullSummary() {
if (demand != 0) {
state = State.STREAMING;
runnable = () -> boltConnection
- .writeAndFlush(this, Messages.pull(runSummary.queryId(), demand > 0 ? demand : -1))
+ .writeAndFlush(
+ this,
+ Messages.pull(runSummary.queryId(), demand > 0 ? demand : -1),
+ recordsObservation)
.whenComplete((ignored, flushThrowable) -> {
var error = Futures.completionExceptionCause(flushThrowable);
if (error != null) {
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java
deleted file mode 100644
index ad9fc33fa6..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/ConnectionPoolMetricsListener.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import org.neo4j.bolt.connection.ListenerEvent;
-
-interface ConnectionPoolMetricsListener {
- /**
- * Invoked before a connection is creating.
- */
- void beforeCreating(ListenerEvent> listenerEvent);
-
- /**
- * Invoked after a connection is created successfully.
- */
- void afterCreated(ListenerEvent> listenerEvent);
-
- /**
- * Invoked after a connection is failed to create due to timeout, any kind of error.
- */
- void afterFailedToCreate();
-
- /**
- * Invoked after a connection is closed.
- */
- void afterClosed();
-
- /**
- * Invoked before acquiring or creating a connection.
- *
- * @param acquireEvent the event
- */
- void beforeAcquiringOrCreating(ListenerEvent> acquireEvent);
-
- /**
- * Invoked after a connection is being acquired or created regardless weather it is successful or not.
- */
- void afterAcquiringOrCreating();
-
- /**
- * Invoked after a connection is acquired or created successfully.
- *
- * @param acquireEvent the event
- */
- void afterAcquiredOrCreated(ListenerEvent> acquireEvent);
-
- /**
- * Invoked after it is timed out to acquire or create a connection.
- */
- void afterTimedOutToAcquireOrCreate();
-
- /**
- * After a connection is acquired from the pool.
- *
- * @param inUseEvent the event
- */
- void acquired(ListenerEvent> inUseEvent);
-
- /**
- * After a connection is released back to pool.
- *
- * @param inUseEvent the event
- */
- void released(ListenerEvent> inUseEvent);
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsListener.java
deleted file mode 100644
index 8112c0975c..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsListener.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import java.util.function.IntSupplier;
-import org.neo4j.bolt.connection.BoltServerAddress;
-import org.neo4j.bolt.connection.ListenerEvent;
-import org.neo4j.bolt.connection.MetricsListener;
-
-public enum DevNullMetricsListener implements MetricsListener {
- INSTANCE;
-
- @Override
- public void beforeCreating(String poolId, ListenerEvent> creatingEvent) {}
-
- @Override
- public void afterCreated(String poolId, ListenerEvent> creatingEvent) {}
-
- @Override
- public void afterFailedToCreate(String poolId) {}
-
- @Override
- public void afterClosed(String poolId) {}
-
- @Override
- public void beforeAcquiringOrCreating(String poolId, ListenerEvent> acquireEvent) {}
-
- @Override
- public void afterAcquiringOrCreating(String poolId) {}
-
- @Override
- public void afterAcquiredOrCreated(String poolId, ListenerEvent> acquireEvent) {}
-
- @Override
- public void afterTimedOutToAcquireOrCreate(String poolId) {}
-
- @Override
- public void afterConnectionCreated(String poolId, ListenerEvent> inUseEvent) {}
-
- @Override
- public void afterConnectionReleased(String poolId, ListenerEvent> inUseEvent) {}
-
- @Override
- public ListenerEvent> createListenerEvent() {
- return DevNullListenerEvent.INSTANCE;
- }
-
- @Override
- public void registerPoolMetrics(
- String poolId, BoltServerAddress serverAddress, IntSupplier inUseSupplier, IntSupplier idleSupplier) {}
-
- @Override
- public void removePoolMetrics(String poolId) {}
-
- @Override
- public String toString() {
- return "Driver metrics are not available if they are not enabled.";
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsProvider.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsProvider.java
deleted file mode 100644
index 903039bb12..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullMetricsProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import org.neo4j.bolt.connection.MetricsListener;
-import org.neo4j.driver.Metrics;
-import org.neo4j.driver.exceptions.ClientException;
-import org.neo4j.driver.internal.GqlStatusError;
-
-public enum DevNullMetricsProvider implements MetricsProvider {
- INSTANCE;
-
- @Override
- public Metrics metrics() {
- // To outside users, we forbid access to the metrics API
- var message =
- "Driver metrics are not enabled. You need to enable driver metrics in driver configuration in order to access them.";
- throw new ClientException(
- GqlStatusError.UNKNOWN.getStatus(),
- GqlStatusError.UNKNOWN.getStatusDescription(message),
- "N/A",
- message,
- GqlStatusError.DIAGNOSTIC_RECORD,
- null);
- }
-
- @Override
- public MetricsListener metricsListener() {
- // Internally we can still register callbacks to this empty metrics listener.
- return DevNullMetricsListener.INSTANCE;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullPoolMetricsListener.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullPoolMetricsListener.java
deleted file mode 100644
index 945237acf0..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/DevNullPoolMetricsListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import org.neo4j.bolt.connection.ListenerEvent;
-
-enum DevNullPoolMetricsListener implements ConnectionPoolMetricsListener {
- INSTANCE;
-
- @Override
- public void beforeCreating(ListenerEvent> listenerEvent) {}
-
- @Override
- public void afterCreated(ListenerEvent> listenerEvent) {}
-
- @Override
- public void afterFailedToCreate() {}
-
- @Override
- public void afterClosed() {}
-
- @Override
- public void beforeAcquiringOrCreating(ListenerEvent> acquireEvent) {}
-
- @Override
- public void afterAcquiringOrCreating() {}
-
- @Override
- public void afterAcquiredOrCreated(ListenerEvent> acquireEvent) {}
-
- @Override
- public void afterTimedOutToAcquireOrCreate() {}
-
- @Override
- public void acquired(ListenerEvent> inUseEvent) {}
-
- @Override
- public void released(ListenerEvent> inUseEvent) {}
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java
deleted file mode 100644
index 5f55e20236..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalMetrics.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import static java.lang.String.format;
-import static java.util.Collections.unmodifiableCollection;
-
-import java.time.Clock;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.IntSupplier;
-import org.neo4j.bolt.connection.BoltServerAddress;
-import org.neo4j.bolt.connection.ListenerEvent;
-import org.neo4j.bolt.connection.MetricsListener;
-import org.neo4j.driver.ConnectionPoolMetrics;
-import org.neo4j.driver.Logger;
-import org.neo4j.driver.Logging;
-import org.neo4j.driver.Metrics;
-
-final class InternalMetrics implements Metrics, MetricsListener {
- private final Map connectionPoolMetrics;
- private final Clock clock;
-
- @SuppressWarnings("deprecation")
- private final Logger log;
-
- InternalMetrics(Clock clock, @SuppressWarnings("deprecation") Logging logging) {
- Objects.requireNonNull(clock);
- this.connectionPoolMetrics = new ConcurrentHashMap<>();
- this.clock = clock;
- this.log = logging.getLog(getClass());
- }
-
- @Override
- public void registerPoolMetrics(
- String poolId, BoltServerAddress serverAddress, IntSupplier inUseSupplier, IntSupplier idleSupplier) {
- this.connectionPoolMetrics.put(
- poolId, new InternalConnectionPoolMetrics(poolId, serverAddress, inUseSupplier, idleSupplier));
- }
-
- @Override
- public void removePoolMetrics(String id) {
- this.connectionPoolMetrics.remove(id);
- }
-
- @Override
- public void beforeCreating(String poolId, ListenerEvent> creatingEvent) {
- poolMetrics(poolId).beforeCreating(creatingEvent);
- }
-
- @Override
- public void afterCreated(String poolId, ListenerEvent> creatingEvent) {
- poolMetrics(poolId).afterCreated(creatingEvent);
- }
-
- @Override
- public void afterFailedToCreate(String poolId) {
- poolMetrics(poolId).afterFailedToCreate();
- }
-
- @Override
- public void afterClosed(String poolId) {
- poolMetrics(poolId).afterClosed();
- }
-
- @Override
- public void beforeAcquiringOrCreating(String poolId, ListenerEvent> acquireEvent) {
- poolMetrics(poolId).beforeAcquiringOrCreating(acquireEvent);
- }
-
- @Override
- public void afterAcquiringOrCreating(String poolId) {
- poolMetrics(poolId).afterAcquiringOrCreating();
- }
-
- @Override
- public void afterAcquiredOrCreated(String poolId, ListenerEvent> acquireEvent) {
- poolMetrics(poolId).afterAcquiredOrCreated(acquireEvent);
- }
-
- @Override
- public void afterConnectionCreated(String poolId, ListenerEvent> inUseEvent) {
- poolMetrics(poolId).acquired(inUseEvent);
- }
-
- @Override
- public void afterConnectionReleased(String poolId, ListenerEvent> inUseEvent) {
- poolMetrics(poolId).released(inUseEvent);
- }
-
- @Override
- public void afterTimedOutToAcquireOrCreate(String poolId) {
- poolMetrics(poolId).afterTimedOutToAcquireOrCreate();
- }
-
- @Override
- public ListenerEvent> createListenerEvent() {
- return new TimeRecorderListenerEvent(clock);
- }
-
- @Override
- public Collection connectionPoolMetrics() {
- return unmodifiableCollection(this.connectionPoolMetrics.values());
- }
-
- @Override
- public String toString() {
- return format("PoolMetrics=%s", connectionPoolMetrics);
- }
-
- private ConnectionPoolMetricsListener poolMetrics(String poolId) {
- var poolMetrics = (InternalConnectionPoolMetrics) this.connectionPoolMetrics.get(poolId);
- if (poolMetrics == null) {
- log.warn(format("Failed to find pool metrics with id `%s` in %s.", poolId, this.connectionPoolMetrics));
- return DevNullPoolMetricsListener.INSTANCE;
- }
- return poolMetrics;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerConnectionPoolMetrics.java
deleted file mode 100644
index 6efa483d59..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerConnectionPoolMetrics.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import static java.lang.String.format;
-
-import io.micrometer.core.instrument.Counter;
-import io.micrometer.core.instrument.Gauge;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.Tags;
-import io.micrometer.core.instrument.Timer;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.IntSupplier;
-import org.neo4j.bolt.connection.BoltServerAddress;
-import org.neo4j.bolt.connection.ListenerEvent;
-import org.neo4j.driver.ConnectionPoolMetrics;
-
-final class MicrometerConnectionPoolMetrics implements ConnectionPoolMetricsListener, ConnectionPoolMetrics {
- public static final String PREFIX = "neo4j.driver.connections";
- public static final String IN_USE = PREFIX + ".in.use";
- public static final String IDLE = PREFIX + ".idle";
- public static final String CREATING = PREFIX + ".creating";
- public static final String FAILED = PREFIX + ".failed";
- public static final String CLOSED = PREFIX + ".closed";
- public static final String ACQUIRING = PREFIX + ".acquiring";
- public static final String ACQUISITION_TIMEOUT = PREFIX + ".acquisition.timeout";
- public static final String ACQUISITION = PREFIX + ".acquisition";
- public static final String CREATION = PREFIX + ".creation";
- public static final String USAGE = PREFIX + ".usage";
-
- private final IntSupplier inUseSupplier;
- private final IntSupplier idleSupplier;
-
- private final String id;
-
- private final AtomicInteger creating = new AtomicInteger();
- private final Counter failedToCreate;
- private final Counter closed;
- private final AtomicInteger acquiring = new AtomicInteger();
- private final Counter timedOutToAcquire;
- private final Timer totalAcquisitionTimer;
- private final Timer totalConnectionTimer;
- private final Timer totalInUseTimer;
-
- MicrometerConnectionPoolMetrics(
- String poolId,
- BoltServerAddress address,
- IntSupplier inUseSupplier,
- IntSupplier idleSupplier,
- MeterRegistry registry) {
- this(poolId, address, inUseSupplier, idleSupplier, registry, Tags.empty());
- }
-
- MicrometerConnectionPoolMetrics(
- String poolId,
- BoltServerAddress address,
- IntSupplier inUseSupplier,
- IntSupplier idleSupplier,
- MeterRegistry registry,
- Iterable initialTags) {
- Objects.requireNonNull(poolId);
- Objects.requireNonNull(address);
- Objects.requireNonNull(inUseSupplier);
- Objects.requireNonNull(idleSupplier);
- Objects.requireNonNull(registry);
-
- this.id = poolId;
- this.inUseSupplier = inUseSupplier;
- this.idleSupplier = idleSupplier;
- Iterable tags =
- Tags.concat(initialTags, "address", String.format("%s:%d", address.connectionHost(), address.port()));
-
- Gauge.builder(IN_USE, this::inUse).tags(tags).register(registry);
- Gauge.builder(IDLE, this::idle).tags(tags).register(registry);
- Gauge.builder(CREATING, creating, AtomicInteger::get).tags(tags).register(registry);
- failedToCreate = Counter.builder(FAILED).tags(tags).register(registry);
- closed = Counter.builder(CLOSED).tags(tags).register(registry);
- Gauge.builder(ACQUIRING, acquiring, AtomicInteger::get).tags(tags).register(registry);
- timedOutToAcquire = Counter.builder(ACQUISITION_TIMEOUT).tags(tags).register(registry);
- totalAcquisitionTimer = Timer.builder(ACQUISITION).tags(tags).register(registry);
- totalConnectionTimer = Timer.builder(CREATION).tags(tags).register(registry);
- totalInUseTimer = Timer.builder(USAGE).tags(tags).register(registry);
- }
-
- @Override
- public void beforeCreating(ListenerEvent> connEvent) {
- creating.incrementAndGet();
- connEvent.start();
- }
-
- @Override
- public void afterFailedToCreate() {
- failedToCreate.increment();
- creating.decrementAndGet();
- }
-
- @Override
- public void afterCreated(ListenerEvent> connEvent) {
- creating.decrementAndGet();
- var sample = ((MicrometerTimerListenerEvent) connEvent).getSample();
- sample.stop(totalConnectionTimer);
- }
-
- @Override
- public void afterClosed() {
- closed.increment();
- }
-
- @Override
- public void beforeAcquiringOrCreating(ListenerEvent> acquireEvent) {
- acquireEvent.start();
- acquiring.incrementAndGet();
- }
-
- @Override
- public void afterAcquiringOrCreating() {
- acquiring.decrementAndGet();
- }
-
- @Override
- public void afterAcquiredOrCreated(ListenerEvent> acquireEvent) {
- var sample = ((MicrometerTimerListenerEvent) acquireEvent).getSample();
- sample.stop(totalAcquisitionTimer);
- }
-
- @Override
- public void afterTimedOutToAcquireOrCreate() {
- timedOutToAcquire.increment();
- }
-
- @Override
- public void acquired(ListenerEvent> inUseEvent) {
- inUseEvent.start();
- }
-
- @Override
- public void released(ListenerEvent> inUseEvent) {
- var sample = ((MicrometerTimerListenerEvent) inUseEvent).getSample();
- sample.stop(totalInUseTimer);
- }
-
- @Override
- public String id() {
- return this.id;
- }
-
- @Override
- public int inUse() {
- return inUseSupplier.getAsInt();
- }
-
- @Override
- public int idle() {
- return idleSupplier.getAsInt();
- }
-
- @Override
- public int creating() {
- return creating.get();
- }
-
- @Override
- public long created() {
- return totalConnectionTimer.count();
- }
-
- @Override
- public long failedToCreate() {
- return count(failedToCreate);
- }
-
- @Override
- public long closed() {
- return count(closed);
- }
-
- @Override
- public int acquiring() {
- return acquiring.get();
- }
-
- @Override
- public long acquired() {
- return totalAcquisitionTimer.count();
- }
-
- @Override
- public long timedOutToAcquire() {
- return count(timedOutToAcquire);
- }
-
- @Override
- public long totalAcquisitionTime() {
- return (long) totalAcquisitionTimer.totalTime(TimeUnit.MILLISECONDS);
- }
-
- @Override
- public long totalConnectionTime() {
- return (long) totalConnectionTimer.totalTime(TimeUnit.MILLISECONDS);
- }
-
- @Override
- public long totalInUseTime() {
- return (long) totalInUseTimer.totalTime(TimeUnit.MILLISECONDS);
- }
-
- @Override
- public long totalInUseCount() {
- return totalInUseTimer.count();
- }
-
- @Override
- public String toString() {
- return format(
- "%s=[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquiring=%s, acquired=%s, "
- + "timedOutToAcquire=%s, inUse=%s, idle=%s, "
- + "totalAcquisitionTime=%s, totalConnectionTime=%s, totalInUseTime=%s, totalInUseCount=%s]",
- id(),
- created(),
- closed(),
- creating(),
- failedToCreate(),
- acquiring(),
- acquired(),
- timedOutToAcquire(),
- inUse(),
- idle(),
- totalAcquisitionTime(),
- totalConnectionTime(),
- totalInUseTime(),
- totalInUseCount());
- }
-
- private long count(Counter counter) {
- return (long) counter.count();
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetrics.java
deleted file mode 100644
index 17d14edd18..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetrics.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.IntSupplier;
-import org.neo4j.bolt.connection.BoltServerAddress;
-import org.neo4j.bolt.connection.ListenerEvent;
-import org.neo4j.bolt.connection.MetricsListener;
-import org.neo4j.driver.ConnectionPoolMetrics;
-import org.neo4j.driver.Metrics;
-
-final class MicrometerMetrics implements Metrics, MetricsListener {
- private final MeterRegistry meterRegistry;
- private final Map connectionPoolMetrics;
-
- public MicrometerMetrics(MeterRegistry meterRegistry) {
- this.meterRegistry = meterRegistry;
- this.connectionPoolMetrics = new ConcurrentHashMap<>();
- }
-
- @Override
- public Collection connectionPoolMetrics() {
- return Collections.unmodifiableCollection(this.connectionPoolMetrics.values());
- }
-
- @Override
- public void beforeCreating(String poolId, ListenerEvent> creatingEvent) {
- poolMetricsListener(poolId).beforeCreating(creatingEvent);
- }
-
- @Override
- public void afterCreated(String poolId, ListenerEvent> creatingEvent) {
- poolMetricsListener(poolId).afterCreated(creatingEvent);
- }
-
- @Override
- public void afterFailedToCreate(String poolId) {
- poolMetricsListener(poolId).afterFailedToCreate();
- }
-
- @Override
- public void afterClosed(String poolId) {
- poolMetricsListener(poolId).afterClosed();
- }
-
- @Override
- public void beforeAcquiringOrCreating(String poolId, ListenerEvent> acquireEvent) {
- poolMetricsListener(poolId).beforeAcquiringOrCreating(acquireEvent);
- }
-
- @Override
- public void afterAcquiringOrCreating(String poolId) {
- poolMetricsListener(poolId).afterAcquiringOrCreating();
- }
-
- @Override
- public void afterAcquiredOrCreated(String poolId, ListenerEvent> acquireEvent) {
- poolMetricsListener(poolId).afterAcquiredOrCreated(acquireEvent);
- }
-
- @Override
- public void afterTimedOutToAcquireOrCreate(String poolId) {
- poolMetricsListener(poolId).afterTimedOutToAcquireOrCreate();
- }
-
- @Override
- public void afterConnectionCreated(String poolId, ListenerEvent> inUseEvent) {
- poolMetricsListener(poolId).acquired(inUseEvent);
- }
-
- @Override
- public void afterConnectionReleased(String poolId, ListenerEvent> inUseEvent) {
- poolMetricsListener(poolId).released(inUseEvent);
- }
-
- @Override
- public ListenerEvent> createListenerEvent() {
- return new MicrometerTimerListenerEvent(this.meterRegistry);
- }
-
- @Override
- public void registerPoolMetrics(
- String poolId, BoltServerAddress address, IntSupplier inUseSupplier, IntSupplier idleSupplier) {
- this.connectionPoolMetrics.put(
- poolId,
- new MicrometerConnectionPoolMetrics(poolId, address, inUseSupplier, idleSupplier, this.meterRegistry));
- }
-
- // For testing purposes only
- void putPoolMetrics(String poolId, ConnectionPoolMetrics poolMetrics) {
- this.connectionPoolMetrics.put(poolId, poolMetrics);
- }
-
- @Override
- public void removePoolMetrics(String poolId) {
- this.connectionPoolMetrics.remove(poolId);
- }
-
- private ConnectionPoolMetricsListener poolMetricsListener(String poolId) {
- var poolMetrics = (ConnectionPoolMetricsListener) this.connectionPoolMetrics.get(poolId);
- if (poolMetrics == null) {
- return DevNullPoolMetricsListener.INSTANCE;
- }
- return poolMetrics;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetricsProvider.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetricsProvider.java
deleted file mode 100644
index 5e0202407c..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerMetricsProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import org.neo4j.bolt.connection.MetricsListener;
-import org.neo4j.driver.Metrics;
-
-/**
- * An adapter to bridge between driver metrics and Micrometer {@link MeterRegistry meter registry}.
- */
-public final class MicrometerMetricsProvider implements MetricsProvider {
- private final MicrometerMetrics metrics;
-
- public static MetricsProvider forGlobalRegistry() {
- return of(io.micrometer.core.instrument.Metrics.globalRegistry);
- }
-
- public static MetricsProvider of(MeterRegistry meterRegistry) {
- return new MicrometerMetricsProvider(meterRegistry);
- }
-
- private MicrometerMetricsProvider(MeterRegistry meterRegistry) {
- this.metrics = new MicrometerMetrics(meterRegistry);
- }
-
- @Override
- public Metrics metrics() {
- return this.metrics;
- }
-
- @Override
- public MetricsListener metricsListener() {
- return this.metrics;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerTimerListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerTimerListenerEvent.java
deleted file mode 100644
index 465031fb28..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/MicrometerTimerListenerEvent.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright (c) "Neo4j"
- * Neo4j Sweden AB [https://neo4j.com]
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.metrics;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-import org.neo4j.bolt.connection.ListenerEvent;
-
-final class MicrometerTimerListenerEvent implements ListenerEvent {
- private final MeterRegistry meterRegistry;
- private Timer.Sample sample;
-
- public MicrometerTimerListenerEvent(MeterRegistry meterRegistry) {
- this.meterRegistry = meterRegistry;
- }
-
- @Override
- public void start() {
- this.sample = Timer.start(this.meterRegistry);
- }
-
- @Override
- public Timer.Sample getSample() {
- return this.sample;
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/TimeRecorderListenerEvent.java b/driver/src/main/java/org/neo4j/driver/internal/observation/BoltExchangeObservation.java
similarity index 58%
rename from driver/src/main/java/org/neo4j/driver/internal/metrics/TimeRecorderListenerEvent.java
rename to driver/src/main/java/org/neo4j/driver/internal/observation/BoltExchangeObservation.java
index cfc2a2bf30..c5292f0115 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/metrics/TimeRecorderListenerEvent.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/observation/BoltExchangeObservation.java
@@ -14,26 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.metrics;
+package org.neo4j.driver.internal.observation;
-import java.time.Clock;
-import org.neo4j.bolt.connection.ListenerEvent;
+public interface BoltExchangeObservation extends Observation {
-final class TimeRecorderListenerEvent implements ListenerEvent