Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;

/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,4 +575,9 @@ public void close() throws IOException {
public boolean isBroken() {
return state == State.BROKEN;
}

@Override
public long getSyncedLength() {
return this.ackedBlockLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {

private final ExecutorService executor;

private volatile long syncedLength = 0;

public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
Expand Down Expand Up @@ -91,7 +93,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
out.hflush();
}
}
future.complete(out.getPos());
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
Expand Down Expand Up @@ -124,4 +130,9 @@ public void close() throws IOException {
public boolean isBroken() {
return false;
}

@Override
public long getSyncedLength() {
return this.syncedLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,9 @@ protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public long getLength() {
return writers.get(0).getLength();
}

@Override
public long getSyncedLength() {
return writers.get(0).getSyncedLength();
}

@Override
public void close() throws IOException {
Exception error = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter

protected FSDataOutputStream output;

private volatile long syncedLength = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an AtomicLong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that using AtomicLong is unnecessary, because AtomicLong could not provide update if greater than semantics, so I used synchronized keyword here when updating the syncedLength for simplicity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use AtomicUtils.updateMax. It is a util class in hbase-common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used AtomicUtils to replace synchronized, thank you very much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why do we have AtomicUtils.updateMax? It seems getAndAccumulate is designed for this use case, i.e., syncedLength.getAndAccumulate(fsdos.getPos(), Math::max)


private final Object syncedLengthMonitor = new Object();

@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
Expand Down Expand Up @@ -85,6 +89,20 @@ public void sync(boolean forceSync) throws IOException {
} else {
fsdos.hflush();
}
this.updateSyncedLength(fsdos.getPos());
}

private void updateSyncedLength(long lengthToUse) {
synchronized (this.syncedLengthMonitor) {
if(lengthToUse > this.syncedLength) {
this.syncedLength = lengthToUse;
}
}
}

@Override
public long getSyncedLength() {
return this.syncedLength;
}

public FSDataOutputStream getStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public interface WALProvider {

interface WriterBase extends Closeable {
long getLength();
/**
* @return byteSize success synced to underlying filesystem.
*/
long getSyncedLength();
}

// Writers are used internally. Users outside of the WAL should be relying on the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down Expand Up @@ -374,6 +379,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return w.getLength();
}

@Override
public long getSyncedLength() {
return w.getSyncedLength();
}
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public long getLength() {
return writer.getLength();
}

@Override
public long getSyncedLength() {
return writer.getSyncedLength();
}

@Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter1.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter1.getSyncedLength();
}
};
log.setWriter(newWriter1);

Expand Down Expand Up @@ -231,6 +236,11 @@ public void append(Entry entry) throws IOException {
public long getLength() {
return oldWriter2.getLength();
}

@Override
public long getSyncedLength() {
return oldWriter2.getSyncedLength();
}
};
log.setWriter(newWriter2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public long getLength() {
return asyncWriter.getLength();
}

@Override
public long getSyncedLength() {
return asyncWriter.getSyncedLength();
}

@Override
public void append(Entry entry) throws IOException {
asyncWriter.append(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public long getLength() {
return localWriter.getLength();
}

@Override
public long getSyncedLength() {
return this.localWriter.getSyncedLength();
}

@Override
public void close() throws IOException {
Closeables.close(localWriter, true);
Expand Down