Skip to content

Commit bfb169f

Browse files
comnetworkndimiduk
authored andcommitted
HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (apache#1970)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 5d5b156 commit bfb169f

14 files changed

Lines changed: 123 additions & 24 deletions

File tree

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
8989
*/
9090
@Override
9191
void close() throws IOException;
92+
93+
/**
94+
* @return byteSize success synced to underlying filesystem.
95+
*/
96+
long getSyncedLength();
9297
}

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,4 +574,9 @@ public void close() throws IOException {
574574
public boolean isBroken() {
575575
return state == State.BROKEN;
576576
}
577+
578+
@Override
579+
public long getSyncedLength() {
580+
return this.ackedBlockLength;
581+
}
577582
}

hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
4545

4646
private final ExecutorService executor;
4747

48+
private volatile long syncedLength = 0;
49+
4850
public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
4951
this.out = out;
5052
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
@@ -91,7 +93,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
9193
out.hflush();
9294
}
9395
}
94-
future.complete(out.getPos());
96+
long pos = out.getPos();
97+
if(pos > this.syncedLength) {
98+
this.syncedLength = pos;
99+
}
100+
future.complete(pos);
95101
} catch (IOException e) {
96102
future.completeExceptionally(e);
97103
return;
@@ -124,4 +130,9 @@ public void close() throws IOException {
124130
public boolean isBroken() {
125131
return false;
126132
}
133+
134+
@Override
135+
public long getSyncedLength() {
136+
return this.syncedLength;
137+
}
127138
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
10611061
Path currentPath = getOldPath();
10621062
if (path.equals(currentPath)) {
10631063
W writer = this.writer;
1064-
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
1064+
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
10651065
} else {
10661066
return OptionalLong.empty();
10671067
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws
231231
protected OutputStream getOutputStreamForCellEncoder() {
232232
return asyncOutputWrapper;
233233
}
234+
235+
@Override
236+
public long getSyncedLength() {
237+
return this.output.getSyncedLength();
238+
}
234239
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919

2020
import java.io.IOException;
2121
import java.io.OutputStream;
22+
import java.util.concurrent.atomic.AtomicLong;
23+
2224
import org.apache.hadoop.fs.FSDataOutputStream;
2325
import org.apache.hadoop.fs.FileSystem;
2426
import org.apache.hadoop.fs.Path;
2527
import org.apache.hadoop.fs.StreamCapabilities;
2628
import org.apache.hadoop.hbase.Cell;
29+
import org.apache.hadoop.hbase.util.AtomicUtils;
2730
import org.apache.hadoop.hbase.util.CommonFSUtils;
2831
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
2932
import org.apache.hadoop.hbase.wal.FSHLogProvider;
@@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
4649

4750
protected FSDataOutputStream output;
4851

52+
private final AtomicLong syncedLength = new AtomicLong(0);
53+
4954
@Override
5055
public void append(Entry entry) throws IOException {
5156
entry.getKey().getBuilder(compressor).
@@ -85,6 +90,12 @@ public void sync(boolean forceSync) throws IOException {
8590
} else {
8691
fsdos.hflush();
8792
}
93+
AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
94+
}
95+
96+
@Override
97+
public long getSyncedLength() {
98+
return this.syncedLength.get();
8899
}
89100

90101
public FSDataOutputStream getStream() {

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import org.apache.hadoop.conf.Configuration;
2727
import org.apache.hadoop.hbase.client.RegionInfo;
28+
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
2829
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
2930
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
3031
import org.apache.yetus.audience.InterfaceAudience;
@@ -74,6 +75,22 @@ public interface WALProvider {
7475

7576
interface WriterBase extends Closeable {
7677
long getLength();
78+
/**
79+
* NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
80+
* considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
81+
* according to the visibility guarantee of HDFS, the data will be available immediately
82+
* when arriving at DN since all the DNs will be considered as the last one in pipeline.
83+
* This means replication may read uncommitted data and replicate it to the remote cluster
84+
* and cause data inconsistency.
85+
* The method {@link WriterBase#getLength} may return length which just in hdfs client
86+
* buffer and not successfully synced to HDFS, so we use this method to return the length
87+
* successfully synced to HDFS and replication thread could only read writing WAL file
88+
* limited by this length.
89+
* see also HBASE-14004 and this document for more details:
90+
* https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
91+
* @return byteSize successfully synced to underlying filesystem.
92+
*/
93+
long getSyncedLength();
7794
}
7895

7996
// Writers are used internally. Users outside of the WAL should be relying on the

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -130,35 +130,40 @@ public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOExce
130130
@Override
131131
protected Writer createWriterInstance(Path path) throws IOException {
132132
final Writer w = super.createWriterInstance(path);
133-
return new Writer() {
134-
@Override
135-
public void close() throws IOException {
136-
w.close();
137-
}
133+
return new Writer() {
134+
@Override
135+
public void close() throws IOException {
136+
w.close();
137+
}
138138

139-
@Override
140-
public void sync(boolean forceSync) throws IOException {
141-
if (throwSyncException) {
142-
throw new IOException("FAKE! Failed to replace a bad datanode...");
143-
}
144-
w.sync(forceSync);
139+
@Override
140+
public void sync(boolean forceSync) throws IOException {
141+
if (throwSyncException) {
142+
throw new IOException("FAKE! Failed to replace a bad datanode...");
145143
}
144+
w.sync(forceSync);
145+
}
146146

147-
@Override
148-
public void append(Entry entry) throws IOException {
149-
if (throwAppendException) {
150-
throw new IOException("FAKE! Failed to replace a bad datanode...");
151-
}
152-
w.append(entry);
147+
@Override
148+
public void append(Entry entry) throws IOException {
149+
if (throwAppendException) {
150+
throw new IOException("FAKE! Failed to replace a bad datanode...");
153151
}
152+
w.append(entry);
153+
}
154154

155-
@Override
156-
public long getLength() {
157-
return w.getLength();
158-
}
159-
};
155+
@Override
156+
public long getLength() {
157+
return w.getLength();
160158
}
159+
160+
@Override
161+
public long getSyncedLength() {
162+
return w.getSyncedLength();
163+
}
164+
};
161165
}
166+
}
162167

163168
// Make up mocked server and services.
164169
RegionServerServices services = mock(RegionServerServices.class);

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,11 @@ public void append(Entry entry) throws IOException {
12511251
public long getLength() {
12521252
return w.getLength();
12531253
}
1254+
1255+
@Override
1256+
public long getSyncedLength() {
1257+
return w.getSyncedLength();
1258+
}
12541259
};
12551260
}
12561261
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException {
190190
public long getLength() {
191191
return w.getLength();
192192
}
193+
194+
@Override
195+
public long getSyncedLength() {
196+
return w.getSyncedLength();
197+
}
193198
};
194199
}
195200
}
@@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException {
374379
public long getLength() {
375380
return w.getLength();
376381
}
382+
383+
@Override
384+
public long getSyncedLength() {
385+
return w.getSyncedLength();
386+
}
377387
};
378388
}
379389
}

0 commit comments

Comments
 (0)