From c64498e714704a004fca4b15063f8113bb1145e0 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 30 Sep 2025 03:08:31 +0000 Subject: [PATCH] Merge pull request #87220 from ClickHouse/pqice Fixes for parquet reader v3, _row_number, and iceberg positioned deletes --- src/Common/ProfileEvents.cpp | 2 +- src/Common/futex.h | 12 +- src/Common/threadPoolCallbackRunner.cpp | 135 +++++++++++------- src/Common/threadPoolCallbackRunner.h | 24 +++- src/Core/FormatFactorySettings.h | 2 +- src/Processors/Formats/IInputFormat.cpp | 11 ++ src/Processors/Formats/IInputFormat.h | 31 +++- .../Formats/Impl/Parquet/Prefetcher.cpp | 31 ++-- .../Formats/Impl/Parquet/ReadManager.cpp | 78 ++++++++-- .../Formats/Impl/Parquet/ReadManager.h | 19 ++- .../Formats/Impl/Parquet/Reader.cpp | 4 + src/Processors/Formats/Impl/Parquet/Reader.h | 2 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 4 +- .../Impl/ParquetV3BlockInputFormat.cpp | 11 +- .../Formats/Impl/ParquetV3BlockInputFormat.h | 4 +- src/Processors/Sources/ConstChunkGenerator.h | 2 +- .../DataLakes/DataLakeConfiguration.h | 8 +- .../DeltaLakeMetadataDeltaKernel.cpp | 2 +- .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 2 +- .../DataLakes/IDataLakeMetadata.h | 2 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 8 ++ .../DataLakes/Iceberg/IcebergMetadata.h | 1 + .../DataLakes/Iceberg/Mutations.cpp | 25 ++-- .../Iceberg/PositionDeleteTransform.cpp | 114 ++++++++++----- .../Iceberg/PositionDeleteTransform.h | 3 +- .../ObjectStorage/StorageObjectStorage.cpp | 2 +- .../StorageObjectStorageConfiguration.h | 2 +- src/Storages/VirtualColumnUtils.cpp | 16 ++- tests/integration/helpers/iceberg_utils.py | 14 +- .../integration/test_storage_iceberg/test.py | 53 ++++++- .../03596_parquet_prewhere_page_skip_bug.sql | 2 +- .../03624_parquet_row_number.reference | 9 ++ .../0_stateless/03624_parquet_row_number.sql | 7 + 33 files changed, 474 insertions(+), 168 deletions(-) create mode 100644 tests/queries/0_stateless/03624_parquet_row_number.reference create mode 100644 tests/queries/0_stateless/03624_parquet_row_number.sql diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 119b25ba1d6f..41790c8adf90 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1122,7 +1122,7 @@ The server successfully detected this situation and will download merged part fr M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \ M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \ \ - M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \ + M(ParquetFetchWaitTimeMicroseconds, "Time of waiting for parquet file reads from decoding threads (not prefetching threads)", ValueType::Microseconds) \ M(ParquetReadRowGroups, "The total number of row groups read from parquet data", ValueType::Number) \ M(ParquetPrunedRowGroups, "The total number of row groups pruned from parquet data", ValueType::Number) \ M(ParquetDecodingTasks, "Tasks issued by parquet reader", ValueType::Number) \ diff --git a/src/Common/futex.h b/src/Common/futex.h index f86cfacdc3d3..9ee0ae95d956 100644 --- a/src/Common/futex.h +++ b/src/Common/futex.h @@ -19,6 +19,16 @@ inline Int64 futexWait(void * address, UInt32 value) return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0); } +inline Int64 futexTimedWait(void * address, UInt32 value, UInt64 nanos) +{ + const UInt64 nanos_per_sec = 1'000'000'000; + UInt64 sec = nanos / nanos_per_sec; + struct timespec timeout; + timeout.tv_sec = time_t(std::min(sec, UInt64(std::numeric_limits::max()))); + timeout.tv_nsec = int64_t(nanos % nanos_per_sec); + return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, &timeout, nullptr, 0); +} + inline Int64 futexWake(void * address, int count) { return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0); @@ -37,7 +47,7 @@ inline void futexWakeOne(std::atomic & address) inline void futexWakeAll(std::atomic & address) { - futexWake(&address, INT_MAX); + futexWake(&address, INT_MAX); } constexpr UInt32 lowerHalf(UInt64 value) diff --git a/src/Common/threadPoolCallbackRunner.cpp b/src/Common/threadPoolCallbackRunner.cpp index d33349cbaedb..81117d22ba3d 100644 --- a/src/Common/threadPoolCallbackRunner.cpp +++ b/src/Common/threadPoolCallbackRunner.cpp @@ -20,14 +20,6 @@ void ThreadPoolCallbackRunnerFast::initThreadPool(ThreadPool & pool_, size_t max max_threads = max_threads_; thread_name = thread_name_; thread_group = thread_group_; - - /// We could dynamically add and remove threads based on load, but it's not clear whether it's - /// worth the added complexity. - for (size_t i = 0; i < max_threads; ++i) - { - pool->scheduleOrThrowOnError([this] { threadFunction(); }); - ++threads; // only if scheduleOrThrowOnError didn't throw - } } ThreadPoolCallbackRunnerFast::ThreadPoolCallbackRunnerFast(Mode mode_) : mode(mode_) @@ -58,19 +50,30 @@ void ThreadPoolCallbackRunnerFast::shutdown() chassert(active_tasks.load() == queue.size()); } +void ThreadPoolCallbackRunnerFast::startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock &) +{ + while (threads < max_threads && threads < active_tasks_ && !shutdown_requested) + { + pool->scheduleOrThrow([this] { threadFunction(); }); + ++threads; // only if scheduleOrThrow didn't throw + } +} + void ThreadPoolCallbackRunnerFast::operator()(std::function f) { if (mode == Mode::Disabled) throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized"); + size_t active_tasks_ = 1 + active_tasks.fetch_add(1, std::memory_order_relaxed); + { std::unique_lock lock(mutex); queue.push_back(std::move(f)); + startMoreThreadsIfNeeded(active_tasks_, lock); } if (mode == Mode::ThreadPool) { - active_tasks.fetch_add(1, std::memory_order_relaxed); #ifdef OS_LINUX UInt32 prev_size = queue_size.fetch_add(1, std::memory_order_release); if (prev_size < max_threads) @@ -89,14 +92,16 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector switcher; + switcher.emplace(thread_group, thread_name.c_str()); + + while (true) { - ThreadGroupSwitcher switcher(thread_group, thread_name.c_str()); + bool timed_out = false; +#ifdef OS_LINUX + UInt32 x = queue_size.load(std::memory_order_relaxed); while (true) { - #ifdef OS_LINUX - UInt32 x = queue_size.load(std::memory_order_relaxed); - while (true) + if (x == 0) { - if (x == 0) + Int64 waited = futexTimedWait(&queue_size, 0, THREAD_IDLE_TIMEOUT_NS); + x = queue_size.load(std::memory_order_relaxed); + + if (waited < 0 && errno == ETIMEDOUT && x == 0) { - futexWait(&queue_size, 0); - x = queue_size.load(std::memory_order_relaxed); - } - else if (queue_size.compare_exchange_weak( - x, x - 1, std::memory_order_acquire, std::memory_order_relaxed)) + timed_out = true; break; + } } - #endif + else if (queue_size.compare_exchange_weak( + x, x - 1, std::memory_order_acquire, std::memory_order_relaxed)) + break; + } +#endif - std::function f; - { - std::unique_lock lock(mutex); + std::function f; + { + std::unique_lock lock(mutex); - #ifndef OS_LINUX - queue_cv.wait(lock, [&] { return shutdown_requested || !queue.empty(); }); - #endif +#ifdef OS_LINUX + /// Important to never stop the last thread if queue is not empty (checked under the + /// same `lock` as decrementing `threads`). Otherwise we'll deadlock like this: + /// 0. `threads` == 1, queue is empty. + /// 1. The worker thread times out; it didn't lock mutex or decrement `threads` yet. + /// 2. A manager thread enqueues a task. It sees active_tasks == 1 and `threads` == 1, + /// so it doesn't start another thread. + /// 3. The worker thread exits. + /// 4. There are no threads, but the queue is not empty, oops. + if (timed_out && !queue.empty() && !shutdown_requested) + /// We can't just proceed to `queue.pop_front()` here because we haven't + /// decremented queue_size. + continue; +#else + timed_out = !queue_cv.wait_for( + lock, std::chrono::nanoseconds(THREAD_IDLE_TIMEOUT_NS), + [&] { return shutdown_requested || !queue.empty(); }); +#endif - if (shutdown_requested) - break; + if (shutdown_requested || timed_out) + { + /// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`. + /// Otherwise ~ThreadGroupSwitcher may access global Context after the query is + /// finished, which may race with mutating Context (specifically, Settings) at the + /// start of next query. + switcher.reset(); - chassert(!queue.empty()); + threads -= 1; + if (threads == 0) + shutdown_cv.notify_all(); - f = std::move(queue.front()); - queue.pop_front(); + return; } - try - { - f(); + chassert(!queue.empty()); - CurrentThread::updatePerformanceCountersIfNeeded(); - } - catch (...) - { - tryLogCurrentException("FastThreadPool"); - chassert(false); - } + f = std::move(queue.front()); + queue.pop_front(); + } + + try + { + f(); - active_tasks.fetch_sub(1, std::memory_order_relaxed); + CurrentThread::updatePerformanceCountersIfNeeded(); + } + catch (...) + { + tryLogCurrentException("FastThreadPool"); + chassert(false); } - } - /// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`. - /// Otherwise ~ThreadGroupSwitcher may access global Context after the query is finished, which - /// may race with mutating Context (specifically, Settings) at the start of next query. - { - std::unique_lock lock(mutex); - threads -= 1; - if (threads == 0) - shutdown_cv.notify_all(); + active_tasks.fetch_sub(1, std::memory_order_relaxed); } + + chassert(false); } bool ShutdownHelper::try_lock_shared() diff --git a/src/Common/threadPoolCallbackRunner.h b/src/Common/threadPoolCallbackRunner.h index 33a9a63ba91e..443e07c0584f 100644 --- a/src/Common/threadPoolCallbackRunner.h +++ b/src/Common/threadPoolCallbackRunner.h @@ -249,8 +249,6 @@ class ThreadPoolCallbackRunnerFast Disabled, }; - /// TODO [parquet]: Add metrics for queue size and active threads, and maybe event for tasks executed. - ThreadPoolCallbackRunnerFast(); void initManual() @@ -282,6 +280,9 @@ class ThreadPoolCallbackRunnerFast bool isIdle() const { return active_tasks.load(std::memory_order_relaxed) == 0; } private: + /// Stop thread if it had nothing to do for this long. + static constexpr UInt64 THREAD_IDLE_TIMEOUT_NS = 3'000'000; // 3 ms + Mode mode = Mode::Disabled; ThreadPool * pool = nullptr; size_t max_threads = 0; @@ -309,6 +310,25 @@ class ThreadPoolCallbackRunnerFast std::condition_variable queue_cv; #endif + /// We dynamically start more threads when queue grows and stop idle threads after a timeout. + /// + /// Interestingly, this is required for correctness, not just performance. + /// If we kept max_threads threads at all times, we may deadlock because the "threads" that we + /// schedule on ThreadPool are not necessarily running, they may be sitting in ThreadPool's + /// queue, blocking other "threads" from running. E.g. this may happen: + /// 1. Iceberg reader creates many parquet readers, and their ThreadPoolCallbackRunnerFast(s) + /// occupy all slots in the shared ThreadPool (getFormatParsingThreadPool()). + /// 2. Iceberg reader creates some more parquet readers for positional deletes, using separate + /// ThreadPoolCallbackRunnerFast-s (because the ones from above are mildly inconvenient to + /// propagate to that code site). Those ThreadPoolCallbackRunnerFast-s make + /// pool->scheduleOrThrowOnError calls, but ThreadPool just adds them to queue, no actual + /// ThreadPoolCallbackRunnerFast::threadFunction()-s are started. + /// 3. The readers from step 2 are stuck because their ThreadPoolCallbackRunnerFast-s have no + /// threads. The readers from step 1 are idle but not destroyed (keep occupying threads) + /// because the iceberg reader is waiting for positional deletes to be read (by readers + /// from step 2). We're stuck. + void startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock &); + void threadFunction(); }; diff --git a/src/Core/FormatFactorySettings.h b/src/Core/FormatFactorySettings.h index 03caeae01bec..e3e69899cee2 100644 --- a/src/Core/FormatFactorySettings.h +++ b/src/Core/FormatFactorySettings.h @@ -168,7 +168,7 @@ Ignore case when matching ORC columns with CH columns. Ignore case when matching Parquet columns with CH columns. )", 0) \ DECLARE(Bool, input_format_parquet_preserve_order, false, R"( -Avoid reordering rows when reading from Parquet files. Usually makes it much slower. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it. +Avoid reordering rows when reading from Parquet files. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it. Use `ORDER BY _row_number` instead. )", 0) \ DECLARE(Bool, input_format_parquet_filter_push_down, true, R"( When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata. diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index f7449b2c8728..4311e8bcaf97 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -6,6 +6,17 @@ namespace DB { +ChunkInfoRowNumbers::ChunkInfoRowNumbers(size_t row_num_offset_, std::optional applied_filter_) + : row_num_offset(row_num_offset_), applied_filter(std::move(applied_filter_)) { } + +ChunkInfoRowNumbers::Ptr ChunkInfoRowNumbers::clone() const +{ + auto res = std::make_shared(row_num_offset); + if (applied_filter.has_value()) + res->applied_filter.emplace(applied_filter->begin(), applied_filter->end()); + return res; +} + IInputFormat::IInputFormat(SharedHeader header, ReadBuffer * in_) : ISource(std::move(header)), in(in_) { column_mapping = std::make_shared(); diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index d7a86cf52b24..cb5feea18646 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -14,13 +15,35 @@ namespace DB struct SelectQueryInfo; using ColumnMappingPtr = std::shared_ptr; - -struct ChunkInfoRowNumOffset : public ChunkInfoCloneable +using IColumnFilter = PaddedPODArray; + +/// Most (all?) file formats have a natural order of rows within the file. +/// But our format readers and query pipeline may reorder or filter rows. This struct is used to +/// propagate the original row numbers, e.g. for _row_number virtual column or for iceberg +/// positional deletes. +/// +/// Warning: we currently don't correctly update this info in most transforms. E.g. things like +/// FilterTransform and SortingTransform logically should remove this ChunkInfo, but don't; we don't +/// have a mechanism to systematically find all code sites that would need to do that or to detect +/// if one was missed. +/// So this is only used in a few specific situations, and the builder of query pipeline must be +/// careful to never put a step that uses this info after a step that breaks it. +/// +/// If row numbers in a chunk are consecutive, this contains just the first row number. +/// If row numbers are not consecutive as a result of filtering, this additionally contains the mask +/// that was used for filtering, from which row numbers can be recovered. +struct ChunkInfoRowNumbers : public ChunkInfo { - ChunkInfoRowNumOffset(const ChunkInfoRowNumOffset & other) = default; - explicit ChunkInfoRowNumOffset(size_t row_num_offset_) : row_num_offset(row_num_offset_) { } + explicit ChunkInfoRowNumbers(size_t row_num_offset_, std::optional applied_filter_ = std::nullopt); + + Ptr clone() const override; const size_t row_num_offset; + /// If nullopt, row numbers are consecutive. + /// If not empty, the number of '1' elements is equal to the number of rows in the chunk; + /// row i in the chunk has row number: + /// row_num_offset + {index of the i-th '1' element in applied_filter}. + std::optional applied_filter; }; /** Input format is a source, that reads data from ReadBuffer. diff --git a/src/Processors/Formats/Impl/Parquet/Prefetcher.cpp b/src/Processors/Formats/Impl/Parquet/Prefetcher.cpp index 0ba59cbe9a02..3e05029060fa 100644 --- a/src/Processors/Formats/Impl/Parquet/Prefetcher.cpp +++ b/src/Processors/Formats/Impl/Parquet/Prefetcher.cpp @@ -15,6 +15,11 @@ namespace DB::ErrorCodes extern const int CANNOT_READ_ALL_DATA; } +namespace ProfileEvents +{ + extern const Event ParquetFetchWaitTimeMicroseconds; +} + namespace DB::Parquet { @@ -409,16 +414,24 @@ std::span Prefetcher::getRangeData(const PrefetchHandle & request) const RequestState * req = request.request; chassert(req->state == RequestState::State::HasTask); Task * task = req->task; - auto s = task->state.load(std::memory_order_acquire); - if (s == Task::State::Scheduled) + Task::State s = task->state.load(std::memory_order_acquire); + if (s == Task::State::Scheduled || s == Task::State::Running) { - s = runTask(task); - chassert(s != Task::State::Scheduled); - } - if (s == Task::State::Running) - { - task->completion.wait(); - s = task->state.load(); + Stopwatch wait_time; + + if (s == Task::State::Scheduled) + { + s = runTask(task); + chassert(s != Task::State::Scheduled); + } + + if (s == Task::State::Running) // (not `else`, the runTask above may return Running) + { + task->completion.wait(); + s = task->state.load(); + } + + ProfileEvents::increment(ProfileEvents::ParquetFetchWaitTimeMicroseconds, wait_time.elapsedMicroseconds()); } if (s == Task::State::Exception) rethrowException(task); diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index 0c83f9a03c0e..43dc7c9a606a 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -17,6 +18,8 @@ namespace ProfileEvents { extern const Event ParquetDecodingTasks; extern const Event ParquetDecodingTaskBatches; + extern const Event ParquetReadRowGroups; + extern const Event ParquetPrunedRowGroups; } namespace DB::Parquet @@ -45,6 +48,9 @@ void ReadManager::init(FormatParserSharedResourcesPtr parser_shared_resources_) reader.prefilterAndInitRowGroups(); reader.preparePrewhere(); + ProfileEvents::increment(ProfileEvents::ParquetReadRowGroups, reader.row_groups.size()); + ProfileEvents::increment(ProfileEvents::ParquetPrunedRowGroups, reader.file_metadata.row_groups.size() - reader.row_groups.size()); + size_t num_row_groups = reader.row_groups.size(); for (size_t i = size_t(ReadStage::NotStarted) + 1; i < size_t(ReadStage::Deliver); ++i) { @@ -202,9 +208,10 @@ void ReadManager::finishRowGroupStage(size_t row_group_idx, ReadStage stage, Mem { diff.scheduleAllStages(); - if (i + 1 == reader.row_groups.size()) + /// Notify read() if everything is done or if it's relying on + /// first_incomplete_row_group to deliver chunks in order. + if (i + 1 == reader.row_groups.size() || reader.options.format.parquet.preserve_order) { - /// Notify read() that everything is done. { /// Lock and unlock to avoid race condition on condition variable. /// (Otherwise the notify_all() may happen after read() saw the old @@ -349,14 +356,19 @@ void ReadManager::finishRowSubgroupStage(size_t row_group_idx, size_t row_subgro } case ReadStage::MainData: { - size_t prev = row_group.read_ptr.exchange(row_subgroup_idx + 1); - chassert(prev == row_subgroup_idx); - advanced_read_ptr = prev + 1; - row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed); + /// Must add to delivery_queue before advancing read_ptr to deliver subgroups in order. + /// (If we advanced read_ptr first, another thread could start and finish reading the + /// next subgroup before we add this one to delivery_queue, and ReadManager::read could + /// pick up the later subgroup before we add this one.) { std::lock_guard lock(delivery_mutex); delivery_queue.push(Task {.stage = ReadStage::Deliver, .row_group_idx = row_group_idx, .row_subgroup_idx = row_subgroup_idx}); } + + size_t prev = row_group.read_ptr.exchange(row_subgroup_idx + 1); + chassert(prev == row_subgroup_idx); + advanced_read_ptr = prev + 1; + row_subgroup.stage.store(ReadStage::Deliver, std::memory_order::relaxed); delivery_cv.notify_one(); break; // proceed to advancing read_ptr } @@ -816,13 +828,13 @@ void ReadManager::clearColumnChunk(ColumnChunk & column, MemoryUsageDiff & diff) void ReadManager::clearRowSubgroup(RowSubgroup & row_subgroup, MemoryUsageDiff & diff) { - row_subgroup.filter.memory.reset(&diff); + row_subgroup.filter.clear(&diff); row_subgroup.output.clear(); for (ColumnSubchunk & col : row_subgroup.columns) col.column_and_offsets_memory.reset(&diff); } -std::tuple ReadManager::read() +ReadManager::ReadResult ReadManager::read() { Task task; { @@ -835,9 +847,15 @@ std::tuple ReadManager::read() if (exception) std::rethrow_exception(exception); - if (!delivery_queue.empty()) + /// If `preserve_order`, only deliver chunks from `first_incomplete_row_group`. + /// This ensures that row groups are delivered in order. Within a row group, row + /// subgroups are read and added to `delivery_queue` in order. + if (!delivery_queue.empty() && + (!reader.options.format.parquet.preserve_order || + delivery_queue.top().row_group_idx == + first_incomplete_row_group.load(std::memory_order_relaxed))) { - task = delivery_queue.front(); + task = delivery_queue.top(); delivery_queue.pop(); break; } @@ -846,7 +864,10 @@ std::tuple ReadManager::read() { /// All done. Check for memory accounting leaks. /// First join the threads because they might still be decrementing memory_usage. + lock.unlock(); shutdown->shutdown(); + lock.lock(); + for (const RowGroup & row_group : reader.row_groups) { chassert(row_group.stage.load(std::memory_order_relaxed) == ReadStage::Deallocated); @@ -892,7 +913,8 @@ std::tuple ReadManager::read() } } - auto & row_subgroup = reader.row_groups.at(task.row_group_idx).subgroups.at(task.row_subgroup_idx); + RowGroup & row_group = reader.row_groups.at(task.row_group_idx); + RowSubgroup & row_subgroup = row_group.subgroups.at(task.row_subgroup_idx); chassert(row_subgroup.stage == ReadStage::Deliver); size_t num_final_columns = reader.sample_block->columns(); for (size_t i = 0; i < reader.output_columns.size(); ++i) @@ -912,12 +934,44 @@ std::tuple ReadManager::read() Chunk chunk(std::move(row_subgroup.output), row_subgroup.filter.rows_pass); BlockMissingValues block_missing_values = std::move(row_subgroup.block_missing_values); + auto row_numbers_info = std::make_shared( + row_subgroup.start_row_idx + row_group.start_global_row_idx); + if (row_subgroup.filter.rows_pass != row_subgroup.filter.rows_total) + { + chassert(row_subgroup.filter.rows_pass > 0); + chassert(!row_subgroup.filter.filter.empty()); + chassert(std::accumulate(row_subgroup.filter.filter.begin(), row_subgroup.filter.filter.end(), size_t(0)) == chunk.getNumRows()); + + row_numbers_info->applied_filter = std::move(row_subgroup.filter.filter); + } + chunk.getChunkInfos().add(std::move(row_numbers_info)); + + /// This is a terrible hack to make progress indication kind of work. + /// + /// TODO: Fix progress bar in many ways: + /// 1. use number of rows instead of bytes; + /// don't lie about number of bytes read (getApproxBytesReadForChunk()), + /// 2. estimate total rows to read after filtering row groups; + /// for rows filtered out by PREWHERE, either report them as read or reduce the + /// estimate of number of rows to read (make it signed), + /// 3. report uncompressed deserialized IColumn bytes instead of file bytes, for + /// consistency with MergeTree reads, + /// 4. correctly extrapolate progress when reading many files in sequence, e.g. + /// file('part{1..1000}.parquet'), + /// 5. correctly merge progress info when a query reads both from MergeTree and files, or + /// parquet and text files. + /// Probably get rid of getApproxBytesReadForChunk() and use the existing + /// ISource::progress()/addTotalRowsApprox instead. + /// For (4) and (5), either add things to struct Progress or make progress bar use + /// ProfileEvents instead of Progress. + size_t virtual_bytes_read = size_t(row_group.meta->total_compressed_size) * row_subgroup.filter.rows_total / std::max(size_t(1), size_t(row_group.meta->num_rows)); + /// This updates `memory_usage` of previous stages, which may allow more tasks to be scheduled. MemoryUsageDiff diff(ReadStage::Deliver); finishRowSubgroupStage(task.row_group_idx, task.row_subgroup_idx, ReadStage::Deliver, diff); flushMemoryUsageDiff(std::move(diff)); - return {std::move(chunk), std::move(block_missing_values)}; + return {std::move(chunk), std::move(block_missing_values), virtual_bytes_read}; } } diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.h b/src/Processors/Formats/Impl/Parquet/ReadManager.h index 43b55873265e..291f81a2cb4a 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.h +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.h @@ -44,8 +44,15 @@ class ReadManager ~ReadManager(); + struct ReadResult + { + Chunk chunk; + BlockMissingValues block_missing_values; + size_t virtual_bytes_read = 0; + }; + /// Not thread safe. - std::tuple read(); + ReadResult read(); void cancel() noexcept; @@ -63,6 +70,14 @@ class ReadManager size_t row_subgroup_idx = UINT64_MAX; size_t column_idx = UINT64_MAX; size_t cost_estimate_bytes = 0; + + struct Comparator + { + bool operator()(const Task & x, const Task & y) const + { + return std::make_tuple(x.row_group_idx, x.row_subgroup_idx) > std::make_tuple(y.row_group_idx, y.row_subgroup_idx); + } + }; }; struct Stage @@ -91,7 +106,7 @@ class ReadManager std::atomic first_incomplete_row_group {0}; std::mutex delivery_mutex; - std::queue delivery_queue; + std::priority_queue, Task::Comparator> delivery_queue; std::condition_variable delivery_cv; std::exception_ptr exception; diff --git a/src/Processors/Formats/Impl/Parquet/Reader.cpp b/src/Processors/Formats/Impl/Parquet/Reader.cpp index 924c337f0aa5..cc2b0662c7a8 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.cpp +++ b/src/Processors/Formats/Impl/Parquet/Reader.cpp @@ -305,6 +305,7 @@ void Reader::prefilterAndInitRowGroups() } /// Populate row_groups. Skip row groups based on column chunk min/max statistics. + size_t total_rows = 0; for (size_t row_group_idx = 0; row_group_idx < file_metadata.row_groups.size(); ++row_group_idx) { const auto * meta = &file_metadata.row_groups[row_group_idx]; @@ -313,6 +314,8 @@ void Reader::prefilterAndInitRowGroups() if (meta->columns.size() != total_primitive_columns_in_file) throw Exception(ErrorCodes::INCORRECT_DATA, "Row group {} has unexpected number of columns: {} != {}", row_group_idx, meta->columns.size(), total_primitive_columns_in_file); + total_rows += size_t(meta->num_rows); // before potentially skipping the row group + Hyperrectangle hyperrectangle(extended_sample_block.columns(), Range::createWholeUniverse()); if (options.format.parquet.filter_push_down && format_filter_info->key_condition) { @@ -325,6 +328,7 @@ void Reader::prefilterAndInitRowGroups() RowGroup & row_group = row_groups.emplace_back(); row_group.meta = meta; row_group.row_group_idx = row_group_idx; + row_group.start_global_row_idx = total_rows - size_t(meta->num_rows); row_group.columns.resize(primitive_columns.size()); row_group.hyperrectangle = std::move(hyperrectangle); diff --git a/src/Processors/Formats/Impl/Parquet/Reader.h b/src/Processors/Formats/Impl/Parquet/Reader.h index 43dd76857696..7b5875a7f93a 100644 --- a/src/Processors/Formats/Impl/Parquet/Reader.h +++ b/src/Processors/Formats/Impl/Parquet/Reader.h @@ -25,7 +25,6 @@ namespace DB::Parquet // TODO [parquet]: // * column_mapper -// * find a way to make this compatible at all with our implementation of iceberg positioned deletes: https://github.com/ClickHouse/ClickHouse/pull/83094 (prewhere causes nonconsecutive row idxs in chunk) // * allow_geoparquet_parser // * test on files from https://github.com/apache/parquet-testing // * check fields for false sharing, add cacheline padding as needed @@ -390,6 +389,7 @@ struct Reader const parq::RowGroup * meta; size_t row_group_idx; // in parquet file + size_t start_global_row_idx = 0; // total number of rows in preceding row groups in the file /// Parallel to Reader::primitive_columns. /// NOT parallel to `meta.columns` (it's a subset of parquet columns). diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index b47fe1dd5f3c..ff6fb572104d 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1232,7 +1232,7 @@ Chunk ParquetBlockInputFormat::read() [](size_t sum, const RowGroupBatchState & batch) { return sum + batch.total_rows; }); row_group_batches_completed++; - chunk.getChunkInfos().add(std::make_shared(total_rows_before)); + chunk.getChunkInfos().add(std::make_shared(total_rows_before)); return chunk; } @@ -1280,7 +1280,7 @@ Chunk ParquetBlockInputFormat::read() + std::accumulate(row_group.chunk_sizes.begin(), row_group.chunk_sizes.begin() + chunk.chunk_idx, 0ull); - chunk.chunk.getChunkInfos().add(std::make_shared(total_rows_before)); + chunk.chunk.getChunkInfos().add(std::make_shared(total_rows_before)); return std::move(chunk.chunk); } diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index 5debd8249190..16ceae4819b0 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -65,7 +65,7 @@ void ParquetV3BlockInputFormat::initializeIfNeeded() /// as a signal to disable thread pool altogether, sacrificing the ability to /// use thread pool with 1 thread. We could subtract 1 instead, but then /// by default the thread pool would use `num_cores - 1` threads, also bad. - if (parser_shared_resources->max_parsing_threads <= 1 || format_settings.parquet.preserve_order) + if (parser_shared_resources->max_parsing_threads <= 1) parser_shared_resources->parsing_runner.initManual(); else parser_shared_resources->parsing_runner.initThreadPool( @@ -98,16 +98,17 @@ Chunk ParquetV3BlockInputFormat::read() auto file_metadata = Parquet::Reader::readFileMetaData(temp_prefetcher); auto chunk = getChunkForCount(size_t(file_metadata.num_rows)); - chunk.getChunkInfos().add(std::make_shared(0)); + chunk.getChunkInfos().add(std::make_shared(0)); reported_count = true; return chunk; } initializeIfNeeded(); - Chunk chunk; - std::tie(chunk, previous_block_missing_values) = reader->read(); - return chunk; + auto res = reader->read(); + previous_block_missing_values = res.block_missing_values; + previous_approx_bytes_read_for_chunk = res.virtual_bytes_read; + return std::move(res.chunk); } const BlockMissingValues * ParquetV3BlockInputFormat::getMissingValues() const diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index e93f9c456882..122aaae174ac 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -29,8 +29,7 @@ class ParquetV3BlockInputFormat : public IInputFormat size_t getApproxBytesReadForChunk() const override { - /// TODO [parquet]: - return 0; + return previous_approx_bytes_read_for_chunk; } private: @@ -47,6 +46,7 @@ class ParquetV3BlockInputFormat : public IInputFormat bool reported_count = false; // if need_only_count BlockMissingValues previous_block_missing_values; + size_t previous_approx_bytes_read_for_chunk = 0; void initializeIfNeeded(); }; diff --git a/src/Processors/Sources/ConstChunkGenerator.h b/src/Processors/Sources/ConstChunkGenerator.h index ddb30d2476f6..ca120fc23d23 100644 --- a/src/Processors/Sources/ConstChunkGenerator.h +++ b/src/Processors/Sources/ConstChunkGenerator.h @@ -29,7 +29,7 @@ class ConstChunkGenerator : public ISource size_t num_rows = std::min(max_block_size, remaining_rows); remaining_rows -= num_rows; auto chunk = cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows); - chunk.getChunkInfos().add(std::make_shared(generated_rows)); + chunk.getChunkInfos().add(std::make_shared(generated_rows)); generated_rows += num_rows; return chunk; } diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index c9dd5f1b242c..7fd9f2c20051 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -242,10 +242,10 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl } #endif - void modifyFormatSettings(FormatSettings & settings_) const override + void modifyFormatSettings(FormatSettings & settings_, const Context & local_context) const override { assertInitializedDL(); - current_metadata->modifyFormatSettings(settings_); + current_metadata->modifyFormatSettings(settings_, local_context); } ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override @@ -449,7 +449,7 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu public: explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} - + ObjectStorageType getType() const override { return getImpl().getType(); } std::string getTypeName() const override { return getImpl().getTypeName(); } @@ -504,7 +504,7 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu std::shared_ptr getSchemaTransformer(ContextPtr context, ObjectInfoPtr object_info) const override { return getImpl().getSchemaTransformer(context, object_info); } - void modifyFormatSettings(FormatSettings & settings_) const override { getImpl().modifyFormatSettings(settings_); } + void modifyFormatSettings(FormatSettings & settings_, const Context & local_context) const override { getImpl().modifyFormatSettings(settings_, local_context); } void addDeleteTransformers( ObjectInfoPtr object_info, diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index 68a42937e379..5155a9cd2255 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -115,7 +115,7 @@ NamesAndTypesList DeltaLakeMetadataDeltaKernel::getTableSchema() const return table_snapshot->getTableSchema(); } -void DeltaLakeMetadataDeltaKernel::modifyFormatSettings(FormatSettings & format_settings) const +void DeltaLakeMetadataDeltaKernel::modifyFormatSettings(FormatSettings & format_settings, const Context &) const { /// There can be missing columns because of ALTER ADD/DROP COLUMN. /// So to support reading from such tables it is enough to turn on this setting. diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index c43d7dcd13a2..4a161ada5930 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -47,7 +47,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool operator ==(const IDataLakeMetadata &) const override; - void modifyFormatSettings(FormatSettings & format_settings) const override; + void modifyFormatSettings(FormatSettings & format_settings, const Context &) const override; static DataLakeMetadataPtr create( ObjectStoragePtr object_storage, diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 4e8babe15fb7..b2a0384e981f 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -128,7 +128,7 @@ class IDataLakeMetadata : boost::noncopyable virtual bool supportsSchemaEvolution() const { return false; } virtual bool supportsWrites() const { return false; } - virtual void modifyFormatSettings(FormatSettings &) const {} + virtual void modifyFormatSettings(FormatSettings &, const Context &) const {} virtual std::optional totalRows(ContextPtr) const { return {}; } virtual std::optional totalBytes(ContextPtr) const { return {}; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index b6cdd7968ed5..ade62ff01d61 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1024,6 +1024,14 @@ std::tuple IcebergMetadata::getVersion() const return std::make_tuple(relevant_snapshot_id, relevant_snapshot_schema_id); } +void IcebergMetadata::modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const +{ + if (!local_context.getSettingsRef()[Setting::use_roaring_bitmap_iceberg_positional_deletes].value) + /// IcebergStreamingPositionDeleteTransform requires increasing row numbers from both the + /// data reader and the deletes reader. + format_settings.parquet.preserve_order = true; +} + void IcebergMetadata::addDeleteTransformers( ObjectInfoPtr object_info, QueryPipelineBuilder & builder, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 113b5be49f3b..e868814b23f8 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -120,6 +120,7 @@ class IcebergMetadata : public IDataLakeMetadata void checkMutationIsPossible(const MutationCommands & commands) override; + void modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const override; void addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional & format_settings, ContextPtr local_context) const override; void checkAlterIsPossible(const AlterCommands & commands) override; void alter(const AlterCommands & params, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index 3a562855019a..cfdfaf2eb511 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -114,18 +115,10 @@ std::optional writeDataFiles( Chunk chunk(block.getColumns(), block.rows()); auto partition_result = chunk_partitioner.partitionChunk(chunk); - auto col_data_filename = block.getByName(block_datafile_path); - auto col_position = block.getByName(block_row_number); - - size_t col_data_filename_index = 0; - size_t col_position_index = 0; - for (size_t i = 0; i < block.columns(); ++i) - { - if (block.getNames()[i] == block_datafile_path) - col_data_filename_index = i; - if (block.getNames()[i] == block_row_number) - col_position_index = i; - } + size_t col_data_filename_index = block.getPositionByName(block_datafile_path); + size_t col_position_index = block.getPositionByName(block_row_number); + ColumnWithTypeAndName col_data_filename = block.getByPosition(col_data_filename_index); + ColumnWithTypeAndName col_position = block.getByPosition(col_position_index); for (const auto & [partition_key, partition_chunk] : partition_result) { @@ -161,6 +154,14 @@ std::optional writeDataFiles( col_data_filename.column = partition_chunk.getColumns()[col_data_filename_index]; col_position.column = partition_chunk.getColumns()[col_position_index]; + if (const ColumnNullable * nullable = typeid_cast(col_position.column.get())) + { + const auto & null_map = nullable->getNullMapData(); + if (std::any_of(null_map.begin(), null_map.end(), [](UInt8 x) { return x != 0; })) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected null _row_number"); + col_position.column = nullable->getNestedColumnPtr(); + } + auto col_data_filename_without_namespaces = ColumnString::create(); for (size_t i = 0; i < col_data_filename.column->size(); ++i) { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp index 1599d93433bb..2f670db195f3 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.cpp @@ -138,25 +138,46 @@ void IcebergBitmapPositionDeleteTransform::transform(Chunk & chunk) IColumn::Filter delete_vector(num_rows, true); size_t num_rows_after_filtration = num_rows; - auto chunk_info = chunk.getChunkInfos().get(); + auto chunk_info = chunk.getChunkInfos().get(); if (!chunk_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "ChunkInfoRowNumOffset does not exist"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "ChunkInfoRowNumbers does not exist"); size_t row_num_offset = chunk_info->row_num_offset; - for (size_t i = 0; i < num_rows; i++) + auto & applied_filter = chunk_info->applied_filter; + size_t num_indices = applied_filter.has_value() ? applied_filter->size() : num_rows; + size_t idx_in_chunk = 0; + for (size_t i = 0; i < num_indices; i++) { - size_t row_idx = row_num_offset + i; - if (bitmap.rb_contains(row_idx)) + if (!applied_filter.has_value() || applied_filter.value()[i]) { - delete_vector[i] = false; - num_rows_after_filtration--; + size_t row_idx = row_num_offset + i; + if (bitmap.rb_contains(row_idx)) + { + delete_vector[idx_in_chunk] = false; + + /// If we already have a _row_number-indexed filter vector, update it in place. + if (applied_filter.has_value()) + applied_filter.value()[i] = false; + + num_rows_after_filtration--; + } + idx_in_chunk += 1; } } + chassert(idx_in_chunk == num_rows); + + if (num_rows_after_filtration == num_rows) + return; auto columns = chunk.detachColumns(); for (auto & column : columns) column = column->filter(delete_vector, -1); + /// If it's the first filtering we do on this Chunk (i.e. its _row_number-s were consecutive), + /// assign its applied_filter. + if (!applied_filter.has_value()) + applied_filter.emplace(std::move(delete_vector)); + chunk.setColumns(std::move(columns), num_rows_after_filtration); } @@ -223,50 +244,75 @@ void IcebergStreamingPositionDeleteTransform::transform(Chunk & chunk) size_t num_rows = chunk.getNumRows(); IColumn::Filter filter(num_rows, true); size_t num_rows_after_filtration = chunk.getNumRows(); - auto chunk_info = chunk.getChunkInfos().get(); + auto chunk_info = chunk.getChunkInfos().get(); if (!chunk_info) - throw Exception(ErrorCodes::LOGICAL_ERROR, "ChunkInfoRowNumOffset does not exist"); - - size_t total_previous_chunks_size = chunk_info->row_num_offset; - if (previous_chunk_offset && previous_chunk_offset.value() > total_previous_chunks_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "ChunkInfoRowNumbers does not exist"); + + size_t num_indices = chunk_info->applied_filter.has_value() ? chunk_info->applied_filter->size() : chunk.getNumRows(); + + /// We get chunks in order of increasing row number because: + /// * this transform should be immediately after the IInputFormat + /// (typically ParquetV3BlockInputFormat) in the pipeline, + /// * IInputFormat outputs chunks in order of row number even if it uses multiple threads + /// internally; for parquet IcebergMetadata::modifyFormatSettings sets + /// `format_settings.parquet.preserve_order = true` to ensure this, other formats return + /// chunks in order by default. + if (previous_chunk_end_offset && previous_chunk_end_offset.value() > chunk_info->row_num_offset) throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunks offsets should increase."); - previous_chunk_offset = total_previous_chunks_size; - for (size_t i = 0; i < chunk.getNumRows(); ++i) + previous_chunk_end_offset = chunk_info->row_num_offset + num_indices; + + size_t idx_in_chunk = 0; + for (size_t i = 0; i < num_indices; i++) { - while (!latest_positions.empty()) + if (!chunk_info->applied_filter.has_value() || chunk_info->applied_filter.value()[i]) { - auto it = latest_positions.begin(); - if (it->first < i + total_previous_chunks_size) + size_t row_idx = chunk_info->row_num_offset + i; + + while (!latest_positions.empty()) { - size_t delete_source_index = it->second; - latest_positions.erase(it); - if (iterator_at_latest_chunks[delete_source_index] + 1 >= latest_chunks[delete_source_index].getNumRows() && latest_chunks[delete_source_index].getNumRows() > 0) + auto it = latest_positions.begin(); + if (it->first < row_idx) { - fetchNewChunkFromSource(delete_source_index); + size_t delete_source_index = it->second; + latest_positions.erase(it); + if (iterator_at_latest_chunks[delete_source_index] + 1 >= latest_chunks[delete_source_index].getNumRows() && latest_chunks[delete_source_index].getNumRows() > 0) + { + fetchNewChunkFromSource(delete_source_index); + } + else + { + ++iterator_at_latest_chunks[delete_source_index]; + auto position_index = delete_source_column_indices[delete_source_index].position_index; + size_t next_index_value_in_positional_delete_file = latest_chunks[delete_source_index].getColumns()[position_index]->get64(iterator_at_latest_chunks[delete_source_index]); + latest_positions.insert(std::pair{next_index_value_in_positional_delete_file, delete_source_index}); + } } - else + else if (it->first == row_idx) { - ++iterator_at_latest_chunks[delete_source_index]; - auto position_index = delete_source_column_indices[delete_source_index].position_index; - size_t next_index_value_in_positional_delete_file = latest_chunks[delete_source_index].getColumns()[position_index]->get64(iterator_at_latest_chunks[delete_source_index]); - latest_positions.insert(std::pair{next_index_value_in_positional_delete_file, delete_source_index}); + filter[idx_in_chunk] = false; + + if (chunk_info->applied_filter.has_value()) + chunk_info->applied_filter.value()[i] = false; + + --num_rows_after_filtration; + break; } + else + break; } - else if (it->first == i + total_previous_chunks_size) - { - filter[i] = false; - --num_rows_after_filtration; - break; - } - else - break; + + idx_in_chunk += 1; } } + chassert(idx_in_chunk == chunk.getNumRows()); auto columns = chunk.detachColumns(); for (auto & column : columns) column = column->filter(filter, -1); + if (!chunk_info->applied_filter.has_value()) + chunk_info->applied_filter.emplace(std::move(filter)); + chunk.setColumns(std::move(columns), num_rows_after_filtration); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h index 1c2122645b06..96c5719e04fd 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PositionDeleteTransform.h @@ -82,6 +82,7 @@ class IcebergBitmapPositionDeleteTransform : public IcebergPositionDeleteTransfo }; +/// Requires both the deletes and the input Chunk-s to arrive in order of increasing row number. class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTransform { public: @@ -116,7 +117,7 @@ class IcebergStreamingPositionDeleteTransform : public IcebergPositionDeleteTran std::vector iterator_at_latest_chunks; std::set> latest_positions; - std::optional previous_chunk_offset; + std::optional previous_chunk_end_offset; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 711ef86664f0..29813130aa81 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -370,7 +370,7 @@ void StorageObjectStorage::read( if (!modified_format_settings.has_value()) modified_format_settings.emplace(getFormatSettings(local_context)); - configuration->modifyFormatSettings(modified_format_settings.value()); + configuration->modifyFormatSettings(modified_format_settings.value(), *local_context); auto read_step = std::make_unique( object_storage, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 002da8c12b47..f45cd9725dea 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -145,7 +145,7 @@ class StorageObjectStorageConfiguration virtual std::shared_ptr getSchemaTransformer(ContextPtr, ObjectInfoPtr) const { return {}; } - virtual void modifyFormatSettings(FormatSettings &) const {} + virtual void modifyFormatSettings(FormatSettings &, const Context &) const {} virtual void addDeleteTransformers( ObjectInfoPtr object_info, diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index d2c201b4219f..d17abaef3e2b 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -297,18 +298,23 @@ void addRequestedFileLikeStorageVirtualsToChunk( else if (virtual_column.name == "_row_number") { #if USE_PARQUET - auto chunk_info = chunk.getChunkInfos().get(); + auto chunk_info = chunk.getChunkInfos().get(); if (chunk_info) { size_t row_num_offset = chunk_info->row_num_offset; + const auto & applied_filter = chunk_info->applied_filter; + size_t num_indices = applied_filter.has_value() ? applied_filter->size() : chunk.getNumRows(); auto column = ColumnInt64::create(); - for (size_t i = 0; i < chunk.getNumRows(); ++i) - column->insertValue(i + row_num_offset); - chunk.addColumn(std::move(column)); + for (size_t i = 0; i < num_indices; ++i) + if (!applied_filter.has_value() || applied_filter.value()[i]) + column->insertValue(i + row_num_offset); + auto null_map = ColumnUInt8::create(chunk.getNumRows(), 0); + chunk.addColumn(ColumnNullable::create(std::move(column), std::move(null_map))); return; } #endif - chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), -1)->convertToFullColumnIfConst()); + /// Row numbers not known, _row_number = NULL. + chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst()); } } } diff --git a/tests/integration/helpers/iceberg_utils.py b/tests/integration/helpers/iceberg_utils.py index 4cb0fd142dd3..070ca5921ab0 100644 --- a/tests/integration/helpers/iceberg_utils.py +++ b/tests/integration/helpers/iceberg_utils.py @@ -196,11 +196,13 @@ def get_creation_expression( explicit_metadata_path="", storage_type_as_arg=False, storage_type_in_named_collection=False, + additional_settings = [], **kwargs, ): - settings_array = [] + settings_array = list(additional_settings) settings_array.append(f"allow_dynamic_metadata_for_data_lakes = {1 if allow_dynamic_metadata_for_data_lakes else 0}") + if explicit_metadata_path: settings_array.append(f"iceberg_metadata_file_path = '{explicit_metadata_path}'") @@ -240,7 +242,7 @@ def get_creation_expression( if_not_exists_prefix = "" if if_not_exists: - if_not_exists_prefix = "IF NOT EXISTS" + if_not_exists_prefix = "IF NOT EXISTS" if storage_type == "s3": if "bucket" in kwargs: @@ -336,7 +338,7 @@ def convert_schema_and_data_to_pandas_df(schema_raw, data_raw): pandas_types = [clickhouse_to_pandas_types[t]for t in types] schema_df = pd.DataFrame([types], columns=column_names) - + # Convert data to DataFrame data_rows = list( map( @@ -344,13 +346,13 @@ def convert_schema_and_data_to_pandas_df(schema_raw, data_raw): filter(lambda x: len(x) > 0, data_raw.strip().split("\n")), ) ) - + if data_rows: data_df = pd.DataFrame(data_rows, columns=column_names, dtype='object') else: # Create empty DataFrame with correct columns data_df = pd.DataFrame(columns=column_names, dtype='object') - + data_df = data_df.astype(dict(zip(column_names, pandas_types))) return schema_df, data_df @@ -483,7 +485,7 @@ def default_download_directory( else: raise Exception(f"Unknown iceberg storage type for downloading: {storage_type}") - + def execute_spark_query_general( spark, started_cluster, storage_type: str, table_name: str, query: str ): diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 791234dece09..2775438dbef4 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -721,6 +721,12 @@ def test_delete_files(started_cluster, format_version, storage_type): assert instance.query(f"SELECT ProfileEvents['IcebergTrivialCountOptimizationApplied'] FROM system.query_log where query_id = '{query_id}' and type = 'QueryFinish'") == "1\n" +def get_array(query_result: str): + arr = sorted([int(x) for x in query_result.strip().split("\n")]) + print(arr) + return arr + + @pytest.mark.parametrize("use_roaring_bitmaps", [0, 1]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_position_deletes(started_cluster, use_roaring_bitmaps, storage_type): @@ -737,11 +743,6 @@ def test_position_deletes(started_cluster, use_roaring_bitmaps, storage_type): ) spark.sql(f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10, 100)") - def get_array(query_result: str): - arr = sorted([int(x) for x in query_result.strip().split("\n")]) - print(arr) - return arr - default_upload_directory( started_cluster, storage_type, @@ -822,6 +823,48 @@ def get_array(query_result: str): instance.query(f"DROP TABLE {TABLE_NAME}") +@pytest.mark.parametrize("use_roaring_bitmaps", [0, 1]) +def test_position_deletes_out_of_order(started_cluster, use_roaring_bitmaps): + storage_type = "local" + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_position_deletes_out_of_order_" + get_uuid_str() + instance.query(f"SET use_roaring_bitmap_iceberg_positional_deletes={use_roaring_bitmaps};") + instance.query(f"SET input_format_parquet_use_native_reader_v3=1;") + + # There are a few flaky hacks chained together here. + # We want the parquet reader to produce chunks corresponding to row groups out of order if + # `format_settings.parquet.preserve_order` wasn't enabled. For that we: + # * Use `PREWHERE NOT sleepEachRow(...)` to make the reader take longer for bigger row groups. + # * Set spark row group size limit to 1 byte. Rely on current spark implementation detail: + # it'll check this limit every 100 rows. So effectively we've set row group size to 100 rows. + # * Insert 105 rows. So the first row group will have 100 rows, the second 5 rows. + # If one of these steps breaks in future, this test will be less effective but won't fail. + + spark.sql( + f""" + CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', 'write.parquet.row-group-size-bytes'='1') + """ + ) + spark.sql(f"INSERT INTO {TABLE_NAME} select /*+ COALESCE(1) */ id, char(id + ascii('a')) from range(0, 105)") + # (Fun fact: if you replace these two queries with one query with `WHERE id < 10 OR id = 103`, + # spark either quetly fails to delete row 103 or outright crashes with segfault in jre.) + spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id < 10") + spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id = 103") + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, additional_settings=["input_format_parquet_use_native_reader_v3=1", f"use_roaring_bitmap_iceberg_positional_deletes={use_roaring_bitmaps}"]) + + assert get_array(instance.query(f"SELECT id FROM {TABLE_NAME} PREWHERE NOT sleepEachRow(1/100) order by id")) == list(range(10, 103)) + [104] + + instance.query(f"DROP TABLE {TABLE_NAME}") + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_schema_inference(started_cluster, format_version, storage_type): diff --git a/tests/queries/0_stateless/03596_parquet_prewhere_page_skip_bug.sql b/tests/queries/0_stateless/03596_parquet_prewhere_page_skip_bug.sql index 92fa42185c67..79cac73b12b7 100644 --- a/tests/queries/0_stateless/03596_parquet_prewhere_page_skip_bug.sql +++ b/tests/queries/0_stateless/03596_parquet_prewhere_page_skip_bug.sql @@ -4,6 +4,6 @@ set output_format_parquet_use_custom_encoder = 1; set input_format_parquet_use_native_reader_v3 = 1; set engine_file_truncate_on_insert = 1; -insert into function file('03596_parquet_prewhere_page_skip_bug.parquet') select number as n, number*10 as n10 from numbers(200) settings output_format_parquet_data_page_size=100, output_format_parquet_batch_size=10, output_format_parquet_row_group_size=100, engine_file_truncate_on_insert=1, output_format_parquet_write_page_index=0; +insert into function file('03596_parquet_prewhere_page_skip_bug.parquet') select number as n, number*10 as n10 from numbers(200) settings output_format_parquet_data_page_size=100, output_format_parquet_batch_size=10, output_format_parquet_row_group_size=100, output_format_parquet_write_page_index=0; select n10 from file('03596_parquet_prewhere_page_skip_bug.parquet') prewhere n in (131, 174, 175, 176) order by all settings input_format_parquet_page_filter_push_down=0, input_format_parquet_filter_push_down=0, input_format_parquet_bloom_filter_push_down=0, input_format_parquet_max_block_size=10; diff --git a/tests/queries/0_stateless/03624_parquet_row_number.reference b/tests/queries/0_stateless/03624_parquet_row_number.reference new file mode 100644 index 000000000000..41750ad34aa3 --- /dev/null +++ b/tests/queries/0_stateless/03624_parquet_row_number.reference @@ -0,0 +1,9 @@ +7 70 +8 80 +10 100 +11 110 +13 130 +14 140 +16 160 +17 170 +19 190 diff --git a/tests/queries/0_stateless/03624_parquet_row_number.sql b/tests/queries/0_stateless/03624_parquet_row_number.sql new file mode 100644 index 000000000000..b2dcab18716c --- /dev/null +++ b/tests/queries/0_stateless/03624_parquet_row_number.sql @@ -0,0 +1,7 @@ +-- Tags: no-fasttest, no-parallel + +set engine_file_truncate_on_insert = 1; + +insert into function file('03624_parquet_row_number.parquet') select number*10 as x from numbers(20) settings max_threads=1, output_format_parquet_row_group_size=5; + +select _row_number, x from file('03624_parquet_row_number.parquet') where x % 3 != 0 and x > 60 order by _row_number;