diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java index 9a8c4fbb5456..2cadc05a1a0c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hbase.mapreduce; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -33,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -65,22 +68,41 @@ public class RowCounter extends AbstractHBaseTool { private final static String OPT_END_TIME = "endtime"; private final static String OPT_RANGE = "range"; private final static String OPT_EXPECTED_COUNT = "expectedCount"; + private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers"; private String tableName; private List rowRangeList; private long startTime; private long endTime; private long expectedCount; + private boolean countDeleteMarkers; private List columns = new ArrayList<>(); + private Job job; + /** * Mapper that runs the count. */ static class RowCounterMapper extends TableMapper { - /** Counter enumeration to count the actual rows. */ + /** Counter enumeration to count the actual rows, cells and delete markers. */ public static enum Counters { - ROWS + ROWS, + DELETE, + DELETE_COLUMN, + DELETE_FAMILY, + DELETE_FAMILY_VERSION, + ROWS_WITH_DELETE_MARKER + } + + private boolean countDeleteMarkers; + + @Override + protected void + setup(Mapper.Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false); } /** @@ -95,6 +117,37 @@ public static enum Counters { public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException { // Count every row containing data, whether it's in qualifiers or values context.getCounter(Counters.ROWS).increment(1); + + if (countDeleteMarkers) { + boolean rowContainsDeleteMarker = false; + for (Cell cell : values.rawCells()) { + Cell.Type type = cell.getType(); + switch (type) { + case Delete: + rowContainsDeleteMarker = true; + context.getCounter(Counters.DELETE).increment(1); + break; + case DeleteColumn: + rowContainsDeleteMarker = true; + context.getCounter(Counters.DELETE_COLUMN).increment(1); + break; + case DeleteFamily: + rowContainsDeleteMarker = true; + context.getCounter(Counters.DELETE_FAMILY).increment(1); + break; + case DeleteFamilyVersion: + rowContainsDeleteMarker = true; + context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1); + break; + default: + break; + } + } + + if (rowContainsDeleteMarker) { + context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1); + } + } } } @@ -105,11 +158,14 @@ public void map(ImmutableBytesWritable row, Result values, Context context) thro * @throws IOException When setting up the job fails. */ public Job createSubmittableJob(Configuration conf) throws IOException { + conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); job.setJarByClass(RowCounter.class); Scan scan = new Scan(); + // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set + scan.setRaw(this.countDeleteMarkers); scan.setCacheBlocks(false); - setScanFilter(scan, rowRangeList); + setScanFilter(scan, rowRangeList, this.countDeleteMarkers); for (String columnName : this.columns) { String family = StringUtils.substringBefore(columnName, ":"); @@ -147,6 +203,7 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws List rowRangeList = null; long startTime = 0; long endTime = 0; + boolean countDeleteMarkers = false; StringBuilder sb = new StringBuilder(); @@ -154,6 +211,7 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws final String startTimeArgKey = "--starttime="; final String endTimeArgKey = "--endtime="; final String expectedCountArg = "--expected-count="; + final String countDeleteMarkersArg = "--countDeleteMarkers"; // First argument is table name, starting from second for (int i = 1; i < args.length; i++) { @@ -179,10 +237,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws Long.parseLong(args[i].substring(expectedCountArg.length()))); continue; } + if (args[i].startsWith(countDeleteMarkersArg)) { + countDeleteMarkers = true; + continue; + } // if no switch, assume column names sb.append(args[i]); sb.append(" "); } + conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers); if (endTime < startTime) { printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); return null; @@ -192,7 +255,9 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws job.setJarByClass(RowCounter.class); Scan scan = new Scan(); scan.setCacheBlocks(false); - setScanFilter(scan, rowRangeList); + // raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set + scan.setRaw(countDeleteMarkers); + setScanFilter(scan, rowRangeList, countDeleteMarkers); if (sb.length() > 0) { for (String columnName : sb.toString().trim().split(" ")) { String family = StringUtils.substringBefore(columnName, ":"); @@ -250,9 +315,11 @@ private static List parseRowRangeParameter(String * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList * contains exactly one element, startRow and stopRow are set to the scan. */ - private static void setScanFilter(Scan scan, List rowRangeList) { + private static void setScanFilter(Scan scan, List rowRangeList, + boolean countDeleteMarkers) { final int size = rowRangeList == null ? 0 : rowRangeList.size(); - if (size <= 1) { + // all cells will be needed if --countDeleteMarkers flag is set, hence, skipping filter + if (size <= 1 && !countDeleteMarkers) { scan.setFilter(new FirstKeyOnlyFilter()); } if (size == 1) { @@ -295,10 +362,15 @@ protected void addOptions() { .desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build(); Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true) .desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build(); + Option countDeleteMarkersOption = Option.builder(null).hasArg(false) + .desc("counts the number of Delete Markers of all types, i.e. " + + "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)") + .longOpt(OPT_COUNT_DELETE_MARKERS).build(); addOption(startTimeOption); addOption(endTimeOption); addOption(rangeOption); addOption(expectedOption); + addOption(countDeleteMarkersOption); } @Override @@ -316,6 +388,7 @@ protected void processOptions(CommandLine cmd) throws IllegalArgumentException { this.startTime = cmd.getOptionValue(OPT_START_TIME) == null ? 0 : Long.parseLong(cmd.getOptionValue(OPT_START_TIME)); + this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS); for (int i = 1; i < cmd.getArgList().size(); i++) { String argument = cmd.getArgList().get(i); @@ -347,7 +420,7 @@ protected void processOldArgs(List args) { @Override protected int doWork() throws Exception { - Job job = createSubmittableJob(getConf()); + job = createSubmittableJob(getConf()); if (job == null) { return -1; } @@ -388,4 +461,10 @@ protected CommandLineParser newParser() { return new RowCounterCommandLineParser(); } + @RestrictedApi(explanation = "Only visible for testing", link = "", + allowedOnPath = ".*/src/test/.*") + Job getMapReduceJob() { + return job; + } + } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java index 8f15fb1c170a..1922b89bc2c8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -26,9 +26,11 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -524,6 +527,137 @@ public void testInvalidTable() throws Exception { } } + /** + * Step 1: Add 10 rows(row1, row2, row3, row4, row5, row6, row7, row8, row9, row10) to a table. + * Each row contains 1 column family and 4 columns and values for two different timestamps - 5 & + * 10. + *

+ * Step 2: Delete the latest version of column A for row1. --> 1 X Delete + *

+ * Step 3: Delete the cell for timestamp 5 of column B for row1. --> 1 X Delete + *

+ * Step 4: Delete a column family for row2 and row4. --> 2 X DeleteFamily + *

+ * Step 5: Delete all versions of a specific column for row3, row5 and row6. --> 3 X DeleteColumn + *

+ * Step 6: Delete all columns for timestamp 5 for row 7. --> 1 X DeleteFamilyVersion + *

+ * Case 1: Run row counter without countDeleteMarkers and validate counter values. + *

+ * Case 2: Run row counter with countDeleteMarkers flag and validate counter values. + *

+ * Case 3: Run row counter with countDeleteMarkers flag for a row range and validate counter + * values. + */ + @Test + public void testRowCounterWithCountDeleteMarkersOption() throws Exception { + // Test Setup + + final TableName tableName = + TableName.valueOf(TABLE_NAME + "_" + "withCountDeleteMarkersOption"); + // Row keys are represented in this way because of HBASE-15287 + final byte[][] rowKeys = { Bytes.toBytesBinary("\\x00row1"), Bytes.toBytesBinary("\\x00row2"), + Bytes.toBytesBinary("\\x00row3"), Bytes.toBytesBinary("\\x00row4"), + Bytes.toBytesBinary("\\x00row5"), Bytes.toBytesBinary("\\x00row6"), + Bytes.toBytesBinary("\\x00row7"), Bytes.toBytesBinary("\\x00row8"), + Bytes.toBytesBinary("\\x00row9"), Bytes.toBytesBinary("\\x00row10") }; + final byte[] columnFamily = Bytes.toBytes("cf"); + final byte[][] columns = + { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C"), Bytes.toBytes("D") }; + final byte[][] values = { Bytes.toBytes("a"), Bytes.toBytes("b") }; + + try (Table table = TEST_UTIL.createTable(tableName, columnFamily)) { + // Step 1: Insert rows with columns + for (byte[] rowKey : rowKeys) { + Put put = new Put(rowKey); + for (byte[] col : columns) { + long timestamp = 5L; + for (byte[] value : values) { + put.addColumn(columnFamily, col, timestamp, value); + timestamp += 5L; + } + } + table.put(put); + } + TEST_UTIL.getAdmin().flush(tableName); + + // Steps 2-6 + Delete deleteA = new Delete(rowKeys[0]).addColumn(columnFamily, columns[0]); + Delete deleteB = new Delete(rowKeys[0]).addColumn(columnFamily, columns[1], 5L); + Delete deleteC = new Delete(rowKeys[1]).addFamily(columnFamily); + Delete deleteD = new Delete(rowKeys[2]).addColumns(columnFamily, columns[0]); + Delete deleteE = new Delete(rowKeys[3]).addFamily(columnFamily); + Delete deleteF = new Delete(rowKeys[4]).addColumns(columnFamily, columns[0]); + Delete deleteG = new Delete(rowKeys[5]).addColumns(columnFamily, columns[0]); + Delete deleteH = new Delete(rowKeys[6]).addFamilyVersion(columnFamily, 5L); + + table.delete(deleteA); + table.delete(deleteB); + table.delete(deleteC); + table.delete(deleteD); + table.delete(deleteE); + table.delete(deleteF); + table.delete(deleteG); + table.delete(deleteH); + TEST_UTIL.getAdmin().flush(tableName); + } + + RowCounter rowCounterWithoutCountDeleteMarkers = new RowCounter(); + RowCounter rowCounterWithCountDeleteMarkers = new RowCounter(); + RowCounter rowCounterForRangeWithCountDeleteMarkers = new RowCounter(); + rowCounterWithoutCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration())); + rowCounterWithCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration())); + rowCounterForRangeWithCountDeleteMarkers + .setConf(new Configuration(TEST_UTIL.getConfiguration())); + + // Invocation + + rowCounterWithoutCountDeleteMarkers.run(new String[] { tableName.getNameAsString() }); + rowCounterWithCountDeleteMarkers + .run(new String[] { tableName.getNameAsString(), "--countDeleteMarkers" }); + rowCounterForRangeWithCountDeleteMarkers.run(new String[] { tableName.getNameAsString(), + "--countDeleteMarkers", "--range=\\x00row8,\\x00row9" }); + + // Validation + + // Case 1: + validateCounterCounts(rowCounterWithoutCountDeleteMarkers.getMapReduceJob().getCounters(), 8, 0, + 0, 0, 0, 0); + + // Case 2: + validateCounterCounts(rowCounterWithCountDeleteMarkers.getMapReduceJob().getCounters(), 10, 7, + 2, 3, 2, 1); + + // Case 3: + validateCounterCounts(rowCounterForRangeWithCountDeleteMarkers.getMapReduceJob().getCounters(), + 1, 0, 0, 0, 0, 0); + } + + private void validateCounterCounts(Counters counters, long rowCount, + long rowsWithDeleteMarkersCount, long deleteCount, long deleteColumnCount, + long deleteFamilyCount, long deleteFamilyVersionCount) { + + long actualRowCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue(); + long actualRowsWithDeleteMarkersCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS_WITH_DELETE_MARKER).getValue(); + long actualDeleteCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE).getValue(); + long actualDeleteColumnCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_COLUMN).getValue(); + long actualDeleteFamilyCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY).getValue(); + long actualDeleteFamilyVersionCount = + counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY_VERSION).getValue(); + + assertEquals(rowCount, actualRowCount); + assertEquals(rowsWithDeleteMarkersCount, actualRowsWithDeleteMarkersCount); + assertEquals(deleteCount, actualDeleteCount); + assertEquals(deleteColumnCount, actualDeleteColumnCount); + assertEquals(deleteFamilyCount, actualDeleteFamilyCount); + assertEquals(deleteFamilyVersionCount, actualDeleteFamilyVersionCount); + } + private void assertUsageContent(String usage) { assertTrue(usage .contains("usage: hbase rowcounter " + " [options] [ ...]"));