From 68dcef9376b8ec9e5f97834d53a439e7d40bed9e Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Wed, 29 Jul 2020 17:14:45 +0800 Subject: [PATCH 1/3] HBASE-24791 Improve HFileOutputFormat2 to avoid always call getTableRelativePath method --- .../hbase/mapreduce/HFileOutputFormat2.java | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index daad1e7584da..68b947ab6408 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -222,6 +222,7 @@ static RecordWriter createRecordWrit private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); + private byte[] tableNameBytes = Bytes.toBytes(writeTableNames);; @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { @@ -235,7 +236,6 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { byte[] rowKey = CellUtil.cloneRow(kv); int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; byte[] family = CellUtil.cloneFamily(kv); - byte[] tableNameBytes = null; if (writeMultipleTables) { tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() @@ -244,11 +244,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) + " not expected"); } - } else { - tableNameBytes = Bytes.toBytes(writeTableNames); } - String tableName = Bytes.toString(tableNameBytes); - Path tableRelPath = getTableRelativePath(tableNameBytes); byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); WriterLength wl = this.writers.get(tableAndFamily); @@ -257,9 +253,9 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { if (wl == null) { Path writerPath = null; if (writeMultipleTables) { + Path tableRelPath = getTableRelativePath(tableNameBytes); writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family))); - } - else { + } else { writerPath = new Path(outputDir, Bytes.toString(family)); } fs.mkdirs(writerPath); @@ -274,39 +270,36 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { // create a new WAL writer, if necessary if (wl == null || wl.writer == null) { + InetSocketAddress[] favoredNodes = null; if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { HRegionLocation loc = null; - + String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { try (Connection connection = ConnectionFactory.createConnection(conf); - RegionLocator locator = - connection.getRegionLocator(TableName.valueOf(tableName))) { + RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { - LOG.warn("Something wrong locating rowkey {} in {}", - Bytes.toString(rowKey), tableName, e); + LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), + tableName, e); loc = null; - } } - + } + } if (null == loc) { LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); - wl = getNewWriter(tableNameBytes, family, conf, null); } else { LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort()); if (initialIsa.isUnresolved()) { LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); - wl = getNewWriter(tableNameBytes, family, conf, null); } else { LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); - wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa - }); + favoredNodes = new InetSocketAddress[] { initialIsa}; } } - } else { - wl = getNewWriter(tableNameBytes, family, conf, null); } + wl = getNewWriter(tableNameBytes, family, conf, favoredNodes); + } // we now have the proper WAL writer. full steam ahead @@ -321,9 +314,9 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { private Path getTableRelativePath(byte[] tableNameBytes) { String tableName = Bytes.toString(tableNameBytes); String[] tableNameParts = tableName.split(":"); - Path tableRelPath = new Path(tableName.split(":")[0]); + Path tableRelPath = new Path(tableNameParts[0]); if (tableNameParts.length > 1) { - tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); + tableRelPath = new Path(tableRelPath, tableNameParts[1]); } return tableRelPath; } @@ -377,7 +370,7 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; encoding = encoding == null ? DataBlockEncoding.NONE : encoding; HFileContextBuilder contextBuilder = new HFileContextBuilder() - .withCompression(compression).withChecksumType(HStore.getChecksumType(conf)) + .withCompression(compression).withDataBlockEncoding(encoding).withChecksumType(HStore.getChecksumType(conf)) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) .withColumnFamily(family).withTableName(tableName); @@ -385,7 +378,6 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration contextBuilder.withIncludesTags(true); } - contextBuilder.withDataBlockEncoding(encoding); HFileContext hFileContext = contextBuilder.build(); if (null == favoredNodes) { wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) From 0bd75fac9e8256e2c5135a5da7981823aa6afe9a Mon Sep 17 00:00:00 2001 From: YeChao Chen Date: Wed, 29 Jul 2020 22:05:02 +0800 Subject: [PATCH 2/3] Code format and remove extra semicolon --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 68b947ab6408..7227771ecbe6 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -222,7 +222,7 @@ static RecordWriter createRecordWrit private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); - private byte[] tableNameBytes = Bytes.toBytes(writeTableNames);; + private byte[] tableNameBytes = Bytes.toBytes(writeTableNames); @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { @@ -254,7 +254,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { Path writerPath = null; if (writeMultipleTables) { Path tableRelPath = getTableRelativePath(tableNameBytes); - writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family))); + writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family))); } else { writerPath = new Path(outputDir, Bytes.toString(family)); } @@ -369,10 +369,10 @@ private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration DataBlockEncoding encoding = overriddenEncoding; encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - HFileContextBuilder contextBuilder = new HFileContextBuilder() - .withCompression(compression).withDataBlockEncoding(encoding).withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) - .withColumnFamily(family).withTableName(tableName); + HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) + .withColumnFamily(family).withTableName(tableName); if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { contextBuilder.withIncludesTags(true); From 29b4332b96d3d0b871c519a582abd5bf78138995 Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Mon, 3 Aug 2020 14:10:57 +0800 Subject: [PATCH 3/3] Code format --- .../apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 7227771ecbe6..08752c192b60 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -222,7 +222,7 @@ static RecordWriter createRecordWrit private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); - private byte[] tableNameBytes = Bytes.toBytes(writeTableNames); + private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames); @Override public void write(ImmutableBytesWritable row, V cell) throws IOException { @@ -276,7 +276,8 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { String tableName = Bytes.toString(tableNameBytes); if (tableName != null) { try (Connection connection = ConnectionFactory.createConnection(conf); - RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) { + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { loc = locator.getRegionLocation(rowKey); } catch (Throwable e) { LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey), @@ -294,7 +295,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); } else { LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); - favoredNodes = new InetSocketAddress[] { initialIsa}; + favoredNodes = new InetSocketAddress[] { initialIsa }; } } }