Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cpp/examples/parquet_io/benchmark.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied as is from hybrid_scan_io/benchmark.hpp for self-contained example.

* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include "timer.hpp"

#include <cuda/iterator>

#include <algorithm>
#include <chrono>
#include <concepts>
#include <cstddef>
#include <iostream>

template <std::invocable F>
void benchmark(F&& f, std::size_t iterations)
{
auto total_time = double{0.0};

std::for_each(
cuda::counting_iterator<std::size_t>{0}, cuda::counting_iterator{iterations}, [&](auto iter) {
timer timer;

f();

auto elapsed_time_ms =
std::chrono::duration_cast<std::chrono::microseconds>(timer.elapsed()).count() /
double{1000.0};

std::cout << "Iteration: " << iter << ", time: " << elapsed_time_ms << " ms\n";

if (iterations == 1 or (iter != 0)) { total_time += elapsed_time_ms; }
});

std::cout << "Average time (first iteration excluded if iterations > 1): "
<< total_time / std::max<std::size_t>(1, iterations - 1) << " ms\n\n";
}
8 changes: 5 additions & 3 deletions cpp/examples/parquet_io/common_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <cudf/join/filtered_join.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/cuda_async_memory_resource.hpp>
#include <rmm/mr/cuda_memory_resource.hpp>
#include <rmm/mr/pool_memory_resource.hpp>

Expand All @@ -25,11 +27,11 @@

cuda::mr::any_resource<cuda::mr::device_accessible> create_memory_resource(bool is_pool_used)
{
rmm::mr::cuda_memory_resource cuda_mr{};
if (is_pool_used) {
return rmm::mr::pool_memory_resource{cuda_mr, rmm::percent_of_free_device_memory(50)};
return rmm::mr::pool_memory_resource{rmm::mr::cuda_memory_resource{},
rmm::percent_of_free_device_memory(80)};
}
return cuda_mr;
return rmm::mr::cuda_async_memory_resource{};
}

cudf::io::column_encoding get_encoding_type(std::string name)
Expand Down
37 changes: 19 additions & 18 deletions cpp/examples/parquet_io/parquet_io_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include "benchmark.hpp"
#include "common_utils.hpp"
#include "io_source.hpp"
#include "timer.hpp"

#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
Expand Down Expand Up @@ -337,7 +337,7 @@ int32_t main(int argc, char const** argv)
switch (argc) {
case 7: write_and_validate = get_boolean(argv[6]); [[fallthrough]];
case 6: thread_count = std::max(thread_count, std::stoi(std::string{argv[5]})); [[fallthrough]];
case 5: num_reads = std::max(1, std::stoi(argv[4])); [[fallthrough]];
case 5: num_reads = std::max(num_reads, std::stoi(argv[4])); [[fallthrough]];
case 4: io_source_type = get_io_source_type(argv[3]); [[fallthrough]];
case 3:
input_multiplier = std::max(input_multiplier, std::stoi(std::string{argv[2]}));
Expand Down Expand Up @@ -383,15 +383,16 @@ int32_t main(int argc, char const** argv)
"growth.\n\n";
}

timer timer;
std::for_each(cuda::counting_iterator<int32_t>{0},
cuda::counting_iterator{num_reads},
[&](auto i) { // Read parquet files and discard the tables
std::ignore = read_parquet_multithreaded<read_mode::NO_CONCATENATE>(
input_sources, thread_count, stream_pool);
});
default_stream.synchronize();
timer.print_elapsed_millis();
benchmark(
[&] {
std::ignore = read_parquet_multithreaded<read_mode::NO_CONCATENATE>(
input_sources, thread_count, stream_pool);
default_stream.synchronize();
},
num_reads);

// Print peak memory
Comment thread
mhaseeb123 marked this conversation as resolved.
Outdated
Comment thread
mhaseeb123 marked this conversation as resolved.
Outdated
std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1'048'576.0) << " MB\n\n";
}

// Write parquet files and validate if needed
Expand Down Expand Up @@ -419,10 +420,13 @@ int32_t main(int argc, char const** argv)
std::string output_path =
std::filesystem::temp_directory_path().string() + "/output_" + current_date_and_time();
std::filesystem::create_directory({output_path});
timer timer;
write_parquet_multithreaded(output_path, table_views, thread_count, stream_pool);
default_stream.synchronize();
timer.print_elapsed_millis();

benchmark(
[&] {
write_parquet_multithreaded(output_path, table_views, thread_count, stream_pool);
default_stream.synchronize();
},
1);

// Verify the output
std::cout << "Verifying output..\n";
Expand All @@ -447,8 +451,5 @@ int32_t main(int argc, char const** argv)
std::filesystem::remove_all(output_path);
}

// Print peak memory
std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n";

return 0;
}
Loading