Skip to content

Commit eaa4c4f

Browse files
yihuacodope
andauthored
[HUDI-1180] Upgrade HBase to 2.4.9 (#5004)
Co-authored-by: Sagar Sumit <[email protected]>
1 parent 5e86cdd commit eaa4c4f

File tree

41 files changed

+3866
-746
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3866
-746
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,8 @@ private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
548548
case AVRO_DATA_BLOCK:
549549
return new HoodieAvroDataBlock(recordList, header, keyField);
550550
case HFILE_DATA_BLOCK:
551-
return new HoodieHFileDataBlock(recordList, header, writeConfig.getHFileCompressionAlgorithm());
551+
return new HoodieHFileDataBlock(
552+
recordList, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath()));
552553
case PARQUET_DATA_BLOCK:
553554
return new HoodieParquetDataBlock(recordList, header, keyField, writeConfig.getParquetCompressionCodec());
554555
default:

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import org.apache.avro.Schema;
3232
import org.apache.avro.generic.IndexedRecord;
33+
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.hadoop.fs.Path;
3435
import org.apache.parquet.avro.AvroSchemaConverter;
3536

@@ -53,39 +54,42 @@ public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, O>
5354
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, config.populateMetaFields());
5455
}
5556
if (HFILE.getFileExtension().equals(extension)) {
56-
return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
57+
return newHFileFileWriter(
58+
instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
5759
}
5860
if (ORC.getFileExtension().equals(extension)) {
59-
return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
61+
return newOrcFileWriter(
62+
instantTime, path, config, schema, hoodieTable.getHadoopConf(), taskContextSupplier);
6063
}
6164
throw new UnsupportedOperationException(extension + " format not supported yet.");
6265
}
6366

6467
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
6568
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
6669
TaskContextSupplier taskContextSupplier, boolean populateMetaFields) throws IOException {
67-
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, populateMetaFields, populateMetaFields);
70+
return newParquetFileWriter(instantTime, path, config, schema, hoodieTable.getHadoopConf(),
71+
taskContextSupplier, populateMetaFields, populateMetaFields);
6872
}
6973

7074
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
71-
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
75+
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
7276
TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter) throws IOException {
7377
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
74-
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, filter);
78+
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);
7579

7680
HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
7781
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
78-
hoodieTable.getHadoopConf(), config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
82+
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());
7983

8084
return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
8185
}
8286

83-
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
84-
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
87+
static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
88+
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
8589
TaskContextSupplier taskContextSupplier) throws IOException {
8690

8791
BloomFilter filter = createBloomFilter(config);
88-
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
92+
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(conf,
8993
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
9094
HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
9195
filter, HFILE_COMPARATOR);
@@ -94,10 +98,10 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
9498
}
9599

96100
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter(
97-
String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
101+
String instantTime, Path path, HoodieWriteConfig config, Schema schema, Configuration conf,
98102
TaskContextSupplier taskContextSupplier) throws IOException {
99103
BloomFilter filter = createBloomFilter(config);
100-
HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
104+
HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, config.getOrcCompressionCodec(),
101105
config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter);
102106
return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier);
103107
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import org.apache.hudi.common.bloom.BloomFilter;
2222

2323
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.CellComparator;
2425
import org.apache.hadoop.hbase.HColumnDescriptor;
25-
import org.apache.hadoop.hbase.KeyValue;
2626
import org.apache.hadoop.hbase.io.compress.Compression;
2727
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
2828

2929
public class HoodieHFileConfig {
3030

31-
public static final KeyValue.KVComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
31+
public static final CellComparator HFILE_COMPARATOR = new HoodieHBaseKVComparator();
3232
public static final boolean PREFETCH_ON_OPEN = CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
3333
public static final boolean CACHE_DATA_IN_L1 = HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
3434
// This is private in CacheConfig so have been copied here.
@@ -42,12 +42,12 @@ public class HoodieHFileConfig {
4242
private final boolean dropBehindCacheCompaction;
4343
private final Configuration hadoopConf;
4444
private final BloomFilter bloomFilter;
45-
private final KeyValue.KVComparator hfileComparator;
45+
private final CellComparator hfileComparator;
4646
private final String keyFieldName;
4747

4848
public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
4949
long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
50-
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
50+
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, CellComparator hfileComparator) {
5151
this.hadoopConf = hadoopConf;
5252
this.compressionAlgorithm = compressionAlgorithm;
5353
this.blockSize = blockSize;
@@ -96,7 +96,7 @@ public BloomFilter getBloomFilter() {
9696
return bloomFilter;
9797
}
9898

99-
public KeyValue.KVComparator getHfileComparator() {
99+
public CellComparator getHFileComparator() {
100100
return hfileComparator;
101101
}
102102

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
2626
import org.apache.hudi.common.model.HoodieRecord;
2727
import org.apache.hudi.common.model.HoodieRecordPayload;
28+
import org.apache.hudi.common.util.Option;
29+
import org.apache.hudi.common.util.StringUtils;
2830

2931
import org.apache.avro.Schema;
3032
import org.apache.avro.generic.GenericRecord;
@@ -38,8 +40,6 @@
3840
import org.apache.hadoop.hbase.io.hfile.HFileContext;
3941
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
4042
import org.apache.hadoop.io.Writable;
41-
import org.apache.hudi.common.util.Option;
42-
import org.apache.hudi.common.util.StringUtils;
4343

4444
import java.io.DataInput;
4545
import java.io.DataOutput;
@@ -95,6 +95,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
9595

9696
HFileContext context = new HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
9797
.withCompression(hfileConfig.getCompressionAlgorithm())
98+
.withCellComparator(hfileConfig.getHFileComparator())
9899
.build();
99100

100101
conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
@@ -104,7 +105,6 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
104105
this.writer = HFile.getWriterFactory(conf, cacheConfig)
105106
.withPath(this.fs, this.file)
106107
.withFileContext(context)
107-
.withComparator(hfileConfig.getHfileComparator())
108108
.create();
109109

110110
writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());

0 commit comments

Comments
 (0)