Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
6f1f7bc
HBASE-23205 Correctly update the position of WALs currently being rep…
JeongDaeKim Sep 26, 2019
affc75a
fix checkstyle warnings
JeongDaeKim Oct 24, 2019
b6efe7e
Fix typo
JeongDaeKim Oct 24, 2019
114aa1b
(fix) close writer
Oct 30, 2019
4bcc397
HBASE-23229 Update branch-1 to 1.6.0-SNAPSHOT (#772)
busbey Oct 30, 2019
1dbf6f7
(fix) revert test for HBASE-18137
Oct 31, 2019
c0b8f7b
Revert unnecessary codes
Oct 31, 2019
577db5d
HBASE-23238 Additional test and checks for null references on Scanner…
wchevreuil Oct 31, 2019
3c7c1b5
HBASE-23185 Fix high cpu usage because getTable()#put() gets config v…
bitterfox Oct 31, 2019
2451023
HBASE-23219 Re-enable ZKLess tests for branch-1 (Revert HBASE-14622)
Oct 25, 2019
3f9ce86
HBASE-23246 Fix error prone warning in TestMetricsUserSourceImpl (#789)
apurtell Nov 4, 2019
75620b0
(fix) Change newly added method name
Nov 4, 2019
c92d79e
(fix) add getRecoveredQueueInfo() to make a test more recognizable
Nov 5, 2019
d3ed533
(fix) a check style warning
Nov 5, 2019
1360816
HBASE-23250 Log message about CleanerChore delegate initialization sh…
rabi-kumar Nov 5, 2019
a5f09cd
HBASE-23212 Dynamically reload configs for Region Recovery chore (#803)
virajjasani Nov 5, 2019
cf02e6f
HBASE-23236 Upgrade to yetus 0.11.1
Apache9 Nov 6, 2019
72d622b
HBASE-23228 Allow for jdk8 specific modules on branch-1 in precommit/…
busbey Nov 8, 2019
abf6ec0
HBASE-18439 Subclasses of o.a.h.h.chaos.actions.Action all use the sa…
rabi-kumar Nov 8, 2019
9b30df5
HBASE-23273 Fix table header display is incorrect on table.jsp when v…
guangxuCheng Nov 10, 2019
aa2e487
HBASE-23245 : MutableHistogram constructor changes and provide Histog…
virajjasani Nov 11, 2019
caef9f0
HBASE-23245 : Test Histogram Impl changes for histogram update (Adden…
virajjasani Nov 12, 2019
a154bd8
HBASE-23283 Provide clear and consistent logging about the period of …
liuml07 Nov 13, 2019
5130bc5
HBASE-23287 LogCleaner is not added to choreService
ZhaoBQ Nov 13, 2019
b566a4f
HBASE-22701 Disable the DynamicClassLoader when it fails to initialize
joshelser Jul 16, 2019
8e60b0c
HBASE-23261 Region stuck in transition while splitting
virajjasani Nov 11, 2019
0cae004
(fix) log a message even in empty batch case
Nov 18, 2019
5f36343
HBASE-23288 - Backport HBASE-23251 (Add Column Family and Table Names…
gjacoby126 Nov 19, 2019
eee337f
HBASE-23278 Add a table-level compaction progress display on the UI (…
ZhaoBQ Nov 19, 2019
eb5e94a
HBASE-23259: Populate master address end points in cluster/rs configs…
bharathv Nov 21, 2019
38ae0b5
HBASE-23234 Provide .editorconfig based on checkstyle configuration (…
ndimiduk Nov 21, 2019
da9f6bf
HBASE-23237 Prevent Negative values in metrics requestsPerSecond (#866)
Nov 23, 2019
af2ac03
HBASE-23337 Release scripts should rely on maven for deploy. (#887)
busbey Dec 2, 2019
737eaa6
HBASE-23359 RS going down with NPE when splitting a region with compa…
brfrn169 Dec 4, 2019
ec55c2a
HBASE-22096 /storeFile.jsp shows CorruptHFileException when the store…
brfrn169 Dec 4, 2019
9b10afd
HBASE-23364 HRegionServer sometimes does not shut down.
lhofhansl Dec 6, 2019
f5171b4
HBASE-23073 Add an optional costFunction to balance regions according…
PierreZ Dec 9, 2019
80c3581
HBASE-23552 Format Javadocs on ITBLL
ndimiduk Dec 9, 2019
67ca8db
(fix) Add a new test and expose an api
Dec 11, 2019
84c0a90
HBASE-23360 [CLI] Fix help command 'set_quota' for removing limits (#…
Dec 11, 2019
871e2ea
HBASE-23205 Correctly update the position of WALs currently being rep…
JeongDaeKim Sep 26, 2019
2142ded
fix checkstyle warnings
JeongDaeKim Oct 24, 2019
6a574ff
Fix typo
JeongDaeKim Oct 24, 2019
16d56dd
(fix) close writer
Oct 30, 2019
3e83af8
(fix) revert test for HBASE-18137
Oct 31, 2019
bb5492d
Revert unnecessary codes
Oct 31, 2019
b541c24
(fix) Change newly added method name
Nov 4, 2019
833467c
(fix) add getRecoveredQueueInfo() to make a test more recognizable
Nov 5, 2019
5cc0dca
(fix) a check style warning
Nov 5, 2019
9e08eea
(fix) log a message even in empty batch case
Nov 18, 2019
d6297a7
(fix) Add a new test and expose an api
Dec 11, 2019
572c73b
Merge branch 'HBASE-23205' of https://github.com/JeongDaeKim/hbase in…
wchevreuil Dec 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
Expand Down Expand Up @@ -439,14 +440,30 @@ public String getPeerClusterId() {
}

@Override
@VisibleForTesting
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}

@VisibleForTesting
public Path getLastLoggedPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPath();
}
return null;
}

@VisibleForTesting
public long getLastLoggedPosition() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPosition();
}
return 0;
}

private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
Expand Down Expand Up @@ -481,8 +498,8 @@ public String getStats() {
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
long position = worker.getLastLoggedPosition();
Path currentPath = worker.getLastLoggedPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
Expand Down Expand Up @@ -517,7 +534,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = worker.getCurrentPath();
Path currentPath = worker.getLastLoggedPath();
fileSize = -1;
if (currentPath != null) {
try {
Expand All @@ -535,7 +552,7 @@ public Map<String, ReplicationStatus> getWalGroupStatus() {
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(worker.getCurrentPosition())
.withCurrentPosition(worker.getLastLoggedPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
Expand All @@ -555,7 +572,7 @@ public class ReplicationSourceShipperThread extends Thread {
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
private volatile Path lastLoggedPath;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
Expand Down Expand Up @@ -600,21 +617,19 @@ public void run() {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}

if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
Expand Down Expand Up @@ -694,15 +709,13 @@ private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
lastLoggedPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -787,8 +800,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
}

private void updateLogPosition(long lastReadPosition) {
manager.setPendingShipment(false);
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
Expand All @@ -800,7 +812,7 @@ public void startup() {
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getCurrentPath(), e);
+ getLastLoggedPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Expand Down Expand Up @@ -941,8 +953,12 @@ public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
}

public long getCurrentPosition() {
return this.lastLoggedPosition;
public Path getLastLoggedPath() {
return lastLoggedPath;
}

public long getLastLoggedPosition() {
return lastLoggedPosition;
}

private boolean isWorkerActive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public class ReplicationSourceManager implements ReplicationListener {

private AtomicLong totalBufferUsed = new AtomicLong();

private boolean pendingShipment;

/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
Expand Down Expand Up @@ -191,19 +189,13 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
if (!this.pendingShipment) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
}

public synchronized void setPendingShipment(boolean pendingShipment) {
this.pendingShipment = pendingShipment;
cleanOldLogs(fileName, id, queueRecovered);
}

/**
Expand Down
Loading