diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java index 40bad99dd146..d75014512df0 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java @@ -26,12 +26,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPInputStream; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; @@ -84,7 +84,6 @@ import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; /** @@ -163,17 +162,17 @@ public static enum Counts { REFERENCED, UNREFERENCED, CORRUPT } - Path warcFileInputDir = null; - Path outputDir = null; - String[] args; + protected Path warcFileInputDir = null; + protected Path outputDir = null; + protected String[] args; - protected int runLoader(Path warcFileInputDir, Path outputDir) throws Exception { + protected int runLoader(final Path warcFileInputDir, final Path outputDir) throws Exception { Loader loader = new Loader(); loader.setConf(conf); return loader.run(warcFileInputDir, outputDir); } - protected int runVerify(Path inputDir) throws Exception { + protected int runVerify(final Path inputDir) throws Exception { Verify verify = new Verify(); verify.setConf(conf); return verify.run(inputDir); @@ -208,7 +207,7 @@ public int run(String[] args) { } @Override - protected void processOptions(CommandLine cmd) { + protected void processOptions(final CommandLine cmd) { processBaseOptions(cmd); args = cmd.getArgs(); } @@ -232,7 +231,7 @@ public void cleanUpCluster() throws Exception { } } - static TableName getTablename(Configuration c) { + static TableName getTablename(final Configuration c) { return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME)); } @@ -421,7 +420,7 @@ public static class Loader extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(Loader.class); private static final String USAGE = "Loader "; - void createSchema(TableName tableName) throws IOException { + void createSchema(final TableName tableName) throws IOException { try (Connection conn = ConnectionFactory.createConnection(getConf()); Admin admin = conn.getAdmin()) { @@ -477,24 +476,24 @@ void createSchema(TableName tableName) throws IOException { } } - int run(Path warcFileInput, Path outputDir) + int run(final Path warcFileInput, final Path outputDir) throws IOException, ClassNotFoundException, InterruptedException { createSchema(getTablename(getConf())); - Job job = Job.getInstance(getConf()); + final Job job = Job.getInstance(getConf()); job.setJobName(Loader.class.getName()); job.setNumReduceTasks(0); job.setJarByClass(getClass()); job.setMapperClass(LoaderMapper.class); job.setInputFormatClass(WARCInputFormat.class); - FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf()); + final FileSystem fs = FileSystem.get(warcFileInput.toUri(), getConf()); if (fs.getFileStatus(warcFileInput).isDirectory()) { LOG.info("Using directory as WARC input path: " + warcFileInput); FileInputFormat.setInputPaths(job, warcFileInput); - } else { + } else if (warcFileInput.toUri().getScheme().equals("file")) { LOG.info("Getting WARC input paths from file: " + warcFileInput); - List paths = new LinkedList(); + final List paths = new ArrayList(); try (FSDataInputStream is = fs.open(warcFileInput)) { InputStreamReader reader; if (warcFileInput.getName().toLowerCase().endsWith(".gz")) { @@ -511,6 +510,8 @@ int run(Path warcFileInput, Path outputDir) } LOG.info("Read " + paths.size() + " WARC input paths from " + warcFileInput); FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()])); + } else { + FileInputFormat.setInputPaths(job, warcFileInput); } job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job, outputDir); @@ -550,20 +551,19 @@ public static void main(String[] args) throws Exception { public static class LoaderMapper extends Mapper { - Configuration conf; - Connection conn; - BufferedMutator mutator; + protected Configuration conf; + protected Connection conn; + protected BufferedMutator mutator; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(final Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); conn = ConnectionFactory.createConnection(conf); mutator = conn.getBufferedMutator(getTablename(conf)); - mutator.setWriteBufferPeriodicFlush(10 * 1000); // default is 1 sec, increase to 10 } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { + protected void cleanup(final Context context) throws IOException, InterruptedException { try { mutator.close(); } catch (Exception e) { @@ -577,16 +577,15 @@ protected void cleanup(Context context) throws IOException, InterruptedException } @Override - protected void map(LongWritable key, WARCWritable value, Context output) + protected void map(final LongWritable key, final WARCWritable value, final Context output) throws IOException, InterruptedException { - WARCRecord.Header warcHeader = value.getRecord().getHeader(); - String recordID = warcHeader.getRecordID(); - String targetURI = warcHeader.getTargetURI(); + final WARCRecord.Header warcHeader = value.getRecord().getHeader(); + final String recordID = warcHeader.getRecordID(); + final String targetURI = warcHeader.getTargetURI(); if (warcHeader.getRecordType().equals("response") && targetURI != null) { - String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); + final String contentType = warcHeader.getField("WARC-Identified-Payload-Type"); if (contentType != null) { LOG.info("Processing uri=\"" + targetURI + "\", id=" + recordID); - long now = EnvironmentEdgeManager.currentTime(); // Make row key @@ -604,62 +603,63 @@ protected void map(LongWritable key, WARCWritable value, Context output) // Get the content and calculate the CRC64 - byte[] content = value.getRecord().getContent(); - CRC64 crc = new CRC64(); + final byte[] content = value.getRecord().getContent(); + final CRC64 crc = new CRC64(); crc.update(content); - long crc64 = crc.getValue(); + final long crc64 = crc.getValue(); // Store to HBase - Put put = new Put(rowKey); - put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, now, content); - put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, now, + final long ts = getCurrentTime(); + final Put put = new Put(rowKey); + put.addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER, ts, content); + put.addColumn(INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, ts, Bytes.toBytes(content.length)); - put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, now, + put.addColumn(INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, ts, Bytes.toBytes(contentType)); - put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, now, Bytes.toBytes(crc64)); - put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, now, Bytes.toBytes(recordID)); - put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, now, Bytes.toBytes(targetURI)); - put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, now, + put.addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER, ts, Bytes.toBytes(crc64)); + put.addColumn(INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, ts, Bytes.toBytes(recordID)); + put.addColumn(INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, ts, Bytes.toBytes(targetURI)); + put.addColumn(INFO_FAMILY_NAME, DATE_QUALIFIER, ts, Bytes.toBytes(warcHeader.getDateString())); - String ipAddr = warcHeader.getField("WARC-IP-Address"); + final String ipAddr = warcHeader.getField("WARC-IP-Address"); if (ipAddr != null) { - put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, now, Bytes.toBytes(ipAddr)); + put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr)); } mutator.mutate(put); // Write records out for later verification, one per HBase field except for the // content record, which will be verified by CRC64. - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, now), + output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CRC_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(crc64))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_LENGTH_QUALIFIER, - now), new BytesWritable(Bytes.toBytes(content.length))); + ts), new BytesWritable(Bytes.toBytes(content.length))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, CONTENT_TYPE_QUALIFIER, - now), new BytesWritable(Bytes.toBytes(contentType))); + ts), new BytesWritable(Bytes.toBytes(contentType))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, RECORD_ID_QUALIFIER, - now), new BytesWritable(Bytes.toBytes(recordID))); + ts), new BytesWritable(Bytes.toBytes(recordID))); output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, TARGET_URI_QUALIFIER, - now), new BytesWritable(Bytes.toBytes(targetURI))); - output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, now), + ts), new BytesWritable(Bytes.toBytes(targetURI))); + output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, DATE_QUALIFIER, ts), new BytesWritable(Bytes.toBytes(warcHeader.getDateString()))); if (ipAddr != null) { output.write(new HBaseKeyWritable(rowKey, INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, - now), new BytesWritable(Bytes.toBytes(ipAddr))); + ts), new BytesWritable(Bytes.toBytes(ipAddr))); } } } } - private byte[] rowKeyFromTargetURI(String targetUri) + private byte[] rowKeyFromTargetURI(final String targetUri) throws URISyntaxException, IllegalArgumentException { - URI uri = new URI(targetUri); + final URI uri = new URI(targetUri); // Ignore the scheme // Reverse the components of the hostname String reversedHost; if (uri.getHost() != null) { - StringBuffer sb = new StringBuffer(); - String[] hostComponents = uri.getHost().split("\\."); + final StringBuilder sb = new StringBuilder(); + final String[] hostComponents = uri.getHost().split("\\."); for (int i = hostComponents.length - 1; i >= 0; i--) { sb.append(hostComponents[i]); if (i != 0) { @@ -670,7 +670,7 @@ private byte[] rowKeyFromTargetURI(String targetUri) } else { throw new IllegalArgumentException("URI is missing host component"); } - StringBuffer sb = new StringBuffer(); + final StringBuilder sb = new StringBuilder(); sb.append(reversedHost); if (uri.getPort() >= 0) { sb.append(':'); @@ -700,7 +700,7 @@ private byte[] rowKeyFromTargetURI(String targetUri) public static class OneFilePerMapperSFIF extends SequenceFileInputFormat { @Override - protected boolean isSplitable(JobContext context, Path filename) { + protected boolean isSplitable(final JobContext context, final Path filename) { return false; } } @@ -710,7 +710,8 @@ public static class Verify extends Configured implements Tool { public static final Logger LOG = LoggerFactory.getLogger(Verify.class); public static final String USAGE = "Verify "; - int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException { + int run(final Path inputDir) + throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(getConf()); job.setJobName(Verify.class.getName()); job.setJarByClass(getClass()); @@ -725,10 +726,18 @@ int run(Path inputDir) throws IOException, ClassNotFoundException, InterruptedEx if (!success) { LOG.error("Failure during job " + job.getJobID()); } - Counters counters = job.getCounters(); + final Counters counters = job.getCounters(); for (Counts c: Counts.values()) { LOG.info(c + ": " + counters.findCounter(c).getValue()); } + if (counters.findCounter(Counts.UNREFERENCED).getValue() > 0) { + LOG.error("Nonzero UNREFERENCED count from job " + job.getJobID()); + success = false; + } + if (counters.findCounter(Counts.CORRUPT).getValue() > 0) { + LOG.error("Nonzero CORRUPT count from job " + job.getJobID()); + success = false; + } return success ? 0 : 1; } @@ -749,44 +758,51 @@ public static void main(String[] args) throws Exception { public static class VerifyMapper extends Mapper { - Connection conn; - Table table; + private Connection conn; + private Table table; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(final Context context) throws IOException, InterruptedException { conn = ConnectionFactory.createConnection(context.getConfiguration()); table = conn.getTable(getTablename(conn.getConfiguration())); } @Override - protected void cleanup(Context context) throws IOException ,InterruptedException { - table.close(); - conn.close(); + protected void cleanup(final Context context) throws IOException, InterruptedException { + try { + table.close(); + } catch (Exception e) { + LOG.warn("Exception closing Table", e); + } + try { + conn.close(); + } catch (Exception e) { + LOG.warn("Exception closing Connection", e); + } } @Override - protected void map(HBaseKeyWritable key, BytesWritable value, Context output) - throws IOException, InterruptedException { - - byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength()); - byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(), + protected void map(final HBaseKeyWritable key, final BytesWritable value, + final Context output) throws IOException, InterruptedException { + final byte[] row = Bytes.copy(key.getRowArray(), key.getRowOffset(), key.getRowLength()); + final byte[] family = Bytes.copy(key.getFamilyArray(), key.getFamilyOffset(), key.getFamilyLength()); - byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), + final byte[] qualifier = Bytes.copy(key.getQualifierArray(), key.getQualifierOffset(), key.getQualifierLength()); - long ts = key.getTimestamp(); - int retries = VERIFICATION_READ_RETRIES; + final long ts = key.getTimestamp(); + int retries = VERIFICATION_READ_RETRIES; while (true) { if (Bytes.equals(INFO_FAMILY_NAME, family) && Bytes.equals(CRC_QUALIFIER, qualifier)) { - long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); - Result result = table.get(new Get(row) + final long expectedCRC64 = Bytes.toLong(value.getBytes(), 0, value.getLength()); + final Result result = table.get(new Get(row) .addColumn(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER) .addColumn(INFO_FAMILY_NAME, CRC_QUALIFIER) .setTimestamp(ts)); - byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); + final byte[] content = result.getValue(CONTENT_FAMILY_NAME, CONTENT_QUALIFIER); if (content == null) { if (retries-- > 0) { continue; @@ -795,18 +811,15 @@ protected void map(HBaseKeyWritable key, BytesWritable value, Context output) output.getCounter(Counts.UNREFERENCED).increment(1); return; } else { - CRC64 crc = new CRC64(); + final CRC64 crc = new CRC64(); crc.update(content); if (crc.getValue() != expectedCRC64) { - if (retries-- > 0) { - continue; - } LOG.error("Row " + Bytes.toStringBinary(row) + ": corrupt content"); output.getCounter(Counts.CORRUPT).increment(1); return; } } - byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); + final byte[] crc = result.getValue(INFO_FAMILY_NAME, CRC_QUALIFIER); if (crc == null) { if (retries-- > 0) { continue; @@ -826,10 +839,10 @@ protected void map(HBaseKeyWritable key, BytesWritable value, Context output) } else { - Result result = table.get(new Get(row) + final Result result = table.get(new Get(row) .addColumn(family, qualifier) .setTimestamp(ts)); - byte[] bytes = result.getValue(family, qualifier); + final byte[] bytes = result.getValue(family, qualifier); if (bytes == null) { if (retries-- > 0) { continue; @@ -862,4 +875,15 @@ protected void map(HBaseKeyWritable key, BytesWritable value, Context output) } } + private static final AtomicLong counter = new AtomicLong(); + + private static long getCurrentTime() { + // Typical hybrid logical clock scheme. + // Take the current time, shift by 16 bits and zero those bits, and replace those bits + // with the low 16 bits of the atomic counter. Mask off the high bit too because timestamps + // cannot be negative. + return ((EnvironmentEdgeManager.currentTime() << 16) & 0x7fff_ffff_ffff_0000L) | + (counter.getAndIncrement() & 0xffffL); + } + }