Skip to content

Commit c7c9984

Browse files
committed
[HUDI-4880] Fix corrupted parquet file issue left over by cancelled compaction task
1 parent 30f489e commit c7c9984

3 files changed

Lines changed: 12 additions & 5 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,13 @@ protected Path makeNewFilePath(String partitionPath, String fileName) {
181181
}
182182

183183
/**
184-
* Creates an empty marker file corresponding to storage writer path.
184+
* Creates an empty marker file corresponding to storage writer path. Do nothing if marker file already exists.
185185
*
186186
* @param partitionPath Partition path
187187
*/
188188
protected void createMarkerFile(String partitionPath, String dataFileName) {
189189
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
190-
.create(partitionPath, dataFileName, getIOType());
190+
.createIfNotExists(partitionPath, dataFileName, getIOType());
191191
}
192192

193193
public Schema getWriterSchemaWithMetaFields() {

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,14 @@ private HoodieWriteConfig reloadWriteConfig() throws Exception {
130130
public void setExecutor(NonThrownExecutor executor) {
131131
this.executor = executor;
132132
}
133+
134+
@Override
135+
public void close() throws Exception {
136+
if (this.asyncCompaction) {
137+
this.executor.close();
138+
}
139+
if (null != this.writeClient) {
140+
this.writeClient.close();
141+
}
142+
}
133143
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,6 @@ private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) th
129129
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
130130
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
131131
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
132-
WriteMarkersFactory
133-
.get(table.getConfig().getMarkersType(), table, compactionInstantTime)
134-
.deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
135132
for (CompactionOperation operation : operations) {
136133
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
137134
}

0 commit comments

Comments
 (0)