Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/examples/parquet_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
cmake_minimum_required(VERSION 3.30.4 FATAL_ERROR)

include(../set_cuda_architecture.cmake)
include_directories(../../src) # due to #include "io/parquet/reader_impl_helpers.hpp"

# initialize cuda architecture
rapids_cuda_init_architectures(parquet_io)
Expand Down Expand Up @@ -31,6 +32,15 @@ target_link_libraries(
target_compile_features(parquet_io PRIVATE cxx_std_17)
install(TARGETS parquet_io DESTINATION bin/examples/libcudf)

# Build and install parquet_io_metadata_caching
add_executable(parquet_io_metadata_caching parquet_io_metadata_caching.cpp)
target_link_libraries(
parquet_io_metadata_caching PRIVATE cudf::cudf $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp>
$<TARGET_OBJECTS:parquet_io_utils>
)
target_compile_features(parquet_io_metadata_caching PRIVATE cxx_std_17)
install(TARGETS parquet_io_metadata_caching DESTINATION bin/examples/libcudf)

# Build and install parquet_io_multithreaded
add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp)
target_link_libraries(
Expand Down
183 changes: 183 additions & 0 deletions cpp/examples/parquet_io/parquet_io_metadata_caching.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "../utilities/timer.hpp"
#include "common_utils.hpp"
#include "io_source.hpp"

#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

#include <string>

/**
* @file parquet_io_metadata_caching.cpp
* @brief Demonstrates usage of the libcudf APIs to read parquet file format
* with and without metadata caching.
*/

/**
* @brief Read parquet input from file
*
* @param filepath path to input parquet file
* @return cudf::io::table_with_metadata
*/
cudf::io::table_with_metadata read_parquet(std::string filepath)
{
auto source_info = cudf::io::source_info(filepath);
auto builder = cudf::io::parquet_reader_options::builder(source_info);
auto options = builder.build();
return cudf::io::read_parquet(options);
}

/**
* @brief Read parquet input from file iterarting all rowgroups
*
* @param filepath path to input parquet file
* @param num_rowgroups the number of rowgroups
*/
void read_parquet_RGs(std::string filepath, cudf::size_type num_rowgroups)
{
for (cudf::size_type rg_id = 0; rg_id < num_rowgroups; rg_id++) {
auto source_info = cudf::io::source_info(filepath);
auto builder = cudf::io::parquet_reader_options::builder(source_info);
auto options = builder.build();
options.set_row_groups({{rg_id}});
cudf::io::read_parquet(options);
}
}

/**
* @brief Read parquet input from file with metadata-caching
*
* @param filepath path to input parquet file
* @param aggregate_reader_metadata_ptr the cached metadata pointer
* @return cudf::io::table_with_metadata
*/
cudf::io::table_with_metadata read_parquet_with_metadata_caching(
std::string filepath, std::uintptr_t aggregate_reader_metadata_ptr)
{
auto source_info = cudf::io::source_info(filepath);
auto builder = cudf::io::parquet_reader_options::builder(source_info);
auto options = builder.build();
options.set_aggregate_reader_metadata(
aggregate_reader_metadata_ptr); // here to enable metadata caching
return cudf::io::read_parquet(options);
}

/**
* @brief Read parquet input from file iterarting all rowgroups
*
* @param filepath path to input parquet file
* @param aggregate_reader_metadata_ptr the cached metadata pointer
* @param num_rowgroups the number of rowgroups
*/
void read_parquet_RGs_with_metadata_caching(std::string filepath,
std::uintptr_t aggregate_reader_metadata_ptr,
cudf::size_type num_rowgroups)
{
for (cudf::size_type rg_id = 0; rg_id < num_rowgroups; rg_id++) {
auto source_info = cudf::io::source_info(filepath);
auto builder = cudf::io::parquet_reader_options::builder(source_info);
auto options = builder.build();
options.set_row_groups({{rg_id}});
options.set_aggregate_reader_metadata(
aggregate_reader_metadata_ptr); // here to enable metadata caching
cudf::io::read_parquet(options);
}
}

/**
* @brief Function to print example usage and argument information.
*/
void print_usage() { std::cout << "\nUsage: parquet_io_metadata_caching <input parquet file>\n"; }

/**
* @brief Main for nested_types examples
*
* Command line parameters:
* 1. parquet input file name/path (default: "example.parquet")
*
*/
int main(int argc, char const** argv)
{
std::string input_filepath = "example.parquet";

switch (argc) {
case 2: // Check if instead of input_paths, the first argument is `-h` or `--help`
if (auto arg = std::string{argv[1]}; arg != "-h" and arg != "--help") {
input_filepath = std::move(arg);
break;
}
[[fallthrough]];
default: print_usage(); throw std::runtime_error("");
}

// Create and use a memory pool
bool is_pool_used = true;
auto resource = create_memory_resource(is_pool_used);
cudf::set_current_device_resource(resource.get());

// Prepare the metadata outside the timer scope
auto source_info = cudf::io::source_info(input_filepath);
auto metadata = cudf::io::read_parquet_metadata(source_info);
auto aggregate_reader_metadata_ptr = metadata.get_aggregate_reader_metadata_ptr();

// Bluk Read: read the file in one read call
{
// Read input parquet file
// We do not want to time the initial read time as it may include
// time for nvcomp, cufile loading and RMM growth
std::cout << "\nReading " << input_filepath << " without metadata-caching...\n";
std::cout << "Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth.\n\n";

cudf::examples::timer timer;
auto [first_read_input, first_read_metadata] = read_parquet(input_filepath);
timer.print_elapsed_millis();

// Read the parquet file written with encoding and compression
std::cout << "Reading " << input_filepath << " with metadata-caching...\n";

// Reset the timer
timer.reset();
auto [second_read_input, second_read_metadata] =
read_parquet_with_metadata_caching(input_filepath, aggregate_reader_metadata_ptr);
timer.print_elapsed_millis();

// Check for validity
check_tables_equal(first_read_input->view(), second_read_input->view());
}

// Iterating all row-groups one-by-one
{
const auto num_rowgroups = metadata.num_rowgroups();
std::cout << "Number of Parquet row-groups of the inputfile: " << num_rowgroups << std::endl;

std::cout << "Iterating all rowgroups " << input_filepath << " without metadata-caching...\n";
cudf::examples::timer timer;
read_parquet_RGs(input_filepath, num_rowgroups);
timer.print_elapsed_millis();

std::cout << "Iterating all rowgroups " << input_filepath << " with metadata-caching...\n";
timer.reset();
read_parquet_RGs_with_metadata_caching(
input_filepath, aggregate_reader_metadata_ptr, num_rowgroups);
timer.print_elapsed_millis();
}
return 0;
}
26 changes: 26 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include "io/parquet/reader_impl_helpers.hpp"

#include <cudf/ast/expressions.hpp>
#include <cudf/io/detail/parquet.hpp>
#include <cudf/io/types.hpp>
Expand Down Expand Up @@ -72,6 +74,8 @@ constexpr size_type default_max_page_fragment_size = 5000; ///< 5000 rows p

class parquet_reader_options_builder;

using aggregate_reader_metadata = parquet::detail::aggregate_reader_metadata;

/**
* @brief Settings for `read_parquet()`.
*/
Expand Down Expand Up @@ -113,6 +117,8 @@ class parquet_reader_options {

friend parquet_reader_options_builder;

aggregate_reader_metadata* _aggregate_reader_metadata = nullptr;

public:
/**
* @brief Default constructor.
Expand All @@ -131,6 +137,16 @@ class parquet_reader_options {
*/
static parquet_reader_options_builder builder(source_info src = source_info{});

/**
* @brief Returns the _aggregate_reader_metadata pointer in this option.
*
* @return _aggregate_reader_metadata pointer. If not cached, then nullptr.
*/
[[nodiscard]] aggregate_reader_metadata* get_aggregate_reader_metadata_ptr() const
{
return _aggregate_reader_metadata;
}

/**
* @brief Returns source info.
*
Expand Down Expand Up @@ -360,6 +376,16 @@ class parquet_reader_options {
* @param type The timestamp data_type to which all timestamp columns need to be cast
*/
void set_timestamp_type(data_type type) { _timestamp_type = type; }

/**
* @brief Sets the cached metadata
*
* @param meta_ptr the cached metadata pointer
*/
void set_aggregate_reader_metadata(std::uintptr_t meta_ptr)
{
_aggregate_reader_metadata = reinterpret_cast<aggregate_reader_metadata*>(meta_ptr);
}
};

/**
Expand Down
47 changes: 46 additions & 1 deletion cpp/include/cudf/io/parquet_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#pragma once

#include "io/parquet/reader_impl_helpers.hpp"

#include <cudf/io/parquet_schema.hpp>
#include <cudf/io/types.hpp>
#include <cudf/utilities/export.hpp>
Expand Down Expand Up @@ -168,7 +170,8 @@ class parquet_metadata {
using row_group_metadata = std::unordered_map<std::string, int64_t>;
/// Column chunk metadata from each ColumnChunkMetaData element
using column_chunk_metadata = std::unordered_map<std::string, std::vector<int64_t>>;

/// For metadata caching
using aggregate_reader_metadata = parquet::detail::aggregate_reader_metadata;
/**
* @brief Default constructor
*
Expand Down Expand Up @@ -205,6 +208,37 @@ class parquet_metadata {
{
}

/**
* @brief constructor for the case with metadata caching
*
* @param schema parquet schema
* @param num_rows number of rows
* @param num_rowgroups total number of row groups
* @param num_rowgroups_per_file number of row groups per file
* @param file_metadata key-value metadata in the file footer
* @param rg_metadata vector of maps containing metadata for each row group
* @param column_chunk_metadata map of column names to vectors of `total_uncompressed_size`
* metadata from all their column chunks
* @param _aggregate_reader_metadata the reader metadata
*/
parquet_metadata(parquet_schema schema,
int64_t num_rows,
size_type num_rowgroups,
std::vector<size_type> num_rowgroups_per_file,
key_value_metadata file_metadata,
std::vector<row_group_metadata> rg_metadata,
column_chunk_metadata column_chunk_metadata,
aggregate_reader_metadata _aggregate_reader_metadata)
: _schema{std::move(schema)},
_num_rows{num_rows},
_num_rowgroups{num_rowgroups},
_num_rowgroups_per_file{std::move(num_rowgroups_per_file)},
_file_metadata{std::move(file_metadata)},
_rowgroup_metadata{std::move(rg_metadata)},
_aggregate_reader_metadata{_aggregate_reader_metadata}
{
}

/**
* @brief Returns the parquet schema
*
Expand Down Expand Up @@ -249,6 +283,16 @@ class parquet_metadata {
*/
[[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; }

/**
* @brief Returns the cached parquet metadata pointer
*
* @return A single metadata
*/
[[nodiscard]] std::uintptr_t get_aggregate_reader_metadata_ptr() const
{
return reinterpret_cast<std::uintptr_t>(&_aggregate_reader_metadata);
}

/**
* @brief Returns a map of column names to vectors of `total_uncompressed_size` metadata from
* all their column chunks
Expand All @@ -266,6 +310,7 @@ class parquet_metadata {
key_value_metadata _file_metadata;
std::vector<row_group_metadata> _rowgroup_metadata;
column_chunk_metadata _column_chunk_metadata;
aggregate_reader_metadata _aggregate_reader_metadata;
};

/**
Expand Down
18 changes: 12 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,16 @@ reader::impl::impl(std::size_t chunk_read_limit,
_output_chunk_read_limit{chunk_read_limit},
_input_pass_read_limit{pass_read_limit}
{
// Open and parse the source dataset metadata
_metadata = std::make_unique<aggregate_reader_metadata>(
_sources,
options.is_enabled_use_arrow_schema(),
options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas());
if (options.get_aggregate_reader_metadata_ptr() != nullptr) {
// Metadata caching
_metadata.make_raw_ptr(options.get_aggregate_reader_metadata_ptr());
} else {
// Open and parse the source dataset metadata
_metadata.make_unique_ptr(
_sources,
options.is_enabled_use_arrow_schema(),
options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas());
}

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();
Expand Down Expand Up @@ -871,7 +876,8 @@ parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> con
metadata.get_num_row_groups_per_file(),
metadata.get_key_value_metadata()[0],
metadata.get_rowgroup_metadata(),
metadata.get_column_chunk_metadata()};
metadata.get_column_chunk_metadata(),
metadata};
}

} // namespace cudf::io::parquet::detail
Loading