Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -211,6 +212,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
*/
protected final int maxLogs;

protected final boolean useHsync;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always get these confused? You can't just call it sync? Whats diff between hsync, sync, and fsync again? I forgot?


/**
* This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
* is held. We don't just use synchronized because that results in bogus and tedious findbugs
Expand Down Expand Up @@ -472,6 +475,7 @@ protected SyncFuture initialValue() {
this.implClassName = getClass().getSimpleName();
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
}

/**
Expand Down Expand Up @@ -937,8 +941,8 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
}

protected final SyncFuture getSyncFuture(long sequence) {
return cachedSyncFutures.get().reset(sequence);
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
}

protected boolean isLogRollRequested() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private void sync(AsyncWriter writer) {
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
final long startTimeNs = System.nanoTime();
final long epoch = (long) epochAndState >>> 2L;
addListener(writer.sync(), (result, error) -> {
addListener(writer.sync(useHsync), (result, error) -> {
if (error != null) {
syncFailed(epoch, error);
} else {
Expand Down Expand Up @@ -630,11 +630,21 @@ protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inM

@Override
public void sync() throws IOException {
sync(useHsync);
}

@Override
public void sync(long txid) throws IOException {
sync(txid, useHsync);
}

@Override
public void sync(boolean forceSync) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? The method is sync. We pass a forceSync flag. There are conditions under which we won't sync when we call sync such that we might need a force?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see you are just using what was there... moving it up into abstract wal that this wal impl uses. Ok. The doc of the sync types can be afterward.

try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
long txid = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid);
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(txid);
truck.load(future);
} finally {
Expand All @@ -648,7 +658,7 @@ public void sync() throws IOException {
}

@Override
public void sync(long txid) throws IOException {
public void sync(long txid, boolean forceSync) throws IOException {
if (highestSyncedTxid.get() >= txid) {
return;
}
Expand All @@ -657,7 +667,7 @@ public void sync(long txid) throws IOException {
long sequence = waitingConsumePayloads.next();
SyncFuture future;
try {
future = getSyncFuture(txid);
future = getSyncFuture(txid, forceSync);
RingBufferTruck truck = waitingConsumePayloads.get(sequence);
truck.load(future);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public void append(Entry entry) {
}

@Override
public CompletableFuture<Long> sync() {
return output.flush(false);
public CompletableFuture<Long> sync(boolean forceSync) {
return output.flush(forceSync);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void append(Entry entry) {
}

@Override
public CompletableFuture<Long> sync() {
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> future = new CompletableFuture<>();
AtomicInteger remaining = new AtomicInteger(writers.size());
writers.forEach(w -> addListener(w.sync(), (length, error) -> {
writers.forEach(w -> addListener(w.sync(forceSync), (length, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
Expand Down Expand Up @@ -134,8 +133,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
private final int minTolerableReplication;

private final boolean useHsync;

// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
Expand Down Expand Up @@ -186,6 +183,7 @@ public void handleOnShutdownException(Throwable ex) {
* @param logDir dir where wals are stored
* @param conf configuration to use
*/
@VisibleForTesting
public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
throws IOException {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
Expand Down Expand Up @@ -218,7 +216,6 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit",
5);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);

// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
Expand Down Expand Up @@ -715,7 +712,7 @@ private SyncFuture publishSyncOnRingBuffer(boolean forceSync) {
@VisibleForTesting
protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
// here we use ring buffer sequence as transaction id
SyncFuture syncFuture = getSyncFuture(sequence).setForceSync(forceSync);
SyncFuture syncFuture = getSyncFuture(sequence, forceSync);
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
truck.load(syncFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ interface Writer extends WriterBase {
}

interface AsyncWriter extends WriterBase {
CompletableFuture<Long> sync();
CompletableFuture<Long> sync(boolean forceSync);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and WALProvider is audience private so this change is good.


void append(WAL.Entry entry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public long getLength() {
}

@Override
public CompletableFuture<Long> sync() {
CompletableFuture<Long> result = writer.sync();
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
if (failedCount.incrementAndGet() < 1000) {
CompletableFuture<Long> future = new CompletableFuture<>();
FutureUtils.addListener(result,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;

@Category({ RegionServerServices.class, MediumTests.class })
public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncFSWAL> {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncFSWALDurability.class);

private static NioEventLoopGroup GROUP;

@BeforeClass
public static void setUpBeforeClass() {
GROUP = new NioEventLoopGroup();
}

@AfterClass
public static void tearDownAfterClass() {
GROUP.shutdownGracefully();
}

@Override
protected CustomAsyncFSWAL getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
CustomAsyncFSWAL wal =
new CustomAsyncFSWAL(fs, root, logDir, conf, GROUP, NioSocketChannel.class);
wal.init();
return wal;
}

@Override
protected void resetSyncFlag(CustomAsyncFSWAL wal) {
wal.resetSyncFlag();
}

@Override
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
return wal.getSyncFlag();
}
}

class CustomAsyncFSWAL extends AsyncFSWAL {
private Boolean syncFlag;

public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null,
eventLoopGroup, channelClass);
}

@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
}

@Override
public void sync(long txid, boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(txid, forceSync);
}

void resetSyncFlag() {
this.syncFlag = null;
}

Boolean getSyncFlag() {
return syncFlag;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

@Category({ RegionServerServices.class, MediumTests.class })
public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLogDurability.class);

@Override
protected CustomFSHLog getWAL(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
CustomFSHLog wal = new CustomFSHLog(fs, root, logDir, conf);
wal.init();
return wal;
}

@Override
protected void resetSyncFlag(CustomFSHLog wal) {
wal.resetSyncFlag();
}

@Override
protected Boolean getSyncFlag(CustomFSHLog wal) {
return wal.getSyncFlag();
}
}

class CustomFSHLog extends FSHLog {
private Boolean syncFlag;

public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
throws IOException {
super(fs, root, logDir, conf);
}

@Override
public void sync(boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(forceSync);
}

@Override
public void sync(long txid, boolean forceSync) throws IOException {
syncFlag = forceSync;
super.sync(txid, forceSync);
}

void resetSyncFlag() {
this.syncFlag = null;
}

Boolean getSyncFlag() {
return syncFlag;
}
}
Loading