|
29 | 29 | import java.util.Arrays; |
30 | 30 | import java.util.List; |
31 | 31 | import java.util.Objects; |
| 32 | +import org.apache.commons.lang3.mutable.MutableObject; |
32 | 33 | import org.apache.hadoop.conf.Configuration; |
33 | 34 | import org.apache.hadoop.hbase.Cell; |
34 | 35 | import org.apache.hadoop.hbase.CellScanner; |
|
53 | 54 | import org.apache.hadoop.hbase.regionserver.HRegionServer; |
54 | 55 | import org.apache.hadoop.hbase.regionserver.Region; |
55 | 56 | import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| 57 | +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; |
56 | 58 | import org.apache.hadoop.hbase.testclassification.LargeTests; |
57 | 59 | import org.apache.hadoop.hbase.util.Bytes; |
58 | 60 | import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; |
@@ -225,6 +227,50 @@ public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Excepti |
225 | 227 | } |
226 | 228 | } |
227 | 229 |
|
| 230 | + @Test |
| 231 | + public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception { |
| 232 | + TableName tableName = TableName.valueOf("hbase:meta"); |
| 233 | + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); |
| 234 | + Table table = connection.getTable(tableName)) { |
| 235 | + MiniHBaseCluster cluster = HTU.getHBaseCluster(); |
| 236 | + cluster.getMaster().balanceSwitch(false); |
| 237 | + HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); |
| 238 | + ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService() |
| 239 | + .getReplicationManager().catalogReplicationSource.get(); |
| 240 | + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false); |
| 241 | + // there's small chance source reader has passed the peer state check but not yet read the |
| 242 | + // wal, which could allow it to read some added entries before the wal gets deleted, |
| 243 | + // so we are making sure here we only proceed once the reader loop has managed to |
| 244 | + // detect the peer is disabled. |
| 245 | + HTU.waitFor(2000, 100, true, () -> { |
| 246 | + MutableObject<Boolean> readerWaiting = new MutableObject<>(true); |
| 247 | + source.logQueue.getQueues().keySet() |
| 248 | + .forEach(w -> readerWaiting.setValue(readerWaiting.getValue() |
| 249 | + && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get())); |
| 250 | + return readerWaiting.getValue(); |
| 251 | + }); |
| 252 | + // load the data to the table |
| 253 | + for (int i = 0; i < 5; i++) { |
| 254 | + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); |
| 255 | + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); |
| 256 | + LOG.info("flushing table"); |
| 257 | + HTU.flush(tableName); |
| 258 | + LOG.info("compacting table"); |
| 259 | + if (i < 4) { |
| 260 | + HTU.compact(tableName, false); |
| 261 | + } |
| 262 | + } |
| 263 | + HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner(); |
| 264 | + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true); |
| 265 | + // now loads more data without flushing nor compacting |
| 266 | + for (int i = 5; i < 10; i++) { |
| 267 | + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); |
| 268 | + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); |
| 269 | + } |
| 270 | + verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY); |
| 271 | + } |
| 272 | + } |
| 273 | + |
228 | 274 | @Test |
229 | 275 | public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception { |
230 | 276 | MiniHBaseCluster cluster = HTU.getMiniHBaseCluster(); |
|
0 commit comments