diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java index 6af7c42c26dd..92405fbc06b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java @@ -82,7 +82,7 @@ public boolean equals(Object o) { } SlowLogParams that = (SlowLogParams) o; return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params) - .append("scan", scan).isEquals(); + .append(scan, that.scan).isEquals(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index eb35d886bbb0..235d82302d64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -21,7 +21,10 @@ import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.Message; /** @@ -32,8 +35,10 @@ public class RpcLogDetails extends NamedQueuePayload { public static final int SLOW_LOG_EVENT = 0; + private static final Logger LOG = LoggerFactory.getLogger(RpcLogDetails.class.getName()); + private final RpcCall rpcCall; - private final Message param; + private Message param; private final String clientAddress; private final long responseSize; private final long blockBytesScanned; @@ -47,7 +52,6 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) { super(SLOW_LOG_EVENT); this.rpcCall = rpcCall; - this.param = param; this.clientAddress = clientAddress; this.responseSize = responseSize; this.blockBytesScanned = blockBytesScanned; @@ -60,6 +64,16 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long // would result in corrupted attributes this.connectionAttributes = rpcCall.getConnectionAttributes(); this.requestAttributes = rpcCall.getRequestAttributes(); + + // We also need to deep copy the message because the CodedInputStream may be + // overwritten before this slow log is consumed. Such overwriting could + // cause the slow log payload to be corrupt + try { + this.param = param.newBuilderForType().mergeFrom(param.toByteArray()).build(); + } catch (InvalidProtocolBufferException e) { + LOG.error("Failed to parse protobuf for message {}", param, e); + this.param = param; + } } public RpcCall getRpcCall() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java new file mode 100644 index 000000000000..8a93f2d0ff54 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestRpcLogDetails.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestRpcLogDetails { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRpcLogDetails.class); + + private final ClientProtos.Scan scan = + ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("abc"))) + .setStopRow(ByteString.copyFrom(Bytes.toBytes("xyz"))).build(); + private final ClientProtos.Scan otherScan = + ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("def"))) + .setStopRow(ByteString.copyFrom(Bytes.toBytes("uvw"))).build(); + private final ClientProtos.ScanRequest scanRequest = ClientProtos.ScanRequest + .newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(scan).build(); + private final ClientProtos.ScanRequest otherScanRequest = ClientProtos.ScanRequest + .newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(otherScan).build(); + + @Test + public void itDeepCopiesRpcLogDetailsParams() throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(scanRequest.toByteArray().length); + CodedInputStream cis = UnsafeByteOperations.unsafeWrap(buffer).newCodedInput(); + cis.enableAliasing(true); + buffer.put(scanRequest.toByteArray()); + Message.Builder messageBuilder = ClientProtos.ScanRequest.newBuilder(); + ProtobufUtil.mergeFrom(messageBuilder, cis, buffer.capacity()); + Message message = messageBuilder.build(); + RpcLogDetails rpcLogDetails = + new RpcLogDetails(getRpcCall(message), message, null, 0L, 0L, null, true, false); + + // log's scan should be equal + ClientProtos.Scan logScan = ((ClientProtos.ScanRequest) rpcLogDetails.getParam()).getScan(); + assertEquals(logScan, scan); + + // ensure we have a different byte array for testing + assertFalse(Arrays.equals(scanRequest.toByteArray(), otherScanRequest.toByteArray())); + + // corrupt the underlying buffer + buffer.position(0); + buffer.put(otherScanRequest.toByteArray(), 0, otherScanRequest.toByteArray().length); + assertArrayEquals(otherScanRequest.toByteArray(), buffer.array()); + + // log scan should still be original scan + assertEquals(logScan, scan); + } + + @SuppressWarnings("checkstyle:methodlength") + private static RpcCall getRpcCall(Message message) { + RpcCall rpcCall = new RpcCall() { + @Override + public BlockingService getService() { + return null; + } + + @Override + public Descriptors.MethodDescriptor getMethod() { + return null; + } + + @Override + public Message getParam() { + return message; + } + + @Override + public CellScanner getCellScanner() { + return null; + } + + @Override + public long getReceiveTime() { + return 0; + } + + @Override + public long getStartTime() { + return 0; + } + + @Override + public void setStartTime(long startTime) { + } + + @Override + public int getTimeout() { + return 0; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public long getDeadline() { + return 0; + } + + @Override + public long getSize() { + return 0; + } + + @Override + public RPCProtos.RequestHeader getHeader() { + return null; + } + + @Override + public Map getConnectionAttributes() { + return Collections.emptyMap(); + } + + @Override + public Map getRequestAttributes() { + return Collections.emptyMap(); + } + + @Override + public byte[] getRequestAttribute(String key) { + return null; + } + + @Override + public int getRemotePort() { + return 0; + } + + @Override + public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, + String error) { + } + + @Override + public void sendResponseIfReady() throws IOException { + } + + @Override + public void cleanup() { + } + + @Override + public String toShortString() { + return null; + } + + @Override + public long disconnectSince() { + return 0; + } + + @Override + public boolean isClientCellBlockSupported() { + return false; + } + + @Override + public Optional getRequestUser() { + return null; + } + + @Override + public InetAddress getRemoteAddress() { + return null; + } + + @Override + public HBaseProtos.VersionInfo getClientVersionInfo() { + return null; + } + + @Override + public void setCallBack(RpcCallback callback) { + } + + @Override + public boolean isRetryImmediatelySupported() { + return false; + } + + @Override + public long getResponseCellSize() { + return 0; + } + + @Override + public void incrementResponseCellSize(long cellSize) { + } + + @Override + public long getBlockBytesScanned() { + return 0; + } + + @Override + public void incrementBlockBytesScanned(long blockSize) { + } + + @Override + public long getResponseExceptionSize() { + return 0; + } + + @Override + public void incrementResponseExceptionSize(long exceptionSize) { + } + }; + return rpcCall; + } + +}