From b8e0db3c6c5985469a8b9d6d0e2b5dc6f9cd7cc2 Mon Sep 17 00:00:00 2001 From: langdamao Date: Wed, 28 Aug 2019 18:14:51 +0800 Subject: [PATCH 1/2] HBASE-22887 Fix HFileOutputFormat2 writer roll Signed-off-by: langdamao --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index c963c6c18ae2..b49903879e71 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -235,9 +235,9 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] // Map of families to writers and how much has been output on the writer. private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); - private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final Map previousRows = + new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); - private boolean rollRequested = false; @Override public void write(ImmutableBytesWritable row, V cell) @@ -286,12 +286,9 @@ public void write(ImmutableBytesWritable row, V cell) configureStoragePolicy(conf, fs, tableAndFamily, writerPath); } - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + if (wl != null && wl.written + length >= maxsize + && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) { rollWriters(wl); } @@ -348,7 +345,7 @@ public void write(ImmutableBytesWritable row, V cell) wl.written += length; // Copy the row so we know when a row transition. - this.previousRow = rowKey; + this.previousRows.put(family, rowKey); } private Path getTableRelativePath(byte[] tableNameBytes) { @@ -368,7 +365,6 @@ private void rollWriters(WriterLength writerLength) throws IOException { closeWriter(wl); } } - this.rollRequested = false; } private void closeWriter(WriterLength wl) throws IOException { From f726e2fc02ef32adbad0c52a1939017af2dc70e5 Mon Sep 17 00:00:00 2001 From: langdamao Date: Tue, 8 Oct 2019 21:15:01 +0800 Subject: [PATCH 2/2] add ut Signed-off-by: langdamao --- .../mapreduce/TestHFileOutputFormat2.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 60127a665243..ae32c7aa0231 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -430,7 +430,8 @@ public void testWritingPEData() throws Exception { // Set down this value or we OOME in eclipse. conf.setInt("mapreduce.task.io.sort.mb", 20); // Write a few files. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + long hregionMaxFilesize = 10 * 1024; + conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); Job job = new Job(conf, "testWritingPEData"); setupRandomGeneratorMapper(job, false); @@ -457,6 +458,26 @@ public void testWritingPEData() throws Exception { assertTrue(job.waitForCompletion(false)); FileStatus [] files = fs.listStatus(testDir); assertTrue(files.length > 0); + + //check output file num and size. + for (byte[] family : FAMILIES) { + long kvCount= 0; + RemoteIterator iterator = + fs.listFiles(testDir.suffix("/" + new String(family)), true); + while (iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + HFile.Reader reader = + HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); + HFileScanner scanner = reader.getScanner(false, false, false); + + kvCount += reader.getEntries(); + scanner.seekTo(); + long perKVSize = scanner.getCell().getSerializedSize(); + assertTrue("Data size of each file should not be too large.", + perKVSize * reader.getEntries() <= hregionMaxFilesize); + } + assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); + } } /**