Skip to content

Commit 6da4bf4

Browse files
5herhomyihua
authored andcommitted
[HUDI-4282] Repair IOException in CHDFS when check block corrupted in HoodieLogFileReader (apache#6031)
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> (cherry picked from commit eaa2f8e)
1 parent fa0b60b commit 6da4bf4

3 files changed

Lines changed: 81 additions & 1 deletion

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
*/
15+
16+
package org.apache.hudi.common.fs;
17+
18+
import org.apache.hadoop.fs.FSDataInputStream;
19+
import org.apache.hadoop.fs.FileSystem;
20+
import org.apache.hadoop.fs.Path;
21+
22+
import java.io.EOFException;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
26+
public class BoundedFsDataInputStream extends FSDataInputStream {
27+
private FileSystem fs;
28+
private Path file;
29+
private long fileLen = -1L;
30+
31+
public BoundedFsDataInputStream(FileSystem fs, Path file, InputStream in) {
32+
super(in);
33+
this.fs = fs;
34+
this.file = file;
35+
}
36+
37+
@Override
38+
public boolean markSupported() {
39+
return false;
40+
}
41+
42+
/* Return the file length */
43+
private long getFileLength() throws IOException {
44+
if (fileLen == -1L) {
45+
fileLen = fs.getContentSummary(file).getLength();
46+
}
47+
return fileLen;
48+
}
49+
50+
@Override
51+
public synchronized void seek(long pos) throws IOException {
52+
if (pos < 0 || pos > getFileLength()) {
53+
throw new EOFException("Try to seek pos[" + pos + "] , but fileSize is " + getFileLength());
54+
}
55+
super.seek(pos);
56+
}
57+
58+
@Override
59+
public synchronized long skip(long n) throws IOException {
60+
long curPos = getPos();
61+
long fileLength = getFileLength();
62+
if (n + curPos > fileLength) {
63+
n = fileLength - curPos;
64+
}
65+
return super.skip(n);
66+
}
67+
68+
}

hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,14 @@ public static boolean isGCSFileSystem(FileSystem fs) {
653653
return fs.getScheme().equals(StorageSchemes.GCS.getScheme());
654654
}
655655

656+
/**
657+
* Chdfs will throw {@code IOException} instead of {@code EOFException}. It will cause error in isBlockCorrupted().
658+
* Wrapped by {@code BoundedFsDataInputStream}, to check whether the desired offset is out of the file size in advance.
659+
*/
660+
public static boolean isCHDFileSystem(FileSystem fs) {
661+
return StorageSchemes.CHDFS.getScheme().equals(fs.getScheme());
662+
}
663+
656664
public static Configuration registerFileSystem(Path file, Configuration conf) {
657665
Configuration returnConf = new Configuration(conf);
658666
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();

hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.common.table.log;
2020

21+
import org.apache.hudi.common.fs.BoundedFsDataInputStream;
2122
import org.apache.hudi.common.fs.FSUtils;
2223
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
2324
import org.apache.hudi.common.fs.TimedFSDataInputStream;
@@ -45,7 +46,6 @@
4546
import org.apache.hadoop.fs.FSInputStream;
4647
import org.apache.hadoop.fs.FileSystem;
4748
import org.apache.hadoop.hbase.util.Bytes;
48-
4949
import org.apache.log4j.LogManager;
5050
import org.apache.log4j.Logger;
5151

@@ -478,6 +478,10 @@ private static FSDataInputStream getFSDataInputStream(FileSystem fs,
478478
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
479479
}
480480

481+
if (FSUtils.isCHDFileSystem(fs)) {
482+
return new BoundedFsDataInputStream(fs, logFile.getPath(), fsDataInputStream);
483+
}
484+
481485
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
482486
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
483487
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));

0 commit comments

Comments
 (0)