Skip to content

Commit e99a6b0

Browse files
zhangyue19921010yuezhang
andauthored
[HUDI-2073] Fix the bug of hoodieClusteringJob never quit (#3157)
Co-authored-by: yuezhang <[email protected]>
1 parent f73bedd commit e99a6b0

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,11 @@ private String getSchemaFromLatestInstant() throws Exception {
149149

150150
private int doCluster(JavaSparkContext jsc) throws Exception {
151151
String schemaStr = getSchemaFromLatestInstant();
152-
SparkRDDWriteClient<HoodieRecordPayload> client =
153-
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
154-
JavaRDD<WriteStatus> writeResponse =
155-
client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
156-
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
152+
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
153+
JavaRDD<WriteStatus> writeResponse =
154+
client.cluster(cfg.clusteringInstantTime, true).getWriteStatuses();
155+
return UtilHelpers.handleErrors(jsc, cfg.clusteringInstantTime, writeResponse);
156+
}
157157
}
158158

159159
@TestOnly
@@ -163,12 +163,12 @@ public Option<String> doSchedule() throws Exception {
163163

164164
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
165165
String schemaStr = getSchemaFromLatestInstant();
166-
SparkRDDWriteClient<HoodieRecordPayload> client =
167-
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
168-
if (cfg.clusteringInstantTime != null) {
169-
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
170-
return Option.of(cfg.clusteringInstantTime);
166+
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
167+
if (cfg.clusteringInstantTime != null) {
168+
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
169+
return Option.of(cfg.clusteringInstantTime);
170+
}
171+
return client.scheduleClustering(Option.empty());
171172
}
172-
return client.scheduleClustering(Option.empty());
173173
}
174174
}

0 commit comments

Comments
 (0)