Skip to content

Commit 4af60dc

Browse files
author
Alexey Kudinkin
authored
[HUDI-4465] Optimizing file-listing sequence of Metadata Table (#6016)
Optimizes file-listing sequence of the Metadata Table to make sure it's on par or better than FS-based file-listing Change log: - Cleaned up avoidable instantiations of Hadoop's Path - Replaced new Path w/ createUnsafePath where possible - Cached TimestampFormatter, DateFormatter for timezone - Avoid loading defaults in Hadoop conf when init-ing HFile reader - Avoid re-instantiating BaseTableMetadata twice w/in BaseHoodieTableFileIndex - Avoid looking up FileSystem for every partition when listing partitioned table, instead do it just once
1 parent 13eb892 commit 4af60dc

38 files changed

Lines changed: 451 additions & 233 deletions

File tree

docker/demo/config/dfs-source.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ include=base.properties
1919
# Key fields, for kafka example
2020
hoodie.datasource.write.recordkey.field=key
2121
hoodie.datasource.write.partitionpath.field=date
22+
# NOTE: We have to duplicate configuration since this is being used
23+
# w/ both Spark and DeltaStreamer
24+
hoodie.table.recordkey.fields=key
25+
hoodie.table.partition.fields=date
2226
# Schema provider props (change to absolute path based on your installation)
2327
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
2428
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,21 +1006,21 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst
10061006
// finish off any pending compactions if any from previous attempt.
10071007
writeClient.runAnyPendingCompactions();
10081008

1009-
String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
1009+
String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
10101010
.get().getTimestamp();
10111011
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
10121012
.findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
10131013

10141014
if (!pendingInstants.isEmpty()) {
10151015
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
1016-
pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
1016+
pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
10171017
return;
10181018
}
10191019

10201020
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
10211021
// delta commits synced over will not have an instant time lesser than the last completed instant on the
10221022
// metadata table.
1023-
final String compactionInstantTime = latestDeltacommitTime + "001";
1023+
final String compactionInstantTime = latestDeltaCommitTime + "001";
10241024
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
10251025
writeClient.compact(compactionInstantTime);
10261026
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
7777
protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
7878
protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
7979

80-
8180
protected transient volatile SparkRowConverter rowConverter;
8281
protected transient volatile SparkRowAccessor rowAccessor;
8382

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.avro.generic.GenericRecord;
2222
import org.apache.hudi.common.config.TypedProperties;
23+
import org.apache.hudi.common.util.ValidationUtils;
2324
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
2425
import org.apache.spark.sql.Row;
2526
import org.apache.spark.sql.catalyst.InternalRow;
@@ -46,6 +47,12 @@ public SimpleKeyGenerator(TypedProperties props) {
4647

4748
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
4849
super(props);
50+
// Make sure key-generator is configured properly
51+
ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
52+
"Record key field has to be non-empty!");
53+
ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(),
54+
"Partition path field has to be non-empty!");
55+
4956
this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
5057
this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
5158
this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType
2626

2727
trait SparkParsePartitionUtil extends Serializable {
2828

29-
def parsePartition(
30-
path: Path,
31-
typeInference: Boolean,
32-
basePaths: Set[Path],
33-
userSpecifiedDataTypes: Map[String, DataType],
34-
timeZone: TimeZone): InternalRow
29+
def parsePartition(path: Path,
30+
typeInference: Boolean,
31+
basePaths: Set[Path],
32+
userSpecifiedDataTypes: Map[String, DataType],
33+
timeZone: TimeZone,
34+
validatePartitionValues: Boolean = false): InternalRow
3535
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable {
8585
/**
8686
* Create the SparkParsePartitionUtil.
8787
*/
88-
def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil
88+
def getSparkParsePartitionUtil: SparkParsePartitionUtil
8989

9090
/**
9191
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private void setUp(boolean populateMetaFields, boolean partitioned) throws Excep
9797
initTestDataGenerator(new String[] {""});
9898
}
9999
initFileSystem();
100-
Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
100+
Properties props = getPropertiesForKeyGen(populateMetaFields);
101101
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
102102
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
103103
config = getConfigBuilder()

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
3737
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
3838
import org.apache.hudi.common.testutils.Transformations;
39+
import org.apache.hudi.common.util.CollectionUtils;
3940
import org.apache.hudi.common.util.Option;
4041
import org.apache.hudi.config.HoodieClusteringConfig;
4142
import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,7 +62,6 @@
6162
import org.apache.spark.sql.Dataset;
6263
import org.apache.spark.sql.Row;
6364
import org.apache.spark.storage.StorageLevel;
64-
import org.junit.jupiter.api.BeforeEach;
6565
import org.junit.jupiter.api.Test;
6666
import org.junit.jupiter.params.ParameterizedTest;
6767
import org.junit.jupiter.params.provider.ValueSource;
@@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
8787
private HoodieTableMetaClient metaClient;
8888
private HoodieTestDataGenerator dataGen;
8989

90-
@BeforeEach
91-
void setUp() throws IOException {
92-
Properties properties = new Properties();
90+
void setUp(Properties props) throws IOException {
91+
Properties properties = CollectionUtils.copy(props);
9392
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
9493
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
9594
dataGen = new HoodieTestDataGenerator();
@@ -99,6 +98,9 @@ void setUp() throws IOException {
9998
@Test
10099
public void testMetadataAggregateFromWriteStatus() throws Exception {
101100
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
101+
102+
setUp(cfg.getProps());
103+
102104
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
103105

104106
String newCommitTime = "001";
@@ -125,6 +127,9 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
125127
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
126128
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
127129
HoodieWriteConfig cfg = cfgBuilder.build();
130+
131+
setUp(cfg.getProps());
132+
128133
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
129134

130135
/**
@@ -213,6 +218,8 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws
213218
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
214219
HoodieWriteConfig config = cfgBuilder.build();
215220

221+
setUp(config.getProps());
222+
216223
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
217224
String newCommitTime = "100";
218225
writeClient.startCommitWithTime(newCommitTime);
@@ -302,6 +309,8 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep
302309
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
303310
.withAutoCommit(false).build();
304311

312+
setUp(cfg.getProps());
313+
305314
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
306315
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
307316

@@ -381,6 +390,9 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep
381390
@Test
382391
public void testRollingStatsWithSmallFileHandling() throws Exception {
383392
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
393+
394+
setUp(cfg.getProps());
395+
384396
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
385397
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
386398
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -497,6 +509,9 @@ public void testRollingStatsWithSmallFileHandling() throws Exception {
497509
@Test
498510
public void testHandleUpdateWithMultiplePartitions() throws Exception {
499511
HoodieWriteConfig cfg = getConfig(true);
512+
513+
setUp(cfg.getProps());
514+
500515
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
501516

502517
/**
@@ -578,6 +593,9 @@ public void testReleaseResource() throws Exception {
578593
HoodieWriteConfig.Builder builder = getConfigBuilder(true);
579594
builder.withReleaseResourceEnabled(true);
580595
builder.withAutoCommit(false);
596+
597+
setUp(builder.build().getProps());
598+
581599
/**
582600
* Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
583601
*/

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.nio.file.Paths;
5555
import java.util.Arrays;
5656
import java.util.List;
57+
import java.util.Properties;
5758
import java.util.stream.Collectors;
5859
import java.util.stream.Stream;
5960

@@ -98,8 +99,13 @@ public void testWriteDuringCompaction() throws IOException {
9899
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
99100
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
100101
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
101-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
102-
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
102+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
103+
.build();
104+
105+
Properties props = getPropertiesForKeyGen(true);
106+
props.putAll(config.getProps());
107+
108+
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
103109
client = getHoodieWriteClient(config);
104110

105111
// write data and commit
@@ -138,8 +144,13 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en
138144
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
139145
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
140146
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
141-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
142-
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
147+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
148+
.build();
149+
150+
Properties props = getPropertiesForKeyGen(true);
151+
props.putAll(config.getProps());
152+
153+
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
143154
client = getHoodieWriteClient(config);
144155

145156
final List<HoodieRecord> records = dataGen.generateInserts("001", 100);

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void setUp() {
8282
public void testIncrementalReadsWithCompaction() throws Exception {
8383
final String partitionPath = "2020/02/20"; // use only one partition for this test
8484
final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
85-
Properties props = new Properties();
85+
Properties props = getPropertiesForKeyGen(true);
8686
props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
8787
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
8888
HoodieWriteConfig cfg = getConfigBuilder(true).build();

0 commit comments

Comments
 (0)