Skip to content

Commit e038901

Browse files
authored
[HUDI-6226] Support parquet native bloom filters (#8716)
* Make bloom working with the OP example * Adapt test for both spark2 and 3
1 parent f39327c commit e038901

File tree

2 files changed

+196
-20
lines changed

2 files changed

+196
-20
lines changed

hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,98 @@
1818

1919
package org.apache.hudi.io.storage;
2020

21+
import org.apache.hadoop.conf.Configuration;
2122
import org.apache.hadoop.fs.Path;
23+
2224
import org.apache.hudi.common.fs.FSUtils;
2325
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
2426
import org.apache.hudi.common.util.VisibleForTesting;
27+
28+
import org.apache.parquet.column.ParquetProperties;
2529
import org.apache.parquet.hadoop.ParquetFileWriter;
2630
import org.apache.parquet.hadoop.ParquetWriter;
2731
import org.apache.parquet.hadoop.api.WriteSupport;
2832

33+
import java.io.Closeable;
2934
import java.io.IOException;
35+
import java.lang.reflect.InvocationTargetException;
3036
import java.util.concurrent.atomic.AtomicLong;
3137

32-
import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
33-
import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
38+
import java.lang.reflect.Method;
3439

3540
/**
3641
* Base class of Hudi's custom {@link ParquetWriter} implementations
3742
*
3843
* @param <R> target type of the object being written into Parquet files (for ex,
39-
* {@code IndexedRecord}, {@code InternalRow})
44+
* {@code IndexedRecord}, {@code InternalRow})
4045
*/
41-
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
46+
public abstract class HoodieBaseParquetWriter<R> implements Closeable {
4247

4348
private final AtomicLong writtenRecordCount = new AtomicLong(0);
4449
private final long maxFileSize;
4550
private long recordCountForNextSizeCheck;
51+
private final ParquetWriter parquetWriter;
52+
public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
53+
public static final String BLOOM_FILTER_ENABLED = "parquet.bloom.filter.enabled";
4654

4755
public HoodieBaseParquetWriter(Path file,
4856
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
49-
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
50-
ParquetFileWriter.Mode.CREATE,
51-
parquetConfig.getWriteSupport(),
52-
parquetConfig.getCompressionCodecName(),
53-
parquetConfig.getBlockSize(),
54-
parquetConfig.getPageSize(),
55-
parquetConfig.getPageSize(),
56-
parquetConfig.dictionaryEnabled(),
57-
DEFAULT_IS_VALIDATING_ENABLED,
58-
DEFAULT_WRITER_VERSION,
59-
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
57+
ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) {
58+
@Override
59+
protected ParquetWriter.Builder self() {
60+
return this;
61+
}
62+
63+
@Override
64+
protected WriteSupport getWriteSupport(Configuration conf) {
65+
return parquetConfig.getWriteSupport();
66+
}
67+
};
68+
69+
parquetWriterbuilder.withWriteMode(ParquetFileWriter.Mode.CREATE);
70+
parquetWriterbuilder.withCompressionCodec(parquetConfig.getCompressionCodecName());
71+
parquetWriterbuilder.withRowGroupSize(parquetConfig.getBlockSize());
72+
parquetWriterbuilder.withPageSize(parquetConfig.getPageSize());
73+
parquetWriterbuilder.withDictionaryPageSize(parquetConfig.getPageSize());
74+
parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled());
75+
parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED);
76+
parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION);
77+
parquetWriterbuilder.withConf(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
78+
handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf());
6079

80+
parquetWriter = parquetWriterbuilder.build();
6181
// We cannot accurately measure the snappy compressed output file size. We are choosing a
6282
// conservative 10%
6383
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
6484
// stream and the actual file size reported by HDFS
6585
this.maxFileSize = parquetConfig.getMaxFileSize()
6686
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
67-
this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
87+
this.recordCountForNextSizeCheck = ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
88+
}
89+
90+
protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) {
91+
// inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464
92+
hadoopConf.forEach(conf -> {
93+
String key = conf.getKey();
94+
if (key.startsWith(BLOOM_FILTER_ENABLED)) {
95+
String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length());
96+
try {
97+
Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterEnabled");
98+
method.invoke(column, Boolean.valueOf(conf.getValue()));
99+
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
100+
// skip
101+
}
102+
}
103+
if (key.startsWith(BLOOM_FILTER_EXPECTED_NDV)) {
104+
String column = key.substring(BLOOM_FILTER_EXPECTED_NDV.length() + 1, key.length());
105+
try {
106+
Method method = parquetWriterbuilder.getClass().getDeclaredMethod("withBloomFilterNDV");
107+
method.invoke(column, Long.valueOf(conf.getValue(), -1));
108+
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
109+
// skip
110+
}
111+
}
112+
});
68113
}
69114

70115
public boolean canWrite() {
@@ -82,15 +127,18 @@ public boolean canWrite() {
82127
}
83128
recordCountForNextSizeCheck = writtenCount + Math.min(
84129
// Do check it in the halfway
85-
Math.max(DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
86-
DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
130+
Math.max(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, (maxFileSize / avgRecordSize - writtenCount) / 2),
131+
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK);
87132
}
88133
return true;
89134
}
90135

91-
@Override
136+
public long getDataSize() {
137+
return this.parquetWriter.getDataSize();
138+
}
139+
92140
public void write(R object) throws IOException {
93-
super.write(object);
141+
this.parquetWriter.write(object);
94142
writtenRecordCount.incrementAndGet();
95143
}
96144

@@ -102,4 +150,9 @@ protected long getWrittenRecordCount() {
102150
protected long getRecordCountForNextSizeCheck() {
103151
return recordCountForNextSizeCheck;
104152
}
153+
154+
@Override
155+
public void close() throws IOException {
156+
this.parquetWriter.close();
157+
}
105158
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi
19+
20+
import org.apache.spark.sql._
21+
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
22+
import org.apache.spark.util.{AccumulatorV2}
23+
import org.apache.spark.SparkContext
24+
25+
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
26+
import org.apache.hudi.DataSourceWriteOptions
27+
import org.apache.hudi.config.HoodieWriteConfig
28+
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
29+
30+
31+
import org.junit.jupiter.api.Assertions.{assertEquals}
32+
import org.junit.jupiter.api.{BeforeEach}
33+
import org.junit.jupiter.params.ParameterizedTest
34+
import org.junit.jupiter.params.provider.{EnumSource}
35+
36+
class TestHoodieParquetBloomFilter {
37+
38+
var spark: SparkSession = _
39+
var sqlContext: SQLContext = _
40+
var sc: SparkContext = _
41+
42+
def initSparkContext(): Unit = {
43+
val sparkConf = getSparkConfForTest(getClass.getSimpleName)
44+
45+
spark = SparkSession.builder()
46+
.withExtensions(new HoodieSparkSessionExtension)
47+
.config(sparkConf)
48+
.getOrCreate()
49+
50+
sc = spark.sparkContext
51+
sc.setLogLevel("ERROR")
52+
sqlContext = spark.sqlContext
53+
}
54+
55+
@BeforeEach
56+
def setUp() {
57+
initSparkContext()
58+
}
59+
60+
@ParameterizedTest
61+
@EnumSource(value = classOf[WriteOperationType], names = Array("BULK_INSERT", "INSERT", "UPSERT", "INSERT_OVERWRITE"))
62+
def testBloomFilter(operation: WriteOperationType): Unit = {
63+
// setup hadoop conf with bloom col enabled
64+
spark.sparkContext.hadoopConfiguration.set("parquet.bloom.filter.enabled#bloom_col", "true")
65+
66+
val basePath = java.nio.file.Files.createTempDirectory("hoodie_bloom_source_path").toAbsolutePath.toString
67+
val opts = Map(
68+
HoodieWriteConfig.TBL_NAME.key -> "hoodie_bloom",
69+
DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.toString,
70+
DataSourceWriteOptions.OPERATION.key -> operation.toString,
71+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
72+
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition"
73+
)
74+
val inputDF = spark.sql(
75+
"""select '0' as _row_key, '1' as bloom_col, '2' as partition, '3' as ts
76+
|union
77+
|select '1', '2', '3', '4'
78+
|""".stripMargin)
79+
inputDF.write.format("hudi")
80+
.options(opts)
81+
.mode(SaveMode.Overwrite)
82+
.save(basePath)
83+
84+
val accu = new NumRowGroupsAcc
85+
spark.sparkContext.register(accu)
86+
87+
// this one shall skip partition scanning thanks to bloom when spark >=3
88+
spark.read.format("hudi").load(basePath).filter("bloom_col = '3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
89+
assertEquals(if (currentSparkSupportParquetBloom()) 0 else 1, accu.value)
90+
91+
// this one will trigger one partition scan
92+
spark.read.format("hudi").load(basePath).filter("bloom_col = '2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
93+
assertEquals(1, accu.value)
94+
}
95+
96+
def currentSparkSupportParquetBloom(): Boolean = {
97+
Integer.valueOf(spark.version.charAt(0)) >= 3
98+
}
99+
}
100+
101+
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
102+
private var _sum = 0
103+
104+
override def isZero: Boolean = _sum == 0
105+
106+
override def copy(): AccumulatorV2[Integer, Integer] = {
107+
val acc = new NumRowGroupsAcc()
108+
acc._sum = _sum
109+
acc
110+
}
111+
112+
override def reset(): Unit = _sum = 0
113+
114+
override def add(v: Integer): Unit = _sum += v
115+
116+
override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match {
117+
case a: NumRowGroupsAcc => _sum += a._sum
118+
case _ => throw new UnsupportedOperationException(
119+
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
120+
}
121+
122+
override def value: Integer = _sum
123+
}

0 commit comments

Comments
 (0)