Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#include "velox/dwio/common/DirectBufferedInput.h"
#include "velox/common/memory/Allocation.h"
#include "velox/common/process/TraceContext.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/DirectInputStream.h"

DECLARE_int32(cache_prefetch_min_pct);

using ::facebook::velox::common::Region;
using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::dwio::common {

Expand Down Expand Up @@ -184,7 +186,7 @@ void DirectBufferedInput::readRegion(
return;
}
auto load = std::make_shared<DirectCoalescedLoad>(
input_, ioStats_, groupId_, requests, *pool_, options_.loadQuantum());
input_, ioStats_, groupId_, requests, pool_, options_.loadQuantum());
coalescedLoads_.push_back(load);
streamToCoalescedLoad_.withWLock([&](auto& loads) {
for (auto& request : requests) {
Expand All @@ -210,9 +212,12 @@ void DirectBufferedInput::readRegions(
for (auto i = 0; i < coalescedLoads_.size(); ++i) {
auto& load = coalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
executor_->add([pendingLoad = load]() {
AsyncLoadHolder loadHolder{
.load = load, .pool = pool_->shared_from_this()};
executor_->add([asyncLoad = std::move(loadHolder)]() {
process::TraceContext trace("Read Ahead");
pendingLoad->loadOrFuture(nullptr);
VELOX_CHECK_NOT_NULL(asyncLoad.load);
asyncLoad.load->loadOrFuture(nullptr);
});
}
}
Expand Down Expand Up @@ -293,7 +298,7 @@ std::vector<cache::CachePin> DirectCoalescedLoad::loadData(bool prefetch) {
}
const auto numPages =
memory::AllocationTraits::numPages(request.loadSize);
pool_.allocateNonContiguous(numPages, request.data);
pool_->allocateNonContiguous(numPages, request.data);
appendRanges(request.data, request.loadSize, buffers);
} else {
request.loadSize = region.length;
Expand All @@ -318,6 +323,8 @@ std::vector<cache::CachePin> DirectCoalescedLoad::loadData(bool prefetch) {
if (prefetch) {
ioStats_->prefetch().increment(size + overread);
}
TestValue::adjust(
"facebook::velox::cache::DirectCoalescedLoad::loadData", this);
return {};
}

Expand Down
23 changes: 21 additions & 2 deletions velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ class DirectCoalescedLoad : public cache::CoalescedLoad {
std::shared_ptr<IoStatistics> ioStats,
uint64_t /* groupId */,
const std::vector<LoadRequest*>& requests,
memory::MemoryPool& pool,
memory::MemoryPool* pool,
int32_t loadQuantum)
: CoalescedLoad({}, {}),
ioStats_(ioStats),
input_(std::move(input)),
loadQuantum_(loadQuantum),
pool_(pool) {
VELOX_DCHECK_NOT_NULL(pool_);
VELOX_DCHECK(
std::is_sorted(requests.begin(), requests.end(), [](auto* x, auto* y) {
return x->region.offset < y->region.offset;
Expand Down Expand Up @@ -96,7 +97,7 @@ class DirectCoalescedLoad : public cache::CoalescedLoad {
const std::shared_ptr<IoStatistics> ioStats_;
const std::shared_ptr<ReadFileInputStream> input_;
const int32_t loadQuantum_;
memory::MemoryPool& pool_;
memory::MemoryPool* const pool_;
std::vector<LoadRequest> requests_;
};

Expand Down Expand Up @@ -233,6 +234,24 @@ class DirectBufferedInput : public BufferedInput {
bool prefetch,
const std::vector<int32_t>& groupEnds);

// Holds the reference on the memory pool for async load in case of early task
// terminate.
struct AsyncLoadHolder {
std::shared_ptr<cache::CoalescedLoad> load;
std::shared_ptr<memory::MemoryPool> pool;

~AsyncLoadHolder() {
// Release the load reference before the memory pool reference.
// This is to make sure the memory pool is not destroyed before we free up
// the allocated buffers.
// This is to handle the case that the associated task has already
// destroyed before the async load is done. The async load holds
// the last reference to the memory pool in that case.
load.reset();
pool.reset();
}
};

const uint64_t fileNum_;
const std::shared_ptr<cache::ScanTracker> tracker_;
const uint64_t groupId_;
Expand Down
61 changes: 61 additions & 0 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "velox/common/base/Fs.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/tests/CacheTestUtil.h"
#include "velox/common/file/tests/FaultyFile.h"
#include "velox/common/file/tests/FaultyFileSystem.h"
Expand Down Expand Up @@ -343,6 +344,66 @@ TEST_F(TableScanTest, directBufferInputRawInputBytes) {
ASSERT_GT(getTableScanRuntimeStats(task)["ioWaitWallNanos"].sum, 0);
}

DEBUG_ONLY_TEST_F(TableScanTest, pendingCoalescedIoWhenTaskFailed) {
gflags::FlagSaver gflagSaver;
// Always trigger prefetch.
FLAGS_cache_prefetch_min_pct = 0;
facebook::velox::VectorFuzzer::Options opts;
opts.vectorSize = 1024;
facebook::velox::VectorFuzzer fuzzer(opts, pool_.get());
const auto tableType = ROW({"a", "b"}, {BIGINT(), BIGINT()});
const int numBatches{10};
std::vector<RowVectorPtr> tableInputs;
tableInputs.reserve(numBatches);
for (int i = 0; i < numBatches; ++i) {
tableInputs.push_back(fuzzer.fuzzInputRow(tableType));
}
auto filePath = TempFilePath::create();
writeToFile(filePath->getPath(), tableInputs);

auto plan = PlanBuilder(pool_.get())
.startTableScan()
.outputType(tableType)
.endTableScan()
.planNode();

std::unordered_map<std::string, std::string> config;
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>
connectorConfigs = {};
// Create query ctx without cache to read through direct buffer input.
auto queryCtx = core::QueryCtx::create(
executor_.get(),
core::QueryConfig(std::move(config)),
connectorConfigs,
/*cache=*/nullptr);

// Inject error right after the coalesce io gets triggered and before the
// on-demand load.
const std::string errMsg{"injectedError"};
SCOPED_TESTVALUE_SET(
"facebook::velox::connector::hive::HiveDataSource::next",
std::function<void(connector::hive::HiveDataSource*)>(
[&](connector::hive::HiveDataSource* /*unused*/) {
VELOX_FAIL(errMsg);
}));
SCOPED_TESTVALUE_SET(
"facebook::velox::cache::DirectCoalescedLoad::loadData",
std::function<void(cache::CoalescedLoad*)>(
[&](cache::CoalescedLoad* /*unused*/) {
std::this_thread::sleep_for(std::chrono::seconds(2));
}));
std::thread queryThread([&]() {
VELOX_ASSERT_THROW(
AssertQueryBuilder(duckDbQueryRunner_)
.plan(plan)
.splits(makeHiveConnectorSplits({filePath}))
.queryCtx(queryCtx)
.copyResults(pool_.get()),
errMsg);
});
queryThread.join();
}

TEST_F(TableScanTest, connectorStats) {
auto hiveConnector =
std::dynamic_pointer_cast<connector::hive::HiveConnector>(
Expand Down
Loading