From b3d0495f53882bdbafe2376911a2576794e12297 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Thu, 28 Aug 2025 20:56:01 -0700 Subject: [PATCH] Add output stats for pos sink operators (#25915) Summary: Adds output row stats for sapphire-velox related sink operators Properly close write file on broadcast write Reviewed By: singcha Differential Revision: D81271224 --- .../presto_cpp/main/operators/BroadcastFactory.cpp | 7 ++++++- .../presto_cpp/main/operators/BroadcastWrite.cpp | 3 +++ .../presto_cpp/main/operators/ShuffleWrite.cpp | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp index 7dcfee2cbfad..b82ea7192e6b 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp @@ -82,7 +82,12 @@ void BroadcastFileWriter::collect(const RowVectorPtr& input) { write(input); } -void BroadcastFileWriter::noMoreData() {} +void BroadcastFileWriter::noMoreData() { + if (writeFile_ != nullptr) { + writeFile_->flush(); + writeFile_->close(); + } +} RowVectorPtr BroadcastFileWriter::fileStats() { // No rows written. diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp index cffcbd4a6432..fc835bbe5c73 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp @@ -70,6 +70,9 @@ class BroadcastWriteOperator : public Operator { } fileBroadcastWriter_->collect(reorderedInput); + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector( + reorderedInput->estimateFlatSize(), reorderedInput->size()); } void noMoreInput() override { diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp index 434353a1e5d2..dbb99c0e6271 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp @@ -104,6 +104,8 @@ class ShuffleWriteOperator : public Operator { "collect"); } } + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); } void noMoreInput() override {