Skip to content

Commit 8dbbd9e

Browse files
rmdmattinglybbeaudreault
authored andcommitted
HBASE-28175 Deep copy RpcLogDetails' param field (#5481)
Signed-off-by: Viraj Jasani <[email protected]> Signed-off-by: Bryan Beaudreault <[email protected]>
1 parent ce309a7 commit 8dbbd9e

2 files changed

Lines changed: 265 additions & 2 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import org.apache.commons.lang3.builder.ToStringBuilder;
2121
import org.apache.hadoop.hbase.ipc.RpcCall;
2222
import org.apache.yetus.audience.InterfaceAudience;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

26+
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
2427
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
2528

2629
/**
@@ -31,8 +34,10 @@ public class RpcLogDetails extends NamedQueuePayload {
3134

3235
public static final int SLOW_LOG_EVENT = 0;
3336

37+
private static final Logger LOG = LoggerFactory.getLogger(RpcLogDetails.class.getName());
38+
3439
private final RpcCall rpcCall;
35-
private final Message param;
40+
private Message param;
3641
private final String clientAddress;
3742
private final long responseSize;
3843
private final String className;
@@ -43,12 +48,21 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long
4348
String className, boolean isSlowLog, boolean isLargeLog) {
4449
super(SLOW_LOG_EVENT);
4550
this.rpcCall = rpcCall;
46-
this.param = param;
4751
this.clientAddress = clientAddress;
4852
this.responseSize = responseSize;
4953
this.className = className;
5054
this.isSlowLog = isSlowLog;
5155
this.isLargeLog = isLargeLog;
56+
57+
// We need to deep copy the message because the CodedInputStream may be
58+
// overwritten before this slow log is consumed. Such overwriting could
59+
// cause the slow log payload to be corrupt
60+
try {
61+
this.param = param.newBuilderForType().mergeFrom(param.toByteArray()).build();
62+
} catch (InvalidProtocolBufferException e) {
63+
LOG.error("Failed to parse protobuf for message {}", param, e);
64+
this.param = param;
65+
}
5266
}
5367

5468
public RpcCall getRpcCall() {
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.namequeues;
19+
20+
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
24+
import java.io.IOException;
25+
import java.net.InetAddress;
26+
import java.nio.ByteBuffer;
27+
import java.util.Arrays;
28+
import java.util.Optional;
29+
import org.apache.hadoop.hbase.CellScanner;
30+
import org.apache.hadoop.hbase.HBaseClassTestRule;
31+
import org.apache.hadoop.hbase.ipc.RpcCall;
32+
import org.apache.hadoop.hbase.ipc.RpcCallback;
33+
import org.apache.hadoop.hbase.security.User;
34+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
35+
import org.apache.hadoop.hbase.testclassification.SmallTests;
36+
import org.apache.hadoop.hbase.util.Bytes;
37+
import org.junit.ClassRule;
38+
import org.junit.Test;
39+
import org.junit.experimental.categories.Category;
40+
41+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
42+
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
43+
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
44+
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
45+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
46+
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
47+
48+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
49+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
50+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
51+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
52+
53+
@Category({ RegionServerTests.class, SmallTests.class })
54+
public class TestRpcLogDetails {
55+
56+
@ClassRule
57+
public static final HBaseClassTestRule CLASS_RULE =
58+
HBaseClassTestRule.forClass(TestRpcLogDetails.class);
59+
60+
private final ClientProtos.Scan scan =
61+
ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("abc")))
62+
.setStopRow(ByteString.copyFrom(Bytes.toBytes("xyz"))).build();
63+
private final ClientProtos.Scan otherScan =
64+
ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("def")))
65+
.setStopRow(ByteString.copyFrom(Bytes.toBytes("uvw"))).build();
66+
private final ClientProtos.ScanRequest scanRequest = ClientProtos.ScanRequest
67+
.newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(scan).build();
68+
private final ClientProtos.ScanRequest otherScanRequest = ClientProtos.ScanRequest
69+
.newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(otherScan).build();
70+
71+
@Test
72+
public void itDeepCopiesRpcLogDetailsParams() throws IOException {
73+
ByteBuffer buffer = ByteBuffer.allocate(scanRequest.toByteArray().length);
74+
CodedInputStream cis = UnsafeByteOperations.unsafeWrap(buffer).newCodedInput();
75+
cis.enableAliasing(true);
76+
buffer.put(scanRequest.toByteArray());
77+
Message.Builder messageBuilder = ClientProtos.ScanRequest.newBuilder();
78+
ProtobufUtil.mergeFrom(messageBuilder, cis, buffer.capacity());
79+
Message message = messageBuilder.build();
80+
RpcLogDetails rpcLogDetails =
81+
new RpcLogDetails(getRpcCall(message), message, null, 0L, null, true, false);
82+
83+
// log's scan should be equal
84+
ClientProtos.Scan logScan = ((ClientProtos.ScanRequest) rpcLogDetails.getParam()).getScan();
85+
assertEquals(logScan, scan);
86+
87+
// ensure we have a different byte array for testing
88+
assertFalse(Arrays.equals(scanRequest.toByteArray(), otherScanRequest.toByteArray()));
89+
90+
// corrupt the underlying buffer
91+
buffer.position(0);
92+
buffer.put(otherScanRequest.toByteArray(), 0, otherScanRequest.toByteArray().length);
93+
assertArrayEquals(otherScanRequest.toByteArray(), buffer.array());
94+
95+
// log scan should still be original scan
96+
assertEquals(logScan, scan);
97+
}
98+
99+
@SuppressWarnings("checkstyle:methodlength")
100+
private static RpcCall getRpcCall(Message message) {
101+
RpcCall rpcCall = new RpcCall() {
102+
@Override
103+
public BlockingService getService() {
104+
return null;
105+
}
106+
107+
@Override
108+
public Descriptors.MethodDescriptor getMethod() {
109+
return null;
110+
}
111+
112+
@Override
113+
public Message getParam() {
114+
return message;
115+
}
116+
117+
@Override
118+
public CellScanner getCellScanner() {
119+
return null;
120+
}
121+
122+
@Override
123+
public long getReceiveTime() {
124+
return 0;
125+
}
126+
127+
@Override
128+
public long getStartTime() {
129+
return 0;
130+
}
131+
132+
@Override
133+
public void setStartTime(long startTime) {
134+
}
135+
136+
@Override
137+
public int getTimeout() {
138+
return 0;
139+
}
140+
141+
@Override
142+
public int getPriority() {
143+
return 0;
144+
}
145+
146+
@Override
147+
public long getDeadline() {
148+
return 0;
149+
}
150+
151+
@Override
152+
public long getSize() {
153+
return 0;
154+
}
155+
156+
@Override
157+
public RPCProtos.RequestHeader getHeader() {
158+
return null;
159+
}
160+
161+
@Override
162+
public int getRemotePort() {
163+
return 0;
164+
}
165+
166+
@Override
167+
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
168+
String error) {
169+
}
170+
171+
@Override
172+
public void sendResponseIfReady() throws IOException {
173+
}
174+
175+
@Override
176+
public void cleanup() {
177+
}
178+
179+
@Override
180+
public String toShortString() {
181+
return null;
182+
}
183+
184+
@Override
185+
public long disconnectSince() {
186+
return 0;
187+
}
188+
189+
@Override
190+
public boolean isClientCellBlockSupported() {
191+
return false;
192+
}
193+
194+
@Override
195+
public Optional<User> getRequestUser() {
196+
return null;
197+
}
198+
199+
@Override
200+
public InetAddress getRemoteAddress() {
201+
return null;
202+
}
203+
204+
@Override
205+
public HBaseProtos.VersionInfo getClientVersionInfo() {
206+
return null;
207+
}
208+
209+
@Override
210+
public void setCallBack(RpcCallback callback) {
211+
}
212+
213+
@Override
214+
public boolean isRetryImmediatelySupported() {
215+
return false;
216+
}
217+
218+
@Override
219+
public long getResponseCellSize() {
220+
return 0;
221+
}
222+
223+
@Override
224+
public void incrementResponseCellSize(long cellSize) {
225+
}
226+
227+
@Override
228+
public long getResponseBlockSize() {
229+
return 0;
230+
}
231+
232+
@Override
233+
public void incrementResponseBlockSize(long blockSize) {
234+
235+
}
236+
237+
@Override
238+
public long getResponseExceptionSize() {
239+
return 0;
240+
}
241+
242+
@Override
243+
public void incrementResponseExceptionSize(long exceptionSize) {
244+
}
245+
};
246+
return rpcCall;
247+
}
248+
249+
}

0 commit comments

Comments
 (0)