Skip to content

Commit 471538c

Browse files
committed
HBASE-23221 Polish the WAL interface after HBASE-23181 (#774)
Removes the closeRegion flag added by HBASE-23181 and instead relies on reading meta WALEdit content. Modified how qualifier is written when the meta WALEdit is for a RegionEventDescriptor so the 'type' is added to the qualifer so can figure type w/o having to deserialize protobuf value content: e.g. HBASE::REGION_EVENT::REGION_CLOSE Added doc on WALEdit and tried to formalize the 'meta' WALEdit type and how it works. Needs complete redo in part as suggested by HBASE-8457. Meantime, some doc and cleanup. Also changed the LogRoller constructor to remove redundant param. Because of constructor change, need to change also TestFailedAppendAndSync, TestWALLockup, TestAsyncFSWAL & WALPerformanceEvaluation.java Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Lijin Bin <binlijin@apache.org>
1 parent 9ab0489 commit 471538c

28 files changed

Lines changed: 315 additions & 238 deletions

hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ public static boolean matchingRow(final Cell left, final Cell right) {
756756

757757
/**
758758
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
759-
* {@link #matchingRows(Cell, byte[]))}
759+
* {@link #matchingRows(Cell, byte[])}
760760
*/
761761
@Deprecated
762762
public static boolean matchingRow(final Cell left, final byte[] buf) {
@@ -894,8 +894,15 @@ public static boolean matchingQualifier(final Cell left, final byte[] buf, final
894894
}
895895

896896
public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
897-
if (!matchingFamily(left, fam)) return false;
898-
return matchingQualifier(left, qual);
897+
return matchingFamily(left, fam) && matchingQualifier(left, qual);
898+
}
899+
900+
/**
901+
* @return True if matching column family and the qualifier starts with <code>qual</code>
902+
*/
903+
public static boolean matchingColumnFamilyAndQualifierPrefix(final Cell left, final byte[] fam,
904+
final byte[] qual) {
905+
return matchingFamily(left, fam) && PrivateCellUtil.qualifierStartsWith(left, qual);
899906
}
900907

901908
/**

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,10 @@ public void map(WALKey key, WALEdit value, Context context)
199199
Delete del = null;
200200
Cell lastCell = null;
201201
for (Cell cell : value.getCells()) {
202-
// filtering WAL meta entries
202+
// Filtering WAL meta marker entries.
203203
if (WALEdit.isMetaEditFamily(cell)) {
204204
continue;
205205
}
206-
207206
// Allow a subclass filter out this cell.
208207
if (filter(context, cell)) {
209208
// A WALEdit may contain multiple operations (HBASE-3584) and/or

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3566,7 +3566,7 @@ public List<ReplicationPeerDescription> listReplicationPeers(String regex)
35663566
if (cpHost != null) {
35673567
cpHost.preListReplicationPeers(regex);
35683568
}
3569-
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
3569+
LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
35703570
Pattern pattern = regex == null ? null : Pattern.compile(regex);
35713571
List<ReplicationPeerDescription> peers =
35723572
this.replicationPeerManager.listPeers(pattern);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,7 +1725,7 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {
17251725

17261726
status.setStatus("Writing region close event to WAL");
17271727
// Always write close marker to wal even for read only table. This is not a big problem as we
1728-
// do not write any data into the region.
1728+
// do not write any data into the region; it is just a meta edit in the WAL file.
17291729
if (!abort && wal != null && getRegionServerServices() != null &&
17301730
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
17311731
writeRegionCloseMarker(wal);
@@ -2691,7 +2691,8 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
26912691
}
26922692
}
26932693
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
2694-
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
2694+
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
2695+
storesToFlush.size() + "/" + stores.size() + " column families," +
26952696
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
26962697
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
26972698
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
@@ -4817,7 +4818,7 @@ private long replayRecoveredEdits(final Path edits,
48174818
for (Cell cell: val.getCells()) {
48184819
// Check this edit is for me. Also, guard against writing the special
48194820
// METACOLUMN info such as HBASE::CACHEFLUSH entries
4820-
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4821+
if (WALEdit.isMetaEditFamily(cell)) {
48214822
// if region names don't match, skipp replaying compaction marker
48224823
if (!checkRowWithinBoundary) {
48234824
//this is a special edit, we should handle it

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1926,7 +1926,7 @@ private void startServices() throws IOException {
19261926
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
19271927
}
19281928

1929-
this.walRoller = new LogRoller(this, this);
1929+
this.walRoller = new LogRoller(this);
19301930
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
19311931
this.procedureResultReporter = new RemoteProcedureResultReporter(this);
19321932

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
*
33
* Licensed to the Apache Software Foundation (ASF) under one
44
* or more contributor license agreements. See the NOTICE file
@@ -28,7 +28,6 @@
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.ConcurrentMap;
3030
import org.apache.hadoop.hbase.HConstants;
31-
import org.apache.hadoop.hbase.Server;
3231
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
3332
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
3433
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -56,7 +55,6 @@
5655
public class LogRoller extends HasThread implements Closeable {
5756
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
5857
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
59-
private final Server server;
6058
protected final RegionServerServices services;
6159
private volatile long lastRollTime = System.currentTimeMillis();
6260
// Period to roll log.
@@ -99,16 +97,14 @@ public void requestRollAll() {
9997
}
10098
}
10199

102-
/** @param server */
103-
public LogRoller(final Server server, final RegionServerServices services) {
100+
public LogRoller(RegionServerServices services) {
104101
super("LogRoller");
105-
this.server = server;
106102
this.services = services;
107-
this.rollPeriod = this.server.getConfiguration().
103+
this.rollPeriod = this.services.getConfiguration().
108104
getLong("hbase.regionserver.logroll.period", 3600000);
109-
this.threadWakeFrequency = this.server.getConfiguration().
105+
this.threadWakeFrequency = this.services.getConfiguration().
110106
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
111-
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
107+
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
112108
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
113109
}
114110

@@ -144,7 +140,7 @@ private void abort(String reason, Throwable cause) {
144140
LOG.warn("Failed to shutdown wal", e);
145141
}
146142
}
147-
server.abort(reason, cause);
143+
this.services.abort(reason, cause);
148144
}
149145

150146
@Override
@@ -156,7 +152,7 @@ public void run() {
156152
periodic = (now - this.lastRollTime) > this.rollPeriod;
157153
if (periodic) {
158154
// Time for periodic roll, fall through
159-
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
155+
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
160156
} else {
161157
synchronized (this) {
162158
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
@@ -183,9 +179,9 @@ public void run() {
183179
WAL wal = entry.getKey();
184180
// reset the flag in front to avoid missing roll request before we return from rollWriter.
185181
walNeedsRoll.put(wal, Boolean.FALSE);
186-
// Force the roll if the logroll.period is elapsed or if a roll was requested.
187-
// The returned value is an array of actual region names.
188-
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
182+
// Force the roll if the logroll.period is elapsed or if a roll was requested.
183+
// The returned value is an array of actual region names.
184+
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
189185
if (regionsToFlush != null) {
190186
for (byte[] r : regionsToFlush) {
191187
scheduleFlush(Bytes.toString(r));

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
261261
private static final class WalProps {
262262

263263
/**
264-
* Map the encoded region name to the highest sequence id. Contain all the regions it has
265-
* entries of
264+
* Map the encoded region name to the highest sequence id.
265+
* <p/>Contains all the regions it has an entry for.
266266
*/
267267
public final Map<byte[], Long> encodedName2HighestSequenceId;
268268

@@ -610,9 +610,9 @@ public int getNumLogFiles() {
610610
}
611611

612612
/**
613-
* If the number of un-archived WAL files is greater than maximum allowed, check the first
614-
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
615-
* archived.
613+
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
614+
* check the first (oldest) WAL, and return those regions which should be flushed so that
615+
* it can be let-go/'archived'.
616616
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
617617
*/
618618
byte[][] findRegionsToForceFlush() throws IOException {
@@ -888,10 +888,6 @@ public void close() throws IOException {
888888
/**
889889
* updates the sequence number of a specific store. depending on the flag: replaces current seq
890890
* number if the given seq id is bigger, or even if it is lower than existing one
891-
* @param encodedRegionName
892-
* @param familyName
893-
* @param sequenceid
894-
* @param onlyIfGreater
895891
*/
896892
@Override
897893
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
@@ -1015,7 +1011,7 @@ protected final void postSync(final long timeInNanos, final int handlerSyncs) {
10151011
}
10161012

10171013
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1018-
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
1014+
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
10191015
throws IOException {
10201016
if (this.closed) {
10211017
throw new IOException(
@@ -1029,7 +1025,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
10291025
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
10301026
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
10311027
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1032-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
1028+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
10331029
entry.stampRegionSequenceId(we);
10341030
ringBuffer.get(txid).load(entry);
10351031
} finally {
@@ -1067,13 +1063,13 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
10671063

10681064
@Override
10691065
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1070-
return append(info, key, edits, true, false);
1066+
return append(info, key, edits, true);
10711067
}
10721068

10731069
@Override
1074-
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
1070+
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
10751071
throws IOException {
1076-
return append(info, key, edits, false, closeRegion);
1072+
return append(info, key, edits, false);
10771073
}
10781074

10791075
/**
@@ -1097,17 +1093,17 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean
10971093
* @param key Modified by this call; we add to it this edits region edit/sequence id.
10981094
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
10991095
* sequence id that is after all currently appended edits.
1100-
* @param inMemstore Always true except for case where we are writing a region event marker, for
1101-
* example, a compaction completion record into the WAL; in this case the entry is just
1102-
* so we can finish an unfinished compaction -- it is not an edit for memstore.
1103-
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
1104-
* region on this region server. The WAL implementation should remove all the related
1105-
* stuff, for example, the sequence id accounting.
1096+
* @param inMemstore Always true except for case where we are writing a region event meta
1097+
* marker edit, for example, a compaction completion record into the WAL or noting a
1098+
* Region Open event. In these cases the entry is just so we can finish an unfinished
1099+
* compaction after a crash when the new Server reads the WAL on recovery, etc. These
1100+
* transition event 'Markers' do not go via the memstore. When memstore is false,
1101+
* we presume a Marker event edit.
11061102
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
11071103
* in it.
11081104
*/
1109-
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
1110-
boolean closeRegion) throws IOException;
1105+
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1106+
throws IOException;
11111107

11121108
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
11131109

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -564,9 +564,9 @@ private boolean shouldScheduleConsumer() {
564564
}
565565

566566
@Override
567-
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
568-
boolean closeRegion) throws IOException {
569-
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
567+
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
568+
throws IOException {
569+
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
570570
waitingConsumePayloads);
571571
if (shouldScheduleConsumer()) {
572572
consumeExecutor.execute(consumer);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,8 @@ protected void doShutdown() throws IOException {
436436

437437
@Override
438438
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
439-
final boolean inMemstore, boolean closeRegion) throws IOException {
440-
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
439+
final boolean inMemstore) throws IOException {
440+
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
441441
disruptor.getRingBuffer());
442442
}
443443

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
/**
4141
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
4242
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
43-
* region sequence id (we want to use this later, just before we write the WAL to ensure region
43+
* region sequenceid (we want to use this later, just before we write the WAL to ensure region
4444
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
4545
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
4646
* the assign of the region sequence id. See #stampRegionSequenceId().
@@ -50,25 +50,40 @@ class FSWALEntry extends Entry {
5050
// The below data members are denoted 'transient' just to highlight these are not persisted;
5151
// they are only in memory and held here while passing over the ring buffer.
5252
private final transient long txid;
53+
54+
/**
55+
* If false, means this is a meta edit written by the hbase system itself. It was not in
56+
* memstore. HBase uses these edit types to note in the log operational transitions such
57+
* as compactions, flushes, or region open/closes.
58+
*/
5359
private final transient boolean inMemstore;
60+
61+
/**
62+
* Set if this is a meta edit and it is of close region type.
63+
*/
5464
private final transient boolean closeRegion;
65+
5566
private final transient RegionInfo regionInfo;
5667
private final transient Set<byte[]> familyNames;
5768
private final transient ServerCall<?> rpcCall;
5869

70+
/**
71+
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
72+
* is a meta edit made by the hbase system itself and is for the WAL only.
73+
*/
5974
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
60-
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
75+
final boolean inMemstore, ServerCall<?> rpcCall) {
6176
super(key, edit);
6277
this.inMemstore = inMemstore;
63-
this.closeRegion = closeRegion;
78+
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
6479
this.regionInfo = regionInfo;
6580
this.txid = txid;
6681
if (inMemstore) {
6782
// construct familyNames here to reduce the work of log sinker.
6883
Set<byte[]> families = edit.getFamilies();
6984
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
7085
} else {
71-
this.familyNames = Collections.<byte[]> emptySet();
86+
this.familyNames = Collections.emptySet();
7287
}
7388
this.rpcCall = rpcCall;
7489
if (rpcCall != null) {
@@ -83,7 +98,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
8398
} else {
8499
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
85100
for (Cell cell: cells) {
86-
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
101+
if (!WALEdit.isMetaEditFamily(cell)) {
87102
set.add(CellUtil.cloneFamily(cell));
88103
}
89104
}
@@ -94,7 +109,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
94109
@Override
95110
public String toString() {
96111
return "sequence=" + this.txid + ", " + super.toString();
97-
};
112+
}
98113

99114
boolean isInMemStore() {
100115
return this.inMemstore;

0 commit comments

Comments
 (0)