Skip to content

Commit de00fb3

Browse files
committed
[HUDI-3085] review fix
1 parent a4f82cf commit de00fb3

4 files changed

Lines changed: 7 additions & 11 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ default String getFileIdPfx(int partitionId) {
5757

5858
/**
5959
* Return write handle factory for the given partition.
60-
* By default, return CreateHandleFactory which will always write to a new file group
6160
* @param partitionId data partition
6261
* @return
6362
*/

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public List<WriteStatus> performClusteringWithRecordList(
6565
.withEngineType(EngineType.JAVA)
6666
.withProps(getWriteConfig().getProps()).build();
6767
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
68-
6968
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
7069
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
7170
}

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,11 @@ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord
7777
config.shouldAllowMultiWriteOnSameInstant());
7878
}
7979

80-
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
81-
? userDefinedBulkInsertPartitioner.get()
82-
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
80+
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
8381

8482
// write new files
85-
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
83+
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
84+
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
8685
//update index
8786
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
8887
return result;
@@ -118,6 +117,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
118117
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
119118
config, instantTime, table,
120119
fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
120+
// Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine.
121121
(WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll);
122122

123123
return writeStatuses;

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,11 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<
7474
executor.getCommitActionType(), instantTime), Option.empty(),
7575
config.shouldAllowMultiWriteOnSameInstant());
7676

77-
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
78-
? userDefinedBulkInsertPartitioner.get()
79-
: BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
77+
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
8078

8179
// write new files
82-
HoodieData<WriteStatus> writeStatuses =
83-
bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
80+
HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
81+
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
8482
//update index
8583
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
8684
return result;

0 commit comments

Comments
 (0)