Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -44,9 +45,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;

public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R>, Closeable {
Expand Down Expand Up @@ -155,11 +153,11 @@ public void close() throws IOException {
final BloomFilter bloomFilter = orcConfig.getBloomFilter();
writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
if (minRecordKey != null && maxRecordKey != null) {
writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
}
}
writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
Expand All @@ -37,8 +38,6 @@
import java.util.function.Supplier;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -78,8 +77,8 @@ protected HoodieFileReader<GenericRecord> createReader(
protected void verifyMetadata(Configuration conf) throws IOException {
Reader orcReader = OrcFile.createReader(getFilePath(), OrcFile.readerOptions(conf));
assertEquals(4, orcReader.getMetadataKeys().size());
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,32 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

/**
* Hoodie Write Support for directly writing {@link RowData} to Parquet.
*/
public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport {

private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private String minRecordKey;
private String maxRecordKey;
private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;

public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) {
super(rowType);
this.hadoopConf = new Configuration(conf);
this.bloomFilter = bloomFilter;
this.bloomFilterWriteSupportOpt = Option.ofNullable(bloomFilter)
.map(HoodieBloomFilterRowDataWriteSupport::new);
}

public Configuration getHadoopConf() {
Expand All @@ -55,32 +52,26 @@ public Configuration getHadoopConf() {

@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
HashMap<String, String> extraMetaData = new HashMap<>();
if (bloomFilter != null) {
extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (minRecordKey != null && maxRecordKey != null) {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(extraMetaData);
Map<String, String> extraMetadata =
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap());

return new WriteSupport.FinalizedWriteContext(extraMetadata);
}

public void add(String recordKey) {
this.bloomFilter.add(recordKey);
if (minRecordKey != null) {
minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
} else {
minRecordKey = recordKey;
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}

private static class HoodieBloomFilterRowDataWriteSupport extends HoodieBloomFilterWriteSupport<String> {
public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
}

if (maxRecordKey != null) {
maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
} else {
maxRecordKey = recordKey;
@Override
protected byte[] getUTF8Bytes(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,35 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import java.util.Collections;
import java.util.Map;

/**
* Hoodie Write Support for directly writing Row to Parquet.
*/
public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {

private final Configuration hadoopConf;
private final BloomFilter bloomFilter;

private UTF8String minRecordKey;
private UTF8String maxRecordKey;
private final Option<HoodieBloomFilterWriteSupport<UTF8String>> bloomFilterWriteSupportOpt;

public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilterOpt.orElse(null);

this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}

public Configuration getHadoopConf() {
Expand All @@ -62,32 +56,35 @@ public Configuration getHadoopConf() {

@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
HashMap<String, String> extraMetaData = new HashMap<>();
if (bloomFilter != null) {
extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (minRecordKey != null && maxRecordKey != null) {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(extraMetaData);
Map<String, String> extraMetadata =
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap());

return new WriteSupport.FinalizedWriteContext(extraMetadata);
}

public void add(UTF8String recordKey) {
this.bloomFilter.add(recordKey.getBytes());
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}

if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
private static class HoodieBloomFilterRowWriteSupport extends HoodieBloomFilterWriteSupport<UTF8String> {
public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
}

@Override
protected byte[] getUTF8Bytes(UTF8String key) {
return key.getBytes();
}

@Override
protected UTF8String dereference(UTF8String key) {
// NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in
// cases when [[UTF8String]] is pointing into a buffer storing the whole containing record,
// and simply do a pass over when it holds a (immutable) buffer holding just the string
minRecordKey = recordKey.clone();
}

if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
maxRecordKey = recordKey.clone();
return key.clone();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -34,55 +36,45 @@
*/
public class HoodieAvroWriteSupport extends AvroWriteSupport {

private Option<BloomFilter> bloomFilterOpt;
private String minRecordKey;
private String maxRecordKey;
private Map<String, String> footerMetadata = new HashMap<>();
private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;
private final Map<String, String> footerMetadata = new HashMap<>();

public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter";
public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";

public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option<BloomFilter> bloomFilterOpt) {
super(schema, avroSchema, ConvertingGenericData.INSTANCE);
this.bloomFilterOpt = bloomFilterOpt;
this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
}

@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
if (bloomFilterOpt.isPresent()) {
footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
if (minRecordKey != null && maxRecordKey != null) {
footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
}
if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(footerMetadata);
Map<String, String> extraMetadata =
CollectionUtils.combine(footerMetadata,
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap())
);

return new WriteSupport.FinalizedWriteContext(extraMetadata);
}

public void add(String recordKey) {
if (bloomFilterOpt.isPresent()) {
this.bloomFilterOpt.get().add(recordKey);
if (minRecordKey != null) {
minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
} else {
minRecordKey = recordKey;
}

if (maxRecordKey != null) {
maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
} else {
maxRecordKey = recordKey;
}
}
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}

public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}

private static class HoodieBloomFilterAvroWriteSupport extends HoodieBloomFilterWriteSupport<String> {
public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
}

@Override
protected byte[] getUTF8Bytes(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
}
}
Loading