Skip to content

Commit ef7b9eb

Browse files
committed
HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (#762)
Signed-off-by: stack <stack@apache.org>
1 parent 457234c commit ef7b9eb

11 files changed

Lines changed: 206 additions & 99 deletions

File tree

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
4848
return new ImmutableByteArray(b);
4949
}
5050

51+
@Override
5152
public String toString() {
5253
return Bytes.toStringBinary(b);
5354
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2953,7 +2953,7 @@ protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask sta
29532953

29542954
// If we get to here, the HStores have been written.
29552955
if (wal != null) {
2956-
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2956+
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
29572957
}
29582958

29592959
// Record latest flush time

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,8 @@ public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyTo
499499
}
500500

501501
@Override
502-
public void completeCacheFlush(byte[] encodedRegionName) {
503-
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
502+
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
503+
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
504504
}
505505

506506
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,36 @@ Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> fam
351351
return lowestUnflushedInRegion;
352352
}
353353

354-
void completeCacheFlush(final byte[] encodedRegionName) {
354+
void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
355+
// This is a simple hack to avoid maxFlushedSeqId go backwards.
356+
// The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
357+
// to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
358+
// still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
359+
// then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
360+
// less than the current maxFlushedSeqId. And if next time we only flush the family with this
361+
// unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
362+
// This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
363+
// behavior in other area.
364+
// The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
365+
// maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
366+
// than or equal to it have been flushed, i.e, persistent to HFile, so set
367+
// lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
368+
// And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
369+
// means we have flushed all the stores so the seq id for actual data should be at least plus 1.
370+
// And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
371+
// lowestUnflushedSeqId - 1, so here let's plus the 1 back.
372+
Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
355373
synchronized (tieLock) {
356374
this.flushingSequenceIds.remove(encodedRegionName);
375+
Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
376+
if (unflushed == null) {
377+
return;
378+
}
379+
for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
380+
if (e.getValue().longValue() <= maxFlushedSeqId) {
381+
e.setValue(wrappedSeqId);
382+
}
383+
}
357384
}
358385
}
359386

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedF
224224
}
225225

226226
@Override
227-
public void completeCacheFlush(final byte[] encodedRegionName) {
227+
public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
228228
}
229229

230230
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ default void sync(long txid, boolean forceSync) throws IOException {
185185
* being flushed; in other words, this is effectively same as a flush of all of the region
186186
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
187187
* oldest/lowest outstanding edit.
188-
* @see #completeCacheFlush(byte[])
188+
* @see #completeCacheFlush(byte[], long)
189189
* @see #abortCacheFlush(byte[])
190190
*/
191191
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@@ -195,10 +195,12 @@ default void sync(long txid, boolean forceSync) throws IOException {
195195
/**
196196
* Complete the cache flush.
197197
* @param encodedRegionName Encoded region name.
198+
* @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
199+
* less that this sequence id.
198200
* @see #startCacheFlush(byte[], Set)
199201
* @see #abortCacheFlush(byte[])
200202
*/
201-
void completeCacheFlush(final byte[] encodedRegionName);
203+
void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
202204

203205
/**
204206
* Abort a cache flush. Call if the flush fails. Note that the only recovery

0 commit comments

Comments
 (0)