Skip to content

Commit f677944

Browse files
committed
PARQUET-674: Add DataSource abstraction for openable files.
1 parent e54ca61 commit f677944

3 files changed

Lines changed: 151 additions & 11 deletions

File tree

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.io;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* {@code ParquetDataSource} is an interface with the methods needed by Parquet
26+
* to read data files using {@link SeekableInputStream} instances.
27+
*/
28+
public interface ParquetDataSource {
29+
30+
/**
31+
* Returns the file location.
32+
*/
33+
String getLocation();
34+
35+
/**
36+
* Returns the total length of the file, in bytes.
37+
* @throws IOException if the length cannot be determined
38+
*/
39+
long getLength() throws IOException;
40+
41+
/**
42+
* Opens a new {@link SeekableInputStream} for the underlying
43+
* data file.
44+
* @throws IOException if the stream cannot be opened.
45+
*/
46+
SeekableInputStream newStream() throws IOException;
47+
48+
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.parquet.hadoop;
2020

21+
import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType.file;
2122
import static org.apache.parquet.Log.DEBUG;
2223
import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
2324
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
@@ -53,6 +54,8 @@
5354
import java.util.concurrent.Executors;
5455
import java.util.concurrent.Future;
5556

57+
import org.apache.commons.math3.analysis.function.Add;
58+
import org.apache.hadoop.conf.Configurable;
5659
import org.apache.hadoop.conf.Configuration;
5760
import org.apache.hadoop.fs.FileStatus;
5861
import org.apache.hadoop.fs.FileSystem;
@@ -88,11 +91,13 @@
8891
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
8992
import org.apache.parquet.hadoop.metadata.FileMetaData;
9093
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
94+
import org.apache.parquet.hadoop.util.HadoopDataSource;
9195
import org.apache.parquet.hadoop.util.HiddenFileFilter;
9296
import org.apache.parquet.hadoop.util.HadoopStreams;
9397
import org.apache.parquet.io.SeekableInputStream;
9498
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
9599
import org.apache.parquet.io.ParquetDecodingException;
100+
import org.apache.parquet.io.ParquetDataSource;
96101

97102
/**
98103
* Internal implementation of the Parquet file reader as a block container
@@ -410,8 +415,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
410415
* @throws IOException if an error occurs while reading the file
411416
*/
412417
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
413-
FileSystem fileSystem = file.getFileSystem(configuration);
414-
return readFooter(configuration, fileSystem.getFileStatus(file), filter);
418+
return readFooter(HadoopDataSource.fromPath(file, configuration), filter);
415419
}
416420

417421
/**
@@ -431,12 +435,21 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
431435
* @throws IOException if an error occurs while reading the file
432436
*/
433437
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
434-
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
435-
SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
436-
try {
437-
return readFooter(file.getLen(), file.getPath().toString(), in, filter);
438-
} finally {
439-
in.close();
438+
return readFooter(HadoopDataSource.fromStatus(file, configuration), filter);
439+
}
440+
441+
/**
442+
* Reads the meta data block in the footer of the file using provided input stream
443+
* @param file a {@link ParquetDataSource} to read
444+
* @param filter the filter to apply to row groups
445+
* @return the metadata blocks in the footer
446+
* @throws IOException if an error occurs while reading the file
447+
*/
448+
public static final ParquetMetadata readFooter(
449+
ParquetDataSource file, MetadataFilter filter) throws IOException {
450+
try (SeekableInputStream in = file.newStream()) {
451+
return readFooter(converter, file.getLength(), file.getLocation(),
452+
in, filter);
440453
}
441454
}
442455

@@ -449,7 +462,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
449462
* @return the metadata blocks in the footer
450463
* @throws IOException if an error occurs while reading the file
451464
*/
452-
public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
465+
private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
453466
if (Log.DEBUG) {
454467
LOG.debug("File length " + fileLen);
455468
}
@@ -563,7 +576,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
563576
FileSystem fs = file.getFileSystem(conf);
564577
this.fileStatus = fs.getFileStatus(file);
565578
this.f = HadoopStreams.wrap(fs.open(file));
566-
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
579+
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
567580
this.fileMetaData = footer.getFileMetaData();
568581
this.blocks = footer.getBlocks();
569582
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
@@ -602,7 +615,7 @@ public ParquetMetadata getFooter() {
602615
if (footer == null) {
603616
try {
604617
// don't read the row groups because this.blocks is always set
605-
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
618+
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
606619
} catch (IOException e) {
607620
throw new ParquetDecodingException("Unable to read file footer", e);
608621
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop.util;
21+
22+
import org.apache.hadoop.conf.Configurable;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.FileStatus;
25+
import org.apache.hadoop.fs.FileSystem;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.parquet.io.SeekableInputStream;
28+
import org.apache.parquet.io.ParquetDataSource;
29+
import java.io.IOException;
30+
31+
public class HadoopDataSource implements ParquetDataSource, Configurable {
32+
33+
private final FileSystem fs;
34+
private final FileStatus stat;
35+
private Configuration conf;
36+
37+
public static HadoopDataSource fromPath(Path path, Configuration conf)
38+
throws IOException {
39+
FileSystem fs = path.getFileSystem(conf);
40+
return new HadoopDataSource(fs, fs.getFileStatus(path), conf);
41+
}
42+
43+
public static HadoopDataSource fromStatus(FileStatus stat, Configuration conf)
44+
throws IOException {
45+
FileSystem fs = stat.getPath().getFileSystem(conf);
46+
return new HadoopDataSource(fs, stat, conf);
47+
}
48+
49+
private HadoopDataSource(FileSystem fs, FileStatus stat, Configuration conf) {
50+
this.conf = conf;
51+
this.fs = fs;
52+
this.stat = stat;
53+
}
54+
55+
@Override
56+
public String getLocation() {
57+
return stat.getPath().toString();
58+
}
59+
60+
@Override
61+
public long getLength() {
62+
return stat.getLen();
63+
}
64+
65+
@Override
66+
public SeekableInputStream newStream() throws IOException {
67+
return HadoopStreams.wrap(fs.open(stat.getPath()));
68+
}
69+
70+
@Override
71+
public void setConf(Configuration conf) {
72+
this.conf = conf;
73+
}
74+
75+
@Override
76+
public Configuration getConf() {
77+
return conf;
78+
}
79+
}

0 commit comments

Comments
 (0)