1- /**
1+ /*
22 * Licensed to the Apache Software Foundation (ASF) under one
33 * or more contributor license agreements. See the NOTICE file
44 * distributed with this work for additional information
2727import static org .apache .hadoop .hbase .client .ConnectionUtils .isRemote ;
2828import static org .apache .hadoop .hbase .client .ConnectionUtils .timelineConsistentRead ;
2929import static org .apache .hadoop .hbase .util .FutureUtils .addListener ;
30-
30+ import io .opentelemetry .api .trace .Span ;
31+ import io .opentelemetry .api .trace .StatusCode ;
32+ import io .opentelemetry .context .Scope ;
3133import java .io .IOException ;
3234import java .util .concurrent .CompletableFuture ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .concurrent .atomic .AtomicInteger ;
3537import org .apache .hadoop .hbase .HRegionLocation ;
3638import org .apache .hadoop .hbase .TableName ;
3739import org .apache .hadoop .hbase .client .metrics .ScanMetrics ;
40+ import org .apache .hadoop .hbase .client .trace .TableOperationSpanBuilder ;
3841import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
42+ import org .apache .hadoop .hbase .trace .TraceUtil ;
3943import org .apache .yetus .audience .InterfaceAudience ;
40-
4144import org .apache .hbase .thirdparty .io .netty .util .Timer ;
42-
4345import org .apache .hadoop .hbase .shaded .protobuf .RequestConverter ;
4446import org .apache .hadoop .hbase .shaded .protobuf .generated .ClientProtos .ClientService ;
4547import org .apache .hadoop .hbase .shaded .protobuf .generated .ClientProtos .ClientService .Interface ;
@@ -85,6 +87,17 @@ class AsyncClientScanner {
8587
8688 private final ScanResultCache resultCache ;
8789
90+ /*
91+ * The `span` instance is accessed concurrently by several threads, but we use only basic
92+ * synchronization. The class claims to be `@ThreadSafe` so we rely on the implementation to
93+ * correctly handle concurrent use. The instance itself is initialized in the `start` method,
94+ * so we cannot make it `final`. Because the instance is created before any consuming runnable
95+ * is submitted to a worker pool, it should be enough to mark it as `volatile`. To avoid over-
96+ * paying the price of the memory barrier, where a consumer makes multiple uses of the `span`
97+ * instance, we use a local final non-volatile reference.
98+ */
99+ private volatile Span span = null ;
100+
88101 public AsyncClientScanner (Scan scan , AdvancedScanResultConsumer consumer , TableName tableName ,
89102 AsyncConnectionImpl conn , Timer retryTimer , long pauseNs , long pauseForCQTBENs ,
90103 int maxAttempts , long scanTimeoutNs , long rpcTimeoutNs , int startLogErrorsCnt ) {
@@ -140,26 +153,37 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
140153
141154 private CompletableFuture <OpenScannerResponse > callOpenScanner (HBaseRpcController controller ,
142155 HRegionLocation loc , ClientService .Interface stub ) {
143- boolean isRegionServerRemote = isRemote (loc .getHostname ());
144- incRPCCallsMetrics (scanMetrics , isRegionServerRemote );
145- if (openScannerTries .getAndIncrement () > 1 ) {
146- incRPCRetriesMetrics (scanMetrics , isRegionServerRemote );
147- }
148- CompletableFuture <OpenScannerResponse > future = new CompletableFuture <>();
149- try {
150- ScanRequest request = RequestConverter .buildScanRequest (loc .getRegion ().getRegionName (), scan ,
151- scan .getCaching (), false );
152- stub .scan (controller , request , resp -> {
153- if (controller .failed ()) {
154- future .completeExceptionally (controller .getFailed ());
155- return ;
156- }
157- future .complete (new OpenScannerResponse (loc , isRegionServerRemote , stub , controller , resp ));
158- });
159- } catch (IOException e ) {
160- future .completeExceptionally (e );
156+ final Span localSpan = span ;
157+ try (Scope ignored = localSpan .makeCurrent ()) {
158+ boolean isRegionServerRemote = isRemote (loc .getHostname ());
159+ incRPCCallsMetrics (scanMetrics , isRegionServerRemote );
160+ if (openScannerTries .getAndIncrement () > 1 ) {
161+ incRPCRetriesMetrics (scanMetrics , isRegionServerRemote );
162+ }
163+ CompletableFuture <OpenScannerResponse > future = new CompletableFuture <>();
164+ try {
165+ ScanRequest request = RequestConverter .buildScanRequest (
166+ loc .getRegion ().getRegionName (), scan , scan .getCaching (), false );
167+ stub .scan (controller , request , resp -> {
168+ try (Scope ignored1 = localSpan .makeCurrent ()) {
169+ if (controller .failed ()) {
170+ final IOException e = controller .getFailed ();
171+ future .completeExceptionally (e );
172+ TraceUtil .setError (localSpan , e );
173+ localSpan .end ();
174+ return ;
175+ }
176+ future .complete (new OpenScannerResponse (
177+ loc , isRegionServerRemote , stub , controller , resp ));
178+ }
179+ });
180+ } catch (IOException e ) {
181+ TraceUtil .setError (localSpan , e );
182+ localSpan .end ();
183+ future .completeExceptionally (e );
184+ }
185+ return future ;
161186 }
162- return future ;
163187 }
164188
165189 private void startScan (OpenScannerResponse resp ) {
@@ -173,25 +197,40 @@ private void startScan(OpenScannerResponse resp) {
173197 .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
174198 .startLogErrorsCnt (startLogErrorsCnt ).start (resp .controller , resp .resp ),
175199 (hasMore , error ) -> {
176- if (error != null ) {
177- consumer .onError (error );
178- return ;
179- }
180- if (hasMore ) {
181- openScanner ();
182- } else {
183- consumer .onComplete ();
200+ final Span localSpan = span ;
201+ try (Scope ignored = localSpan .makeCurrent ()) {
202+ if (error != null ) {
203+ try {
204+ consumer .onError (error );
205+ return ;
206+ } finally {
207+ TraceUtil .setError (localSpan , error );
208+ localSpan .end ();
209+ }
210+ }
211+ if (hasMore ) {
212+ openScanner ();
213+ } else {
214+ try {
215+ consumer .onComplete ();
216+ } finally {
217+ localSpan .setStatus (StatusCode .OK );
218+ localSpan .end ();
219+ }
220+ }
184221 }
185222 });
186223 }
187224
188225 private CompletableFuture <OpenScannerResponse > openScanner (int replicaId ) {
189- return conn .callerFactory .<OpenScannerResponse > single ().table (tableName )
190- .row (scan .getStartRow ()).replicaId (replicaId ).locateType (getLocateType (scan ))
191- .rpcTimeout (rpcTimeoutNs , TimeUnit .NANOSECONDS )
192- .operationTimeout (scanTimeoutNs , TimeUnit .NANOSECONDS ).pause (pauseNs , TimeUnit .NANOSECONDS )
193- .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
194- .startLogErrorsCnt (startLogErrorsCnt ).action (this ::callOpenScanner ).call ();
226+ try (Scope ignored = span .makeCurrent ()) {
227+ return conn .callerFactory .<OpenScannerResponse >single ().table (tableName )
228+ .row (scan .getStartRow ()).replicaId (replicaId ).locateType (getLocateType (scan ))
229+ .rpcTimeout (rpcTimeoutNs , TimeUnit .NANOSECONDS )
230+ .operationTimeout (scanTimeoutNs , TimeUnit .NANOSECONDS ).pause (pauseNs , TimeUnit .NANOSECONDS )
231+ .pauseForCQTBE (pauseForCQTBENs , TimeUnit .NANOSECONDS ).maxAttempts (maxAttempts )
232+ .startLogErrorsCnt (startLogErrorsCnt ).action (this ::callOpenScanner ).call ();
233+ }
195234 }
196235
197236 private long getPrimaryTimeoutNs () {
@@ -205,15 +244,34 @@ private void openScanner() {
205244 addListener (timelineConsistentRead (conn .getLocator (), tableName , scan , scan .getStartRow (),
206245 getLocateType (scan ), this ::openScanner , rpcTimeoutNs , getPrimaryTimeoutNs (), retryTimer ,
207246 conn .getConnectionMetrics ()), (resp , error ) -> {
208- if (error != null ) {
209- consumer .onError (error );
210- return ;
247+ final Span localSpan = span ;
248+ try (Scope ignored = localSpan .makeCurrent ()) {
249+ if (error != null ) {
250+ try {
251+ consumer .onError (error );
252+ return ;
253+ } finally {
254+ TraceUtil .setError (localSpan , error );
255+ localSpan .end ();
256+ }
257+ }
258+ startScan (resp );
211259 }
212- startScan (resp );
213260 });
214261 }
215262
216263 public void start () {
217- openScanner ();
264+ final Span localSpan = new TableOperationSpanBuilder (conn )
265+ .setTableName (tableName )
266+ .setOperation (scan )
267+ .build ();
268+ if (consumer instanceof AsyncTableResultScanner ) {
269+ AsyncTableResultScanner scanner = (AsyncTableResultScanner ) consumer ;
270+ scanner .setSpan (localSpan );
271+ }
272+ span = localSpan ;
273+ try (Scope ignored = localSpan .makeCurrent ()) {
274+ openScanner ();
275+ }
218276 }
219277}
0 commit comments