Skip to content

Commit 7b2c3f6

Browse files
committed
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.ASYNC_WAL is used (#437)
Signed-off-by: Zheng Hu <[email protected]>
1 parent be4cfa3 commit 7b2c3f6

File tree

11 files changed

+379
-33
lines changed

11 files changed

+379
-33
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Optional;
26-
26+
import java.util.concurrent.atomic.AtomicInteger;
2727
import org.apache.hadoop.hbase.CellScanner;
2828
import org.apache.hadoop.hbase.DoNotRetryIOException;
2929
import org.apache.yetus.audience.InterfaceAudience;
@@ -51,7 +51,7 @@
5151
* the result.
5252
*/
5353
@InterfaceAudience.Private
54-
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
54+
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
5555

5656
protected final int id; // the client's call id
5757
protected final BlockingService service;
@@ -91,8 +91,14 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
9191
private long exceptionSize = 0;
9292
private final boolean retryImmediatelySupported;
9393

94-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
95-
justification="Can't figure why this complaint is happening... see below")
94+
// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
95+
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
96+
// why we can not use a general reference counting is that, we may call cleanup multiple times in
97+
// the current implementation. We should fix this in the future.
98+
private final AtomicInteger reference = new AtomicInteger(0b01);
99+
100+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
101+
justification = "Can't figure why this complaint is happening... see below")
96102
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
97103
Message param, CellScanner cellScanner, T connection, long size,
98104
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
@@ -141,14 +147,43 @@ public void done() {
141147
cleanup();
142148
}
143149

150+
private void release(int mask) {
151+
for (;;) {
152+
int ref = reference.get();
153+
if ((ref & mask) == 0) {
154+
return;
155+
}
156+
int nextRef = ref & (~mask);
157+
if (reference.compareAndSet(ref, nextRef)) {
158+
if (nextRef == 0) {
159+
if (this.reqCleanup != null) {
160+
this.reqCleanup.run();
161+
}
162+
}
163+
return;
164+
}
165+
}
166+
}
167+
144168
@Override
145169
public void cleanup() {
146-
if (this.reqCleanup != null) {
147-
this.reqCleanup.run();
148-
this.reqCleanup = null;
170+
release(0b01);
171+
}
172+
173+
public void retainByWAL() {
174+
for (;;) {
175+
int ref = reference.get();
176+
int nextRef = ref | 0b10;
177+
if (reference.compareAndSet(ref, nextRef)) {
178+
return;
179+
}
149180
}
150181
}
151182

183+
public void releaseByWAL() {
184+
release(0b10);
185+
}
186+
152187
@Override
153188
public String toString() {
154189
return toShortString() + " param: " +

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.apache.hadoop.hbase.client.RegionInfo;
5757
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
5858
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
59+
import org.apache.hadoop.hbase.ipc.RpcServer;
60+
import org.apache.hadoop.hbase.ipc.ServerCall;
5961
import org.apache.hadoop.hbase.log.HBaseMarkers;
6062
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
6163
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -894,7 +896,7 @@ boolean isUnflushedEntries() {
894896
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
895897
*/
896898
@VisibleForTesting
897-
void atHeadOfRingBufferEventHandlerAppend() {
899+
protected void atHeadOfRingBufferEventHandlerAppend() {
898900
// Noop
899901
}
900902

@@ -970,8 +972,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
970972
txidHolder.setValue(ringBuffer.next());
971973
});
972974
long txid = txidHolder.longValue();
975+
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
976+
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
973977
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
974-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
978+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
975979
entry.stampRegionSequenceId(we);
976980
ringBuffer.get(txid).load(entry);
977981
} finally {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
322322
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
323323
highestSyncedTxid.set(processedTxid);
324324
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
325-
if (iter.next().getTxid() <= processedTxid) {
325+
FSWALEntry entry = iter.next();
326+
if (entry.getTxid() <= processedTxid) {
327+
entry.release();
326328
iter.remove();
327329
} else {
328330
break;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.lmax.disruptor.TimeoutException;
2525
import com.lmax.disruptor.dsl.Disruptor;
2626
import com.lmax.disruptor.dsl.ProducerType;
27-
2827
import java.io.IOException;
2928
import java.io.OutputStream;
3029
import java.util.Arrays;
@@ -34,7 +33,6 @@
3433
import java.util.concurrent.LinkedBlockingQueue;
3534
import java.util.concurrent.TimeUnit;
3635
import java.util.concurrent.atomic.AtomicInteger;
37-
3836
import org.apache.hadoop.conf.Configuration;
3937
import org.apache.hadoop.fs.FSDataOutputStream;
4038
import org.apache.hadoop.fs.FileSystem;
@@ -58,6 +56,7 @@
5856
import org.apache.yetus.audience.InterfaceAudience;
5957
import org.slf4j.Logger;
6058
import org.slf4j.LoggerFactory;
59+
6160
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
6261

6362
/**
@@ -951,7 +950,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
951950
//TODO handle htrace API change, see HBASE-18895
952951
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
953952
try {
954-
955953
if (this.exception != null) {
956954
// Return to keep processing events coming off the ringbuffer
957955
return;
@@ -968,6 +966,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
968966
: new DamagedWALException("On sync", this.exception));
969967
// Return to keep processing events coming off the ringbuffer
970968
return;
969+
} finally {
970+
entry.release();
971971
}
972972
} else {
973973
// What is this if not an append or sync. Fail all up to this!!!

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

20-
import static java.util.stream.Collectors.toCollection;
21-
2220
import java.io.IOException;
2321
import java.util.Collections;
2422
import java.util.List;
23+
import java.util.Optional;
2524
import java.util.Set;
2625
import java.util.TreeSet;
27-
2826
import org.apache.hadoop.hbase.Cell;
29-
import org.apache.hadoop.hbase.CellComparator;
3027
import org.apache.hadoop.hbase.CellUtil;
3128
import org.apache.hadoop.hbase.PrivateCellUtil;
3229
import org.apache.hadoop.hbase.client.RegionInfo;
30+
import org.apache.hadoop.hbase.ipc.ServerCall;
3331
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
3432
import org.apache.hadoop.hbase.util.Bytes;
3533
import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
5654
private final transient boolean inMemstore;
5755
private final transient RegionInfo regionInfo;
5856
private final transient Set<byte[]> familyNames;
57+
private final transient Optional<ServerCall<?>> rpcCall;
5958

60-
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
61-
final RegionInfo regionInfo, final boolean inMemstore) {
59+
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
60+
final boolean inMemstore, ServerCall<?> rpcCall) {
6261
super(key, edit);
6362
this.inMemstore = inMemstore;
6463
this.regionInfo = regionInfo;
6564
this.txid = txid;
6665
if (inMemstore) {
6766
// construct familyNames here to reduce the work of log sinker.
68-
Set<byte []> families = edit.getFamilies();
69-
this.familyNames = families != null? families: collectFamilies(edit.getCells());
67+
Set<byte[]> families = edit.getFamilies();
68+
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
7069
} else {
71-
this.familyNames = Collections.<byte[]>emptySet();
70+
this.familyNames = Collections.<byte[]> emptySet();
71+
}
72+
this.rpcCall = Optional.ofNullable(rpcCall);
73+
if (rpcCall != null) {
74+
rpcCall.retainByWAL();
7275
}
7376
}
7477

@@ -77,12 +80,13 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
7780
if (CollectionUtils.isEmpty(cells)) {
7881
return Collections.emptySet();
7982
} else {
80-
return cells.stream()
81-
.filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
82-
.collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
83-
.stream()
84-
.map(CellUtil::cloneFamily)
85-
.collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
83+
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
84+
for (Cell cell: cells) {
85+
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
86+
set.add(CellUtil.cloneFamily(cell));
87+
}
88+
}
89+
return set;
8690
}
8791
}
8892

@@ -129,4 +133,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
129133
Set<byte[]> getFamilyNames() {
130134
return familyNames;
131135
}
136+
137+
void release() {
138+
rpcCall.ifPresent(ServerCall::releaseByWAL);
139+
}
132140
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,9 +1155,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
11551155
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
11561156
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
11571157
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
1158-
FSWALEntry entry =
1159-
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
1160-
rowName, family, ee, index), hri, true);
1158+
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
1159+
createWALEdit(rowName, family, ee, index), hri, true, null);
11611160
entry.stampRegionSequenceId(mvcc.begin());
11621161
return entry;
11631162
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
111111
suffix, GROUP, CHANNEL_CLASS) {
112112

113113
@Override
114-
void atHeadOfRingBufferEventHandlerAppend() {
114+
protected void atHeadOfRingBufferEventHandlerAppend() {
115115
action.run();
116116
super.atHeadOfRingBufferEventHandlerAppend();
117117
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
8787
suffix) {
8888

8989
@Override
90-
void atHeadOfRingBufferEventHandlerAppend() {
90+
protected void atHeadOfRingBufferEventHandlerAppend() {
9191
action.run();
9292
super.atHeadOfRingBufferEventHandlerAppend();
9393
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.hbase.HBaseClassTestRule;
26+
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
27+
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
28+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
29+
import org.apache.hadoop.hbase.testclassification.MediumTests;
30+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
31+
import org.apache.hadoop.hbase.util.CommonFSUtils;
32+
import org.apache.hadoop.hbase.util.Pair;
33+
import org.junit.AfterClass;
34+
import org.junit.BeforeClass;
35+
import org.junit.ClassRule;
36+
import org.junit.experimental.categories.Category;
37+
38+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
39+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
40+
41+
/**
42+
* Testcase for HBASE-22539
43+
*/
44+
@Category({ RegionServerTests.class, MediumTests.class })
45+
public class TestAsyncFSWALCorruptionDueToDanglingByteBuffer
46+
extends WALCorruptionDueToDanglingByteBufferTestBase {
47+
48+
@ClassRule
49+
public static final HBaseClassTestRule CLASS_RULE =
50+
HBaseClassTestRule.forClass(TestAsyncFSWALCorruptionDueToDanglingByteBuffer.class);
51+
52+
public static final class PauseWAL extends AsyncFSWAL {
53+
54+
public PauseWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
55+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
56+
String prefix, String suffix, EventLoopGroup eventLoopGroup,
57+
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
58+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
59+
eventLoopGroup, channelClass);
60+
}
61+
62+
@Override
63+
protected void atHeadOfRingBufferEventHandlerAppend() {
64+
if (ARRIVE != null) {
65+
ARRIVE.countDown();
66+
try {
67+
RESUME.await();
68+
} catch (InterruptedException e) {
69+
}
70+
}
71+
}
72+
}
73+
74+
public static final class PauseWALProvider extends AbstractFSWALProvider<PauseWAL> {
75+
76+
private EventLoopGroup eventLoopGroup;
77+
78+
private Class<? extends Channel> channelClass;
79+
80+
@Override
81+
protected PauseWAL createWAL() throws IOException {
82+
return new PauseWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
83+
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
84+
conf, listeners, true, logPrefix,
85+
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
86+
channelClass);
87+
}
88+
89+
@Override
90+
protected void doInit(Configuration conf) throws IOException {
91+
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
92+
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
93+
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
94+
channelClass = eventLoopGroupAndChannelClass.getSecond();
95+
}
96+
}
97+
98+
@BeforeClass
99+
public static void setUp() throws Exception {
100+
UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, PauseWALProvider.class,
101+
WALProvider.class);
102+
UTIL.startMiniCluster(1);
103+
UTIL.createTable(TABLE_NAME, CF);
104+
UTIL.waitTableAvailable(TABLE_NAME);
105+
}
106+
107+
@AfterClass
108+
public static void tearDown() throws Exception {
109+
UTIL.shutdownMiniCluster();
110+
}
111+
}

0 commit comments

Comments
 (0)