2323import java .io .DataOutput ;
2424import java .io .IOException ;
2525import java .io .InputStreamReader ;
26+ import java .net .URI ;
27+ import java .net .URISyntaxException ;
2628import java .nio .charset .StandardCharsets ;
2729import java .util .HashSet ;
2830import java .util .LinkedList ;
3638import org .apache .hadoop .fs .FileSystem ;
3739import org .apache .hadoop .fs .Path ;
3840import org .apache .hadoop .hbase .Cell ;
39- import org .apache .hadoop .hbase .HBaseCommonTestingUtility ;
4041import org .apache .hadoop .hbase .HBaseConfiguration ;
4142import org .apache .hadoop .hbase .HBaseTestingUtility ;
4243import org .apache .hadoop .hbase .IntegrationTestBase ;
5253import org .apache .hadoop .hbase .client .Table ;
5354import org .apache .hadoop .hbase .client .TableDescriptor ;
5455import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
55- import org .apache .hadoop .hbase .io .compress .Compression ;
5656import org .apache .hadoop .hbase .io .encoding .DataBlockEncoding ;
5757import org .apache .hadoop .hbase .mapreduce .TableMapReduceUtil ;
5858import org .apache .hadoop .hbase .regionserver .BloomType ;
6464import org .apache .hadoop .hbase .test .util .warc .WARCRecord ;
6565import org .apache .hadoop .hbase .test .util .warc .WARCWritable ;
6666import org .apache .hadoop .hbase .util .Bytes ;
67- import org .apache .hadoop .hbase .util .CompressionTest ;
6867import org .apache .hadoop .hbase .util .RegionSplitter ;
6968import org .apache .hadoop .io .BytesWritable ;
70- import org .apache .hadoop .io .IOUtils ;
7169import org .apache .hadoop .io .LongWritable ;
7270import org .apache .hadoop .io .NullWritable ;
7371import org .apache .hadoop .io .SequenceFile .CompressionType ;
7472import org .apache .hadoop .io .Writable ;
7573import org .apache .hadoop .mapreduce .Counters ;
76- import org .apache .hadoop .mapreduce .InputFormat ;
77- import org .apache .hadoop .mapreduce .InputSplit ;
7874import org .apache .hadoop .mapreduce .Job ;
7975import org .apache .hadoop .mapreduce .JobContext ;
8076import org .apache .hadoop .mapreduce .Mapper ;
81- import org .apache .hadoop .mapreduce .RecordReader ;
82- import org .apache .hadoop .mapreduce .Reducer ;
83- import org .apache .hadoop .mapreduce .TaskAttemptContext ;
8477import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
8578import org .apache .hadoop .mapreduce .lib .input .SequenceFileInputFormat ;
86- import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
8779import org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
8880import org .apache .hadoop .mapreduce .lib .output .SequenceFileOutputFormat ;
8981import org .apache .hadoop .util .Tool ;
@@ -560,12 +552,27 @@ protected void map(LongWritable key, WARCWritable value, Context output)
560552 if (warcHeader .getRecordType ().equals ("response" ) && warcHeader .getTargetURI () != null ) {
561553 String contentType = warcHeader .getField ("WARC-Identified-Payload-Type" );
562554 if (contentType != null ) {
563- byte [] rowKey = Bytes .toBytes (warcHeader .getTargetURI ());
555+
556+ // Make row key
557+
558+ byte [] rowKey ;
559+ try {
560+ rowKey = rowKeyFromTargetURI (warcHeader .getTargetURI ());
561+ } catch (URISyntaxException e ) {
562+ LOG .warn ("Could not parse URI \" " + warcHeader .getTargetURI () + "\" for record " +
563+ warcHeader .getRecordID ());
564+ return ;
565+ }
566+
567+ // Get the content and calculate the CRC64
568+
564569 byte [] content = value .getRecord ().getContent ();
565570 CRC64 crc = new CRC64 ();
566571 crc .update (content );
567572 long crc64 = crc .getValue ();
568573
574+ // Store to HBase
575+
569576 Put put = new Put (rowKey );
570577
571578 put .addColumn (CONTENT_FAMILY_NAME , CONTENT_QUALIFIER , content );
@@ -585,6 +592,8 @@ protected void map(LongWritable key, WARCWritable value, Context output)
585592
586593 table .put (put );
587594
595+ // If we succeeded in storing to HBase, write records for later verification
596+
588597 output .write (new HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CRC_QUALIFIER ),
589598 new BytesWritable (Bytes .toBytes (crc64 )));
590599 output .write (new HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CONTENT_LENGTH_QUALIFIER ),
@@ -602,6 +611,42 @@ protected void map(LongWritable key, WARCWritable value, Context output)
602611 }
603612 }
604613 }
614+
615+ private byte [] rowKeyFromTargetURI (String targetURI ) throws URISyntaxException {
616+ URI uri = new URI (targetURI );
617+ StringBuffer sb = new StringBuffer ();
618+ // Ignore the scheme
619+ // Reverse the components of the hostname
620+ String [] hostComponents = uri .getHost ().split ("\\ ." );
621+ for (int i = hostComponents .length - 1 ; i >= 0 ; i --) {
622+ sb .append (hostComponents [i ]);
623+ if (i != 0 ) {
624+ sb .append ('.' );
625+ }
626+ }
627+ // Port
628+ if (uri .getPort () != -1 ) {
629+ sb .append (":" );
630+ sb .append (uri .getPort ());
631+ }
632+ // Ignore the rest of the authority
633+ // Path, if present
634+ if (uri .getRawPath () != null ) {
635+ sb .append (uri .getRawPath ());
636+ }
637+ // Query, if present
638+ if (uri .getRawQuery () != null ) {
639+ sb .append ('?' );
640+ sb .append (uri .getRawQuery ());
641+ }
642+ // Fragment, if present
643+ if (uri .getRawFragment () != null ) {
644+ sb .append ('#' );
645+ sb .append (uri .getRawFragment ());
646+ }
647+ return Bytes .toBytes (sb .toString ());
648+ }
649+
605650 }
606651 }
607652
0 commit comments