Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,73 +206,127 @@ private void setCurrentPath(Path path) {

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private HasNext tryAdvanceEntry() {
if (reader == null) {
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath != null) {
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten =
walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
private HasNext prepareReader() {
if (reader != null) {
if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
return HasNext.RETRY;
}
} else {
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
return HasNext.YES;
}
} else if (state != null && state != WALTailingReader.State.NORMAL) {
// reset before reading
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, and try resetting next time
}
// try open next WAL file
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath == null) {
LOG.debug("No more WAL files in queue");
// no more files in queue, this could happen for recovered queue, or for a wal group of a
// sync replication peer which has already been transited to DA or S.
setCurrentPath(null);
return HasNext.NO;
}
setCurrentPath(nextPath);
// we need to test this prior to create the reader. If not, it is possible that, while
// opening the file, the file is still being written so its header is incomplete and we get
// a header EOF, but then while we test whether it is still being written, we have already
// flushed the data out and we consider it is not being written, and then we just skip over
// file, then we will lose the data written after opening...
boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
currentPositionOfEntry, beingWritten);
try {
reader = WALFactory.createTailingReader(fs, nextPath, conf,
currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
return HasNext.YES;
} catch (WALHeaderEOFException e) {
if (!eofAutoRecovery) {
// if we do not enable EOF auto recovery, just let the upper layer retry
// the replication will be stuck usually, and need to be fixed manually
return HasNext.RETRY;
}
// we hit EOF while reading the WAL header, usually this means we can just skip over this
// file, but we need to be careful that whether this file is still being written, if so we
// should retry instead of skipping.
LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
currentPositionOfEntry, e);
if (beingWritten) {
// just retry as the file is still being written, maybe next time we could read
// something
return HasNext.RETRY;
} else {
// the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
}
} catch (LeaseNotRecoveredException e) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + nextPath, e);
AbstractFSWALProvider.recoverLease(conf, nextPath);
return HasNext.RETRY;
} catch (IOException | NullPointerException e) {
// For why we need to catch NPE here, see HDFS-4380 for more details
LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
return HasNext.RETRY;
}
}

private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
// should not be written
assert !pair.getSecond();
if (!state.eof()) {
// we still have something to read after reopen, so return YES. Or there are something wrong
// and we need to retry
return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
}
// No data available after reopen
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}

private HasNext tryAdvanceEntry() {
HasNext prepared = prepareReader();
if (prepared != HasNext.YES) {
return prepared;
}

Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
Expand All @@ -292,46 +346,16 @@ private HasNext tryAdvanceEntry() {
return HasNext.RETRY_IMMEDIATELY;
case EOF_AND_RESET:
case EOF_AND_RESET_COMPRESSION:
if (!beingWritten) {
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
// just leave the state as is, next time we will try to reset it again, but there is a
// nasty problem is that, we will still reach here finally and try reset again to see if
// the log has been fully replicated, which is redundant, can be optimized later
return HasNext.RETRY;
}
Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
state = pair.getFirst();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo, this maybe the problem why some replication related tests are unstable...

// should not be written
assert !p.getSecond();
if (state.eof()) {
if (checkAllBytesParsed()) {
// move to the next wal file and read
dequeueCurrentLog();
return HasNext.RETRY_IMMEDIATELY;
} else {
// see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
// beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
// so when calling tryAdvanceENtry next time we will reset the reader to the beginning
// and read.
currentPositionOfEntry = 0;
currentPositionOfReader = 0;
state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
return HasNext.RETRY;
}
}
} else {
if (beingWritten) {
// just sleep a bit and retry to see if there are new entries coming since the file is
// still being written
return HasNext.RETRY;
}
// no more entries in this log file, and the file is already closed, i.e, rolled
// Before dequeuing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader, and the log is rolled
// while we were reading. See HBASE-6758
return lastAttempt();
case ERROR_AND_RESET:
case ERROR_AND_RESET_COMPRESSION:
// we have meet an error, just sleep a bit and retry again
Expand Down Expand Up @@ -393,10 +417,8 @@ private boolean checkAllBytesParsed() {
return false;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
}
LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
stat == null ? "N/A" : stat.getLen());
metrics.incrCompletedWAL();
return true;
}
Expand Down