-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/orc adapter 2 review #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main-original
Are you sure you want to change the base?
Changes from all commits
bc57e6e
8870048
d53718c
df82c29
4e19200
521d8a4
ce71a4e
003fe45
dcd6ee9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,10 +39,11 @@ | |
| #include "arrow/type.h" | ||
| #include "arrow/util/bit_util.h" | ||
| #include "arrow/util/checked_cast.h" | ||
| #include "arrow/util/decimal.h" | ||
| #include "arrow/util/key_value_metadata.h" | ||
| #include "arrow/util/macros.h" | ||
| #include "orc/Exceptions.hh" | ||
| #include "orc/Statistics.hh" | ||
| #include "orc/Type.hh" | ||
|
|
||
| // alias to not interfere with nested orc namespace | ||
| namespace liborc = orc; | ||
|
|
@@ -217,7 +218,7 @@ class ORCFileReader::Impl { | |
| std::unique_ptr<liborc::Reader> liborc_reader; | ||
| ORC_CATCH_NOT_OK(liborc_reader = createReader(std::move(io_wrapper), options)); | ||
| pool_ = pool; | ||
| reader_ = std::move(liborc_reader); | ||
| reader_ = std::shared_ptr<liborc::Reader>(liborc_reader.release()); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this a backwards incompatible change? what is the impliciation of this change? i also see unique_ptr was used |
||
| current_row_ = 0; | ||
|
|
||
| return Init(); | ||
|
|
@@ -409,6 +410,44 @@ class ORCFileReader::Impl { | |
| return ReadBatch(opts, schema, stripes_[static_cast<size_t>(stripe)].num_rows); | ||
| } | ||
|
|
||
| Result<std::shared_ptr<Table>> ReadStripes( | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this same signature as parquet's equivalent api? |
||
| const std::vector<int64_t>& stripe_indices) { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just curious about c++, is there a more convient/syntactic sugar carrying type we should be using in modern c++? look at broader arrow repo to be sure. |
||
| if (stripe_indices.empty()) { | ||
| ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); | ||
| return Table::MakeEmpty(schema); | ||
| } | ||
|
|
||
| std::vector<std::shared_ptr<RecordBatch>> batches; | ||
| batches.reserve(stripe_indices.size()); | ||
| for (int64_t stripe_index : stripe_indices) { | ||
| ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); | ||
| batches.push_back(std::move(batch)); | ||
| } | ||
| ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema()); | ||
| return Table::FromRecordBatches(schema, std::move(batches)); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this efficient? build batches, then move. hoping that we minimize mem allocatons |
||
| } | ||
|
|
||
| Result<std::shared_ptr<Table>> ReadStripes(const std::vector<int64_t>& stripe_indices, | ||
| const std::vector<int>& include_indices) { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why name "include_indices" ? is it from parquet signature? if yes- -- ignore. if its not from parquet, lets have a more intuitive name. |
||
| if (stripe_indices.empty()) { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is logic in this case different than the other ReadStripes function) |
||
| liborc::RowReaderOptions opts = DefaultRowReaderOptions(); | ||
| RETURN_NOT_OK(SelectIndices(&opts, include_indices)); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does return_not_ok do? |
||
| ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); | ||
| return Table::MakeEmpty(schema); | ||
| } | ||
|
|
||
| std::vector<std::shared_ptr<RecordBatch>> batches; | ||
| batches.reserve(stripe_indices.size()); | ||
| for (int64_t stripe_index : stripe_indices) { | ||
| ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); | ||
| batches.push_back(std::move(batch)); | ||
| } | ||
| liborc::RowReaderOptions opts = DefaultRowReaderOptions(); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this needed? |
||
| RETURN_NOT_OK(SelectIndices(&opts, include_indices)); | ||
| ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); | ||
| return Table::FromRecordBatches(schema, std::move(batches)); | ||
| } | ||
|
|
||
| Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { | ||
| ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), | ||
| Status::Invalid("Out of bounds stripe: ", stripe)); | ||
|
|
@@ -526,7 +565,7 @@ class ORCFileReader::Impl { | |
| pool_); | ||
| } | ||
|
|
||
| Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( | ||
| Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader( | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we changing the function signature and what are the implications for the existing users |
||
| int64_t batch_size, const std::vector<std::string>& include_names) { | ||
| liborc::RowReaderOptions opts = DefaultRowReaderOptions(); | ||
| if (!include_names.empty()) { | ||
|
|
@@ -539,7 +578,7 @@ class ORCFileReader::Impl { | |
| row_reader = reader_->createRowReader(opts); | ||
| ORC_END_CATCH_NOT_OK | ||
|
|
||
| return std::make_shared<OrcStripeReader>(std::move(row_reader), schema, batch_size, | ||
| return std::make_unique<OrcStripeReader>(std::move(row_reader), schema, batch_size, | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? |
||
| pool_); | ||
| } | ||
|
|
||
|
|
@@ -548,9 +587,21 @@ class ORCFileReader::Impl { | |
| return NextStripeReader(batch_size, empty_vec); | ||
| } | ||
|
|
||
| std::shared_ptr<FileMetaData> GetFileMetaData() { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this the same public API as parquet's GetFileMetadata? |
||
| try { | ||
| auto file_stats = | ||
| std::shared_ptr<const liborc::Statistics>(reader_->getStatistics().release()); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there an arrow specific ptr type that we should be using instead
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is ".release()"? |
||
| return std::make_shared<FileMetaData>(reader_, std::move(file_stats)); | ||
| } catch (...) { | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| const liborc::Type& GetORCType() { return reader_->getType(); } | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we exposing a liborc type to caller? compare with what GetParquetType does |
||
|
|
||
| private: | ||
| MemoryPool* pool_; | ||
| std::unique_ptr<liborc::Reader> reader_; | ||
| std::shared_ptr<liborc::Reader> reader_; | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changing behavior. whats impliciation on dependencies? |
||
| std::vector<StripeInformation> stripes_; | ||
| int64_t current_row_; | ||
| }; | ||
|
|
@@ -613,14 +664,25 @@ Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe( | |
| return impl_->ReadStripe(stripe, include_names); | ||
| } | ||
|
|
||
| Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes( | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is parquet's equivlaent function signature for ParquetFileReader::ReadRowGroups |
||
| const std::vector<int64_t>& stripe_indices) { | ||
| return impl_->ReadStripes(stripe_indices); | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have we properly scoped this PR such that the public api is only exposed by pimpl? |
||
| } | ||
|
|
||
| Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes( | ||
| const std::vector<int64_t>& stripe_indices, | ||
| const std::vector<int>& include_indices) { | ||
| return impl_->ReadStripes(stripe_indices, include_indices); | ||
| } | ||
|
|
||
| Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } | ||
|
|
||
| Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader( | ||
| int64_t batch_size) { | ||
| return impl_->NextStripeReader(batch_size); | ||
| } | ||
|
|
||
| Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::GetRecordBatchReader( | ||
| Result<std::unique_ptr<RecordBatchReader>> ORCFileReader::GetRecordBatchReader( | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed interface. confirm that this does not impact downstreams |
||
| int64_t batch_size, const std::vector<std::string>& include_names) { | ||
| return impl_->GetRecordBatchReader(batch_size, include_names); | ||
| } | ||
|
|
@@ -678,6 +740,12 @@ std::string ORCFileReader::GetSerializedFileTail() { | |
| return impl_->GetSerializedFileTail(); | ||
| } | ||
|
|
||
| std::shared_ptr<FileMetaData> ORCFileReader::GetFileMetaData() { | ||
| return impl_->GetFileMetaData(); | ||
| } | ||
|
|
||
| const ::orc::Type& ORCFileReader::GetORCType() { return impl_->GetORCType(); } | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another instance of exposing liborc to callers, i don't think we should do that. compare with parquet's getparquettype to be inspired. |
||
|
|
||
| namespace { | ||
|
|
||
| class ArrowOutputStream : public liborc::OutputStream { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,15 +22,19 @@ | |
| #include <vector> | ||
|
|
||
| #include "arrow/adapters/orc/options.h" | ||
| #include "arrow/adapters/orc/statistics.h" | ||
| #include "arrow/io/interfaces.h" | ||
| #include "arrow/memory_pool.h" | ||
| #include "arrow/record_batch.h" | ||
| #include "arrow/status.h" | ||
| #include "arrow/type.h" | ||
| #include "arrow/type_fwd.h" | ||
| #include "arrow/util/macros.h" | ||
| #include "arrow/util/visibility.h" | ||
|
|
||
| namespace orc { | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did we do this and what is it |
||
| class Type; | ||
| } // namespace orc | ||
|
|
||
| namespace arrow { | ||
| namespace adapters { | ||
| namespace orc { | ||
|
|
@@ -129,6 +133,29 @@ class ARROW_EXPORT ORCFileReader { | |
| Result<std::shared_ptr<RecordBatch>> ReadStripe( | ||
| int64_t stripe, const std::vector<std::string>& include_names); | ||
|
|
||
| /// \brief Read multiple selected stripes as a Table | ||
| /// | ||
| /// Reads only the specified stripes and concatenates them into a single table. | ||
| /// This is useful for stripe-selective reading based on predicate pushdown. | ||
| /// If stripe_indices is empty, returns an empty table with the file's schema. | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in parquetm, when rowgroup_indices is empty, does it also return an empty table with the file's schema ? |
||
| /// | ||
| /// \param[in] stripe_indices the indices of stripes to read | ||
| /// \return the returned Table containing data from the selected stripes | ||
| Result<std::shared_ptr<Table>> ReadStripes( | ||
| const std::vector<int64_t>& stripe_indices); | ||
|
|
||
| /// \brief Read multiple selected stripes with column selection as a Table | ||
| /// | ||
| /// Reads only the specified stripes and selected columns, concatenating them | ||
| /// into a single table. | ||
| /// If stripe_indices is empty, returns an empty table with the selected schema. | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in parquetm, when rowgroup_indices is empty, does it also return an empty table with the file's schema ? |
||
| /// | ||
| /// \param[in] stripe_indices the indices of stripes to read | ||
| /// \param[in] include_indices the selected field indices to read | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. explain what happens if include_indices is empty |
||
| /// \return the returned Table containing data from the selected stripes and columns | ||
| Result<std::shared_ptr<Table>> ReadStripes(const std::vector<int64_t>& stripe_indices, | ||
| const std::vector<int>& include_indices); | ||
|
|
||
| /// \brief Seek to designated row. Invoke NextStripeReader() after seek | ||
| /// will return stripe reader starting from designated row. | ||
| /// | ||
|
|
@@ -171,7 +198,7 @@ class ARROW_EXPORT ORCFileReader { | |
| /// \param[in] include_names the selected field names to read, if not empty | ||
| /// (otherwise all fields are read) | ||
| /// \return the record batch iterator | ||
| Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader( | ||
| Result<std::unique_ptr<RecordBatchReader>> GetRecordBatchReader( | ||
|
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change in interface. determine the downstream implications on dependencies. compare with parquet's GetRecordBatchReader -- is it a unque_ptr or shared_ptr? |
||
| int64_t batch_size, const std::vector<std::string>& include_names); | ||
|
|
||
| /// \brief The number of stripes in the file | ||
|
|
@@ -267,6 +294,14 @@ class ARROW_EXPORT ORCFileReader { | |
| /// \return A KeyValueMetadata object containing the ORC metadata | ||
| Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata(); | ||
|
|
||
| /// \brief Get file-level metadata view. | ||
| std::shared_ptr<FileMetaData> GetFileMetaData(); | ||
|
|
||
| /// \brief Get the ORC type tree for column ID mapping. | ||
| /// | ||
| /// \return reference to the root ORC Type (STRUCT), owned by the reader. | ||
| const ::orc::Type& GetORCType(); | ||
|
|
||
| private: | ||
| class Impl; | ||
| std::unique_ptr<Impl> impl_; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is decimal.h excluded?