Skip to content

Commit 791ad84

Browse files
committed
HBASE-20461 Implement fsync for AsyncFSWAL (#947)
Signed-off-by: stack <stack@apache.org>
1 parent 683ce0c commit 791ad84

10 files changed

Lines changed: 254 additions & 74 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hbase.ipc.RpcServer;
6060
import org.apache.hadoop.hbase.ipc.ServerCall;
6161
import org.apache.hadoop.hbase.log.HBaseMarkers;
62+
import org.apache.hadoop.hbase.regionserver.HRegion;
6263
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
6364
import org.apache.hadoop.hbase.trace.TraceUtil;
6465
import org.apache.hadoop.hbase.util.Bytes;
@@ -188,6 +189,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
188189
*/
189190
protected final int maxLogs;
190191

192+
protected final boolean useHsync;
193+
191194
/**
192195
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
193196
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
@@ -429,6 +432,7 @@ protected SyncFuture initialValue() {
429432
}
430433
};
431434
this.implClassName = getClass().getSimpleName();
435+
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
432436
}
433437

434438
/**
@@ -867,8 +871,8 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
867871
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
868872
}
869873

870-
protected final SyncFuture getSyncFuture(long sequence) {
871-
return cachedSyncFutures.get().reset(sequence);
874+
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
875+
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
872876
}
873877

874878
protected boolean isLogRollRequested() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ private void sync(AsyncWriter writer) {
347347
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
348348
final long startTimeNs = System.nanoTime();
349349
final long epoch = (long) epochAndState >>> 2L;
350-
addListener(writer.sync(), (result, error) -> {
350+
addListener(writer.sync(useHsync), (result, error) -> {
351351
if (error != null) {
352352
syncFailed(epoch, error);
353353
} else {
@@ -570,11 +570,21 @@ protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inM
570570

571571
@Override
572572
public void sync() throws IOException {
573+
sync(useHsync);
574+
}
575+
576+
@Override
577+
public void sync(long txid) throws IOException {
578+
sync(txid, useHsync);
579+
}
580+
581+
@Override
582+
public void sync(boolean forceSync) throws IOException {
573583
try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
574584
long txid = waitingConsumePayloads.next();
575585
SyncFuture future;
576586
try {
577-
future = getSyncFuture(txid);
587+
future = getSyncFuture(txid, forceSync);
578588
RingBufferTruck truck = waitingConsumePayloads.get(txid);
579589
truck.load(future);
580590
} finally {
@@ -588,7 +598,7 @@ public void sync() throws IOException {
588598
}
589599

590600
@Override
591-
public void sync(long txid) throws IOException {
601+
public void sync(long txid, boolean forceSync) throws IOException {
592602
if (highestSyncedTxid.get() >= txid) {
593603
return;
594604
}
@@ -597,7 +607,7 @@ public void sync(long txid) throws IOException {
597607
long sequence = waitingConsumePayloads.next();
598608
SyncFuture future;
599609
try {
600-
future = getSyncFuture(txid);
610+
future = getSyncFuture(txid, forceSync);
601611
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
602612
truck.load(future);
603613
} finally {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ public void append(Entry entry) {
140140
}
141141

142142
@Override
143-
public CompletableFuture<Long> sync() {
144-
return output.flush(false);
143+
public CompletableFuture<Long> sync(boolean forceSync) {
144+
return output.flush(forceSync);
145145
}
146146

147147
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.hadoop.fs.Path;
4040
import org.apache.hadoop.hbase.HConstants;
4141
import org.apache.hadoop.hbase.client.RegionInfo;
42-
import org.apache.hadoop.hbase.regionserver.HRegion;
4342
import org.apache.hadoop.hbase.trace.TraceUtil;
4443
import org.apache.hadoop.hbase.util.Bytes;
4544
import org.apache.hadoop.hbase.util.ClassSize;
@@ -129,8 +128,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
129128
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
130129
private final int minTolerableReplication;
131130

132-
private final boolean useHsync;
133-
134131
// If live datanode count is lower than the default replicas value,
135132
// RollWriter will be triggered in each sync(So the RollWriter will be
136133
// triggered one by one in a short time). Using it as a workaround to slow
@@ -181,6 +178,7 @@ public void handleOnShutdownException(Throwable ex) {
181178
* @param logDir dir where wals are stored
182179
* @param conf configuration to use
183180
*/
181+
@VisibleForTesting
184182
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
185183
throws IOException {
186184
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
@@ -214,8 +212,6 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
214212
5);
215213
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
216214

217-
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
218-
219215
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
220216
// put on the ring buffer.
221217
String hostingThreadName = Thread.currentThread().getName();
@@ -693,7 +689,7 @@ private SyncFuture publishSyncOnRingBuffer(boolean forceSync) {
693689
@VisibleForTesting
694690
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
695691
// here we use ring buffer sequence as transaction id
696-
SyncFuture syncFuture = getSyncFuture(sequence).setForceSync(forceSync);
692+
SyncFuture syncFuture = getSyncFuture(sequence, forceSync);
697693
try {
698694
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
699695
truck.load(syncFuture);

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ interface Writer extends WriterBase {
8585
}
8686

8787
interface AsyncWriter extends WriterBase {
88-
CompletableFuture<Long> sync();
88+
CompletableFuture<Long> sync(boolean forceSync);
8989

9090
void append(WAL.Entry entry);
9191
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ public long getLength() {
156156
}
157157

158158
@Override
159-
public CompletableFuture<Long> sync() {
160-
CompletableFuture<Long> result = writer.sync();
159+
public CompletableFuture<Long> sync(boolean forceSync) {
160+
CompletableFuture<Long> result = writer.sync(forceSync);
161161
if (failedCount.incrementAndGet() < 1000) {
162162
CompletableFuture<Long> future = new CompletableFuture<>();
163163
FutureUtils.addListener(result,
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.wal;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.HConstants;
26+
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
27+
import org.apache.hadoop.hbase.testclassification.MediumTests;
28+
import org.junit.AfterClass;
29+
import org.junit.BeforeClass;
30+
import org.junit.ClassRule;
31+
import org.junit.experimental.categories.Category;
32+
33+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
34+
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
35+
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
36+
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
37+
38+
@Category({ RegionServerServices.class, MediumTests.class })
39+
public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> {
40+
41+
@ClassRule
42+
public static final HBaseClassTestRule CLASS_RULE =
43+
HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class);
44+
45+
private static NioEventLoopGroup GROUP;
46+
47+
@BeforeClass
48+
public static void setUpBeforeClass() {
49+
GROUP = new NioEventLoopGroup();
50+
}
51+
52+
@AfterClass
53+
public static void tearDownAfterClass() {
54+
GROUP.shutdownGracefully();
55+
}
56+
57+
@Override
58+
protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
59+
throws IOException {
60+
CustomAsyncFSWAL wal =
61+
new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class);
62+
wal.init();
63+
return wal;
64+
}
65+
66+
@Override
67+
protected void resetSyncFlag(CustomAsyncFSWAL wal) {
68+
wal.resetSyncFlag();
69+
}
70+
71+
@Override
72+
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
73+
return wal.getSyncFlag();
74+
}
75+
}
76+
77+
class CustomAsyncFSWAL extends AsyncFSWAL {
78+
private Boolean syncFlag;
79+
80+
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
81+
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
82+
throws FailedLogCloseException, IOException {
83+
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
84+
eventLoopGroup, channelClass);
85+
}
86+
87+
@Override
88+
public void sync(boolean forceSync) throws IOException {
89+
syncFlag = forceSync;
90+
super.sync(forceSync);
91+
}
92+
93+
@Override
94+
public void sync(long txid, boolean forceSync) throws IOException {
95+
syncFlag = forceSync;
96+
super.sync(txid, forceSync);
97+
}
98+
99+
void resetSyncFlag() {
100+
this.syncFlag = null;
101+
}
102+
103+
Boolean getSyncFlag() {
104+
return syncFlag;
105+
}
106+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected void append(AsyncWriter writer, Entry entry) throws IOException {
7777
@Override
7878
protected void sync(AsyncWriter writer) throws IOException {
7979
try {
80-
writer.sync().get();
80+
writer.sync(false).get();
8181
} catch (InterruptedException e) {
8282
throw new InterruptedIOException();
8383
} catch (ExecutionException e) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.wal;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.hadoop.hbase.HBaseClassTestRule;
25+
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
26+
import org.apache.hadoop.hbase.testclassification.MediumTests;
27+
import org.junit.ClassRule;
28+
import org.junit.experimental.categories.Category;
29+
30+
@Category({ RegionServerServices.class, MediumTests.class })
31+
public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
32+
33+
@ClassRule
34+
public static final HBaseClassTestRule CLASS_RULE =
35+
HBaseClassTestRule.forClass(TestFSHLogDurability.class);
36+
37+
@Override
38+
protected CustomFSHLog getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
39+
throws IOException {
40+
CustomFSHLog wal = new CustomFSHLog(fs, root, logDir, conf);
41+
wal.init();
42+
return wal;
43+
}
44+
45+
@Override
46+
protected void resetSyncFlag(CustomFSHLog wal) {
47+
wal.resetSyncFlag();
48+
}
49+
50+
@Override
51+
protected Boolean getSyncFlag(CustomFSHLog wal) {
52+
return wal.getSyncFlag();
53+
}
54+
}
55+
56+
class CustomFSHLog extends FSHLog {
57+
private Boolean syncFlag;
58+
59+
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
60+
throws IOException {
61+
super(fs, root, logDir, conf);
62+
}
63+
64+
@Override
65+
public void sync(boolean forceSync) throws IOException {
66+
syncFlag = forceSync;
67+
super.sync(forceSync);
68+
}
69+
70+
@Override
71+
public void sync(long txid, boolean forceSync) throws IOException {
72+
syncFlag = forceSync;
73+
super.sync(txid, forceSync);
74+
}
75+
76+
void resetSyncFlag() {
77+
this.syncFlag = null;
78+
}
79+
80+
Boolean getSyncFlag() {
81+
return syncFlag;
82+
}
83+
}

0 commit comments

Comments
 (0)