Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion velox/common/caching/AsyncDataCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ class CoalescedLoad {
return state_;
}

void cancel() {
virtual void cancel() {
setEndState(State::kCancelled);
}

Expand Down
3 changes: 1 addition & 2 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ void DirectBufferedInput::readRegions(
for (auto i = 0; i < coalescedLoads_.size(); ++i) {
auto& load = coalescedLoads_[i];
if (load->state() == CoalescedLoad::State::kPlanned) {
AsyncLoadHolder loadHolder{
.load = load, .pool = pool_->shared_from_this()};
AsyncLoadHolder loadHolder{.load = load};
executor_->add([asyncLoad = std::move(loadHolder)]() {
process::TraceContext trace("Read Ahead");
VELOX_CHECK_NOT_NULL(asyncLoad.load);
Expand Down
21 changes: 11 additions & 10 deletions velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ class DirectCoalescedLoad : public cache::CoalescedLoad {
return size;
}

void cancel() override {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancel() function is invoked by ~DirectBufferedInput(), which ultimately depends on the destructor of TableScan, as TableScan owns the std::unique_ptr<connector::DataSource> dataSource_. This dependency relationship is illustrated in this discussion. As a result, TableScan must be destroyed before the memory manager, but the current approach in this PR does not explicitly guarantee that order.

To ensure that the memory held by the load is always properly released, PR #8205 triggers close() through the TableScan::close() call chain to address the similar issue.

Would you like to share your opinions on the fixes? cc: @xiaoxmeng @FelixYBW Thanks.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution is either free the resources manually from top to bottom or free it in object destruction. Either way works.

folly::SemiFuture<bool> waitFuture(false);
if (!loadOrFuture(&waitFuture)) {
waitFuture.wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

such wait can easily lead to deadlock in Prestissimo use case @kewang1024 @Yuhta

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share more details? Let me check if Gluten has the issue or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FelixYBW I explained this offline to @rui-mo

}
for (auto& request : requests_) {
pool_->freeNonContiguous(request.data);
}
CoalescedLoad::cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help confirm if it is better to call cancel at first?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is incorrect to call the cancel function first, because doing so will immediately set state_ to cancelled, and as a result, the loadOrFuture function will not perform any actual waiting.

}

private:
const std::shared_ptr<IoStatistics> ioStats_;
const std::shared_ptr<filesystems::File::IoStats> fsStats_;
Expand Down Expand Up @@ -243,21 +254,11 @@ class DirectBufferedInput : public BufferedInput {
bool prefetch,
const std::vector<int32_t>& groupEnds);

// Holds the reference on the memory pool for async load in case of early task
// terminate.
struct AsyncLoadHolder {
std::shared_ptr<cache::CoalescedLoad> load;
std::shared_ptr<memory::MemoryPool> pool;

~AsyncLoadHolder() {
// Release the load reference before the memory pool reference.
// This is to make sure the memory pool is not destroyed before we free up
// the allocated buffers.
// This is to handle the case that the associated task has already
// destroyed before the async load is done. The async load holds
// the last reference to the memory pool in that case.
load.reset();
pool.reset();
}
};

Expand Down
Loading