diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java index aec96897179b..f71defb329ee 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -54,6 +55,10 @@ public static Matcher containsEntry(String key, String value) { return containsEntry(AttributeKey.stringKey(key), value); } + public static Matcher containsEntry(String key, long value) { + return containsEntry(AttributeKey.longKey(key), value); + } + public static Matcher containsEntryWithStringValuesOf(String key, String... values) { return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values)); } @@ -63,6 +68,10 @@ public static Matcher containsEntryWithStringValuesOf(String key, return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher); } + public static Matcher isEmpty() { + return hasProperty("empty", is(true)); + } + private static final class IsAttributesContaining extends TypeSafeMatcher { private final Matcher> keyMatcher; private final Matcher valueMatcher; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java new file mode 100644 index 000000000000..c10b001c59dd --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CHECKSUM_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.COMPRESSION_ALGORITHM_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ENCRYPTION_CIPHER_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HFILE_NAME_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.READ_TYPE_KEY; + +import io.opentelemetry.api.common.AttributesBuilder; +import java.util.Objects; +import java.util.function.Consumer; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; +import org.apache.hadoop.hbase.util.ChecksumType; +import org.apache.yetus.audience.InterfaceAudience; + +/** + *

+ * Populate fields on an {@link AttributesBuilder} based on an {@link HFileContext}. The class is + * designed such that calls to the {@link #accept(AttributesBuilder)} method are idempotent with + * regards to the instance state of this class. + *

+ *

+ * The true and truly ridiculous class name should be something more like + * {@code HFileContext_ContextAttributes_AttributesBuilder_Consumer}. + *

+ */ +@InterfaceAudience.Private +public class HFileContextAttributesBuilderConsumer implements Consumer { + + private final HFileContext hFileContext; + + private boolean skipChecksum = false; + private ReadType readType = null; + + public HFileContextAttributesBuilderConsumer(final HFileContext hFileContext) { + this.hFileContext = Objects.requireNonNull(hFileContext); + } + + /** + * Specify that the {@link ChecksumType} should not be included in the attributes. + */ + public HFileContextAttributesBuilderConsumer setSkipChecksum(final boolean skipChecksum) { + this.skipChecksum = skipChecksum; + return this; + } + + /** + * Specify the {@link ReadType} involced in this IO operation. + */ + public HFileContextAttributesBuilderConsumer setReadType(final ReadType readType) { + // TODO: this is not a part of the HFileBlock, its context of the operation. Should track this + // detail elsewhere. + this.readType = readType; + return this; + } + + @Override + public void accept(AttributesBuilder builder) { + if (hFileContext.getHFileName() != null) { + builder.put(HFILE_NAME_KEY, hFileContext.getHFileName()); + } + if (hFileContext.getCompression() != null) { + builder.put(COMPRESSION_ALGORITHM_KEY, hFileContext.getCompression().getName()); + } + if (hFileContext.getDataBlockEncoding() != null) { + builder.put(DATA_BLOCK_ENCODING_KEY, hFileContext.getDataBlockEncoding().name()); + } + if ( + hFileContext.getEncryptionContext() != null + && hFileContext.getEncryptionContext().getCipher() != null + ) { + builder.put(ENCRYPTION_CIPHER_KEY, hFileContext.getEncryptionContext().getCipher().getName()); + } + if (!skipChecksum && hFileContext.getChecksumType() != null) { + builder.put(CHECKSUM_KEY, hFileContext.getChecksumType().getName()); + } + if (readType != null) { + builder.put(READ_TYPE_KEY, readType.name()); + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index 1720cae2300c..0463b2ee122a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.hbase.io.util; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; @@ -76,33 +82,53 @@ public static boolean isByteBufferReadable(FSDataInputStream is) { * @throws IOException exception to throw if any error happen */ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { + final Span span = Span.current(); if (!isByteBufferReadable(dis)) { // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // the destination ByteBuff. byte[] heapBuf = new byte[length]; IOUtils.readFully(dis, heapBuf, 0, length); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateHeapBytesRead(attributesBuilder, length); + span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); + } copyToByteBuff(heapBuf, 0, length, buf); return; } + int directBytesRead = 0, heapBytesRead = 0; ByteBuffer[] buffers = buf.nioByteBuffers(); int remain = length; int idx = 0; ByteBuffer cur = buffers[idx]; - while (remain > 0) { - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { + try { + while (remain > 0) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int bytesRead = dis.read(cur); + if (bytesRead < 0) { throw new IOException( - "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); + "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); + } + remain -= bytesRead; + if (cur.isDirect()) { + directBytesRead += bytesRead; + } else { + heapBytesRead += bytesRead; } - cur = buffers[idx]; } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - int bytesRead = dis.read(cur); - if (bytesRead < 0) { - throw new IOException( - "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); + } finally { + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.readFully", attributesBuilder.build()); } - remain -= bytesRead; } } @@ -116,19 +142,30 @@ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) th */ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) throws IOException { - byte[] buffer = new byte[1024]; if (length < 0) { throw new IllegalArgumentException("Length must not be negative: " + length); } + int heapBytesRead = 0; int remain = length, count; - while (remain > 0) { - count = in.read(buffer, 0, Math.min(remain, buffer.length)); - if (count < 0) { - throw new IOException( - "Premature EOF from inputStream, but still need " + remain + " bytes"); + byte[] buffer = new byte[1024]; + try { + while (remain > 0) { + count = in.read(buffer, 0, Math.min(remain, buffer.length)); + if (count < 0) { + throw new IOException( + "Premature EOF from inputStream, but still need " + remain + " bytes"); + } + out.put(buffer, 0, count); + remain -= count; + heapBytesRead += count; + } + } finally { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateHeapBytesRead(attributesBuilder, heapBytesRead); + span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build()); } - out.put(buffer, 0, count); - remain -= count; } } @@ -147,20 +184,31 @@ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int len */ private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, int necessaryLen, int extraLen) throws IOException { + int heapBytesRead = 0; int bytesRemaining = necessaryLen + extraLen; - while (bytesRemaining > 0) { - int ret = in.read(buf, bufOffset, bytesRemaining); - if (ret < 0) { - if (bytesRemaining <= extraLen) { - // We could not read the "extra data", but that is OK. - break; + try { + while (bytesRemaining > 0) { + int ret = in.read(buf, bufOffset, bytesRemaining); + if (ret < 0) { + if (bytesRemaining <= extraLen) { + // We could not read the "extra data", but that is OK. + break; + } + throw new IOException("Premature EOF from inputStream (read " + "returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); } - throw new IOException("Premature EOF from inputStream (read " + "returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); + bufOffset += ret; + bytesRemaining -= ret; + heapBytesRead += ret; + } + } finally { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateHeapBytesRead(attributesBuilder, heapBytesRead); + span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); } - bufOffset += ret; - bytesRemaining -= ret; } return bytesRemaining <= 0; } @@ -186,27 +234,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec copyToByteBuff(heapBuf, 0, heapBuf.length, buf); return ret; } + int directBytesRead = 0, heapBytesRead = 0; ByteBuffer[] buffers = buf.nioByteBuffers(); int bytesRead = 0; int remain = necessaryLen + extraLen; int idx = 0; ByteBuffer cur = buffers[idx]; - while (bytesRead < necessaryLen) { - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { - throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + try { + while (bytesRead < necessaryLen) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int ret = dis.read(cur); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + if (cur.isDirect()) { + directBytesRead += ret; + } else { + heapBytesRead += ret; } - cur = buffers[idx]; } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - int ret = dis.read(cur); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); + } finally { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build()); } - bytesRead += ret; - remain -= ret; } return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); } @@ -264,15 +328,24 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis byte[] buf = new byte[remain]; int bytesRead = 0; int lengthMustRead = readAllBytes ? remain : necessaryLen; - while (bytesRead < lengthMustRead) { - int ret = dis.read(position + bytesRead, buf, bytesRead, remain); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (positional read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); + try { + while (bytesRead < lengthMustRead) { + int ret = dis.read(position + bytesRead, buf, bytesRead, remain); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + } + } finally { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateHeapBytesRead(attributesBuilder, bytesRead); + span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); } - bytesRead += ret; - remain -= ret; } copyToByteBuff(buf, 0, bytesRead, buff); return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); @@ -280,39 +353,55 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { + int directBytesRead = 0, heapBytesRead = 0; int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; ByteBuffer[] buffers = buff.nioByteBuffers(); ByteBuffer cur = buffers[idx]; int lengthMustRead = readAllBytes ? remain : necessaryLen; - while (bytesRead < lengthMustRead) { - int ret; - while (!cur.hasRemaining()) { - if (++idx >= buffers.length) { - throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + try { + while (bytesRead < lengthMustRead) { + int ret; + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + try { + ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); + } catch (IllegalAccessException e) { + throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " + + bytesRead + " bytes from position " + position, e); + } catch (InvocationTargetException e) { + throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" + + " when trying to read " + bytesRead + " bytes from position " + position, e); + } catch (NullPointerException e) { + throw new IOException("something is null"); + } catch (Exception e) { + throw e; + } + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + if (cur.isDirect()) { + directBytesRead += bytesRead; + } else { + heapBytesRead += bytesRead; } - cur = buffers[idx]; - } - cur.limit(cur.position() + Math.min(remain, cur.remaining())); - try { - ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur); - } catch (IllegalAccessException e) { - throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read " - + bytesRead + " bytes from position " + position, e); - } catch (InvocationTargetException e) { - throw new IOException("Encountered an exception when invoking ByteBuffer positioned read" - + " when trying to read " + bytesRead + " bytes from position " + position, e); - } catch (NullPointerException e) { - throw new IOException("something is null"); - } catch (Exception e) { - throw e; } - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (positional read returned " + ret - + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen - + " extra bytes, successfully read " + bytesRead); + } finally { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder attributesBuilder = Attributes.builder(); + annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead); + span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build()); } - bytesRead += ret; - remain -= ret; } return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); @@ -340,4 +429,26 @@ private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) } return len; } + + /** + * Conditionally annotate {@code span} with the appropriate attribute when value is non-zero. + */ + private static void annotateHeapBytesRead(AttributesBuilder attributesBuilder, + int heapBytesRead) { + annotateBytesRead(attributesBuilder, 0, heapBytesRead); + } + + /** + * Conditionally annotate {@code attributesBuilder} with appropriate attributes when values are + * non-zero. + */ + private static void annotateBytesRead(AttributesBuilder attributesBuilder, long directBytesRead, + long heapBytesRead) { + if (directBytesRead > 0) { + attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead); + } + if (heapBytesRead > 0) { + attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 40dfc1dce4f7..2cf350ba6404 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.ByteBuffer; import java.util.List; import org.apache.yetus.audience.InterfaceAudience; @@ -56,6 +57,86 @@ public final class HBaseSemanticAttributes { AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); + /** + * Indicates the amount of data was read into a {@link ByteBuffer} of type + * {@link ByteBuffer#isDirect() direct}. + */ + public static final AttributeKey DIRECT_BYTES_READ_KEY = + AttributeKey.longKey("db.hbase.io.direct_bytes_read"); + /** + * Indicates the amount of data was read into a {@link ByteBuffer} not of type + * {@link ByteBuffer#isDirect() direct}. + */ + public static final AttributeKey HEAP_BYTES_READ_KEY = + AttributeKey.longKey("db.hbase.io.heap_bytes_read"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.compress.Compression.Algorithm} used to encode + * an HFile. + */ + public static final AttributeKey COMPRESSION_ALGORITHM_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.encoding.DataBlockEncoding} algorithm used to + * encode this HFile. + */ + public static final AttributeKey DATA_BLOCK_ENCODING_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding"); + /** + * Indicates the {@link org.apache.hadoop.hbase.io.crypto.Cipher} used to encrypt this HFile. + */ + public static final AttributeKey ENCRYPTION_CIPHER_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.encryption_cipher"); + /** + * Indicates the {@link org.apache.hadoop.hbase.util.ChecksumType} used to encode this HFile. + */ + public static final AttributeKey CHECKSUM_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.checksum_type"); + /** + * Indicates the name of the HFile accessed. + */ + public static final AttributeKey HFILE_NAME_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.file_name"); + /** + * Indicated the type of read. + */ + public static final AttributeKey READ_TYPE_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.read_type"); + /** + * Identifies an entry in the Block Cache. + */ + public static final AttributeKey BLOCK_CACHE_KEY_KEY = + AttributeKey.stringKey("db.hbase.io.hfile.block_cache_key"); + /** + * Indicates the name of the column family accessed. + */ + public static final AttributeKey COLUMN_FAMILY_NAME_KEY = + AttributeKey.stringKey("db.hbase.io.store.column_family_name"); + /** + * Records the {@code ScanQueryMatcher.MatchCode} produced during a scan. + */ + public static final AttributeKey QUERY_MATCHER_MATCH_CODE_KEY = + AttributeKey.stringKey("db.hbase.io.store.query_matcher.match_code"); + /** + * Records the {@code DeleteTracker.DeleteResult} produced during a scan. + */ + public static final AttributeKey QUERY_MATCHER_DELETE_CODE_KEY = + AttributeKey.stringKey("db.hbase.io.store.query_matcher.delete_result"); + /** + * Records the {@code Filter.ReturnCode} produced during a scan. + */ + public static final AttributeKey QUERY_MATCHER_FILTER_CODE_KEY = + AttributeKey.stringKey("db.hbase.io.store.query_matcher.filter_result"); + + /** + * These values represent the different IO read strategies HBase may employ for accessing + * filesystem data. + */ + public enum ReadType { + // TODO: promote this to the FSReader#readBlockData API. Or somehow instead use Scan.ReadType. + POSITIONAL_READ, + SEEK_PLUS_READ, + } + /** * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of * {@code org.apache.hadoop.hbase.client.Operation}, as well as diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 68c80fd95129..1d2eed1cdc5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -19,6 +19,11 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -46,11 +51,14 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; +import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.ShipperListener; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -636,7 +644,14 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException HFileBlock unpacked = shallowClone(this, newBuf); boolean succ = false; - try { + final Span span = TraceUtil.createSpan("HFileBlock.unpack"); + if (span.isRecording()) { + // don't bother resolving all the metadata if the span isn't being collected. + final AttributesBuilder builder = Attributes.builder(); + new HFileContextAttributesBuilderConsumer(fileContext).accept(builder); + span.setAllAttributes(builder.build()); + } + try (Scope ignored = span.makeCurrent()) { HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); @@ -653,6 +668,7 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException if (!succ) { unpacked.release(); } + span.end(); } } @@ -1498,59 +1514,77 @@ public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean boolean updateMetrics, boolean intoHeap) throws IOException { // Get a copy of the current state of whether to validate // hbase checksums or not for this read call. This is not - // thread-safe but the one constaint is that if we decide + // thread-safe but the one constraint is that if we decide // to skip hbase checksum verification then we are // guaranteed to use hdfs checksum verification. boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); - - HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, - doVerificationThruHBaseChecksum, updateMetrics, intoHeap); - if (blk == null) { - HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset " - + offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on..."); - - if (!doVerificationThruHBaseChecksum) { - String msg = "HBase checksum verification failed for file " + pathName + " at offset " - + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " - + doVerificationThruHBaseChecksum; - HFile.LOG.warn(msg); - throw new IOException(msg); // cannot happen case here - } - HFile.CHECKSUM_FAILURES.increment(); // update metrics - - // If we have a checksum failure, we fall back into a mode where - // the next few reads use HDFS level checksums. We aim to make the - // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid - // hbase checksum verification, but since this value is set without - // holding any locks, it can so happen that we might actually do - // a few more than precisely this number. - is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); - doVerificationThruHBaseChecksum = false; - blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, + final Span span = TraceUtil.createSpan("FSReaderImpl.readBlockData"); + if (span.isRecording()) { + // don't bother resolving all the metadata if the span isn't being collected. + final AttributesBuilder builder = Attributes.builder(); + new HFileContextAttributesBuilderConsumer(fileContext) + .setSkipChecksum(doVerificationThruHBaseChecksum) + .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ).accept(builder); + span.setAllAttributes(builder.build()); + } + try (Scope ignored = span.makeCurrent()) { + HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, doVerificationThruHBaseChecksum, updateMetrics, intoHeap); - if (blk != null) { - HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset " - + offset + " filesize " + fileSize); + if (blk == null) { + HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}." + + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize); + + if (!doVerificationThruHBaseChecksum) { + String msg = "HBase checksum verification failed for file " + pathName + " at offset " + + offset + " filesize " + fileSize + " but this cannot happen because doVerify is " + + doVerificationThruHBaseChecksum; + HFile.LOG.warn(msg); + final IOException e = new IOException(msg); + TraceUtil.setError(span, e); + throw e; // cannot happen case here + } + HFile.CHECKSUM_FAILURES.increment(); // update metrics + + // If we have a checksum failure, we fall back into a mode where + // the next few reads use HDFS level checksums. We aim to make the + // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid + // hbase checksum verification, but since this value is set without + // holding any locks, it can so happen that we might actually do + // a few more than precisely this number. + is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); + doVerificationThruHBaseChecksum = false; + blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, + doVerificationThruHBaseChecksum, updateMetrics, intoHeap); + if (blk != null) { + HFile.LOG.warn( + "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}", + pathName, offset, fileSize); + } + } + if (blk == null && !doVerificationThruHBaseChecksum) { + String msg = + "readBlockData failed, possibly due to " + "checksum verification failed for file " + + pathName + " at offset " + offset + " filesize " + fileSize; + HFile.LOG.warn(msg); + final IOException e = new IOException(msg); + TraceUtil.setError(span, e); + throw e; } - } - if (blk == null && !doVerificationThruHBaseChecksum) { - String msg = - "readBlockData failed, possibly due to " + "checksum verification failed for file " - + pathName + " at offset " + offset + " filesize " + fileSize; - HFile.LOG.warn(msg); - throw new IOException(msg); - } - // If there is a checksum mismatch earlier, then retry with - // HBase checksums switched off and use HDFS checksum verification. - // This triggers HDFS to detect and fix corrupt replicas. The - // next checksumOffCount read requests will use HDFS checksums. - // The decrementing of this.checksumOffCount is not thread-safe, - // but it is harmless because eventually checksumOffCount will be - // a negative number. - streamWrapper.checksumOk(); - return blk; + // If there is a checksum mismatch earlier, then retry with + // HBase checksums switched off and use HDFS checksum verification. + // This triggers HDFS to detect and fix corrupt replicas. The + // next checksumOffCount read requests will use HDFS checksums. + // The decrementing of this.checksumOffCount is not thread-safe, + // but it is harmless because eventually checksumOffCount will be + // a negative number. + streamWrapper.checksumOk(); + span.setStatus(StatusCode.OK); + return blk; + } finally { + span.end(); + } } /** @@ -1648,6 +1682,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); } + + final Span span = Span.current(); int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize); // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 // and will save us having to seek the stream backwards to reread the header we @@ -1672,8 +1708,9 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, // in a LOG every time we seek. See HBASE-17072 for more detail. if (headerBuf == null) { if (LOG.isTraceEnabled()) { - LOG.trace("Extra see to get block size!", new RuntimeException()); + LOG.trace("Extra seek to get block size!", new RuntimeException()); } + span.addEvent("Extra seek to get block size!"); headerBuf = HEAP.allocate(hdrSize); readAtOffset(is, headerBuf, hdrSize, false, offset, pread); headerBuf.rewind(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index c74bbb1bdb7e..552f4188cbd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY; + +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.io.DataInput; import java.io.IOException; @@ -522,85 +527,103 @@ private void _readMvccVersion(int offsetFromPos) { * block(e.g. using a faked index key) */ protected int blockSeek(Cell key, boolean seekBefore) { - int klen, vlen, tlen = 0; - int lastKeyValueSize = -1; - int offsetFromPos; - do { - offsetFromPos = 0; - // Better to ensure that we use the BB Utils here - long ll = blockBuffer.getLongAfterPosition(offsetFromPos); - klen = (int) (ll >> Integer.SIZE); - vlen = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); - if (checkKeyLen(klen) || checkLen(vlen)) { - throw new IllegalStateException( - "Invalid klen " + klen + " or vlen " + vlen + ". Block offset: " + curBlock.getOffset() - + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() - + " (without header)." + " path=" + reader.getPath()); - } - offsetFromPos += Bytes.SIZEOF_LONG; - this.rowLen = blockBuffer.getShortAfterPosition(offsetFromPos); - blockBuffer.asSubByteBuffer(blockBuffer.position() + offsetFromPos, klen, pair); - bufBackedKeyOnlyKv.setKey(pair.getFirst(), pair.getSecond(), klen, rowLen); - int comp = - PrivateCellUtil.compareKeyIgnoresMvcc(reader.getComparator(), key, bufBackedKeyOnlyKv); - offsetFromPos += klen + vlen; - if (this.reader.getFileContext().isIncludesTags()) { - // Read short as unsigned, high byte first - tlen = ((blockBuffer.getByteAfterPosition(offsetFromPos) & 0xff) << 8) - ^ (blockBuffer.getByteAfterPosition(offsetFromPos + 1) & 0xff); - if (checkLen(tlen)) { - throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " - + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " - + blockBuffer.position() + " (without header)." + " path=" + reader.getPath()); + final Span span = TraceUtil.createSpan("HFileScannerImpl.blockSeek"); + try (Scope ignored = span.makeCurrent()) { + int klen, vlen, tlen = 0; + int lastKeyValueSize = -1; + int offsetFromPos; + do { + offsetFromPos = 0; + // Better to ensure that we use the BB Utils here + long ll = blockBuffer.getLongAfterPosition(offsetFromPos); + klen = (int) (ll >> Integer.SIZE); + vlen = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); + if (checkKeyLen(klen) || checkLen(vlen)) { + final IllegalStateException e = new IllegalStateException( + "Invalid klen " + klen + " or vlen " + vlen + ". Block offset: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)." + " path=" + reader.getPath()); + TraceUtil.setError(span, e); + throw e; } - // add the two bytes read for the tags. - offsetFromPos += tlen + (Bytes.SIZEOF_SHORT); - } - if (this.reader.getHFileInfo().shouldIncludeMemStoreTS()) { - // Directly read the mvcc based on current position - readMvccVersion(offsetFromPos); - } - if (comp == 0) { - if (seekBefore) { - if (lastKeyValueSize < 0) { - throw new IllegalStateException("blockSeek with seekBefore " - + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) - + ", blockOffset=" + curBlock.getOffset() + ", onDiskSize=" - + curBlock.getOnDiskSizeWithHeader() + ", path=" + reader.getPath()); + offsetFromPos += Bytes.SIZEOF_LONG; + this.rowLen = blockBuffer.getShortAfterPosition(offsetFromPos); + blockBuffer.asSubByteBuffer(blockBuffer.position() + offsetFromPos, klen, pair); + bufBackedKeyOnlyKv.setKey(pair.getFirst(), pair.getSecond(), klen, rowLen); + int comp = + PrivateCellUtil.compareKeyIgnoresMvcc(reader.getComparator(), key, bufBackedKeyOnlyKv); + offsetFromPos += klen + vlen; + if (this.reader.getFileContext().isIncludesTags()) { + // Read short as unsigned, high byte first + tlen = ((blockBuffer.getByteAfterPosition(offsetFromPos) & 0xff) << 8) + ^ (blockBuffer.getByteAfterPosition(offsetFromPos + 1) & 0xff); + if (checkLen(tlen)) { + final IllegalStateException e = + new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " + + curBlock.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + + blockBuffer.position() + " (without header)." + " path=" + reader.getPath()); + TraceUtil.setError(span, e); + throw e; } - blockBuffer.moveBack(lastKeyValueSize); - readKeyValueLen(); - return 1; // non exact match. + // add the two bytes read for the tags. + offsetFromPos += tlen + (Bytes.SIZEOF_SHORT); } - currKeyLen = klen; - currValueLen = vlen; - currTagsLen = tlen; - return 0; // indicate exact match - } else if (comp < 0) { - if (lastKeyValueSize > 0) { - blockBuffer.moveBack(lastKeyValueSize); + if (this.reader.getHFileInfo().shouldIncludeMemStoreTS()) { + // Directly read the mvcc based on current position + readMvccVersion(offsetFromPos); } - readKeyValueLen(); - if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { - return HConstants.INDEX_KEY_MAGIC; + if (comp == 0) { + if (seekBefore) { + if (lastKeyValueSize < 0) { + final IllegalStateException e = + new IllegalStateException("blockSeek with seekBefore " + + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) + + ", blockOffset=" + curBlock.getOffset() + ", onDiskSize=" + + curBlock.getOnDiskSizeWithHeader() + ", path=" + reader.getPath()); + TraceUtil.setError(span, e); + throw e; + } + blockBuffer.moveBack(lastKeyValueSize); + readKeyValueLen(); + span.setStatus(StatusCode.OK); + return 1; // non exact match. + } + currKeyLen = klen; + currValueLen = vlen; + currTagsLen = tlen; + span.setStatus(StatusCode.OK); + return 0; // indicate exact match + } else if (comp < 0) { + if (lastKeyValueSize > 0) { + blockBuffer.moveBack(lastKeyValueSize); + } + readKeyValueLen(); + if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { + span.setStatus(StatusCode.OK); + return HConstants.INDEX_KEY_MAGIC; + } + span.setStatus(StatusCode.OK); + return 1; } - return 1; - } - // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + currMemstoreTSLen + KEY_VALUE_LEN_SIZE; - // include tag length also if tags included with KV - if (reader.getFileContext().isIncludesTags()) { - lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; - } - blockBuffer.skip(lastKeyValueSize); - } while (blockBuffer.hasRemaining()); + // The size of this key/value tuple, including key/value length fields. + lastKeyValueSize = klen + vlen + currMemstoreTSLen + KEY_VALUE_LEN_SIZE; + // include tag length also if tags included with KV + if (reader.getFileContext().isIncludesTags()) { + lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; + } + blockBuffer.skip(lastKeyValueSize); + } while (blockBuffer.hasRemaining()); - // Seek to the last key we successfully read. This will happen if this is - // the last key/value pair in the file, in which case the following call - // to next() has to return false. - blockBuffer.moveBack(lastKeyValueSize); - readKeyValueLen(); - return 1; // didn't exactly find it. + // Seek to the last key we successfully read. This will happen if this is + // the last key/value pair in the file, in which case the following call + // to next() has to return false. + blockBuffer.moveBack(lastKeyValueSize); + readKeyValueLen(); + span.setStatus(StatusCode.OK); + return 1; // didn't exactly find it. + } finally { + span.end(); + } } @Override @@ -1089,71 +1112,80 @@ public void setConf(Configuration conf) { private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { - // Check cache for block. If found return. - BlockCache cache = cacheConf.getBlockCache().orElse(null); - if (cache != null) { - HFileBlock cachedBlock = - (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); - if (cachedBlock != null) { - if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { - HFileBlock compressedBlock = cachedBlock; - cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); - // In case of compressed block after unpacking we can release the compressed block - if (compressedBlock != cachedBlock) { - compressedBlock.release(); + final Span span = TraceUtil.createSpan("HFileReaderImpl.getCachedBlock"); + try (Scope ignored = span.makeCurrent()) { + // Check cache for block. If found return. + BlockCache cache = cacheConf.getBlockCache().orElse(null); + if (cache != null) { + HFileBlock cachedBlock = + (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); + if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + HFileBlock compressedBlock = cachedBlock; + cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); + // In case of compressed block after unpacking we can release the compressed block + if (compressedBlock != cachedBlock) { + compressedBlock.release(); + } + } + try { + validateBlockType(cachedBlock, expectedBlockType); + } catch (IOException e) { + returnAndEvictBlock(cache, cacheKey, cachedBlock); + TraceUtil.setError(span, e); + throw e; } - } - try { - validateBlockType(cachedBlock, expectedBlockType); - } catch (IOException e) { - returnAndEvictBlock(cache, cacheKey, cachedBlock); - throw e; - } - if (expectedDataBlockEncoding == null) { - return cachedBlock; - } - DataBlockEncoding actualDataBlockEncoding = cachedBlock.getDataBlockEncoding(); - // Block types other than data blocks always have - // DataBlockEncoding.NONE. To avoid false negative cache misses, only - // perform this check if cached block is a data block. - if ( - cachedBlock.getBlockType().isData() - && !actualDataBlockEncoding.equals(expectedDataBlockEncoding) - ) { - // This mismatch may happen if a Scanner, which is used for say a - // compaction, tries to read an encoded block from the block cache. - // The reverse might happen when an EncodedScanner tries to read - // un-encoded blocks which were cached earlier. - // - // Because returning a data block with an implicit BlockType mismatch - // will cause the requesting scanner to throw a disk read should be - // forced here. This will potentially cause a significant number of - // cache misses, so update so we should keep track of this as it might - // justify the work on a CompoundScanner. + if (expectedDataBlockEncoding == null) { + span.setStatus(StatusCode.OK); + return cachedBlock; + } + DataBlockEncoding actualDataBlockEncoding = cachedBlock.getDataBlockEncoding(); + // Block types other than data blocks always have + // DataBlockEncoding.NONE. To avoid false negative cache misses, only + // perform this check if cached block is a data block. if ( - !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) - && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE) + cachedBlock.getBlockType().isData() + && !actualDataBlockEncoding.equals(expectedDataBlockEncoding) ) { - // If the block is encoded but the encoding does not match the - // expected encoding it is likely the encoding was changed but the - // block was not yet evicted. Evictions on file close happen async - // so blocks with the old encoding still linger in cache for some - // period of time. This event should be rare as it only happens on - // schema definition change. - LOG.info( - "Evicting cached block with key {} because data block encoding mismatch; " - + "expected {}, actual {}, path={}", - cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, path); - // This is an error scenario. so here we need to release the block. - returnAndEvictBlock(cache, cacheKey, cachedBlock); + // This mismatch may happen if a Scanner, which is used for say a + // compaction, tries to read an encoded block from the block cache. + // The reverse might happen when an EncodedScanner tries to read + // un-encoded blocks which were cached earlier. + // + // Because returning a data block with an implicit BlockType mismatch + // will cause the requesting scanner to throw a disk read should be + // forced here. This will potentially cause a significant number of + // cache misses, so update so we should keep track of this as it might + // justify the work on a CompoundScanner. + if ( + !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) + && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE) + ) { + // If the block is encoded but the encoding does not match the + // expected encoding it is likely the encoding was changed but the + // block was not yet evicted. Evictions on file close happen async + // so blocks with the old encoding still linger in cache for some + // period of time. This event should be rare as it only happens on + // schema definition change. + LOG.info( + "Evicting cached block with key {} because data block encoding mismatch; " + + "expected {}, actual {}, path={}", + cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, path); + // This is an error scenario. so here we need to release the block. + returnAndEvictBlock(cache, cacheKey, cachedBlock); + } + span.setStatus(StatusCode.OK); + return null; } - return null; + span.setStatus(StatusCode.OK); + return cachedBlock; } - return cachedBlock; } + return null; + } finally { + span.end(); } - return null; } private void returnAndEvictBlock(BlockCache cache, BlockCacheKey cacheKey, Cacheable block) { @@ -1256,11 +1288,12 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType); + Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString()); boolean useLock = false; IdLock.Entry lockEntry = null; - Span span = TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan(); - try (Scope traceScope = span.makeCurrent()) { + final Span span = Span.current(); + try { while (true) { // Check cache for block. If found return. if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { @@ -1273,9 +1306,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { if (LOG.isTraceEnabled()) { - LOG.trace("From Cache " + cachedBlock); + LOG.trace("From Cache {}", cachedBlock); } - span.addEvent("blockCacheHit"); + span.addEvent("block cache hit", attributes); assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { @@ -1305,7 +1338,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Carry on, please load. } - span.addEvent("blockCacheMiss"); + span.addEvent("block cache miss", attributes); // Load block from filesystem. HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType)); @@ -1335,7 +1368,6 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } - span.end(); } } @@ -1478,17 +1510,21 @@ protected boolean processFirstDataBlock() throws IOException { @Override public boolean next() throws IOException { - boolean isValid = seeker.next(); - if (!isValid) { - HFileBlock newBlock = readNextDataBlock(); - isValid = newBlock != null; - if (isValid) { - updateCurrentBlock(newBlock); - } else { - setNonSeekedState(); + return TraceUtil.trace(() -> { + final Span span = Span.current(); + span.setAttribute(DATA_BLOCK_ENCODING_KEY, getEffectiveDataBlockEncoding().name()); + boolean isValid = seeker.next(); + if (!isValid) { + HFileBlock newBlock = readNextDataBlock(); + isValid = newBlock != null; + if (isValid) { + updateCurrentBlock(newBlock); + } else { + setNonSeekedState(); + } } - } - return isValid; + return isValid; + }, "EncodedScanner.next"); } @Override @@ -1540,6 +1576,8 @@ protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, updateCurrentBlock(seekToBlock); } else if (rewind) { seeker.rewind(); + final Span span = Span.current(); + span.addEvent("EncodedSeeker.rewind"); } this.nextIndexedKey = nextIndexedKey; return seeker.seekToKeyInBlock(key, seekBefore); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index 8561fc1c8939..8bf63909fcb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -88,23 +89,22 @@ public static void request(Path path, Runnable runnable) { delay = 0; } try { - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " ms"); - } - prefetchFutures.put(path, - prefetchExecutorPool.schedule(runnable, delay, TimeUnit.MILLISECONDS)); + LOG.debug("Prefetch requested for {}, delay={} ms", path, delay); + final Runnable tracedRunnable = + TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request"); + final Future future = + prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS); + prefetchFutures.put(path, future); } catch (RejectedExecutionException e) { prefetchFutures.remove(path); - LOG.warn("Prefetch request rejected for " + path); + LOG.warn("Prefetch request rejected for {}", path); } } } public static void complete(Path path) { prefetchFutures.remove(path); - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch completed for " + path); - } + LOG.debug("Prefetch completed for {}", path); } public static void cancel(Path path) { @@ -113,9 +113,7 @@ public static void cancel(Path path) { // ok to race with other cancellation attempts future.cancel(true); prefetchFutures.remove(path); - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch cancelled for " + path); - } + LOG.debug("Prefetch cancelled for {}", path); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 2bbf5bfb90f9..d09c5717db48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import io.opentelemetry.api.trace.Span; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -249,6 +250,8 @@ void clearProgress() { * state that was passed in. */ NextState setScannerState(NextState state) { + final Span span = Span.current(); + span.addEvent("ScannerContext.setScannerState"); if (!NextState.isValidState(state)) { throw new IllegalArgumentException("Cannot set to invalid state: " + state); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 7781e5e3c52a..25100954d2c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; /** @@ -93,12 +94,14 @@ public Cell peek() { // sanity check, the current should be always valid */ @Override public Cell next() throws IOException { - if (closed) { - return null; - } - Cell oldCurrent = current; - updateCurrent(); // update the currently observed Cell - return oldCurrent; + return TraceUtil.trace(() -> { + if (closed) { + return null; + } + Cell oldCurrent = current; + updateCurrent(); // update the currently observed Cell + return oldCurrent; + }, "SegmentScanner.next"); } /** @@ -108,19 +111,21 @@ public Cell next() throws IOException { */ @Override public boolean seek(Cell cell) throws IOException { - if (closed) { - return false; - } - if (cell == null) { - close(); - return false; - } - // restart the iterator from new key - iter = getIterator(cell); - // last is going to be reinitialized in the next getNext() call - last = null; - updateCurrent(); - return (current != null); + return TraceUtil.trace(() -> { + if (closed) { + return false; + } + if (cell == null) { + close(); + return false; + } + // restart the iterator from new key + iter = getIterator(cell); + // last is going to be reinitialized in the next getNext() call + last = null; + updateCurrent(); + return (current != null); + }, "SegmentScanner.seek"); } protected Iterator getIterator(Cell cell) { @@ -136,19 +141,21 @@ protected Iterator getIterator(Cell cell) { */ @Override public boolean reseek(Cell cell) throws IOException { - if (closed) { - return false; - } - /* - * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This code - * is executed concurrently with flush and puts, without locks. The ideal implementation for - * performance would use the sub skip list implicitly pointed by the iterator. Unfortunately the - * Java API does not offer a method to get it. So we remember the last keys we iterated to and - * restore the reseeked set to at least that point. - */ - iter = getIterator(getHighest(cell, last)); - updateCurrent(); - return (current != null); + return TraceUtil.trace(() -> { + if (closed) { + return false; + } + /* + * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This + * code is executed concurrently with flush and puts, without locks. The ideal implementation + * for performance would use the sub skip list implicitly pointed by the iterator. + * Unfortunately the Java API does not offer a method to get it. So we remember the last keys + * we iterated to and restore the reseeked set to at least that point. + */ + iter = getIterator(getHighest(cell, last)); + updateCurrent(); + return (current != null); + }, "SegmentScanner.reseek"); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index ce2a3d6f249a..0a3125401793 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -35,7 +40,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -186,77 +193,113 @@ public Cell peek() { @Override public Cell next() throws IOException { - Cell retKey = cur; + return TraceUtil.trace(() -> { + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder builder = Attributes.builder(); + new HFileContextAttributesBuilderConsumer(hfs.getReader().getFileContext()).accept(builder); + span.setAllAttributes(builder.build()); + } + Cell retKey = cur; - try { - // only seek if we aren't at the end. cur == null implies 'end'. - if (cur != null) { - hfs.next(); - setCurrentCell(hfs.getCell()); - if (hasMVCCInfo || this.reader.isBulkLoaded()) { - skipKVsNewerThanReadpoint(); + try { + // only seek if we aren't at the end. cur == null implies 'end'. + if (cur != null) { + hfs.next(); + setCurrentCell(hfs.getCell()); + if (hasMVCCInfo || this.reader.isBulkLoaded()) { + skipKVsNewerThanReadpoint(); + } } + } catch (FileNotFoundException e) { + throw e; + } catch (IOException e) { + throw new IOException("Could not iterate " + this, e); } - } catch (FileNotFoundException e) { - throw e; - } catch (IOException e) { - throw new IOException("Could not iterate " + this, e); - } - return retKey; + return retKey; + }, "StoreFileScanner.next"); } @Override public boolean seek(Cell key) throws IOException { - if (seekCount != null) seekCount.increment(); + final Span span = TraceUtil.createSpan("StoreFileScanner.seek"); + if (span.isRecording()) { + final AttributesBuilder builder = Attributes.builder(); + new HFileContextAttributesBuilderConsumer(hfs.getReader().getFileContext()).accept(builder); + span.setAllAttributes(builder.build()); + } + + try (Scope ignored = span.makeCurrent()) { + if (seekCount != null) seekCount.increment(); - try { try { if (!seekAtOrAfter(hfs, key)) { this.cur = null; + span.setStatus(StatusCode.OK); return false; } setCurrentCell(hfs.getCell()); if (!hasMVCCInfo && this.reader.isBulkLoaded()) { + span.setStatus(StatusCode.OK); return skipKVsNewerThanReadpoint(); } else { + span.setStatus(StatusCode.OK); return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); } } finally { realSeekDone = true; } } catch (FileNotFoundException e) { + TraceUtil.setError(span, e); throw e; } catch (IOException ioe) { - throw new IOException("Could not seek " + this + " to key " + key, ioe); + final IOException e = new IOException("Could not seek " + this + " to key " + key, ioe); + TraceUtil.setError(span, e); + throw e; + } finally { + span.end(); } } @Override public boolean reseek(Cell key) throws IOException { - if (seekCount != null) seekCount.increment(); - - try { + final Span span = TraceUtil.createSpan("StoreFileScanner.reseek"); + if (span.isRecording()) { + final AttributesBuilder builder = Attributes.builder(); + new HFileContextAttributesBuilderConsumer(hfs.getReader().getFileContext()).accept(builder); + span.setAllAttributes(builder.build()); + } + try (Scope ignored = span.makeCurrent()) { + if (seekCount != null) seekCount.increment(); try { if (!reseekAtOrAfter(hfs, key)) { this.cur = null; + span.setStatus(StatusCode.OK); return false; } setCurrentCell(hfs.getCell()); if (!hasMVCCInfo && this.reader.isBulkLoaded()) { + span.setStatus(StatusCode.OK); return skipKVsNewerThanReadpoint(); } else { + span.setStatus(StatusCode.OK); return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); } } finally { realSeekDone = true; } } catch (FileNotFoundException e) { + TraceUtil.setError(span, e); throw e; } catch (IOException ioe) { - throw new IOException("Could not reseek " + this + " to key " + key, ioe); + final IOException e = new IOException("Could not reseek " + this + " to key " + key, ioe); + TraceUtil.setError(span, e); + throw e; + } finally { + span.end(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 8e6d7fd00a99..3cedf15ac9a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -17,9 +17,17 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.COLUMN_FAMILY_NAME_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.QUERY_MATCHER_MATCH_CODE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; @@ -523,6 +531,8 @@ private void close(boolean withDelayedScannersClose) { @Override public boolean seek(Cell key) throws IOException { + final Span span = Span.current(); + span.addEvent("StoreScanner.seek"); if (checkFlushed()) { reopenAfterFlush(); } @@ -614,6 +624,15 @@ public boolean next(List outResult, ScannerContext scannerContext) throws scannerContext.setLastPeekedCell(cell); topChanged = false; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); + final Span span = Span.current(); + if (span.isRecording()) { + final AttributesBuilder builder = Attributes.builder(); + builder.put(REGION_NAMES_KEY, + Collections.singletonList(store.getRegionInfo().getRegionNameAsString())); + builder.put(COLUMN_FAMILY_NAME_KEY, store.getColumnFamilyName()); + builder.put(QUERY_MATCHER_MATCH_CODE_KEY, qcode.name()); + span.addEvent("ScanQueryMatcher.match"); + } switch (qcode) { case INCLUDE: case INCLUDE_AND_SEEK_NEXT_ROW: @@ -1075,6 +1094,8 @@ protected boolean seekAsDirection(Cell kv) throws IOException { @Override public boolean reseek(Cell kv) throws IOException { + final Span span = Span.current(); + span.addEvent("StoreScanner.reseek"); if (checkFlushed()) { reopenAfterFlush(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java index 77f4dcee7c9c..1cbd08b56b57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.querymatcher; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.QUERY_MATCHER_DELETE_CODE_KEY; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.Iterator; import java.util.NavigableSet; @@ -204,6 +208,11 @@ protected final MatchCode checkDeleted(DeleteTracker deletes, Cell cell) { } // MvccSensitiveTracker always need check all cells to save some infos. DeleteResult deleteResult = deletes.isDeleted(cell); + final Span span = Span.current(); + if (span.isRecording()) { + span.addEvent("ScanQueryMatcher.checkDeleted", + Attributes.of(QUERY_MATCHER_DELETE_CODE_KEY, deleteResult.name())); + } switch (deleteResult) { case FAMILY_DELETED: case COLUMN_DELETED: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java index 6c3d002b0929..c3a671cdd7a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.querymatcher; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.QUERY_MATCHER_FILTER_CODE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.QUERY_MATCHER_MATCH_CODE_KEY; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; @@ -122,6 +127,7 @@ public void beforeShipped() throws IOException { protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte) throws IOException { + final Span span = Span.current(); int tsCmp = tr.compare(timestamp); if (tsCmp > 0) { return MatchCode.SKIP; @@ -131,6 +137,10 @@ protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte) } // STEP 1: Check if the column is part of the requested columns MatchCode matchCode = columns.checkColumn(cell, typeByte); + if (span.isRecording()) { + span.addEvent("UserScanQueryMatcher.matchColumn column_hint", + Attributes.of(QUERY_MATCHER_MATCH_CODE_KEY, matchCode.name())); + } if (matchCode != MatchCode.INCLUDE) { return matchCode; } @@ -139,6 +149,10 @@ protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte) * INCLUDE, INCLUDE_AND_SEEK_NEXT_COL, or INCLUDE_AND_SEEK_NEXT_ROW. */ matchCode = columns.checkVersions(cell, timestamp, typeByte, false); + if (span.isRecording()) { + span.addEvent("UserScanQueryMatcher.matchColumn version_hint", + Attributes.of(QUERY_MATCHER_MATCH_CODE_KEY, matchCode.name())); + } switch (matchCode) { case SKIP: return MatchCode.SKIP; @@ -151,9 +165,15 @@ protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte) break; } - return filter == null - ? matchCode - : mergeFilterResponse(cell, matchCode, filter.filterCell(cell)); + if (filter != null) { + ReturnCode filterResponse = filter.filterCell(cell); + if (span.isRecording()) { + span.addEvent("UserScanQueryMatcher.matchColumn filter_hint", + Attributes.of(QUERY_MATCHER_FILTER_CODE_KEY, filterResponse.name())); + } + return mergeFilterResponse(cell, matchCode, filterResponse); + } + return matchCode; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index efc66111a9f5..cf61d574e389 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,11 +38,17 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,6 +57,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers; +import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -52,6 +70,7 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.ClassRule; @@ -74,6 +93,9 @@ public class TestBlockIOUtils { @Rule public ExpectedException exception = ExpectedException.none(); + @Rule + public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final int NUM_TEST_BLOCKS = 2; @@ -93,20 +115,29 @@ public void testIsByteBufferReadable() throws IOException { @Test public void testReadFully() throws IOException { - FileSystem fs = TEST_UTIL.getTestFileSystem(); - Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); - String s = "hello world"; - try (FSDataOutputStream out = fs.create(p)) { - out.writeBytes(s); - } - ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); - try (FSDataInputStream in = fs.open(p)) { - BlockIOUtils.readFully(buf, in, 11); - } - buf.rewind(); - byte[] heapBuf = new byte[s.length()]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes(s), heapBuf); + TraceUtil.trace(() -> { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); + String s = "hello world"; + try (FSDataOutputStream out = fs.create(p)) { + out.writeBytes(s); + } + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); + try (FSDataInputStream in = fs.open(p)) { + BlockIOUtils.readFully(buf, in, 11); + } + buf.rewind(); + byte[] heapBuf = new byte[s.length()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes(s), heapBuf); + }, testName.getMethodName()); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11)))))))); } @Test @@ -214,33 +245,69 @@ public void testReadWithExtra() throws IOException { try (FSDataOutputStream out = fs.create(p)) { out.writeBytes(s); } - ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); - try (FSDataInputStream in = fs.open(p)) { - assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); + + Span span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); + } + buf.rewind(); + byte[] heapBuf = new byte[buf.capacity()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); + } finally { + span.end(); } - buf.rewind(); - byte[] heapBuf = new byte[buf.capacity()]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); - - buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); - try (FSDataInputStream in = fs.open(p)) { - assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L)))))))); + + otelRule.clearSpans(); + span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = + new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); + } + buf.rewind(); + byte[] heapBuf = new byte[11]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); + } finally { + span.end(); } - buf.rewind(); - heapBuf = new byte[11]; - buf.get(heapBuf, 0, heapBuf.length); - assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); - - buf.position(0).limit(12); - try (FSDataInputStream in = fs.open(p)) { - try { + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); + + otelRule.clearSpans(); + span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + ByteBuff buf = + new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); + buf.position(0).limit(12); + exception.expect(IOException.class); + try (FSDataInputStream in = fs.open(p)) { BlockIOUtils.readWithExtra(buf, in, 12, 0); fail("Should only read 11 bytes"); - } catch (IOException e) { - } + } finally { + span.end(); } + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); } @Test @@ -255,11 +322,20 @@ public void testPositionalReadNoExtra() throws IOException { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -275,12 +351,21 @@ public void testPositionalReadShortReadOfNecessaryBytes() throws IOException { when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 5); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -295,11 +380,20 @@ public void testPositionalReadExtraSucceeded() throws IOException { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -314,11 +408,20 @@ public void testPositionalReadExtraFailed() throws IOException { FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertFalse("Expect false return when reading extra bytes fails", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen)))))))); } @Test @@ -334,12 +437,21 @@ public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes() throws when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 10)).thenReturn(10); when(in.hasCapability(anyString())).thenReturn(false); - boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + boolean ret = + TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), + testName.getMethodName()); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 10); verify(in).hasCapability(anyString()); verifyNoMoreInteractions(in); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); } @Test @@ -357,7 +469,23 @@ public void testPositionalReadPrematureEOF() throws IOException { when(in.hasCapability(anyString())).thenReturn(false); exception.expect(IOException.class); exception.expectMessage("EOF"); - BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + Span span = TraceUtil.createSpan(testName.getMethodName()); + try (Scope ignored = span.makeCurrent()) { + BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); + span.setStatus(StatusCode.OK); + } catch (IOException e) { + TraceUtil.setError(span, e); + throw e; + } finally { + span.end(); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate>( + otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded())))); + assertThat(otelRule.getSpans(), + hasItems(allOf(hasName(testName.getMethodName()), + hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), + hasAttributes(AttributesMatchers.isEmpty()))))))); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 0657799a5f9d..1e4f675b2382 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -17,12 +17,23 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; +import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,21 +41,28 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ IOTests.class, MediumTests.class }) public class TestPrefetch { + private static final Logger LOG = LoggerFactory.getLogger(TestPrefetch.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -61,6 +79,9 @@ public class TestPrefetch { private FileSystem fs; private BlockCache blockCache; + @Rule + public OpenTelemetryRule otelRule = OpenTelemetryRule.create(); + @Before public void setUp() throws IOException { conf = TEST_UTIL.getConfiguration(); @@ -82,8 +103,23 @@ public void testPrefetchSetInHCDWorks() { @Test public void testPrefetch() throws Exception { - Path storeFile = writeStoreFile("TestPrefetch"); - readStoreFile(storeFile); + TraceUtil.trace(() -> { + Path storeFile = writeStoreFile("TestPrefetch"); + readStoreFile(storeFile); + }, "testPrefetch"); + + TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<>(otelRule::getSpans, + hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request")))); + final List spans = otelRule.getSpans(); + if (LOG.isDebugEnabled()) { + StringTraceRenderer renderer = new StringTraceRenderer(spans); + renderer.render(LOG::debug); + } + + final SpanData testSpan = spans.stream().filter(hasName("testPrefetch")::matches).findFirst() + .orElseThrow(AssertionError::new); + assertThat("prefetch spans happen on their own threads, detached from file open.", spans, + hasItem(allOf(hasName("PrefetchExecutor.request"), not(hasParentSpanId(testSpan))))); } @Test