Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
Expand Down Expand Up @@ -436,14 +437,30 @@ public String getPeerClusterId() {
}

@Override
@VisibleForTesting
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}

@VisibleForTesting
public Path getLastLoggedPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the workerThreads sorted so we get last logged first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sorted, It's only for testing in non-multi wal environment, as same as ReplicationSource.getCurrentPath().

}
return null;
}

@VisibleForTesting
public long getLastLoggedPosition() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPosition();
}
return 0;
}

private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
Expand Down Expand Up @@ -478,8 +495,8 @@ public String getStats() {
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
long position = worker.getLastLoggedPosition();
Path currentPath = worker.getLastLoggedPath();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for test, right? We iterate the workers twice -- once to get path and then again to get position. Don't want to return a Pair with Path and offset so just iterate once?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... reading later in the patch, I see need to have them distinct. For sure we'll get the right offset for the selected path?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, not for test and not the same one. each thread have it's own path and position. no need to iterate

sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
Expand Down Expand Up @@ -513,7 +530,7 @@ public class ReplicationSourceShipperThread extends Thread {
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
private volatile Path lastLoggedPath;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
Expand Down Expand Up @@ -553,21 +570,19 @@ public void run() {
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
&& entryBatch.getLastSeqIds().isEmpty()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}

if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
Expand Down Expand Up @@ -635,15 +650,13 @@ private void checkBandwidthChangeAndResetThrottler() {
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
currentPath = entryBatch.getLastWalPath();
lastLoggedPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
updateLogPosition(lastReadPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we be doing a bunch of spinning updating same value over and over w/o these checks in place that look to see if position has changed? Will we be burning CPU to no end?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't be updated with the same value repeatably. entryReader will put a batch only when read position is changed.

// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -727,8 +740,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
}

private void updateLogPosition(long lastReadPosition) {
manager.setPendingShipment(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its ok to remove this setting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do pending flag anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't need the flag anymore. it was required because not only shipper but also reader thread can update log position, but it had some issues about concurrency and high update frequency. With this PR, when wal rolled or log position changed, reader will put a batch for shipper to update log position.

manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
Expand All @@ -740,7 +752,7 @@ public void startup() {
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getCurrentPath(), e);
+ getLastLoggedPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Expand Down Expand Up @@ -881,8 +893,12 @@ public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
}

public long getCurrentPosition() {
return this.lastLoggedPosition;
public Path getLastLoggedPath() {
return lastLoggedPath;
}

public long getLastLoggedPosition() {
return lastLoggedPosition;
}

private boolean isWorkerActive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;

private boolean pendingShipment;

/**
* Creates a replication manager and sets the watch on all the other registered region servers
Expand Down Expand Up @@ -188,19 +187,13 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
if (!this.pendingShipment) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
}

public synchronized void setPendingShipment(boolean pendingShipment) {
this.pendingShipment = pendingShipment;
cleanOldLogs(fileName, id, queueRecovered);
}

/**
Expand Down
Loading