Skip to content

Commit fee80b9

Browse files
authored
[HUDI-4855] Add missing table configs for bootstrap in Deltastreamer (#6694)
1 parent 76f3c6a commit fee80b9

4 files changed

Lines changed: 78 additions & 32 deletions

File tree

docker/demo/sparksql-bootstrap-prep-source.commands

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@
1818
import org.apache.spark.sql.functions.col
1919

2020
val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
21+
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
22+
// in the value. Currently it fails the tests due to slash encoding.
2123
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
2224
System.exit(0)

hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -231,33 +231,33 @@ private void ingestFirstBatchAndHiveSync() throws Exception {
231231
executeSparkSQLCommand(SPARKSQL_BS_PREP_COMMANDS, true);
232232
List<String> bootstrapCmds = CollectionUtils.createImmutableList(
233233
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
234-
+ " --table-type COPY_ON_WRITE "
235-
+ " --run-bootstrap "
236-
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
237-
+ " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
238-
+ " --props /var/demo/config/dfs-source.properties"
239-
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
240-
+ " --initial-checkpoint-provider"
241-
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
242-
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
243-
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
244-
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
245-
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
246-
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
234+
+ " --table-type COPY_ON_WRITE "
235+
+ " --run-bootstrap "
236+
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
237+
+ " --target-base-path " + COW_BOOTSTRAPPED_BASE_PATH + " --target-table " + COW_BOOTSTRAPPED_TABLE_NAME
238+
+ " --props /var/demo/config/dfs-source.properties"
239+
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
240+
+ " --initial-checkpoint-provider"
241+
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
242+
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
243+
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
244+
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
245+
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
246+
+ String.format(HIVE_SYNC_CMD_FMT, "dt", COW_BOOTSTRAPPED_TABLE_NAME),
247247
"spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + HUDI_UTILITIES_BUNDLE
248-
+ " --table-type MERGE_ON_READ "
249-
+ " --run-bootstrap "
250-
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
251-
+ " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
252-
+ " --props /var/demo/config/dfs-source.properties"
253-
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
254-
+ " --initial-checkpoint-provider"
255-
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
256-
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
257-
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
258-
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
259-
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
260-
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
248+
+ " --table-type MERGE_ON_READ "
249+
+ " --run-bootstrap "
250+
+ " --source-class org.apache.hudi.utilities.sources.JsonDFSSource --source-ordering-field ts "
251+
+ " --target-base-path " + MOR_BOOTSTRAPPED_BASE_PATH + " --target-table " + MOR_BOOTSTRAPPED_TABLE_NAME
252+
+ " --props /var/demo/config/dfs-source.properties"
253+
+ " --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
254+
+ " --initial-checkpoint-provider"
255+
+ " org.apache.hudi.utilities.checkpointing.InitialCheckpointFromAnotherHoodieTimelineProvider"
256+
+ " --hoodie-conf hoodie.bootstrap.base.path=" + BOOTSTRAPPED_SRC_PATH
257+
+ " --hoodie-conf hoodie.deltastreamer.checkpoint.provider.path=" + COW_BASE_PATH
258+
+ " --hoodie-conf hoodie.bootstrap.parallelism=2 "
259+
+ " --hoodie-conf hoodie.bootstrap.keygen.class=" + SimpleKeyGenerator.class.getName()
260+
+ String.format(HIVE_SYNC_CMD_FMT, "dt", MOR_BOOTSTRAPPED_TABLE_NAME));
261261
executeCommandStringsInDocker(ADHOC_1_CONTAINER, bootstrapCmds);
262262
}
263263

hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.hudi.client.SparkRDDWriteClient;
2323
import org.apache.hudi.client.common.HoodieSparkEngineContext;
2424
import org.apache.hudi.common.config.TypedProperties;
25+
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
2526
import org.apache.hudi.common.table.HoodieTableConfig;
2627
import org.apache.hudi.common.table.HoodieTableMetaClient;
2728
import org.apache.hudi.common.util.Option;
29+
import org.apache.hudi.common.util.StringUtils;
2830
import org.apache.hudi.common.util.ValidationUtils;
2931
import org.apache.hudi.config.HoodieCompactionConfig;
3032
import org.apache.hudi.config.HoodieIndexConfig;
@@ -33,6 +35,9 @@
3335
import org.apache.hudi.hive.HiveSyncConfig;
3436
import org.apache.hudi.hive.HiveSyncTool;
3537
import org.apache.hudi.index.HoodieIndex;
38+
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
39+
import org.apache.hudi.keygen.SimpleKeyGenerator;
40+
import org.apache.hudi.util.SparkKeyGenUtils;
3641
import org.apache.hudi.utilities.UtilHelpers;
3742
import org.apache.hudi.utilities.schema.SchemaProvider;
3843

@@ -48,8 +53,16 @@
4853
import java.util.HashMap;
4954

5055
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
56+
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT;
57+
import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
58+
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_TIMEZONE;
59+
import static org.apache.hudi.config.HoodieBootstrapConfig.KEYGEN_CLASS_NAME;
60+
import static org.apache.hudi.config.HoodieWriteConfig.PRECOMBINE_FIELD_NAME;
5161
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
5262
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
63+
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
64+
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
65+
import static org.apache.hudi.keygen.constant.KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
5366
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT;
5467
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
5568

@@ -187,16 +200,44 @@ private void initializeTable() throws IOException {
187200
+ ". Cannot bootstrap data on top of an existing table");
188201
}
189202
}
190-
HoodieTableMetaClient.withPropertyBuilder()
203+
204+
HoodieTableMetaClient.PropertyBuilder builder = HoodieTableMetaClient.withPropertyBuilder()
191205
.fromProperties(props)
192206
.setTableType(cfg.tableType)
193207
.setTableName(cfg.targetTableName)
194-
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
208+
.setRecordKeyFields(props.getString(RECORDKEY_FIELD_NAME.key()))
209+
.setPreCombineField(props.getString(
210+
PRECOMBINE_FIELD_NAME.key(), PRECOMBINE_FIELD_NAME.defaultValue()))
211+
.setPopulateMetaFields(props.getBoolean(
212+
POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue()))
213+
.setArchiveLogFolder(props.getString(
214+
ARCHIVELOG_FOLDER.key(), ARCHIVELOG_FOLDER.defaultValue()))
195215
.setPayloadClassName(cfg.payloadClassName)
196216
.setBaseFileFormat(cfg.baseFileFormat)
197217
.setBootstrapIndexClass(cfg.bootstrapIndexClass)
198218
.setBootstrapBasePath(bootstrapBasePath)
199-
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
219+
.setHiveStylePartitioningEnable(props.getBoolean(
220+
HIVE_STYLE_PARTITIONING_ENABLE.key(),
221+
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())
222+
))
223+
.setUrlEncodePartitioning(props.getBoolean(
224+
URL_ENCODE_PARTITIONING.key(),
225+
Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
226+
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(
227+
props.getString(
228+
TIMELINE_TIMEZONE.key(),
229+
String.valueOf(TIMELINE_TIMEZONE.defaultValue()))))
230+
.setPartitionMetafileUseBaseFormat(props.getBoolean(
231+
PARTITION_METAFILE_USE_BASE_FORMAT.key(),
232+
PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()));
233+
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(props);
234+
if (!StringUtils.isNullOrEmpty(partitionColumns)) {
235+
builder.setPartitionFields(partitionColumns).setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()));
236+
} else {
237+
builder.setKeyGeneratorClassProp(props.getString(KEYGEN_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()));
238+
}
239+
240+
builder.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
200241
}
201242

202243
public HoodieWriteConfig getBootstrapConfig() {

hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
import org.apache.hudi.common.testutils.HoodieTestUtils;
4747
import org.apache.hudi.common.util.Option;
4848
import org.apache.hudi.common.util.StringUtils;
49-
import org.apache.hudi.config.HoodieClusteringConfig;
50-
import org.apache.hudi.config.HoodieCleanConfig;
5149
import org.apache.hudi.config.HoodieArchivalConfig;
50+
import org.apache.hudi.config.HoodieCleanConfig;
51+
import org.apache.hudi.config.HoodieClusteringConfig;
5252
import org.apache.hudi.config.HoodieCompactionConfig;
5353
import org.apache.hudi.config.HoodieLockConfig;
5454
import org.apache.hudi.config.HoodieWriteConfig;
@@ -630,11 +630,14 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
630630
Dataset<Row> sourceDf = sqlContext.read()
631631
.format("org.apache.hudi")
632632
.load(tableBasePath);
633-
sourceDf.write().format("parquet").save(bootstrapSourcePath);
633+
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
634+
// in the value. Currently it fails the tests due to slash encoding.
635+
sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);
634636

635637
String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
636638
cfg.runBootstrap = true;
637639
cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath));
640+
cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
638641
cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName()));
639642
cfg.configs.add("hoodie.bootstrap.parallelism=5");
640643
cfg.targetBasePath = newDatasetBasePath;

0 commit comments

Comments
 (0)