Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,10 @@ public MarkerType getMarkersType() {
return MarkerType.valueOf(markerType.toUpperCase());
}

public boolean isHiveStylePartitioningEnabled() {
return getBooleanOrDefault(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE);
}

public int getMarkersTimelineServerBasedBatchNumThreads() {
return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
Expand Down Expand Up @@ -128,7 +129,11 @@ public void write(InternalRow record) throws IOException {
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";
} else if (simpleKeyGen) { // SimpleKeyGen
partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString();
Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
if (writeConfig.isHiveStylePartitioningEnabled()) {
partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
}
} else {
// only BuiltIn key generators are supported if meta fields are disabled.
partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,18 @@ public void tearDown() throws Exception {
}

protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) {
return getWriteConfig(populateMetaFields, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue());
}

protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, String hiveStylePartitioningValue) {
Properties properties = new Properties();
if (!populateMetaFields) {
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName());
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), SparkDatasetTestUtils.RECORD_KEY_FIELD_NAME);
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
}
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), hiveStylePartitioningValue);
return getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

package org.apache.hudi.internal;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkDatasetTestUtils;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -109,6 +112,51 @@ public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) t
}
}

@Test
public void testDataInternalWriterHiveStylePartitioning() throws Exception {
boolean sorted = true;
boolean populateMetaFields = false;
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
for (int i = 0; i < 1; i++) {
String instantTime = "00" + i;
// init writer
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this RANDOM a fixed-seed one?

STRUCT_TYPE, populateMetaFields, sorted);

int size = 10 + RANDOM.nextInt(1000);
// write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
int batches = 3;
Dataset<Row> totalInputRows = null;

for (int j = 0; j < batches; j++) {
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
writeRows(inputRows, writer);
if (totalInputRows == null) {
totalInputRows = inputRows;
} else {
totalInputRows = totalInputRows.union(inputRows);
}
}

BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
Option<List<String>> fileNames = Option.of(new ArrayList<>());

// verify write statuses
assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames);

// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields);

result.collectAsList().forEach(entry -> Assertions.assertTrue(entry.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()
.contains(SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=")));
}
}

/**
* Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected
* to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.
Expand Down