Skip to content

Commit 1afacc7

Browse files
wangyumviirya
authored andcommitted
[SPARK-50235][SQL] Clean up ColumnVector resource after processing all rows in ColumnarToRowExec (apache#640)
### What changes were proposed in this pull request? This patch cleans up ColumnVector resource after processing all rows in ColumnarToRowExec. This patch only focus on codeben implementation of ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and currently no good way has proposed, so leaving it to a follow up. ### Why are the changes needed? Currently we only assign null to ColumnarBatch object but it doesn't release the resources hold by the vectors in the batch. For OnHeapColumnVector, the Java arrays may be automatically collected by JVM, but for OffHeapColumnVector, the allocated off-heap memory will be leaked. For custom ColumnVector implementations like Arrow-based, it also possibly causes issues on memory safety if the underlying buffers are reused across batches. Because when ColumnarToRowExec begins to fill values for next batch, the arrays in previous batch are still hold. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#48767 from viirya/close_if_not_writable. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 800faf0) Signed-off-by: Kent Yao <yao@apache.org> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 2d80a6d commit 1afacc7

4 files changed

Lines changed: 32 additions & 0 deletions

File tree

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ public abstract class ColumnVector implements AutoCloseable {
6767
@Override
6868
public abstract void close();
6969

70+
/**
71+
* Cleans up memory for this column vector if it's not writable. The column vector is not usable
72+
* after this.
73+
*
74+
* If this is a writable column vector, it is a no-op.
75+
*/
76+
public void closeIfNotWritable() {
77+
// By default, we just call close() for all column vectors. If a column vector is writable, it
78+
// should override this method and do nothing.
79+
close();
80+
}
81+
7082
/**
7183
* Returns true if this column vector contains any null values.
7284
*/

sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public void close() {
4545
}
4646
}
4747

48+
/**
49+
* Called to close all the columns if they are not writable. This is used to clean up memory
50+
* allocated during columnar processing.
51+
*/
52+
public void closeIfNotWritable() {
53+
for (ColumnVector c: columns) {
54+
c.closeIfNotWritable();
55+
}
56+
}
57+
4858
/**
4959
* Returns an iterator over the rows in this batch.
5060
*/

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public void close() {
9696
releaseMemory();
9797
}
9898

99+
@Override
100+
public void closeIfNotWritable() {
101+
// no-op
102+
}
103+
99104
public void reserveAdditional(int additionalCapacity) {
100105
reserve(elementsAppended + additionalCapacity);
101106
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,10 +195,15 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
195195
| $shouldStop
196196
| }
197197
| $idx = $numRows;
198+
| $batch.closeIfNotWritable();
198199
| $batch = null;
199200
| ${InterruptChecker.getClass.getName.stripSuffix("$")}.checkInterrupt();
200201
| $nextBatchFuncName();
201202
|}
203+
|// clean up resources
204+
|if ($batch != null) {
205+
| $batch.close();
206+
|}
202207
""".stripMargin
203208
}
204209

0 commit comments

Comments
 (0)