Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -45,21 +45,17 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -224,38 +220,6 @@ public void enqueueLog(Path log) {
}
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}

private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -62,17 +59,6 @@ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
*/
void enqueueLog(Path log);

/**
* Add hfile names to the queue to be replicated.
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;

/**
* Start the replication
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
Expand Down Expand Up @@ -170,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// replication peer.
private final int maxRetriesMultiplier;

private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();

/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
Expand Down Expand Up @@ -348,6 +351,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);

MetricsSource metrics = new MetricsSource(queueId);
sourceMetrics.put(queueId, metrics);
// init replication source
src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
walFileLengthProvider, metrics);
Expand Down Expand Up @@ -1120,7 +1124,49 @@ public String getStats() {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
for (ReplicationSourceInterface source : this.sources.values()) {
throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
}
}

/**
* Add hfile names to the queue to be replicated.
* @param peerId the replication peer id
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
private void addHFileRefs(String peerId, TableName tableName, byte[] family,
List<Pair<Path, Path>> pairs) throws ReplicationException {
// Only the normal replication source update here, its peerId is equals to queueId.
MetricsSource metrics = sourceMetrics.get(peerId);
ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;

/**
Expand Down Expand Up @@ -113,12 +112,6 @@ public String getStats() {
return "";
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
throws ReplicationException {
return;
}

@Override
public boolean isPeerEnabled() {
return true;
Expand Down