Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4bf995f
Make SQL cache serialization pluggable
revans2 Jul 9, 2020
427fc68
Addressed review comments
revans2 Jul 10, 2020
0361237
Few more
revans2 Jul 10, 2020
641f28f
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 14, 2020
f989572
Fixed one set of tests
revans2 Jul 14, 2020
ad11a79
Fixed the other bug
revans2 Jul 14, 2020
cb29e47
Addressed review comments
revans2 Jul 17, 2020
c7c89bb
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 20, 2020
f0c7cfe
Addressed review comments
revans2 Jul 20, 2020
46f4021
Fixed some docs issues
revans2 Jul 20, 2020
10b0ce1
Fixed bad comment quoting, and added comments for reviews
revans2 Jul 20, 2020
38ca741
Added in abstraction for Columnar input
revans2 Jul 20, 2020
c2d00dc
Fixed formatting
revans2 Jul 21, 2020
082e883
Added in a test for columanr input and output to cache serializer
revans2 Jul 22, 2020
e58783e
Addressed review comments
revans2 Jul 22, 2020
05a430d
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 27, 2020
e0f5398
Fixed issue with columnar type for data generation
revans2 Jul 27, 2020
c9b1488
Addressed review comments
revans2 Jul 28, 2020
4ec28bb
Addressed more comments
revans2 Jul 28, 2020
7840a4c
Fixed docs
revans2 Jul 29, 2020
5723aa9
Addressed review comments
revans2 Jul 29, 2020
f8ca4af
Addressed review comments
revans2 Jul 30, 2020
ea762e5
Addressed review comments
revans2 Jul 30, 2020
ce66042
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 31, 2020
3f2f527
Addressed review comments
revans2 Jul 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ object StaticSQLConf {
.toSequence
.createOptional

val SPARK_CACHE_SERIALIZER = buildStaticConf("spark.sql.cache.serializer")
.doc("The name of a class that implements " +
"org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to " +
"translate SQL data into a format that can more efficiently be cached. The underlying " +
"API is subject to change so use with caution. Multiple classes cannot be specified. " +
"The class must have a no-arg constructor.")
.version("3.1.0")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")

val QUERY_EXECUTION_LISTENERS = buildStaticConf("spark.sql.queryExecutionListeners")
.doc("List of class names implementing QueryExecutionListener that will be automatically " +
"added to newly created sessions. The classes should have either a no-arg constructor, " +
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.columnar.{DefaultCachedBatchSerializer, InMemoryRelation}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -85,11 +84,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
val inMemoryRelation = sessionWithAqeOff.withActive {
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache)
InMemoryRelation(
sessionWithAqeOff.sessionState.conf.useCompression,
sessionWithAqeOff.sessionState.conf.columnBatchSize, storageLevel,
qe.executedPlan,
tableName,
optimizedPlan = qe.optimizedPlan)
storageLevel,
qe,
tableName)
}

this.synchronized {
Expand Down Expand Up @@ -195,9 +192,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
val sessionWithAqeOff = getOrCloneSessionWithAqeOff(spark)
val newCache = sessionWithAqeOff.withActive {
val qe = sessionWithAqeOff.sessionState.executePlan(cd.plan)
InMemoryRelation(
cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
optimizedPlan = qe.optimizedPlan)
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
}
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ class ColumnarRule {
def postColumnarTransitions: Rule[SparkPlan] = plan => plan
}

/**
* A trait that is used as a tag to indicate a transition from columns to rows. This allows plugins
* to replace the current [[ColumnarToRowExec]] with an optimized version and still have operations
* that walk a spark plan looking for this type of transition properly match it.
*/
trait ColumnarToRowTransition extends UnaryExecNode

/**
* Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of
* [[InternalRow]]. This is inserted whenever such a transition is determined to be needed.
Expand All @@ -57,7 +64,7 @@ class ColumnarRule {
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
assert(child.supportsColumnar)

override def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -479,7 +486,9 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
* Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions
* to/from columnar formatted data.
*/
case class ApplyColumnarRulesAndInsertTransitions(conf: SQLConf, columnarRules: Seq[ColumnarRule])
case class ApplyColumnarRulesAndInsertTransitions(
conf: SQLConf,
columnarRules: Seq[ColumnarRule])
extends Rule[SparkPlan] {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)()
val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
Expand All @@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
}

private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this visibility change? For me, this is irrelevant to this PR. Can we remove this change from this PR? We had better make this change when it's inevitable. Oh, got it. Sorry, I was confused between two columnar packages.

org.apache.spark.sql.columnar
org.apache.spark.sql.execution.columnar

val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: Seq[AttributeReference]) = {
val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
(AttributeMap(allStats), allStats.flatMap(_._2.schema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
* An Iterator to walk through the InternalRows from a CachedBatch
*/
abstract class ColumnarIterator extends Iterator[InternalRow] {
def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType],
def initialize(input: Iterator[DefaultCachedBatch], columnTypes: Array[DataType],
columnIndexes: Array[Int]): Unit
}

Expand Down Expand Up @@ -203,7 +203,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
return false;
}

${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next();
${classOf[DefaultCachedBatch].getName} batch =
(${classOf[DefaultCachedBatch].getName}) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
Expand Down
Loading