Skip to content

Commit 712e795

Browse files
BryanCutlercurtishoward
authored andcommitted
[SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10.0
Upgrade Apache Arrow to 0.10.0 Version 0.10.0 has a number of bug fixes and improvements with the following pertaining directly to usage in Spark: * Allow for adding BinaryType support ARROW-2141 * Bug fix related to array serialization ARROW-1973 * Python2 str will be made into an Arrow string instead of bytes ARROW-2101 * Python bytearrays are supported in as input to pyarrow ARROW-2141 * Java has common interface for reset to cleanup complex vectors in Spark ArrowWriter ARROW-1962 * Cleanup pyarrow type equality checks ARROW-2423 * ArrowStreamWriter should not hold references to ArrowBlocks ARROW-2632, ARROW-2645 * Improved low level handling of messages for RecordBatch ARROW-2704 existing tests Author: Bryan Cutler <[email protected]> Closes apache#21939 from BryanCutler/arrow-upgrade-010. (cherry picked from commit ed075e1)
1 parent 831ea82 commit 712e795

7 files changed

Lines changed: 20 additions & 32 deletions

File tree

dev/deps/spark-deps-hadoop-2.6

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
1414
api-asn1-api-1.0.0-M20.jar
1515
api-util-1.0.0-M20.jar
1616
arpack_combined_all-0.1.jar
17-
arrow-format-0.8.0.jar
18-
arrow-memory-0.8.0.jar
19-
arrow-vector-0.8.0.jar
17+
arrow-format-0.10.0.jar
18+
arrow-memory-0.10.0.jar
19+
arrow-vector-0.10.0.jar
2020
automaton-1.11-8.jar
2121
avro-1.7.7.jar
2222
avro-ipc-1.7.7.jar

dev/deps/spark-deps-hadoop-2.7

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
1414
api-asn1-api-1.0.0-M20.jar
1515
api-util-1.0.0-M20.jar
1616
arpack_combined_all-0.1.jar
17-
arrow-format-0.8.0.jar
18-
arrow-memory-0.8.0.jar
19-
arrow-vector-0.8.0.jar
17+
arrow-format-0.10.0.jar
18+
arrow-memory-0.10.0.jar
19+
arrow-vector-0.10.0.jar
2020
automaton-1.11-8.jar
2121
avro-1.7.7.jar
2222
avro-ipc-1.7.7.jar

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@
193193
If you are changing Arrow version specification, please check ./python/pyspark/sql/utils.py,
194194
./python/run-tests.py and ./python/setup.py too.
195195
-->
196-
<arrow.version>0.8.0</arrow.version>
196+
<arrow.version>0.10.0</arrow.version>
197197

198198
<test.java.home>${java.home}</test.java.home>
199199
<test.exclude.tags></test.exclude.tags>

python/pyspark/serializers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,14 @@ def _create_batch(series, timezone):
228228
def create_array(s, t):
229229
mask = s.isnull()
230230
# Ensure timestamp series are in expected form for Spark internal representation
231+
# TODO: maybe don't need None check anymore as of Arrow 0.9.1
231232
if t is not None and pa.types.is_timestamp(t):
232233
s = _check_series_convert_timestamps_internal(s.fillna(0), timezone)
233234
# TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2
234235
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
235236
elif t is not None and pa.types.is_string(t) and sys.version < '3':
236237
# TODO: need decode before converting to Arrow in Python 2
238+
# TODO: don't need as of Arrow 0.9.1
237239
return pa.Array.from_pandas(s.apply(
238240
lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t)
239241
elif t is not None and pa.types.is_decimal(t) and \

sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,13 @@ public ArrowColumnVector(ValueVector vector) {
161161
} else if (vector instanceof ListVector) {
162162
ListVector listVector = (ListVector) vector;
163163
accessor = new ArrayAccessor(listVector);
164-
} else if (vector instanceof NullableMapVector) {
165-
NullableMapVector mapVector = (NullableMapVector) vector;
166-
accessor = new StructAccessor(mapVector);
164+
} else if (vector instanceof StructVector) {
165+
StructVector structVector = (StructVector) vector;
166+
accessor = new StructAccessor(structVector);
167167

168-
childColumns = new ArrowColumnVector[mapVector.size()];
168+
childColumns = new ArrowColumnVector[structVector.size()];
169169
for (int i = 0; i < childColumns.length; ++i) {
170-
childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
170+
childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
171171
}
172172
} else {
173173
throw new UnsupportedOperationException();
@@ -470,7 +470,7 @@ final ColumnarArray getArray(int rowId) {
470470
*/
471471
private static class StructAccessor extends ArrowVectorAccessor {
472472

473-
StructAccessor(NullableMapVector vector) {
473+
StructAccessor(StructVector vector) {
474474
super(vector);
475475
}
476476
}

sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
2121

2222
import org.apache.arrow.vector._
2323
import org.apache.arrow.vector.complex._
24-
import org.apache.arrow.vector.types.pojo.ArrowType
2524

2625
import org.apache.spark.sql.catalyst.InternalRow
2726
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
@@ -62,7 +61,7 @@ object ArrowWriter {
6261
case (ArrayType(_, _), vector: ListVector) =>
6362
val elementVector = createFieldWriter(vector.getDataVector())
6463
new ArrayWriter(vector, elementVector)
65-
case (StructType(_), vector: NullableMapVector) =>
64+
case (StructType(_), vector: StructVector) =>
6665
val children = (0 until vector.size()).map { ordinal =>
6766
createFieldWriter(vector.getChildByOrdinal(ordinal))
6867
}
@@ -129,20 +128,7 @@ private[arrow] abstract class ArrowFieldWriter {
129128
}
130129

131130
def reset(): Unit = {
132-
// TODO: reset() should be in a common interface
133-
valueVector match {
134-
case fixedWidthVector: BaseFixedWidthVector => fixedWidthVector.reset()
135-
case variableWidthVector: BaseVariableWidthVector => variableWidthVector.reset()
136-
case listVector: ListVector =>
137-
// Manual "reset" the underlying buffer.
138-
// TODO: When we upgrade to Arrow 0.10.0, we can simply remove this and call
139-
// `listVector.reset()`.
140-
val buffers = listVector.getBuffers(false)
141-
buffers.foreach(buf => buf.setZero(0, buf.capacity()))
142-
listVector.setValueCount(0)
143-
listVector.setLastSet(0)
144-
case _ =>
145-
}
131+
valueVector.reset()
146132
count = 0
147133
}
148134
}
@@ -323,7 +309,7 @@ private[arrow] class ArrayWriter(
323309
}
324310

325311
private[arrow] class StructWriter(
326-
val valueVector: NullableMapVector,
312+
val valueVector: StructVector,
327313
children: Array[ArrowFieldWriter]) extends ArrowFieldWriter {
328314

329315
override def setNull(): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ArrowColumnVectorSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
336336
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
337337
val schema = new StructType().add("int", IntegerType).add("long", LongType)
338338
val vector = ArrowUtils.toArrowField("struct", schema, nullable = false, null)
339-
.createVector(allocator).asInstanceOf[NullableMapVector]
339+
.createVector(allocator).asInstanceOf[StructVector]
340340

341341
vector.allocateNew()
342342
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
@@ -373,7 +373,7 @@ class ArrowColumnVectorSuite extends SparkFunSuite {
373373
val allocator = ArrowUtils.rootAllocator.newChildAllocator("struct", 0, Long.MaxValue)
374374
val schema = new StructType().add("int", IntegerType).add("long", LongType)
375375
val vector = ArrowUtils.toArrowField("struct", schema, nullable = true, null)
376-
.createVector(allocator).asInstanceOf[NullableMapVector]
376+
.createVector(allocator).asInstanceOf[StructVector]
377377
vector.allocateNew()
378378
val intVector = vector.getChildByOrdinal(0).asInstanceOf[IntVector]
379379
val longVector = vector.getChildByOrdinal(1).asInstanceOf[BigIntVector]

0 commit comments

Comments
 (0)