Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,17 +684,12 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();

for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a worker is doing some async work when it is asked to stop and can take time. then I think we should keep the implementation as it was done before, like ask all to stop at once and then wait. because if no. of workers gets large due to backlog and someone changes wait time config to 10s of seconds, then removePeer command/procedure has to wait for a long time (no. of workers * (sleep time + time for clearWalEntryBatch) ) to terminate the replication source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not following your concern here. I don't see how the extra loop in the same method context just setting two a flag in the shipper and other in the reader can help with the contention scenario described, terminate execution would be stuck in the second for loop anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let me try to explain again.
I was referring to restore this loop.

for (ReplicationSourceShipper worker : workers) {	
      worker.stopWorker();	
      if(worker.entryReader != null) {	
        worker.entryReader.setReaderRunning(false);	
      }	
    }

As your current flow is stopping the worker in a linear manner:-

  • Stop a worker
  • wait for the worker thread to complete.
  • stop another worker
  • wait for it finishes
  • continue for others......
    So in the worst case, you would have to wait for the number of workers * min(time taken by the worker to finish, timeout)

though by restoring the old loop, you are parallelizing the stopping of the workers.

  • ask all worker threads to finish their work by setting their state.
  • then in the second loop, wait for each worker to finish, while you are waiting for 1 worker, others are also completing their work in parallel.
  • so when you are done with one worker it is possible that all other workers are also done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, thanks for explaining in more details. Will address it on next commit.

}
}

if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
// Wait worker to stop
Expand All @@ -712,8 +707,14 @@ public void terminate(String reason, Exception cause, boolean clearMetrics,
worker.entryReader.interrupt();
}
}
//If worker is already stopped but there was still entries batched,
//we need to clear buffer used for non processed entries
worker.clearWALEntryBatch();
}

if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}

/**
* Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
* in case there were unprocessed entries batched by the reader to the shipper,
* but the shipper didn't manage to ship those because the replication source is being terminated.
* In that case, it iterates through the batched entries and decrease the pending
* entries size from <code>ReplicationSourceManager.totalBufferUser</code>
* <p/>
* <b>NOTES</b>
* 1) This method should only be called upon replication source termination.
* It blocks waiting for both shipper and reader threads termination,
* to make sure no race conditions
* when updating <code>ReplicationSourceManager.totalBufferUser</code>.
*
* 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
* have been triggered interruption/termination prior to calling this method.
*/
void clearWALEntryBatch() {
long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
while(this.isAlive() || this.entryReader.isAlive()){
try {
if (System.currentTimeMillis() >= timeout) {
LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
+ "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop.", this.source.getPeerId());
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need additional interrupt here when ReplicationSource.terminate() is already interrupted the worker thread prior to clearWALEntryBatch method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are just interrupting if either shipper or reader thread is still alive. We can't guarantee that the caller will always have stopped these threads, therefore, the extra check here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    1. This method should only be called upon replication source termination.

so what this interrupt will do, how is it handled in the source?

LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
            + "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
            + "thread to stop.", this.source.getPeerId());

don't we need to return here as we timed out and not clearing the batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's not been handled. Changing to simply log the exceptional and return back to source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if return, then we do not clean the batch, so replication quota will be leaked.

} else {
// Wait both shipper and reader threads to stop
Thread.sleep(this.sleepForRetries);
}
} catch (InterruptedException e) {
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}",
this.source.getPeerId(), this.getName(), e);
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be just INFO? Also, I think it might be better tho handle those InterruptedException inside ReplicationSource.terminate().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left as WARN because it aborts the flow without effectively updating the buffer usage, which is the fundamental issue we are trying to solve here.

}
}
LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
entryReader.entryBatchQueue.forEach(w -> {
entryReader.entryBatchQueue.remove(w);
w.getWalEntries().forEach(e -> {
long entrySizeExcludeBulkLoad = entryReader.getEntrySizeExcludeBulkLoad(e);
totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
});
});

LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
totalToDecrement.longValue());
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ReplicationSourceWALReader extends Thread {
private final WALEntryFilter filter;
private final ReplicationSource source;

private final BlockingQueue<WALEntryBatch> entryBatchQueue;
@InterfaceAudience.Private
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -269,6 +272,47 @@ public void testTerminateTimeout() throws Exception {
}
}

@Test
public void testTerminateClearsBuffer() throws Exception {
ReplicationSource source = new ReplicationSource();
ReplicationSourceManager mockManager = mock(ReplicationSourceManager.class);
MetricsReplicationGlobalSourceSource mockMetrics =
mock(MetricsReplicationGlobalSourceSource.class);
AtomicLong buffer = new AtomicLong();
Mockito.when(mockManager.getTotalBufferUsed()).thenReturn(buffer);
Mockito.when(mockManager.getGlobalMetrics()).thenReturn(mockMetrics);
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
source.workerThreads.put("testPeer", shipper);
WALEntryBatch batch = new WALEntryBatch(10, logDir);
WAL.Entry mockEntry = mock(WAL.Entry.class);
WALEdit mockEdit = mock(WALEdit.class);
WALKeyImpl mockKey = mock(WALKeyImpl.class);
when(mockEntry.getEdit()).thenReturn(mockEdit);
when(mockEdit.isEmpty()).thenReturn(false);
when(mockEntry.getKey()).thenReturn(mockKey);
when(mockKey.estimatedSerializedSizeOf()).thenReturn(1000L);
when(mockEdit.heapSize()).thenReturn(10000L);
when(mockEdit.size()).thenReturn(0);
ArrayList<Cell> cells = new ArrayList<>();
KeyValue kv = new KeyValue(Bytes.toBytes("0001"), Bytes.toBytes("f"),
Bytes.toBytes("1"), Bytes.toBytes("v1"));
cells.add(kv);
when(mockEdit.getCells()).thenReturn(cells);
reader.addEntryToBatch(batch, mockEntry);
reader.entryBatchQueue.put(batch);
source.terminate("test");
assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
}

/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
Expand Down Expand Up @@ -438,12 +482,12 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
Server server = mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
ReplicationQueueStorage storage = mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
Expand Down