Skip to content

Commit 916f12b

Browse files
authored
[HUDI-2433] Refactor rollback actions in hudi-client module (#3664)
1 parent 86a7351 commit 916f12b

34 files changed

Lines changed: 512 additions & 1462 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseCopyOnWriteRollbackActionExecutor.java renamed to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,39 @@
3333
import java.util.ArrayList;
3434
import java.util.List;
3535

36-
public abstract class BaseCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
36+
public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseRollbackActionExecutor<T, I, K, O> {
3737

38-
private static final Logger LOG = LogManager.getLogger(BaseCopyOnWriteRollbackActionExecutor.class);
38+
private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class);
3939

40-
public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
41-
HoodieWriteConfig config,
42-
HoodieTable<T, I, K, O> table,
43-
String instantTime,
44-
HoodieInstant commitInstant,
45-
boolean deleteInstants) {
40+
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
41+
HoodieWriteConfig config,
42+
HoodieTable<T, I, K, O> table,
43+
String instantTime,
44+
HoodieInstant commitInstant,
45+
boolean deleteInstants) {
4646
super(context, config, table, instantTime, commitInstant, deleteInstants);
4747
}
4848

49-
public BaseCopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
50-
HoodieWriteConfig config,
51-
HoodieTable<T, I, K, O> table,
52-
String instantTime,
53-
HoodieInstant commitInstant,
54-
boolean deleteInstants,
55-
boolean skipTimelinePublish,
56-
boolean useMarkerBasedStrategy) {
49+
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
50+
HoodieWriteConfig config,
51+
HoodieTable<T, I, K, O> table,
52+
String instantTime,
53+
HoodieInstant commitInstant,
54+
boolean deleteInstants,
55+
boolean skipTimelinePublish,
56+
boolean useMarkerBasedStrategy) {
5757
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
5858
}
5959

60+
@Override
61+
protected RollbackStrategy getRollbackStrategy() {
62+
if (useMarkerBasedStrategy) {
63+
return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
64+
} else {
65+
return this::executeRollbackUsingFileListing;
66+
}
67+
}
68+
6069
@Override
6170
protected List<HoodieRollbackStat> executeRollback() {
6271
HoodieTimer rollbackTimer = new HoodieTimer();
@@ -88,4 +97,11 @@ protected List<HoodieRollbackStat> executeRollback() {
8897
LOG.info("Time(in ms) taken to finish rollback " + rollbackTimer.endTimer());
8998
return stats;
9099
}
100+
101+
@Override
102+
protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
103+
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
104+
context, table.getMetaClient().getBasePath(), config);
105+
return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
106+
}
91107
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.action.rollback;
21+
22+
import org.apache.hudi.common.HoodieRollbackStat;
23+
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.fs.FSUtils;
25+
import org.apache.hudi.common.model.HoodieFileFormat;
26+
import org.apache.hudi.common.model.HoodieLogFile;
27+
import org.apache.hudi.common.table.HoodieTableMetaClient;
28+
import org.apache.hudi.common.table.log.HoodieLogFormat;
29+
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
30+
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
31+
import org.apache.hudi.common.table.timeline.HoodieInstant;
32+
import org.apache.hudi.common.util.collection.ImmutablePair;
33+
import org.apache.hudi.common.util.collection.Pair;
34+
import org.apache.hudi.config.HoodieWriteConfig;
35+
import org.apache.hudi.exception.HoodieIOException;
36+
import org.apache.hudi.exception.HoodieRollbackException;
37+
38+
import org.apache.hadoop.fs.FileStatus;
39+
import org.apache.hadoop.fs.FileSystem;
40+
import org.apache.hadoop.fs.PathFilter;
41+
import org.apache.log4j.LogManager;
42+
import org.apache.log4j.Logger;
43+
44+
import java.io.IOException;
45+
import java.io.Serializable;
46+
import java.util.Collections;
47+
import java.util.HashMap;
48+
import java.util.List;
49+
import java.util.Map;
50+
import java.util.Objects;
51+
import java.util.stream.Collectors;
52+
53+
/**
54+
* Performs Rollback of Hoodie Tables.
55+
*/
56+
public class ListingBasedRollbackHelper implements Serializable {
57+
private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
58+
59+
private final HoodieTableMetaClient metaClient;
60+
private final HoodieWriteConfig config;
61+
62+
public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
63+
this.metaClient = metaClient;
64+
this.config = config;
65+
}
66+
67+
/**
68+
* Performs all rollback actions that we have collected in parallel.
69+
*/
70+
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
71+
List<ListingBasedRollbackRequest> rollbackRequests) {
72+
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
73+
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
74+
return context.mapToPairAndReduceByKey(rollbackRequests,
75+
rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
76+
RollbackUtils::mergeRollbackStat,
77+
parallelism);
78+
}
79+
80+
/**
81+
* Collect all file info that needs to be rollbacked.
82+
*/
83+
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
84+
List<ListingBasedRollbackRequest> rollbackRequests) {
85+
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
86+
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
87+
return context.mapToPairAndReduceByKey(rollbackRequests,
88+
rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
89+
RollbackUtils::mergeRollbackStat,
90+
parallelism);
91+
}
92+
93+
/**
94+
* May be delete interested files and collect stats or collect stats only.
95+
*
96+
* @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
97+
* @param doDelete {@code true} if deletion has to be done.
98+
* {@code false} if only stats are to be collected w/o performing any deletes.
99+
* @return stats collected with or w/o actual deletions.
100+
*/
101+
private Pair<String, HoodieRollbackStat> maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
102+
HoodieInstant instantToRollback,
103+
boolean doDelete) throws IOException {
104+
switch (rollbackRequest.getType()) {
105+
case DELETE_DATA_FILES_ONLY: {
106+
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
107+
rollbackRequest.getPartitionPath(), doDelete);
108+
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
109+
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
110+
.withDeletedFileResults(filesToDeletedStatus).build());
111+
}
112+
case DELETE_DATA_AND_LOG_FILES: {
113+
final Map<FileStatus, Boolean> filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
114+
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
115+
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
116+
.withDeletedFileResults(filesToDeletedStatus).build());
117+
}
118+
case APPEND_ROLLBACK_BLOCK: {
119+
String fileId = rollbackRequest.getFileId().get();
120+
String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
121+
122+
// collect all log files that is supposed to be deleted with this rollback
123+
Map<FileStatus, Long> writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
124+
FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
125+
fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
126+
.collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
127+
128+
HoodieLogFormat.Writer writer = null;
129+
try {
130+
writer = HoodieLogFormat.newWriterBuilder()
131+
.onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
132+
.withFileId(fileId)
133+
.overBaseCommit(latestBaseInstant)
134+
.withFs(metaClient.getFs())
135+
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
136+
137+
// generate metadata
138+
if (doDelete) {
139+
Map<HeaderMetadataType, String> header = generateHeader(instantToRollback.getTimestamp());
140+
// if update belongs to an existing log file
141+
writer.appendBlock(new HoodieCommandBlock(header));
142+
}
143+
} catch (IOException | InterruptedException io) {
144+
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
145+
} finally {
146+
try {
147+
if (writer != null) {
148+
writer.close();
149+
}
150+
} catch (IOException io) {
151+
throw new HoodieIOException("Error appending rollback block..", io);
152+
}
153+
}
154+
155+
// This step is intentionally done after writer is closed. Guarantees that
156+
// getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
157+
// cloud-storage : HUDI-168
158+
Map<FileStatus, Long> filesToNumBlocksRollback = Collections.singletonMap(
159+
metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
160+
1L
161+
);
162+
163+
return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
164+
HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
165+
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
166+
.withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
167+
}
168+
default:
169+
throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
170+
}
171+
}
172+
173+
/**
174+
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
175+
*/
176+
private Map<FileStatus, Boolean> deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
177+
String commit, String partitionPath, boolean doDelete) throws IOException {
178+
LOG.info("Cleaning path " + partitionPath);
179+
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
180+
SerializablePathFilter filter = (path) -> {
181+
if (path.toString().endsWith(basefileExtension)) {
182+
String fileCommitTime = FSUtils.getCommitTime(path.getName());
183+
return commit.equals(fileCommitTime);
184+
} else if (FSUtils.isLogFile(path)) {
185+
// Since the baseCommitTime is the only commit for new log files, it's okay here
186+
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
187+
return commit.equals(fileCommitTime);
188+
}
189+
return false;
190+
};
191+
192+
final Map<FileStatus, Boolean> results = new HashMap<>();
193+
FileSystem fs = metaClient.getFs();
194+
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
195+
for (FileStatus file : toBeDeleted) {
196+
if (doDelete) {
197+
boolean success = fs.delete(file.getPath(), false);
198+
results.put(file, success);
199+
LOG.info("Delete file " + file.getPath() + "\t" + success);
200+
} else {
201+
results.put(file, true);
202+
}
203+
}
204+
return results;
205+
}
206+
207+
/**
208+
* Common method used for cleaning out base files under a partition path during rollback of a set of commits.
209+
*/
210+
private Map<FileStatus, Boolean> deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
211+
String commit, String partitionPath, boolean doDelete) throws IOException {
212+
final Map<FileStatus, Boolean> results = new HashMap<>();
213+
LOG.info("Cleaning path " + partitionPath);
214+
FileSystem fs = metaClient.getFs();
215+
String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
216+
PathFilter filter = (path) -> {
217+
if (path.toString().contains(basefileExtension)) {
218+
String fileCommitTime = FSUtils.getCommitTime(path.getName());
219+
return commit.equals(fileCommitTime);
220+
}
221+
return false;
222+
};
223+
FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
224+
for (FileStatus file : toBeDeleted) {
225+
if (doDelete) {
226+
boolean success = fs.delete(file.getPath(), false);
227+
results.put(file, success);
228+
LOG.info("Delete file " + file.getPath() + "\t" + success);
229+
} else {
230+
results.put(file, true);
231+
}
232+
}
233+
return results;
234+
}
235+
236+
private Map<HeaderMetadataType, String> generateHeader(String commit) {
237+
// generate metadata
238+
Map<HeaderMetadataType, String> header = new HashMap<>(3);
239+
header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
240+
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
241+
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
242+
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
243+
return header;
244+
}
245+
246+
public interface SerializablePathFilter extends PathFilter, Serializable {
247+
248+
}
249+
}

0 commit comments

Comments
 (0)