Skip to content

Commit 2e71080

Browse files
cloud-fanjzhuge
authored andcommitted
[SPARK-24971][SQL] remove SupportsDeprecatedScanRow
This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. existing tests. Author: Wenchen Fan <[email protected]> Closes apache#21921 from cloud-fan/row. (cherry picked from commit defc54c) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
1 parent 46f59bf commit 2e71080

15 files changed

Lines changed: 89 additions & 307 deletions

File tree

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
* pruning), etc. Names of these interfaces start with `SupportsPushDown`.
4040
* 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc.
4141
* Names of these interfaces start with `SupportsReporting`.
42-
* 3. Special scans. E.g, columnar scan, unsafe row scan, etc.
43-
* Names of these interfaces start with `SupportsScan`. Note that a reader should only
44-
* implement at most one of the special scans, if more than one special scans are implemented,
45-
* only one of them would be respected, according to the priority list from high to low:
46-
* {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}.
42+
* 3. Columnar scan if implements {@link SupportsScanColumnarBatch}.
4743
*
4844
* If an exception was throw when applying any of these query optimizations, the action will fail
4945
* and no Spark job will be submitted.

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
*
2929
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow}
3030
* for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data
31-
* source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row}
32-
* for data source readers that mix in {@link SupportsDeprecatedScanRow}.
31+
* source readers that mix in {@link SupportsScanColumnarBatch}.
3332
*/
3433
@InterfaceStability.Evolving
3534
public interface InputPartitionReader<T> extends Closeable {

sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsDeprecatedScanRow.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,14 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.rdd.RDD
23-
import org.apache.spark.sql.Row
2423
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
2624
import org.apache.spark.sql.catalyst.expressions._
2725
import org.apache.spark.sql.catalyst.plans.physical
2826
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
2927
import org.apache.spark.sql.execution.streaming.continuous._
3028
import org.apache.spark.sql.sources.v2.DataSourceV2
3129
import org.apache.spark.sql.sources.v2.reader._
3230
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
33-
import org.apache.spark.sql.types.StructType
3431

3532
/**
3633
* Physical plan node for scanning data from a data source.
@@ -64,13 +61,8 @@ case class DataSourceV2ScanExec(
6461
case _ => super.outputPartitioning
6562
}
6663

67-
private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match {
68-
case r: SupportsDeprecatedScanRow =>
69-
r.planRowInputPartitions().asScala.map {
70-
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow]
71-
}
72-
case _ =>
73-
reader.planInputPartitions().asScala
64+
private lazy val partitions: Seq[InputPartition[InternalRow]] = {
65+
reader.planInputPartitions().asScala
7466
}
7567

7668
private lazy val inputRDD: RDD[InternalRow] = reader match {
@@ -113,27 +105,3 @@ case class DataSourceV2ScanExec(
113105
}
114106
}
115107
}
116-
117-
class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
118-
extends InputPartition[InternalRow] {
119-
120-
override def preferredLocations: Array[String] = partition.preferredLocations
121-
122-
override def createPartitionReader: InputPartitionReader[InternalRow] = {
123-
new RowToUnsafeInputPartitionReader(
124-
partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
125-
}
126-
}
127-
128-
class RowToUnsafeInputPartitionReader(
129-
val rowReader: InputPartitionReader[Row],
130-
encoder: ExpressionEncoder[Row])
131-
132-
extends InputPartitionReader[InternalRow] {
133-
134-
override def next: Boolean = rowReader.next
135-
136-
override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow]
137-
138-
override def close(): Unit = rowReader.close()
139-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.sql.{Row, SQLContext}
2727
import org.apache.spark.sql.catalyst.InternalRow
28-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeInputPartitionReader}
28+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition}
2929
import org.apache.spark.sql.sources.v2.reader._
3030
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, PartitionOffset}
3131
import org.apache.spark.util.ThreadUtils
@@ -212,8 +212,6 @@ object ContinuousDataSourceRDD {
212212
reader: InputPartitionReader[InternalRow]): ContinuousInputPartitionReader[_] = {
213213
reader match {
214214
case r: ContinuousInputPartitionReader[InternalRow] => r
215-
case wrapped: RowToUnsafeInputPartitionReader =>
216-
wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
217215
case _ =>
218216
throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}")
219217
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.json4s.DefaultFormats
2323
import org.json4s.jackson.Serialization
2424

25-
import org.apache.spark.sql.Row
25+
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
2828
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
@@ -34,8 +34,7 @@ import org.apache.spark.sql.types.StructType
3434
case class RateStreamPartitionOffset(
3535
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
3636

37-
class RateStreamContinuousReader(options: DataSourceOptions)
38-
extends ContinuousReader with SupportsDeprecatedScanRow {
37+
class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousReader {
3938
implicit val defaultFormats: DefaultFormats = DefaultFormats
4039

4140
val creationTime = System.currentTimeMillis()
@@ -67,7 +66,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
6766

6867
override def getStartOffset(): Offset = offset
6968

70-
override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = {
69+
override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = {
7170
val partitionStartMap = offset match {
7271
case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
7372
case off =>
@@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
9190
i,
9291
numPartitions,
9392
perPartitionRate)
94-
.asInstanceOf[InputPartition[Row]]
93+
.asInstanceOf[InputPartition[InternalRow]]
9594
}.asJava
9695
}
9796

@@ -106,9 +105,9 @@ case class RateStreamContinuousInputPartition(
106105
partitionIndex: Int,
107106
increment: Long,
108107
rowsPerSecond: Double)
109-
extends InputPartition[Row] {
108+
extends InputPartition[InternalRow] {
110109

111-
override def createPartitionReader(): InputPartitionReader[Row] =
110+
override def createPartitionReader(): InputPartitionReader[InternalRow] =
112111
new RateStreamContinuousInputPartitionReader(
113112
startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
114113
}
@@ -119,12 +118,12 @@ class RateStreamContinuousInputPartitionReader(
119118
partitionIndex: Int,
120119
increment: Long,
121120
rowsPerSecond: Double)
122-
extends ContinuousInputPartitionReader[Row] {
121+
extends ContinuousInputPartitionReader[InternalRow] {
123122
private var nextReadTime: Long = startTimeMs
124123
private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong
125124

126125
private var currentValue = startValue
127-
private var currentRow: Row = null
126+
private var currentRow: InternalRow = null
128127

129128
override def next(): Boolean = {
130129
currentValue += increment
@@ -140,14 +139,14 @@ class RateStreamContinuousInputPartitionReader(
140139
return false
141140
}
142141

143-
currentRow = Row(
144-
DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(nextReadTime)),
142+
currentRow = InternalRow(
143+
DateTimeUtils.fromMillis(nextReadTime),
145144
currentValue)
146145

147146
true
148147
}
149148

150-
override def get: Row = currentRow
149+
override def get: InternalRow = currentRow
151150

152151
override def close(): Unit = {}
153152

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import java.io.IOException;
2121
import java.util.*;
2222

23-
import org.apache.spark.sql.Row;
24-
import org.apache.spark.sql.catalyst.expressions.GenericRow;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2525
import org.apache.spark.sql.sources.Filter;
2626
import org.apache.spark.sql.sources.GreaterThan;
2727
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -33,7 +33,7 @@
3333
public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
3434

3535
public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
36-
SupportsPushDownFilters, SupportsDeprecatedScanRow {
36+
SupportsPushDownFilters {
3737

3838
// Exposed for testing.
3939
public StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
@@ -79,8 +79,8 @@ public Filter[] pushedFilters() {
7979
}
8080

8181
@Override
82-
public List<InputPartition<Row>> planRowInputPartitions() {
83-
List<InputPartition<Row>> res = new ArrayList<>();
82+
public List<InputPartition<InternalRow>> planInputPartitions() {
83+
List<InputPartition<InternalRow>> res = new ArrayList<>();
8484

8585
Integer lowerBound = null;
8686
for (Filter filter : filters) {
@@ -107,7 +107,8 @@ public List<InputPartition<Row>> planRowInputPartitions() {
107107
}
108108
}
109109

110-
static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
110+
static class JavaAdvancedInputPartition implements InputPartition<InternalRow>,
111+
InputPartitionReader<InternalRow> {
111112
private int start;
112113
private int end;
113114
private StructType requiredSchema;
@@ -119,7 +120,7 @@ static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPar
119120
}
120121

121122
@Override
122-
public InputPartitionReader<Row> createPartitionReader() {
123+
public InputPartitionReader<InternalRow> createPartitionReader() {
123124
return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
124125
}
125126

@@ -130,7 +131,7 @@ public boolean next() {
130131
}
131132

132133
@Override
133-
public Row get() {
134+
public InternalRow get() {
134135
Object[] values = new Object[requiredSchema.size()];
135136
for (int i = 0; i < values.length; i++) {
136137
if ("i".equals(requiredSchema.apply(i).name())) {
@@ -139,7 +140,7 @@ public Row get() {
139140
values[i] = -start;
140141
}
141142
}
142-
return new GenericRow(values);
143+
return new GenericInternalRow(values);
143144
}
144145

145146
@Override

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323

24-
import org.apache.spark.sql.Row;
25-
import org.apache.spark.sql.catalyst.expressions.GenericRow;
24+
import org.apache.spark.sql.catalyst.InternalRow;
25+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
2626
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2727
import org.apache.spark.sql.sources.v2.DataSourceV2;
2828
import org.apache.spark.sql.sources.v2.ReadSupport;
@@ -34,7 +34,7 @@
3434

3535
public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
3636

37-
class Reader implements DataSourceReader, SupportsReportPartitioning, SupportsDeprecatedScanRow {
37+
class Reader implements DataSourceReader, SupportsReportPartitioning {
3838
private final StructType schema = new StructType().add("a", "int").add("b", "int");
3939

4040
@Override
@@ -43,7 +43,7 @@ public StructType readSchema() {
4343
}
4444

4545
@Override
46-
public List<InputPartition<Row>> planRowInputPartitions() {
46+
public List<InputPartition<InternalRow>> planInputPartitions() {
4747
return java.util.Arrays.asList(
4848
new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
4949
new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
@@ -73,7 +73,9 @@ public boolean satisfy(Distribution distribution) {
7373
}
7474
}
7575

76-
static class SpecificInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
76+
static class SpecificInputPartition implements InputPartition<InternalRow>,
77+
InputPartitionReader<InternalRow> {
78+
7779
private int[] i;
7880
private int[] j;
7981
private int current = -1;
@@ -91,8 +93,8 @@ public boolean next() throws IOException {
9193
}
9294

9395
@Override
94-
public Row get() {
95-
return new GenericRow(new Object[] {i[current], j[current]});
96+
public InternalRow get() {
97+
return new GenericInternalRow(new Object[] {i[current], j[current]});
9698
}
9799

98100
@Override
@@ -101,7 +103,7 @@ public void close() throws IOException {
101103
}
102104

103105
@Override
104-
public InputPartitionReader<Row> createPartitionReader() {
106+
public InputPartitionReader<InternalRow> createPartitionReader() {
105107
return this;
106108
}
107109
}

sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@
1919

2020
import java.util.List;
2121

22-
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.catalyst.InternalRow;
2323
import org.apache.spark.sql.sources.v2.DataSourceOptions;
2424
import org.apache.spark.sql.sources.v2.DataSourceV2;
2525
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
2626
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
2727
import org.apache.spark.sql.sources.v2.reader.InputPartition;
28-
import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow;
2928
import org.apache.spark.sql.types.StructType;
3029

3130
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
3231

33-
class Reader implements DataSourceReader, SupportsDeprecatedScanRow {
32+
class Reader implements DataSourceReader {
3433
private final StructType schema;
3534

3635
Reader(StructType schema) {
@@ -43,7 +42,7 @@ public StructType readSchema() {
4342
}
4443

4544
@Override
46-
public List<InputPartition<Row>> planRowInputPartitions() {
45+
public List<InputPartition<InternalRow>> planInputPartitions() {
4746
return java.util.Collections.emptyList();
4847
}
4948
}

0 commit comments

Comments
 (0)