Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ The server successfully detected this situation and will download merged part fr
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
\
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting for parquet file reads from decoding threads (not prefetching threads)", ValueType::Microseconds) \
M(ParquetReadRowGroups, "The total number of row groups read from parquet data", ValueType::Number) \
M(ParquetPrunedRowGroups, "The total number of row groups pruned from parquet data", ValueType::Number) \
M(ParquetDecodingTasks, "Tasks issued by parquet reader", ValueType::Number) \
Expand Down
12 changes: 11 additions & 1 deletion src/Common/futex.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ inline Int64 futexWait(void * address, UInt32 value)
return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0);
}

inline Int64 futexTimedWait(void * address, UInt32 value, UInt64 nanos)
{
const UInt64 nanos_per_sec = 1'000'000'000;
UInt64 sec = nanos / nanos_per_sec;
struct timespec timeout;
timeout.tv_sec = time_t(std::min(sec, UInt64(std::numeric_limits<time_t>::max())));
timeout.tv_nsec = int64_t(nanos % nanos_per_sec);
return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, &timeout, nullptr, 0);
}

inline Int64 futexWake(void * address, int count)
{
return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0);
Expand All @@ -37,7 +47,7 @@ inline void futexWakeOne(std::atomic<UInt32> & address)

inline void futexWakeAll(std::atomic<UInt32> & address)
{
futexWake(&address, INT_MAX);
futexWake(&address, INT_MAX);
}

constexpr UInt32 lowerHalf(UInt64 value)
Expand Down
135 changes: 83 additions & 52 deletions src/Common/threadPoolCallbackRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ void ThreadPoolCallbackRunnerFast::initThreadPool(ThreadPool & pool_, size_t max
max_threads = max_threads_;
thread_name = thread_name_;
thread_group = thread_group_;

/// We could dynamically add and remove threads based on load, but it's not clear whether it's
/// worth the added complexity.
for (size_t i = 0; i < max_threads; ++i)
{
pool->scheduleOrThrowOnError([this] { threadFunction(); });
++threads; // only if scheduleOrThrowOnError didn't throw
}
}

ThreadPoolCallbackRunnerFast::ThreadPoolCallbackRunnerFast(Mode mode_) : mode(mode_)
Expand Down Expand Up @@ -58,19 +50,30 @@ void ThreadPoolCallbackRunnerFast::shutdown()
chassert(active_tasks.load() == queue.size());
}

void ThreadPoolCallbackRunnerFast::startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock<std::mutex> &)
{
while (threads < max_threads && threads < active_tasks_ && !shutdown_requested)
{
pool->scheduleOrThrow([this] { threadFunction(); });
++threads; // only if scheduleOrThrow didn't throw
}
}

void ThreadPoolCallbackRunnerFast::operator()(std::function<void()> f)
{
if (mode == Mode::Disabled)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");

size_t active_tasks_ = 1 + active_tasks.fetch_add(1, std::memory_order_relaxed);

{
std::unique_lock lock(mutex);
queue.push_back(std::move(f));
startMoreThreadsIfNeeded(active_tasks_, lock);
}

if (mode == Mode::ThreadPool)
{
active_tasks.fetch_add(1, std::memory_order_relaxed);
#ifdef OS_LINUX
UInt32 prev_size = queue_size.fetch_add(1, std::memory_order_release);
if (prev_size < max_threads)
Expand All @@ -89,14 +92,16 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector<std::function<void()
if (mode == Mode::Disabled)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");

size_t active_tasks_ = fs.size() + active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);

{
std::unique_lock lock(mutex);
queue.insert(queue.end(), std::move_iterator(fs.begin()), std::move_iterator(fs.end()));
startMoreThreadsIfNeeded(active_tasks_, lock);
}

if (mode == Mode::ThreadPool)
{
active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
#ifdef OS_LINUX
UInt32 prev_size = queue_size.fetch_add(fs.size(), std::memory_order_release);
if (prev_size < max_threads)
Expand Down Expand Up @@ -127,68 +132,94 @@ bool ThreadPoolCallbackRunnerFast::runTaskInline()

void ThreadPoolCallbackRunnerFast::threadFunction()
{
std::optional<ThreadGroupSwitcher> switcher;
switcher.emplace(thread_group, thread_name.c_str());

while (true)
{
ThreadGroupSwitcher switcher(thread_group, thread_name.c_str());
bool timed_out = false;

#ifdef OS_LINUX
UInt32 x = queue_size.load(std::memory_order_relaxed);
while (true)
{
#ifdef OS_LINUX
UInt32 x = queue_size.load(std::memory_order_relaxed);
while (true)
if (x == 0)
{
if (x == 0)
Int64 waited = futexTimedWait(&queue_size, 0, THREAD_IDLE_TIMEOUT_NS);
x = queue_size.load(std::memory_order_relaxed);

if (waited < 0 && errno == ETIMEDOUT && x == 0)
{
futexWait(&queue_size, 0);
x = queue_size.load(std::memory_order_relaxed);
}
else if (queue_size.compare_exchange_weak(
x, x - 1, std::memory_order_acquire, std::memory_order_relaxed))
timed_out = true;
break;
}
}
#endif
else if (queue_size.compare_exchange_weak(
x, x - 1, std::memory_order_acquire, std::memory_order_relaxed))
break;
}
#endif

std::function<void()> f;
{
std::unique_lock lock(mutex);
std::function<void()> f;
{
std::unique_lock lock(mutex);

#ifndef OS_LINUX
queue_cv.wait(lock, [&] { return shutdown_requested || !queue.empty(); });
#endif
#ifdef OS_LINUX
/// Important to never stop the last thread if queue is not empty (checked under the
/// same `lock` as decrementing `threads`). Otherwise we'll deadlock like this:
/// 0. `threads` == 1, queue is empty.
/// 1. The worker thread times out; it didn't lock mutex or decrement `threads` yet.
/// 2. A manager thread enqueues a task. It sees active_tasks == 1 and `threads` == 1,
/// so it doesn't start another thread.
/// 3. The worker thread exits.
/// 4. There are no threads, but the queue is not empty, oops.
if (timed_out && !queue.empty() && !shutdown_requested)
/// We can't just proceed to `queue.pop_front()` here because we haven't
/// decremented queue_size.
continue;
#else
timed_out = !queue_cv.wait_for(
lock, std::chrono::nanoseconds(THREAD_IDLE_TIMEOUT_NS),
[&] { return shutdown_requested || !queue.empty(); });
#endif

if (shutdown_requested)
break;
if (shutdown_requested || timed_out)
{
/// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`.
/// Otherwise ~ThreadGroupSwitcher may access global Context after the query is
/// finished, which may race with mutating Context (specifically, Settings) at the
/// start of next query.
switcher.reset();

chassert(!queue.empty());
threads -= 1;
if (threads == 0)
shutdown_cv.notify_all();

f = std::move(queue.front());
queue.pop_front();
return;
}

try
{
f();
chassert(!queue.empty());

CurrentThread::updatePerformanceCountersIfNeeded();
}
catch (...)
{
tryLogCurrentException("FastThreadPool");
chassert(false);
}
f = std::move(queue.front());
queue.pop_front();
}

try
{
f();

active_tasks.fetch_sub(1, std::memory_order_relaxed);
CurrentThread::updatePerformanceCountersIfNeeded();
}
catch (...)
{
tryLogCurrentException("FastThreadPool");
chassert(false);
}
}

/// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`.
/// Otherwise ~ThreadGroupSwitcher may access global Context after the query is finished, which
/// may race with mutating Context (specifically, Settings) at the start of next query.
{
std::unique_lock lock(mutex);
threads -= 1;
if (threads == 0)
shutdown_cv.notify_all();
active_tasks.fetch_sub(1, std::memory_order_relaxed);
}

chassert(false);
}

bool ShutdownHelper::try_lock_shared()
Expand Down
24 changes: 22 additions & 2 deletions src/Common/threadPoolCallbackRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,6 @@ class ThreadPoolCallbackRunnerFast
Disabled,
};

/// TODO [parquet]: Add metrics for queue size and active threads, and maybe event for tasks executed.

ThreadPoolCallbackRunnerFast();

void initManual()
Expand Down Expand Up @@ -282,6 +280,9 @@ class ThreadPoolCallbackRunnerFast
bool isIdle() const { return active_tasks.load(std::memory_order_relaxed) == 0; }

private:
/// Stop thread if it had nothing to do for this long.
static constexpr UInt64 THREAD_IDLE_TIMEOUT_NS = 3'000'000; // 3 ms

Mode mode = Mode::Disabled;
ThreadPool * pool = nullptr;
size_t max_threads = 0;
Expand Down Expand Up @@ -309,6 +310,25 @@ class ThreadPoolCallbackRunnerFast
std::condition_variable queue_cv;
#endif

/// We dynamically start more threads when queue grows and stop idle threads after a timeout.
///
/// Interestingly, this is required for correctness, not just performance.
/// If we kept max_threads threads at all times, we may deadlock because the "threads" that we
/// schedule on ThreadPool are not necessarily running, they may be sitting in ThreadPool's
/// queue, blocking other "threads" from running. E.g. this may happen:
/// 1. Iceberg reader creates many parquet readers, and their ThreadPoolCallbackRunnerFast(s)
/// occupy all slots in the shared ThreadPool (getFormatParsingThreadPool()).
/// 2. Iceberg reader creates some more parquet readers for positional deletes, using separate
/// ThreadPoolCallbackRunnerFast-s (because the ones from above are mildly inconvenient to
/// propagate to that code site). Those ThreadPoolCallbackRunnerFast-s make
/// pool->scheduleOrThrowOnError calls, but ThreadPool just adds them to queue, no actual
/// ThreadPoolCallbackRunnerFast::threadFunction()-s are started.
/// 3. The readers from step 2 are stuck because their ThreadPoolCallbackRunnerFast-s have no
/// threads. The readers from step 1 are idle but not destroyed (keep occupying threads)
/// because the iceberg reader is waiting for positional deletes to be read (by readers
/// from step 2). We're stuck.
void startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock<std::mutex> &);

void threadFunction();
};

Expand Down
2 changes: 1 addition & 1 deletion src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Ignore case when matching ORC columns with CH columns.
Ignore case when matching Parquet columns with CH columns.
)", 0) \
DECLARE(Bool, input_format_parquet_preserve_order, false, R"(
Avoid reordering rows when reading from Parquet files. Usually makes it much slower. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it.
Avoid reordering rows when reading from Parquet files. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it. Use `ORDER BY _row_number` instead.
)", 0) \
DECLARE(Bool, input_format_parquet_filter_push_down, true, R"(
When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.
Expand Down
11 changes: 11 additions & 0 deletions src/Processors/Formats/IInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
namespace DB
{

ChunkInfoRowNumbers::ChunkInfoRowNumbers(size_t row_num_offset_, std::optional<IColumnFilter> applied_filter_)
: row_num_offset(row_num_offset_), applied_filter(std::move(applied_filter_)) { }

ChunkInfoRowNumbers::Ptr ChunkInfoRowNumbers::clone() const
{
auto res = std::make_shared<ChunkInfoRowNumbers>(row_num_offset);
if (applied_filter.has_value())
res->applied_filter.emplace(applied_filter->begin(), applied_filter->end());
return res;
}

IInputFormat::IInputFormat(SharedHeader header, ReadBuffer * in_) : ISource(std::move(header)), in(in_)
{
column_mapping = std::make_shared<ColumnMapping>();
Expand Down
31 changes: 27 additions & 4 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Formats/ColumnMapping.h>
#include <IO/ReadBuffer.h>
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Common/PODArray.h>
#include <Core/BlockMissingValues.h>
#include <Processors/ISource.h>
#include <Core/Settings.h>
Expand All @@ -14,13 +15,35 @@ namespace DB
struct SelectQueryInfo;

using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;

struct ChunkInfoRowNumOffset : public ChunkInfoCloneable<ChunkInfoRowNumOffset>
using IColumnFilter = PaddedPODArray<UInt8>;

/// Most (all?) file formats have a natural order of rows within the file.
/// But our format readers and query pipeline may reorder or filter rows. This struct is used to
/// propagate the original row numbers, e.g. for _row_number virtual column or for iceberg
/// positional deletes.
///
/// Warning: we currently don't correctly update this info in most transforms. E.g. things like
/// FilterTransform and SortingTransform logically should remove this ChunkInfo, but don't; we don't
/// have a mechanism to systematically find all code sites that would need to do that or to detect
/// if one was missed.
/// So this is only used in a few specific situations, and the builder of query pipeline must be
/// careful to never put a step that uses this info after a step that breaks it.
///
/// If row numbers in a chunk are consecutive, this contains just the first row number.
/// If row numbers are not consecutive as a result of filtering, this additionally contains the mask
/// that was used for filtering, from which row numbers can be recovered.
struct ChunkInfoRowNumbers : public ChunkInfo
{
ChunkInfoRowNumOffset(const ChunkInfoRowNumOffset & other) = default;
explicit ChunkInfoRowNumOffset(size_t row_num_offset_) : row_num_offset(row_num_offset_) { }
explicit ChunkInfoRowNumbers(size_t row_num_offset_, std::optional<IColumnFilter> applied_filter_ = std::nullopt);

Ptr clone() const override;

const size_t row_num_offset;
/// If nullopt, row numbers are consecutive.
/// If not empty, the number of '1' elements is equal to the number of rows in the chunk;
/// row i in the chunk has row number:
/// row_num_offset + {index of the i-th '1' element in applied_filter}.
std::optional<IColumnFilter> applied_filter;
};

/** Input format is a source, that reads data from ReadBuffer.
Expand Down
Loading
Loading