diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp index 7dcfee2cbfad3..b82ea7192e6b1 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp @@ -82,7 +82,12 @@ void BroadcastFileWriter::collect(const RowVectorPtr& input) { write(input); } -void BroadcastFileWriter::noMoreData() {} +void BroadcastFileWriter::noMoreData() { + if (writeFile_ != nullptr) { + writeFile_->flush(); + writeFile_->close(); + } +} RowVectorPtr BroadcastFileWriter::fileStats() { // No rows written. diff --git a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp index cffcbd4a64320..fc835bbe5c73b 100644 --- a/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp @@ -70,6 +70,9 @@ class BroadcastWriteOperator : public Operator { } fileBroadcastWriter_->collect(reorderedInput); + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector( + reorderedInput->estimateFlatSize(), reorderedInput->size()); } void noMoreInput() override { diff --git a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp index 434353a1e5d2e..dbb99c0e62711 100644 --- a/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp +++ b/presto-native-execution/presto_cpp/main/operators/ShuffleWrite.cpp @@ -104,6 +104,8 @@ class ShuffleWriteOperator : public Operator { "collect"); } } + auto lockedStats = stats_.wlock(); + lockedStats->addOutputVector(input->estimateFlatSize(), input->size()); } void noMoreInput() override {