Skip to content

Commit a62a4b1

Browse files
infraiosunxin
authored andcommitted
HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent 5fa15cf commit a62a4b1

File tree

4 files changed

+48
-60
lines changed

4 files changed

+48
-60
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Collections;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Set;
3029
import java.util.TreeMap;
3130
import java.util.UUID;
3231
import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +34,7 @@
3534
import java.util.concurrent.atomic.AtomicBoolean;
3635
import java.util.concurrent.atomic.AtomicLong;
3736
import java.util.function.Predicate;
37+
3838
import org.apache.commons.lang3.StringUtils;
3939
import org.apache.hadoop.conf.Configuration;
4040
import org.apache.hadoop.fs.FileSystem;
@@ -44,21 +44,17 @@
4444
import org.apache.hadoop.hbase.Server;
4545
import org.apache.hadoop.hbase.ServerName;
4646
import org.apache.hadoop.hbase.TableDescriptors;
47-
import org.apache.hadoop.hbase.TableName;
4847
import org.apache.hadoop.hbase.regionserver.HRegionServer;
4948
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
5049
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
5150
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
5251
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
5352
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54-
import org.apache.hadoop.hbase.replication.ReplicationException;
5553
import org.apache.hadoop.hbase.replication.ReplicationPeer;
5654
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
5755
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
5856
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
5957
import org.apache.hadoop.hbase.replication.WALEntryFilter;
60-
import org.apache.hadoop.hbase.util.Bytes;
61-
import org.apache.hadoop.hbase.util.Pair;
6258
import org.apache.hadoop.hbase.util.Threads;
6359
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
6460
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -260,38 +256,6 @@ public void enqueueLog(Path wal) {
260256
}
261257
}
262258

263-
@Override
264-
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
265-
throws ReplicationException {
266-
String peerId = replicationPeer.getId();
267-
Set<String> namespaces = replicationPeer.getNamespaces();
268-
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
269-
if (tableCFMap != null) { // All peers with TableCFs
270-
List<String> tableCfs = tableCFMap.get(tableName);
271-
if (tableCFMap.containsKey(tableName)
272-
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
273-
this.queueStorage.addHFileRefs(peerId, pairs);
274-
metrics.incrSizeOfHFileRefsQueue(pairs.size());
275-
} else {
276-
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
277-
tableName, Bytes.toString(family), peerId);
278-
}
279-
} else if (namespaces != null) { // Only for set NAMESPACES peers
280-
if (namespaces.contains(tableName.getNamespaceAsString())) {
281-
this.queueStorage.addHFileRefs(peerId, pairs);
282-
metrics.incrSizeOfHFileRefsQueue(pairs.size());
283-
} else {
284-
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
285-
tableName, Bytes.toString(family), peerId);
286-
}
287-
} else {
288-
// user has explicitly not defined any table cfs for replication, means replicate all the
289-
// data
290-
this.queueStorage.addHFileRefs(peerId, pairs);
291-
metrics.incrSizeOfHFileRefsQueue(pairs.size());
292-
}
293-
}
294-
295259
private ReplicationEndpoint createReplicationEndpoint()
296260
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
297261
RegionServerCoprocessorHost rsServerHost = null;

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,9 @@
2828
import org.apache.hadoop.fs.Path;
2929
import org.apache.hadoop.hbase.Server;
3030
import org.apache.hadoop.hbase.ServerName;
31-
import org.apache.hadoop.hbase.TableName;
3231
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
33-
import org.apache.hadoop.hbase.replication.ReplicationException;
3432
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3533
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
36-
import org.apache.hadoop.hbase.util.Pair;
3734
import org.apache.hadoop.hbase.wal.WAL.Entry;
3835
import org.apache.yetus.audience.InterfaceAudience;
3936

@@ -60,17 +57,6 @@ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
6057
*/
6158
void enqueueLog(Path log);
6259

63-
/**
64-
* Add hfile names to the queue to be replicated.
65-
* @param tableName Name of the table these files belongs to
66-
* @param family Name of the family these files belong to
67-
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
68-
* will be added in the queue for replication}
69-
* @throws ReplicationException If failed to add hfile references
70-
*/
71-
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
72-
throws ReplicationException;
73-
7460
/**
7561
* Start the replication
7662
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.hadoop.hbase.replication.ReplicationTracker;
6767
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6868
import org.apache.hadoop.hbase.replication.SyncReplicationState;
69+
import org.apache.hadoop.hbase.util.Bytes;
6970
import org.apache.hadoop.hbase.util.Pair;
7071
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
7172
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -198,6 +199,8 @@ public class ReplicationSourceManager implements ReplicationListener {
198199
*/
199200
AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
200201

202+
private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
203+
201204
/**
202205
* Creates a replication manager and sets the watch on all the other registered region servers
203206
* @param queueStorage the interface for manipulating replication queues
@@ -1154,7 +1157,49 @@ public String getStats() {
11541157
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
11551158
throws IOException {
11561159
for (ReplicationSourceInterface source : this.sources.values()) {
1157-
throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
1160+
throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
1161+
}
1162+
}
1163+
1164+
/**
1165+
* Add hfile names to the queue to be replicated.
1166+
* @param peerId the replication peer id
1167+
* @param tableName Name of the table these files belongs to
1168+
* @param family Name of the family these files belong to
1169+
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
1170+
* will be added in the queue for replication}
1171+
* @throws ReplicationException If failed to add hfile references
1172+
*/
1173+
private void addHFileRefs(String peerId, TableName tableName, byte[] family,
1174+
List<Pair<Path, Path>> pairs) throws ReplicationException {
1175+
// Only the normal replication source update here, its peerId is equals to queueId.
1176+
MetricsSource metrics = sourceMetrics.get(peerId);
1177+
ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
1178+
Set<String> namespaces = replicationPeer.getNamespaces();
1179+
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
1180+
if (tableCFMap != null) { // All peers with TableCFs
1181+
List<String> tableCfs = tableCFMap.get(tableName);
1182+
if (tableCFMap.containsKey(tableName)
1183+
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
1184+
this.queueStorage.addHFileRefs(peerId, pairs);
1185+
metrics.incrSizeOfHFileRefsQueue(pairs.size());
1186+
} else {
1187+
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
1188+
tableName, Bytes.toString(family), peerId);
1189+
}
1190+
} else if (namespaces != null) { // Only for set NAMESPACES peers
1191+
if (namespaces.contains(tableName.getNamespaceAsString())) {
1192+
this.queueStorage.addHFileRefs(peerId, pairs);
1193+
metrics.incrSizeOfHFileRefsQueue(pairs.size());
1194+
} else {
1195+
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
1196+
tableName, Bytes.toString(family), peerId);
1197+
}
1198+
} else {
1199+
// user has explicitly not defined any table cfs for replication, means replicate all the
1200+
// data
1201+
this.queueStorage.addHFileRefs(peerId, pairs);
1202+
metrics.incrSizeOfHFileRefsQueue(pairs.size());
11581203
}
11591204
}
11601205

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,16 @@
2121
import java.util.List;
2222
import java.util.UUID;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.fs.FileSystem;
2627
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.hbase.Server;
2829
import org.apache.hadoop.hbase.ServerName;
29-
import org.apache.hadoop.hbase.TableName;
3030
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
3131
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
3232
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
3333
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
34-
import org.apache.hadoop.hbase.util.Pair;
3534
import org.apache.hadoop.hbase.wal.WAL.Entry;
3635

3736
/**
@@ -114,12 +113,6 @@ public String getStats() {
114113
return "";
115114
}
116115

117-
@Override
118-
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
119-
throws ReplicationException {
120-
return;
121-
}
122-
123116
@Override
124117
public boolean isPeerEnabled() {
125118
return true;

0 commit comments

Comments
 (0)