Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ endif()

if(ARROW_ORC)
arrow_add_object_library(ARROW_ORC adapters/orc/adapter.cc adapters/orc/options.cc
adapters/orc/util.cc)
adapters/orc/statistics.cc adapters/orc/util.cc)
foreach(ARROW_ORC_TARGET ${ARROW_ORC_TARGETS})
target_link_libraries(${ARROW_ORC_TARGET} PRIVATE orc::orc)
if(ARROW_ORC_VERSION VERSION_LESS "2.0.0")
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/adapters/orc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#

# Headers: top level
install(FILES adapter.h options.h
install(FILES adapter.h options.h statistics.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc")

# pkg-config support
Expand Down
127 changes: 126 additions & 1 deletion cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
#include "arrow/type.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/macros.h"
#include "orc/Exceptions.hh"
#include "orc/Statistics.hh"
#include "orc/Type.hh"

// alias to not interfere with nested orc namespace
namespace liborc = orc;
Expand Down Expand Up @@ -409,6 +410,44 @@ class ORCFileReader::Impl {
return ReadBatch(opts, schema, stripes_[static_cast<size_t>(stripe)].num_rows);
}

Result<std::shared_ptr<Table>> ReadStripes(
const std::vector<int64_t>& stripe_indices) {
if (stripe_indices.empty()) {
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema());
return Table::MakeEmpty(schema);
}

std::vector<std::shared_ptr<RecordBatch>> batches;
batches.reserve(stripe_indices.size());
for (int64_t stripe_index : stripe_indices) {
ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index));
batches.push_back(std::move(batch));
}
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema());
return Table::FromRecordBatches(schema, std::move(batches));
}

Result<std::shared_ptr<Table>> ReadStripes(const std::vector<int64_t>& stripe_indices,
const std::vector<int>& include_indices) {
if (stripe_indices.empty()) {
liborc::RowReaderOptions opts = DefaultRowReaderOptions();
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
return Table::MakeEmpty(schema);
}

std::vector<std::shared_ptr<RecordBatch>> batches;
batches.reserve(stripe_indices.size());
for (int64_t stripe_index : stripe_indices) {
ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices));
batches.push_back(std::move(batch));
}
liborc::RowReaderOptions opts = DefaultRowReaderOptions();
RETURN_NOT_OK(SelectIndices(&opts, include_indices));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
return Table::FromRecordBatches(schema, std::move(batches));
}

Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) {
ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(),
Status::Invalid("Out of bounds stripe: ", stripe));
Expand Down Expand Up @@ -548,6 +587,65 @@ class ORCFileReader::Impl {
return NextStripeReader(batch_size, empty_vec);
}

Result<Statistics> GetColumnStatistics(int column_index) {
ORC_BEGIN_CATCH_NOT_OK
auto file_stats =
std::shared_ptr<const liborc::Statistics>(reader_->getStatistics().release());
if (column_index < 0 ||
static_cast<uint32_t>(column_index) >= file_stats->getNumberOfColumns()) {
return Status::Invalid("Column index ", column_index, " out of range [0, ",
file_stats->getNumberOfColumns(), ")");
}
const liborc::ColumnStatistics* col_stats =
file_stats->getColumnStatistics(static_cast<uint32_t>(column_index));
return Statistics(std::move(file_stats), col_stats);
ORC_END_CATCH_NOT_OK
}

Result<Statistics> GetStripeColumnStatistics(int64_t stripe_index, int column_index) {
ORC_BEGIN_CATCH_NOT_OK
if (stripe_index < 0 || stripe_index >= static_cast<int64_t>(stripes_.size())) {
return Status::Invalid("Stripe index ", stripe_index, " out of range");
}
auto stripe_stats = std::shared_ptr<const liborc::Statistics>(
reader_->getStripeStatistics(static_cast<uint64_t>(stripe_index)).release());
if (column_index < 0 ||
static_cast<uint32_t>(column_index) >= stripe_stats->getNumberOfColumns()) {
return Status::Invalid("Column index ", column_index, " out of range [0, ",
stripe_stats->getNumberOfColumns(), ")");
}
const liborc::ColumnStatistics* col_stats =
stripe_stats->getColumnStatistics(static_cast<uint32_t>(column_index));
return Statistics(std::move(stripe_stats), col_stats);
ORC_END_CATCH_NOT_OK
}

Result<std::vector<Statistics>> GetStripeStatistics(
int64_t stripe_index, const std::vector<int>& column_indices) {
ORC_BEGIN_CATCH_NOT_OK
if (stripe_index < 0 || stripe_index >= static_cast<int64_t>(stripes_.size())) {
return Status::Invalid("Stripe index ", stripe_index, " out of range");
}
auto stripe_stats = std::shared_ptr<const liborc::Statistics>(
reader_->getStripeStatistics(static_cast<uint64_t>(stripe_index)).release());
std::vector<Statistics> results;
results.reserve(column_indices.size());
for (int col_idx : column_indices) {
if (col_idx < 0 ||
static_cast<uint32_t>(col_idx) >= stripe_stats->getNumberOfColumns()) {
return Status::Invalid("Column index ", col_idx, " out of range [0, ",
stripe_stats->getNumberOfColumns(), ")");
}
const liborc::ColumnStatistics* col_stats =
stripe_stats->getColumnStatistics(static_cast<uint32_t>(col_idx));
results.emplace_back(stripe_stats, col_stats);
}
return results;
ORC_END_CATCH_NOT_OK
}

const liborc::Type& GetORCType() { return reader_->getType(); }

private:
MemoryPool* pool_;
std::unique_ptr<liborc::Reader> reader_;
Expand Down Expand Up @@ -613,6 +711,17 @@ Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
return impl_->ReadStripe(stripe, include_names);
}

Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes(
const std::vector<int64_t>& stripe_indices) {
return impl_->ReadStripes(stripe_indices);
}

Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes(
const std::vector<int64_t>& stripe_indices,
const std::vector<int>& include_indices) {
return impl_->ReadStripes(stripe_indices, include_indices);
}

Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }

Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
Expand Down Expand Up @@ -678,6 +787,22 @@ std::string ORCFileReader::GetSerializedFileTail() {
return impl_->GetSerializedFileTail();
}

Result<Statistics> ORCFileReader::GetColumnStatistics(int column_index) {
return impl_->GetColumnStatistics(column_index);
}

Result<Statistics> ORCFileReader::GetStripeColumnStatistics(int64_t stripe_index,
int column_index) {
return impl_->GetStripeColumnStatistics(stripe_index, column_index);
}

Result<std::vector<Statistics>> ORCFileReader::GetStripeStatistics(
int64_t stripe_index, const std::vector<int>& column_indices) {
return impl_->GetStripeStatistics(stripe_index, column_indices);
}

const ::orc::Type& ORCFileReader::GetORCType() { return impl_->GetORCType(); }

namespace {

class ArrowOutputStream : public liborc::OutputStream {
Expand Down
63 changes: 62 additions & 1 deletion cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
#include <vector>

#include "arrow/adapters/orc/options.h"
#include "arrow/adapters/orc/statistics.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"

namespace orc {
class Type;
} // namespace orc

namespace arrow {
namespace adapters {
namespace orc {
Expand Down Expand Up @@ -129,6 +133,29 @@ class ARROW_EXPORT ORCFileReader {
Result<std::shared_ptr<RecordBatch>> ReadStripe(
int64_t stripe, const std::vector<std::string>& include_names);

/// \brief Read multiple selected stripes as a Table
///
/// Reads only the specified stripes and concatenates them into a single table.
/// This is useful for stripe-selective reading based on predicate pushdown.
/// If stripe_indices is empty, returns an empty table with the file's schema.
///
/// \param[in] stripe_indices the indices of stripes to read
/// \return the returned Table containing data from the selected stripes
Result<std::shared_ptr<Table>> ReadStripes(
const std::vector<int64_t>& stripe_indices);

/// \brief Read multiple selected stripes with column selection as a Table
///
/// Reads only the specified stripes and selected columns, concatenating them
/// into a single table.
/// If stripe_indices is empty, returns an empty table with the selected schema.
///
/// \param[in] stripe_indices the indices of stripes to read
/// \param[in] include_indices the selected field indices to read
/// \return the returned Table containing data from the selected stripes and columns
Result<std::shared_ptr<Table>> ReadStripes(const std::vector<int64_t>& stripe_indices,
const std::vector<int>& include_indices);

/// \brief Seek to designated row. Invoke NextStripeReader() after seek
/// will return stripe reader starting from designated row.
///
Expand Down Expand Up @@ -267,6 +294,40 @@ class ARROW_EXPORT ORCFileReader {
/// \return A KeyValueMetadata object containing the ORC metadata
Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata();

/// \brief Get file-level statistics for a column.
///
/// \param[in] column_index the column index (0-based)
/// \return the column statistics
Result<Statistics> GetColumnStatistics(int column_index);

/// \brief Get stripe-level statistics for a column.
///
/// \param[in] stripe_index the stripe index (0-based)
/// \param[in] column_index the column index (0-based)
/// \return the column statistics for the specified stripe
Result<Statistics> GetStripeColumnStatistics(int64_t stripe_index, int column_index);

/// \brief Get stripe-level statistics for multiple columns at once.
///
/// More efficient than calling GetStripeColumnStatistics() in a loop
/// because it parses the stripe's statistics object only once.
///
/// \param[in] stripe_index the stripe index (0-based)
/// \param[in] column_indices the column indices to retrieve statistics for
/// \return vector of column statistics, one per requested column index
Result<std::vector<Statistics>> GetStripeStatistics(int64_t stripe_index,
const std::vector<int>& column_indices);

/// \brief Get the ORC type tree for column ID mapping.
///
/// This is needed for building schema manifests that map Arrow schema fields
/// to ORC physical column indices. The ORC type tree uses depth-first pre-order
/// numbering where column 0 is the root struct, column 1 is the first top-level
/// field, etc.
///
/// \return reference to the root ORC Type (STRUCT), owned by the reader.
const ::orc::Type& GetORCType();

private:
class Impl;
std::unique_ptr<Impl> impl_;
Expand Down
Loading
Loading