Skip to content

Commit de3d60b

Browse files
nsivabalanfengjian
authored andcommitted
[HUDI-3848] Fixing minor bug in listing based rollback request generation (apache#6244)
1 parent 79cbd98 commit de3d60b

2 files changed

Lines changed: 70 additions & 3 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,11 +242,11 @@ private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollbac
242242
return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
243243
try {
244244
return fs.exists(entry);
245-
} catch (IOException e) {
245+
} catch (Exception e) {
246246
LOG.error("Exists check failed for " + entry.toString(), e);
247247
}
248-
// if IOException is thrown, do not ignore. lets try to add the file of interest to be deleted. we can't miss any files to be rolled back.
249-
return false;
248+
// if any Exception is thrown, do not ignore. let's try to add the file of interest to be deleted. we can't miss any files to be rolled back.
249+
return true;
250250
}).toArray(Path[]::new), pathFilter);
251251
}
252252

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,50 @@
2020

2121
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
2222
import org.apache.hudi.avro.model.HoodieRollbackPlan;
23+
import org.apache.hudi.avro.model.HoodieRollbackRequest;
24+
import org.apache.hudi.client.SparkRDDWriteClient;
25+
import org.apache.hudi.client.WriteStatus;
26+
import org.apache.hudi.client.common.HoodieSparkEngineContext;
2327
import org.apache.hudi.common.HoodieRollbackStat;
2428
import org.apache.hudi.common.model.FileSlice;
2529
import org.apache.hudi.common.model.HoodieFileGroup;
30+
import org.apache.hudi.common.model.HoodieRecord;
31+
import org.apache.hudi.common.table.HoodieTableMetaClient;
2632
import org.apache.hudi.common.table.timeline.HoodieInstant;
2733
import org.apache.hudi.common.table.timeline.HoodieTimeline;
34+
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
2835
import org.apache.hudi.common.testutils.HoodieTestTable;
2936
import org.apache.hudi.config.HoodieWriteConfig;
3037
import org.apache.hudi.table.HoodieTable;
3138
import org.apache.hudi.table.marker.WriteMarkersFactory;
39+
import org.apache.hudi.testutils.Assertions;
3240

41+
import org.apache.hadoop.fs.FileSystem;
42+
import org.apache.spark.api.java.JavaRDD;
3343
import org.junit.jupiter.api.AfterEach;
3444
import org.junit.jupiter.api.BeforeEach;
3545
import org.junit.jupiter.api.Test;
3646
import org.junit.jupiter.params.ParameterizedTest;
3747
import org.junit.jupiter.params.provider.ValueSource;
48+
import org.mockito.Mockito;
49+
import org.mockito.MockitoAnnotations;
3850

3951
import java.io.IOException;
4052
import java.util.ArrayList;
53+
import java.util.Arrays;
4154
import java.util.Collections;
4255
import java.util.List;
4356
import java.util.Map;
4457
import java.util.stream.Collectors;
4558

4659
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
4760
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
61+
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
4862
import static org.junit.jupiter.api.Assertions.assertEquals;
4963
import static org.junit.jupiter.api.Assertions.assertFalse;
5064
import static org.junit.jupiter.api.Assertions.assertTrue;
5165
import static org.junit.jupiter.api.Assertions.fail;
66+
import static org.mockito.ArgumentMatchers.any;
5267

5368
public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackTestBase {
5469
@BeforeEach
@@ -133,6 +148,58 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile()
133148
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
134149
}
135150

151+
@Test
152+
public void testListBasedRollbackStrategy() throws Exception {
153+
//just generate two partitions
154+
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
155+
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(false).build();
156+
// 1. prepare data
157+
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
158+
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
159+
160+
String newCommitTime = "001";
161+
client.startCommitWithTime(newCommitTime);
162+
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 3);
163+
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
164+
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
165+
Assertions.assertNoWriteErrors(statuses.collect());
166+
167+
newCommitTime = "002";
168+
client.startCommitWithTime(newCommitTime);
169+
records = dataGen.generateUpdates(newCommitTime, records);
170+
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime);
171+
Assertions.assertNoWriteErrors(statuses.collect());
172+
173+
context = new HoodieSparkEngineContext(jsc);
174+
metaClient = HoodieTableMetaClient.reload(metaClient);
175+
HoodieTable table = this.getHoodieTable(metaClient, cfg);
176+
HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
177+
String rollbackInstant = "003";
178+
179+
ListingBasedRollbackStrategy rollbackStrategy = new ListingBasedRollbackStrategy(table, context, table.getConfig(), rollbackInstant);
180+
List<HoodieRollbackRequest> rollBackRequests = rollbackStrategy.getRollbackRequests(needRollBackInstant);
181+
rollBackRequests.forEach(entry -> System.out.println(" " + entry.getPartitionPath() + ", " + entry.getFileId() + " "
182+
+ Arrays.toString(entry.getFilesToBeDeleted().toArray())));
183+
184+
HoodieRollbackRequest rollbackRequest = rollBackRequests.stream().filter(entry -> entry.getPartitionPath().equals(DEFAULT_FIRST_PARTITION_PATH)).findFirst().get();
185+
186+
FileSystem fs = Mockito.mock(FileSystem.class);
187+
MockitoAnnotations.initMocks(this);
188+
189+
// mock to throw exception when fs.exists() is invoked
190+
System.out.println("Fs.exists() call for " + rollbackRequest.getFilesToBeDeleted().get(0).toString());
191+
Mockito.when(fs.exists(any()))
192+
.thenThrow(new IOException("Failing exists call for " + rollbackRequest.getFilesToBeDeleted().get(0)));
193+
194+
rollbackStrategy = new ListingBasedRollbackStrategy(table, context, cfg, rollbackInstant);
195+
List<HoodieRollbackRequest> rollBackRequestsUpdated = rollbackStrategy.getRollbackRequests(needRollBackInstant);
196+
rollBackRequestsUpdated.forEach(entry -> System.out.println(" " + entry.getPartitionPath() + ", " + entry.getFileId() + " "
197+
+ Arrays.toString(entry.getFilesToBeDeleted().toArray())));
198+
199+
assertEquals(rollBackRequests, rollBackRequestsUpdated);
200+
}
201+
202+
136203
// Verify that rollback works with replacecommit
137204
@ParameterizedTest
138205
@ValueSource(booleans = {true, false})

0 commit comments

Comments
 (0)