Skip to content

Commit 8e9e250

Browse files
committed
[HUDI-4078][HUDI-FLINK]BootstrapOperator contains the pending compaction files
1 parent cd51f10 commit 8e9e250

8 files changed

Lines changed: 263 additions & 22 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.util.Functions.Function1;
3030
import org.apache.hudi.common.util.Functions.Function2;
3131
import org.apache.hudi.common.util.Functions.Function3;
32+
import org.apache.hudi.common.util.Functions.Function4;
3233
import org.apache.hudi.common.util.Option;
3334
import org.apache.hudi.common.util.collection.Pair;
3435

@@ -106,7 +107,7 @@ private <T1, T2, R> R execute(T1 val, T2 val2, Function2<T1, T2, R> preferredFun
106107
}
107108

108109
private <T1, T2, T3, R> R execute(T1 val, T2 val2, T3 val3, Function3<T1, T2, T3, R> preferredFunction,
109-
Function3<T1, T2, T3, R> secondaryFunction) {
110+
Function3<T1, T2, T3, R> secondaryFunction) {
110111
if (errorOnPreferredView) {
111112
LOG.warn("Routing request to secondary file-system view");
112113
return secondaryFunction.apply(val, val2, val3);
@@ -121,6 +122,22 @@ private <T1, T2, T3, R> R execute(T1 val, T2 val2, T3 val3, Function3<T1, T2, T3
121122
}
122123
}
123124

125+
private <T1, T2, T3, T4, R> R execute(T1 val, T2 val2, T3 val3, T4 val4, Function4<T1, T2, T3, T4, R> preferredFunction,
126+
Function4<T1, T2, T3, T4, R> secondaryFunction) {
127+
if (errorOnPreferredView) {
128+
LOG.warn("Routing request to secondary file-system view");
129+
return secondaryFunction.apply(val, val2, val3, val4);
130+
} else {
131+
try {
132+
return preferredFunction.apply(val, val2, val3, val4);
133+
} catch (RuntimeException re) {
134+
handleRuntimeException(re);
135+
errorOnPreferredView = true;
136+
return secondaryFunction.apply(val, val2, val3, val4);
137+
}
138+
}
139+
}
140+
124141
private void handleRuntimeException(RuntimeException re) {
125142
if (re.getCause() instanceof HttpResponseException && ((HttpResponseException)re.getCause()).getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
126143
LOG.warn("Got error running preferred function. Likely due to another concurrent writer in progress. Trying secondary");
@@ -179,7 +196,7 @@ public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
179196
@Override
180197
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
181198
boolean includeFileSlicesInPendingCompaction, boolean includeFilesInPendingCompaction) {
182-
return execute(partitionPath, maxCommitTime, includeFileSlicesInPendingCompaction,
199+
return execute(partitionPath, maxCommitTime, includeFileSlicesInPendingCompaction, includeFilesInPendingCompaction,
183200
preferredView::getLatestFileSlicesBeforeOrOn, secondaryView::getLatestFileSlicesBeforeOrOn);
184201
}
185202

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
117117
public static final String LAST_INSTANT_TS = "lastinstantts";
118118
public static final String TIMELINE_HASH = "timelinehash";
119119
public static final String REFRESH_OFF = "refreshoff";
120-
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
121-
120+
public static final String INCLUDE_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
121+
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includefilespendingcompaction";
122122

123123
private static final Logger LOG = LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
124124

@@ -314,8 +314,8 @@ public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
314314
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
315315
boolean includeFileSlicesInPendingCompaction, boolean includeFilesInPendingCompaction) {
316316
Map<String, String> paramsMap = getParamsWithAdditionalParams(partitionPath,
317-
new String[] {MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
318-
new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)});
317+
new String[] {MAX_INSTANT_PARAM, INCLUDE_IN_PENDING_COMPACTION_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
318+
new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction), String.valueOf(includeFilesInPendingCompaction)});
319319
try {
320320
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
321321
new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);

hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,35 @@ static Runnable noop() {
3333
/**
3434
* A function which has not any parameter.
3535
*/
36-
public interface Function0<R> extends Serializable {
36+
interface Function0<R> extends Serializable {
3737
R apply();
3838
}
3939

4040
/**
4141
* A function which contains only one parameter.
4242
*/
43-
public interface Function1<T1, R> extends Serializable {
43+
interface Function1<T1, R> extends Serializable {
4444
R apply(T1 val1);
4545
}
4646

4747
/**
4848
* A function which contains two parameters.
4949
*/
50-
public interface Function2<T1, T2, R> extends Serializable {
50+
interface Function2<T1, T2, R> extends Serializable {
5151
R apply(T1 val1, T2 val2);
5252
}
5353

5454
/**
5555
* A function which contains three parameters.
5656
*/
57-
public interface Function3<T1, T2, T3, R> extends Serializable {
57+
interface Function3<T1, T2, T3, R> extends Serializable {
5858
R apply(T1 val1, T2 val2, T3 val3);
5959
}
60+
61+
/**
62+
* A function which contains 4 parameters.
63+
*/
64+
interface Function4<T1, T2, T3, T4, R> extends Serializable {
65+
R apply(T1 val1, T2 val2, T3 val3, T4 val4);
66+
}
6067
}

hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -375,30 +375,30 @@ public void testGetLatestFileSlicesBeforeOrOn() {
375375
String partitionPath = "/table2";
376376
String maxCommitTime = "2020-01-01";
377377

378-
when(primary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false))
378+
when(primary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false))
379379
.thenReturn(testFileSliceStream);
380-
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false);
380+
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false);
381381
assertEquals(expected, actual);
382382

383383
resetMocks();
384-
when(primary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false))
384+
when(primary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false))
385385
.thenThrow(new RuntimeException());
386-
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false))
386+
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false))
387387
.thenReturn(testFileSliceStream);
388-
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false);
388+
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false);
389389
assertEquals(expected, actual);
390390

391391
resetMocks();
392-
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false))
392+
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false))
393393
.thenReturn(testFileSliceStream);
394-
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false);
394+
actual = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false);
395395
assertEquals(expected, actual);
396396

397397
resetMocks();
398-
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false))
398+
when(secondary.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false))
399399
.thenThrow(new RuntimeException());
400400
assertThrows(RuntimeException.class, () -> {
401-
fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false);
401+
fsView.getLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime, false, false);
402402
});
403403
}
404404

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.sink;
20+
21+
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
22+
import org.apache.hudi.common.model.FileSlice;
23+
import org.apache.hudi.common.model.HoodieKey;
24+
import org.apache.hudi.common.model.HoodieRecord;
25+
import org.apache.hudi.common.model.HoodieTableType;
26+
import org.apache.hudi.common.table.TableSchemaResolver;
27+
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
28+
import org.apache.hudi.common.table.timeline.HoodieInstant;
29+
import org.apache.hudi.common.table.timeline.HoodieTimeline;
30+
import org.apache.hudi.common.util.BaseFileUtils;
31+
import org.apache.hudi.common.util.ClosableIterator;
32+
import org.apache.hudi.common.util.Option;
33+
import org.apache.hudi.config.HoodieWriteConfig;
34+
import org.apache.hudi.configuration.FlinkOptions;
35+
import org.apache.hudi.exception.HoodieException;
36+
import org.apache.hudi.sink.transform.Transformer;
37+
import org.apache.hudi.sink.utils.Pipelines;
38+
import org.apache.hudi.table.HoodieFlinkTable;
39+
import org.apache.hudi.table.format.FormatUtils;
40+
import org.apache.hudi.util.AvroSchemaConverter;
41+
import org.apache.hudi.util.FlinkClientUtil;
42+
import org.apache.hudi.util.StreamerUtil;
43+
import org.apache.hudi.utils.TestConfigurations;
44+
import org.apache.hudi.utils.source.ContinuousFileSource;
45+
46+
import org.apache.avro.Schema;
47+
import org.apache.flink.configuration.Configuration;
48+
import org.apache.flink.core.execution.JobClient;
49+
import org.apache.flink.core.fs.Path;
50+
import org.apache.flink.formats.common.TimestampFormat;
51+
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
52+
import org.apache.flink.streaming.api.CheckpointingMode;
53+
import org.apache.flink.streaming.api.datastream.DataStream;
54+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
55+
import org.apache.flink.table.data.RowData;
56+
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
57+
import org.apache.flink.table.types.logical.RowType;
58+
import org.apache.flink.util.TestLogger;
59+
import org.junit.jupiter.api.Test;
60+
import org.junit.jupiter.api.io.TempDir;
61+
62+
import java.io.File;
63+
import java.io.FilenameFilter;
64+
import java.nio.charset.StandardCharsets;
65+
import java.util.List;
66+
import java.util.Objects;
67+
import java.util.concurrent.atomic.AtomicInteger;
68+
69+
import static java.util.stream.Collectors.toList;
70+
import static org.apache.hudi.util.StreamerUtil.isValidFile;
71+
import static org.junit.jupiter.api.Assertions.assertEquals;
72+
73+
/**
74+
* Integration test for BoostrapOperator.
75+
*/
76+
public class TestBoostrapOperator extends TestLogger {
77+
@TempDir
78+
File tempFile;
79+
80+
@Test
81+
public void testLoadRecords() throws Exception {
82+
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
83+
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
84+
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
85+
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
86+
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
87+
88+
testWriteToHoodie(conf, Option.empty(), "mor_write_with_compact", 5);
89+
90+
deleteLastCompactionCommit();
91+
92+
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true);
93+
HoodieFlinkTable hoodieTable = HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
94+
HoodieTimeline commitsTimeline = hoodieTable.getMetaClient().getCommitsTimeline();
95+
Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
96+
AtomicInteger count = new AtomicInteger();
97+
BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
98+
Schema schema = new TableSchemaResolver(hoodieTable.getMetaClient()).getTableAvroSchema();
99+
if (latestCommitTime.isPresent()) {
100+
List<FileSlice> fileSlices = hoodieTable.getSliceView()
101+
.getLatestFileSlicesBeforeOrOn("par1", latestCommitTime.get().getTimestamp(), true, true)
102+
.collect(toList());
103+
for (FileSlice fileSlice : fileSlices) {
104+
fileSlice.getBaseFile().ifPresent(baseFile -> {
105+
// filter out crushed files
106+
if (!isValidFile(baseFile.getFileStatus())) {
107+
return;
108+
}
109+
try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(FlinkClientUtil.getHadoopConf(), new org.apache.hadoop.fs.Path(baseFile.getPath()))) {
110+
iterator.forEachRemaining(hoodieKey -> {
111+
count.getAndIncrement();
112+
});
113+
}
114+
});
115+
116+
// load avro log records
117+
List<String> logPaths = fileSlice.getLogFiles()
118+
// filter out crushed files
119+
.filter(logFile -> isValidFile(logFile.getFileStatus()))
120+
.map(logFile -> logFile.getPath().toString())
121+
.collect(toList());
122+
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
123+
writeConfig, FlinkClientUtil.getHadoopConf());
124+
125+
try {
126+
for (String recordKey : scanner.getRecords().keySet()) {
127+
count.getAndIncrement();
128+
}
129+
} catch (Exception e) {
130+
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
131+
} finally {
132+
scanner.close();
133+
}
134+
}
135+
}
136+
assertEquals(8, count.get());
137+
}
138+
139+
private void deleteLastCompactionCommit() {
140+
File allCommits = new File(tempFile.getPath(), ".hoodie");
141+
final File[] files = allCommits.listFiles(new FilenameFilter() {
142+
@Override
143+
public boolean accept(File dir, String name) {
144+
return name.endsWith(".commit");
145+
}
146+
});
147+
if (files.length > 0) {
148+
files[files.length - 1].delete();
149+
}
150+
}
151+
152+
private void testWriteToHoodie(
153+
Configuration conf,
154+
Option<Transformer> transformer,
155+
String jobName,
156+
int checkpoints) throws Exception {
157+
158+
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
159+
execEnv.getConfig().disableObjectReuse();
160+
execEnv.setParallelism(4);
161+
// set up checkpoint interval
162+
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
163+
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
164+
165+
// Read from file source
166+
RowType rowType =
167+
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
168+
.getLogicalType();
169+
170+
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
171+
rowType,
172+
InternalTypeInfo.of(rowType),
173+
false,
174+
true,
175+
TimestampFormat.ISO_8601
176+
);
177+
String sourcePath = Objects.requireNonNull(Thread.currentThread()
178+
.getContextClassLoader().getResource("test_source6.data")).toString();
179+
180+
DataStream<RowData> dataStream;
181+
182+
dataStream = execEnv
183+
// use continuous file source to trigger checkpoint
184+
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints))
185+
.name("continuous_file_source")
186+
.setParallelism(1)
187+
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
188+
.setParallelism(1);
189+
190+
if (transformer.isPresent()) {
191+
dataStream = transformer.get().apply(dataStream);
192+
}
193+
194+
int parallelism = execEnv.getParallelism();
195+
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
196+
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
197+
execEnv.addOperator(pipeline.getTransformation());
198+
199+
Pipelines.clean(conf, pipeline);
200+
Pipelines.compact(conf, pipeline);
201+
202+
JobClient client = execEnv.executeAsync(jobName);
203+
// wait for the streaming job to finish
204+
client.getJobExecutionResult().get();
205+
}
206+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01T00:00:01", "partition": "par1"}
2+
{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01T00:00:02", "partition": "par1"}
3+
{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01T00:00:03", "partition": "par1"}
4+
{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01T00:00:04", "partition": "par1"}
5+
{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01T00:00:05", "partition": "par1"}
6+
{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01T00:00:06", "partition": "par1"}
7+
{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01T00:00:07", "partition": "par1"}
8+
{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01T00:00:08", "partition": "par1"}

hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,10 @@ private void registerFileSlicesAPI() {
364364
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
365365
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM).getOrThrow(),
366366
Boolean.parseBoolean(
367-
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM)
367+
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INCLUDE_IN_PENDING_COMPACTION_PARAM)
368+
.getOrThrow()),
369+
Boolean.parseBoolean(
370+
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM, "false")
368371
.getOrThrow()));
369372
writeValueAsString(ctx, dtos);
370373
}, true));

hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ public List<FileSliceDTO> getLatestMergedFileSlicesBeforeOrOn(String basePath, S
6161
}
6262

6363
public List<FileSliceDTO> getLatestFileSlicesBeforeOrOn(String basePath, String partitionPath, String maxInstantTime,
64-
boolean includeFileSlicesInPendingCompaction) {
64+
boolean includeFileSlicesInPendingCompaction, boolean includeFilesInPendingCompaction) {
6565
return viewManager.getFileSystemView(basePath)
66-
.getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime, includeFileSlicesInPendingCompaction)
66+
.getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime, includeFileSlicesInPendingCompaction, includeFilesInPendingCompaction)
6767
.map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
6868
}
6969

0 commit comments

Comments
 (0)