diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java index 8452fea47010..fa0cd9d1619a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java @@ -19,12 +19,17 @@ public class ReplicationLoadSink { private final long ageOfLastAppliedOp; private final long timestampsOfLastAppliedOp; + private final long timestampStarted; + private final long totalOpsProcessed; // TODO: add the builder for this class @InterfaceAudience.Private - public ReplicationLoadSink(long age, long timestamp) { + public ReplicationLoadSink(long age, long timestamp, long timestampStarted, + long totalOpsProcessed) { this.ageOfLastAppliedOp = age; this.timestampsOfLastAppliedOp = timestamp; + this.timestampStarted = timestampStarted; + this.totalOpsProcessed = totalOpsProcessed; } public long getAgeOfLastAppliedOp() { @@ -34,4 +39,12 @@ public long getAgeOfLastAppliedOp() { public long getTimestampsOfLastAppliedOp() { return this.timestampsOfLastAppliedOp; } + + public long getTimestampStarted() { + return timestampStarted; + } + + public long getTotalOpsProcessed() { + return totalOpsProcessed; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 66e4c4105c44..e7dcbd14e421 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2843,7 +2843,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink rls) { - return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp()); + return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), + rls.getTimeStampsOfLastAppliedOp(), + rls.getTimestampStarted(), + rls.getTotalOpsProcessed()); } public static ReplicationLoadSource toReplicationLoadSource( @@ -3438,6 +3441,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink( return ClusterStatusProtos.ReplicationLoadSink.newBuilder() .setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp()) .setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp()) + .setTimestampStarted(rls.getTimestampStarted()) + .setTotalOpsProcessed(rls.getTotalOpsProcessed()) .build(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java index 1d6251b993dc..2498e3426a5d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource { void incrAppliedOps(long batchsize); long getLastAppliedOpAge(); void incrAppliedHFiles(long hfileSize); + long getSinkAppliedOps(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java index 485764e7c8a7..18addb01e327 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -58,4 +58,8 @@ public long getLastAppliedOpAge() { public void incrAppliedHFiles(long hfiles) { hfilesCounter.incr(hfiles); } + + @Override public long getSinkAppliedOps() { + return opsCounter.value(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index c25d60c173b9..874bc88cf7f8 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -185,6 +185,8 @@ message ClientMetrics { message ReplicationLoadSink { required uint64 ageOfLastAppliedOp = 1; required uint64 timeStampsOfLastAppliedOp = 2; + required uint64 timestampStarted = 3; + required uint64 totalOpsProcessed = 4; } message ReplicationLoadSource { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index ae57e7842862..5eb3f8766927 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -29,6 +29,7 @@ public class MetricsSink { private long lastTimestampForAge = System.currentTimeMillis(); + private long startTimestamp = System.currentTimeMillis(); private final MetricsReplicationSinkSource mss; public MetricsSink() { @@ -98,4 +99,21 @@ public long getAgeOfLastAppliedOp() { public long getTimestampOfLastAppliedOp() { return this.lastTimestampForAge; } + + /** + * Gets the time stamp from when the Sink was initialized. + * @return startTimestamp + */ + public long getStartTimestamp() { + return this.startTimestamp; + } + + /** + * Gets the total number of OPs delivered to this sink. + * @return totalAplliedOps + */ + public long getAppliedOps() { + return this.mss.getSinkAppliedOps(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index fe4086b632a1..e011e0af737c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -61,6 +61,8 @@ public void buildReplicationLoad(final List sources, ClusterStatusProtos.ReplicationLoadSink.newBuilder(); rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); + rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp()); + rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps()); this.replicationLoadSink = rLoadSinkBuild.build(); this.replicationLoadSourceEntries = new ArrayList<>(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index ca6680ed56d5..9f704efd77d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationStatus.class); + private void insertRowsOnSource() throws IOException { + final byte[] qualName = Bytes.toBytes("q"); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + } + /** * Test for HBASE-9531. *

@@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception { Admin hbaseAdmin = UTIL1.getAdmin(); // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT. hbaseAdmin.disableReplicationPeer(PEER_ID2); - final byte[] qualName = Bytes.toBytes("q"); - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - Put p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - htable1.put(p); - } + insertRowsOnSource(); LOG.info("AFTER PUTS"); // TODO: Change this wait to a barrier. I tried waiting on replication stats to // change but sleeping in main thread seems to mess up background replication. @@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception { assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID()); } + @Test + public void testReplicationStatusSink() throws Exception { + try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) { + ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName(); + ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server); + //First checks if status of timestamp of last applied op is same as RS start, since no edits + //were replicated yet + assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp()); + //now insert some rows on source, so that it gets delivered to target + insertRowsOnSource(); + long wait = Waiter.waitFor(UTIL2.getConfiguration(), + 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server); + return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted(); + } + }); + //If wait is -1, we know predicate condition was never true + assertTrue(wait>=0); + } + } + + private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server) + throws IOException { + ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); + ServerMetrics sm = metrics.getLiveServerMetrics().get(server); + return sm.getReplicationLoadSink(); + } /** * Wait until Master shows metrics counts for ReplicationLoadSourceList that are * greater than greaterThan for serverName before diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 226a480ee84b..a379ff1249f4 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -835,12 +835,18 @@ def status(format, type) r_source_string = ' SOURCE:' r_load_sink = sl.getReplicationLoadSink next if r_load_sink.nil? + if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted() + # If we have applied no operations since we've started replication, + # assume that we're not acting as a sink and don't print the normal information + r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s + r_sink_string << ", Waiting for OPs... " + else + r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s + r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s + r_sink_string << ", TimeStampsOfLastAppliedOp=" + + (java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString() + end - r_sink_string << ' AgeOfLastAppliedOp=' + - r_load_sink.getAgeOfLastAppliedOp.to_s - r_sink_string << ', TimeStampsOfLastAppliedOp=' + - java.util.Date.new(r_load_sink - .getTimestampsOfLastAppliedOp).toString r_load_source_map = sl.getReplicationLoadSourceMap build_source_string(r_load_source_map, r_source_string) puts(format(' %s:', host: server_name.getHostname))