Skip to content

Commit 0dd2a46

Browse files
committed
[HUDI-4619] Add a remote request retry mechanism for 'Remotehoodietablefilesystemview'.
1 parent 11f85d1 commit 0dd2a46

File tree

7 files changed

+177
-38
lines changed

7 files changed

+177
-38
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
117117
.withRemoteServerHost(hostAddr)
118118
.withRemoteServerPort(serverPort)
119119
.withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs())
120+
.withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled())
121+
.withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers())
122+
.withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs())
123+
.withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs())
124+
.withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions())
120125
.build();
121126
}
122127

hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,7 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
214214
LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
215215
+ viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout="
216216
+ viewConf.getRemoteTimelineClientTimeoutSecs());
217-
return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
218-
metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
217+
return new RemoteHoodieTableFileSystemView(metaClient, viewConf);
219218
}
220219

221220
public static FileSystemViewManager createViewManager(final HoodieEngineContext context,

hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,37 @@ public class FileSystemViewStorageConfig extends HoodieConfig {
110110
.defaultValue(5 * 60) // 5 min
111111
.withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server.");
112112

113+
public static final ConfigProperty<String> REMOTE_RETRY_ENABLE = ConfigProperty
114+
.key("hoodie.filesystem.view.remote.retry.enable")
115+
.defaultValue("false")
116+
.sinceVersion("0.12.1")
117+
.withDocumentation("Whether to enable API request retry for remote file system view.");
118+
119+
public static final ConfigProperty<Integer> REMOTE_MAX_RETRY_NUMBERS = ConfigProperty
120+
.key("hoodie.filesystem.view.remote.retry.max_numbers")
121+
.defaultValue(3) // 3 times
122+
.sinceVersion("0.12.1")
123+
.withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server.");
124+
125+
public static final ConfigProperty<Long> REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty
126+
.key("hoodie.filesystem.view.remote.retry.initial_interval_ms")
127+
.defaultValue(100L)
128+
.sinceVersion("0.12.1")
129+
.withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage.");
130+
131+
public static final ConfigProperty<Long> REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty
132+
.key("hoodie.filesystem.view.remote.retry.max_interval_ms")
133+
.defaultValue(2000L)
134+
.sinceVersion("0.12.1")
135+
.withDocumentation("Maximum amount of time (in ms), to wait for next retry.");
136+
137+
public static final ConfigProperty<String> RETRY_EXCEPTIONS = ConfigProperty
138+
.key("hoodie.filesystem.view.remote.retry.exceptions")
139+
.defaultValue("")
140+
.sinceVersion("0.12.1")
141+
.withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. "
142+
+ "Default is empty which means retry all the IOException and RuntimeException from Remote Request.");
143+
113144
public static final ConfigProperty<String> REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty
114145
.key("hoodie.filesystem.remote.backup.view.enable")
115146
.defaultValue("true") // Need to be disabled only for tests.
@@ -144,6 +175,26 @@ public Integer getRemoteTimelineClientTimeoutSecs() {
144175
return getInt(REMOTE_TIMEOUT_SECS);
145176
}
146177

178+
public boolean isRemoteTimelineClientRetryEnabled() {
179+
return getBoolean(REMOTE_RETRY_ENABLE);
180+
}
181+
182+
public Integer getRemoteTimelineClientMaxRetryNumbers() {
183+
return getInt(REMOTE_MAX_RETRY_NUMBERS);
184+
}
185+
186+
public Long getRemoteTimelineInitialRetryIntervalMs() {
187+
return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS);
188+
}
189+
190+
public Long getRemoteTimelineClientMaxRetryIntervalMs() {
191+
return getLong(REMOTE_MAX_RETRY_INTERVAL_MS);
192+
}
193+
194+
public String getRemoteTimelineClientRetryExceptions() {
195+
return getString(RETRY_EXCEPTIONS);
196+
}
197+
147198
public long getMaxMemoryForFileGroupMap() {
148199
long totalMemory = getLong(SPILLABLE_MEMORY);
149200
return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile();
@@ -245,6 +296,31 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout
245296
return this;
246297
}
247298

299+
public Builder withRemoteTimelineClientRetry(boolean enableRetry) {
300+
fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry));
301+
return this;
302+
}
303+
304+
public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) {
305+
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString());
306+
return this;
307+
}
308+
309+
public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) {
310+
fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString());
311+
return this;
312+
}
313+
314+
public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) {
315+
fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString());
316+
return this;
317+
}
318+
319+
public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) {
320+
fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions);
321+
return this;
322+
}
323+
248324
public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) {
249325
fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString());
250326
return this;

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
4040
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
4141
import org.apache.hudi.common.util.Option;
42+
import org.apache.hudi.common.util.RetryHelper;
4243
import org.apache.hudi.common.util.StringUtils;
4344
import org.apache.hudi.common.util.ValidationUtils;
4445
import org.apache.hudi.common.util.collection.Pair;
@@ -128,26 +129,36 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
128129
private final HoodieTableMetaClient metaClient;
129130
private HoodieTimeline timeline;
130131
private final ObjectMapper mapper;
131-
private final int timeoutSecs;
132+
private final int timeoutMs;
132133

133134
private boolean closed = false;
134135

136+
private RetryHelper<Response> retryHelper;
137+
135138
private enum RequestMethod {
136139
GET, POST
137140
}
138141

139142
public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) {
140-
this(server, port, metaClient, 300);
143+
this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build());
141144
}
142145

143-
public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) {
146+
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) {
144147
this.basePath = metaClient.getBasePath();
145-
this.serverHost = server;
146-
this.serverPort = port;
147148
this.mapper = new ObjectMapper();
148149
this.metaClient = metaClient;
149150
this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
150-
this.timeoutSecs = timeoutSecs;
151+
this.serverHost = viewConf.getRemoteViewServerHost();
152+
this.serverPort = viewConf.getRemoteViewServerPort();
153+
this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
154+
if (viewConf.isRemoteTimelineClientRetryEnabled()) {
155+
retryHelper = new RetryHelper(
156+
viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
157+
viewConf.getRemoteTimelineClientMaxRetryNumbers(),
158+
viewConf.getRemoteTimelineInitialRetryIntervalMs(),
159+
viewConf.getRemoteTimelineClientRetryExceptions(),
160+
"Sending request");
161+
}
151162
}
152163

153164
private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
@@ -165,17 +176,7 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
165176

166177
String url = builder.toString();
167178
LOG.info("Sending request : (" + url + ")");
168-
Response response;
169-
int timeout = this.timeoutSecs * 1000; // msec
170-
switch (method) {
171-
case GET:
172-
response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
173-
break;
174-
case POST:
175-
default:
176-
response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute();
177-
break;
178-
}
179+
Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method);
179180
String content = response.returnContent().asString();
180181
return (T) mapper.readValue(content, reference);
181182
}
@@ -495,4 +496,14 @@ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fil
495496
throw new HoodieRemoteException(e);
496497
}
497498
}
499+
500+
public static Response get(int timeoutMs, String url, RequestMethod method) throws IOException {
501+
switch (method) {
502+
case GET:
503+
return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
504+
case POST:
505+
default:
506+
return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute();
507+
}
508+
}
498509
}

hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,57 @@
1818

1919
package org.apache.hudi.common.util;
2020

21+
import org.apache.hudi.exception.HoodieException;
2122
import org.apache.log4j.LogManager;
2223
import org.apache.log4j.Logger;
2324

2425
import java.io.IOException;
26+
import java.io.Serializable;
2527
import java.util.ArrayList;
2628
import java.util.Arrays;
2729
import java.util.List;
2830
import java.util.Random;
2931
import java.util.stream.Collectors;
3032

31-
public class RetryHelper<T> {
33+
public class RetryHelper<T> implements Serializable {
3234
private static final Logger LOG = LogManager.getLogger(RetryHelper.class);
33-
private CheckedFunction<T> func;
34-
private int num;
35-
private long maxIntervalTime;
36-
private long initialIntervalTime = 100L;
35+
private transient CheckedFunction<T> func;
36+
private final int num;
37+
private final long maxIntervalTime;
38+
private final long initialIntervalTime;
3739
private String taskInfo = "N/A";
3840
private List<? extends Class<? extends Exception>> retryExceptionsClasses;
3941

40-
public RetryHelper() {
41-
}
42-
4342
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) {
4443
this.num = maxRetryNumbers;
4544
this.initialIntervalTime = initialRetryIntervalMs;
4645
this.maxIntervalTime = maxRetryIntervalMs;
4746
if (StringUtils.isNullOrEmpty(retryExceptions)) {
4847
this.retryExceptionsClasses = new ArrayList<>();
4948
} else {
50-
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
51-
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
52-
.map(Exception::getClass)
53-
.collect(Collectors.toList());
49+
try {
50+
this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(","))
51+
.map(exception -> (Exception) ReflectionUtils.loadClass(exception, ""))
52+
.map(Exception::getClass)
53+
.collect(Collectors.toList());
54+
} catch (HoodieException e) {
55+
LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e);
56+
this.retryExceptionsClasses = new ArrayList<>();
57+
}
5458
}
5559
}
5660

57-
public RetryHelper(String taskInfo) {
61+
public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) {
62+
this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions);
5863
this.taskInfo = taskInfo;
5964
}
6065

61-
public RetryHelper tryWith(CheckedFunction<T> func) {
66+
public RetryHelper<T> tryWith(CheckedFunction<T> func) {
6267
this.func = func;
6368
return this;
6469
}
6570

66-
public T start() throws IOException {
71+
public T start(CheckedFunction<T> func) throws IOException {
6772
int retries = 0;
6873
T functionResult = null;
6974

@@ -77,24 +82,33 @@ public T start() throws IOException {
7782
throw e;
7883
}
7984
if (retries++ >= num) {
80-
LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e);
85+
String message = "Still failed to " + taskInfo + " after retried " + num + " times.";
86+
LOG.error(message, e);
87+
if (e instanceof IOException) {
88+
throw new IOException(message, e);
89+
}
8190
throw e;
8291
}
83-
LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e);
92+
LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e);
8493
try {
8594
Thread.sleep(waitTime);
8695
} catch (InterruptedException ex) {
87-
// ignore InterruptedException here
96+
// ignore InterruptedException here
8897
}
8998
}
9099
}
91100

92101
if (retries > 0) {
93102
LOG.info("Success to " + taskInfo + " after retried " + retries + " times.");
94103
}
104+
95105
return functionResult;
96106
}
97107

108+
public T start() throws IOException {
109+
return start(this.func);
110+
}
111+
98112
private boolean checkIfExceptionInRetryList(Exception e) {
99113
boolean inRetryList = false;
100114

@@ -123,7 +137,7 @@ private long getWaitTimeExp(int retryCount) {
123137
}
124138

125139
@FunctionalInterface
126-
public interface CheckedFunction<T> {
140+
public interface CheckedFunction<T> extends Serializable {
127141
T get() throws IOException;
128142
}
129143
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,11 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
429429
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
430430
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort())
431431
.withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs())
432+
.withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled())
433+
.withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers())
434+
.withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs())
435+
.withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs())
436+
.withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions())
432437
.build();
433438
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf);
434439
return writeClient;

hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
2929
import org.apache.hudi.common.table.view.SyncableFileSystemView;
3030
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
31+
import org.apache.hudi.exception.HoodieRemoteException;
3132
import org.apache.hudi.timeline.service.TimelineService;
3233

3334
import org.apache.hadoop.conf.Configuration;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.log4j.LogManager;
3637
import org.apache.log4j.Logger;
38+
import org.junit.jupiter.api.Test;
3739

3840
/**
3941
* Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it.
@@ -64,4 +66,31 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
6466
view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient);
6567
return view;
6668
}
69+
70+
@Test
71+
public void testRemoteHoodieTableFileSystemViewWithRetry() {
72+
// Service is available.
73+
view.getLatestBaseFiles();
74+
// Shut down the service.
75+
server.close();
76+
try {
77+
// Immediately fails and throws a connection refused exception.
78+
view.getLatestBaseFiles();
79+
} catch (HoodieRemoteException e) {
80+
assert e.getMessage().contains("Connection refused (Connection refused)");
81+
}
82+
// Enable API request retry for remote file system view.
83+
view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig
84+
.newBuilder()
85+
.withRemoteServerHost("localhost")
86+
.withRemoteServerPort(server.getServerPort())
87+
.withRemoteTimelineClientRetry(true)
88+
.withRemoteTimelineClientMaxRetryNumbers(4)
89+
.build());
90+
try {
91+
view.getLatestBaseFiles();
92+
} catch (HoodieRemoteException e) {
93+
assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times.");
94+
}
95+
}
6796
}

0 commit comments

Comments
 (0)