Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public Result getResult() {
}

/** A result object from prepare flush cache stage */
static class PrepareFlushResult {
protected static class PrepareFlushResult {
final FlushResultImpl result; // indicating a failure result from prepare
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
final TreeMap<byte[], List<Path>> committedFiles;
Expand Down Expand Up @@ -724,7 +724,7 @@ void sawNoSuchFamily() {

private final StoreHotnessProtector storeHotnessProtector;

private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
protected Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();

/**
* HRegion constructor. This constructor should only be used for testing and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver.regionreplication;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -140,4 +141,16 @@ synchronized void recordFlush(long sequenceId) {
pendingFlushRequest = null;
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
synchronized Timeout getPendingFlushRequest() {
return this.pendingFlushRequest;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
synchronized long getLastRequestNanos() {
return this.lastRequestNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver.regionreplication;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -98,7 +99,7 @@ public class RegionReplicationSink {

public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;

private static final class SinkEntry {
static final class SinkEntry {

final WALKeyImpl key;

Expand Down Expand Up @@ -170,7 +171,7 @@ void replicated() {

private volatile long pendingSize;

private long lastFlushedSequenceId;
private volatile long lastFlushedSequenceId;

private boolean sending;

Expand Down Expand Up @@ -215,31 +216,40 @@ private void onComplete(List<SinkEntry> sent,
}
manager.decrease(toReleaseSize);
Set<Integer> failed = new HashSet<>();
long lastFlushedSequenceIdToUse = this.lastFlushedSequenceId;
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
Integer replicaId = entry.getKey();
Throwable error = entry.getValue().getValue();
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
if (maxSequenceId > lastFlushedSequenceIdToUse) {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is greater than the last flush SN {}," +
" we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error);
failed.add(replicaId);
} else {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
" we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
replicaId, primary, maxSequenceId, lastFlushedSequenceIdToUse, error);
}
}
}
checkFailedReplicaAndSend(failed, toReleaseSize, maxSequenceId);
}

protected void checkFailedReplicaAndSend(Set<Integer> failed, long toReleaseSize,
long maxSequenceId) {
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
failedReplicas.addAll(failed);
flushRequester.requestFlush(maxSequenceId);
// double check
if (maxSequenceId > lastFlushedSequenceId) {
if (!failed.isEmpty()) {
failedReplicas.addAll(failed);
flushRequester.requestFlush(maxSequenceId);
}
}
sending = false;
if (stopping) {
Expand All @@ -253,7 +263,7 @@ private void onComplete(List<SinkEntry> sent,
}
}

private void send() {
void send() {
List<SinkEntry> toSend = new ArrayList<>();
long totalSize = 0L;
boolean hasMetaEdit = false;
Expand Down Expand Up @@ -448,4 +458,35 @@ public void waitUntilStopped() throws InterruptedException {
}
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
RegionReplicationFlushRequester getFlushRequester() {
return this.flushRequester;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
IntHashSet getFailedReplicas() {
synchronized (entries) {
return this.failedReplicas;
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
boolean isSending() {
synchronized (entries) {
return this.sending;
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Queue<SinkEntry> getEntries() {
synchronized (entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just return the entries out so I do not get the point on 'synchronized (entries)' here...

Copy link
Contributor Author

@comnetwork comnetwork Feb 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just return the entries out so I do not get the point on 'synchronized (entries)' here...

That method is only for test, because entries is mutated under synchronized, so here I get entries to check it size also synchronized, but remove the synchronized is also ok, because when I check entries in the UTs, there is no operations, use synchronized here just for absolute safety, I could remove it.

return this.entries;
}
}

}
Loading