Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4bf995f
Make SQL cache serialization pluggable
revans2 Jul 9, 2020
427fc68
Addressed review comments
revans2 Jul 10, 2020
0361237
Few more
revans2 Jul 10, 2020
641f28f
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 14, 2020
f989572
Fixed one set of tests
revans2 Jul 14, 2020
ad11a79
Fixed the other bug
revans2 Jul 14, 2020
cb29e47
Addressed review comments
revans2 Jul 17, 2020
c7c89bb
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 20, 2020
f0c7cfe
Addressed review comments
revans2 Jul 20, 2020
46f4021
Fixed some docs issues
revans2 Jul 20, 2020
10b0ce1
Fixed bad comment quoting, and added comments for reviews
revans2 Jul 20, 2020
38ca741
Added in abstraction for Columnar input
revans2 Jul 20, 2020
c2d00dc
Fixed formatting
revans2 Jul 21, 2020
082e883
Added in a test for columanr input and output to cache serializer
revans2 Jul 22, 2020
e58783e
Addressed review comments
revans2 Jul 22, 2020
05a430d
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 27, 2020
e0f5398
Fixed issue with columnar type for data generation
revans2 Jul 27, 2020
c9b1488
Addressed review comments
revans2 Jul 28, 2020
4ec28bb
Addressed more comments
revans2 Jul 28, 2020
7840a4c
Fixed docs
revans2 Jul 29, 2020
5723aa9
Addressed review comments
revans2 Jul 29, 2020
f8ca4af
Addressed review comments
revans2 Jul 30, 2020
ea762e5
Addressed review comments
revans2 Jul 30, 2020
ce66042
Merge branch 'master' into pluggable_cache_serializer
revans2 Jul 31, 2020
3f2f527
Addressed review comments
revans2 Jul 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ object StaticSQLConf {
.toSequence
.createOptional

val SPARK_CACHE_SERIALIZER = buildStaticConf("spark.sql.cache.serializer")
.doc("The name of a class that implements " +
"org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to " +
"translate SQL data into a format that can more efficiently be cached. The underlying " +
"API is subject to change so use with caution. Multiple classes cannot be specified. " +
"The class must have a no-arg constructor.")
.version("3.1.0")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")

val QUERY_EXECUTION_LISTENERS = buildStaticConf("spark.sql.queryExecutionListeners")
.doc("List of class names implementing QueryExecutionListener that will be automatically " +
"added to newly created sessions. The classes should have either a no-arg constructor, " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.columnar

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, BinaryType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Basic interface that all cached batches of data must support. This is primarily to allow
* for metrics to be handled outside of the encoding and decoding steps in a standard way.
*/
@DeveloperApi
@Since("3.1.0")
trait CachedBatch {
def numRows: Int
def sizeInBytes: Long
}

/**
* Provides APIs for compressing, filtering, and decompressing SQL data that will be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need a more general word other than compressing/decompressing

* persisted/cached.
*/
@DeveloperApi
@Since("3.1.0")
trait CachedBatchSerializer extends Serializable {
/**
* Run the given plan and convert its output to a implementation of [[CachedBatch]].
* @param cachedPlan the plan to run.
* @return the RDD containing the batches of data to cache.
*/
def convertForCache(cachedPlan: SparkPlan): RDD[CachedBatch]

/**
* Builds a function that can be used to filter which batches are loaded.
* In most cases extending [[SimpleMetricsCachedBatchSerializer]] will provide the filter logic
* necessary. You will need to provide metrics for this to work. [[SimpleMetricsCachedBatch]]
* provides the APIs to hold those metrics and explains the metrics used, really just min and max.
* Note that this is intended to skip batches that are not needed, and the actual filtering of
* individual rows is handled later.
* @param predicates the set of expressions to use for filtering.
* @param cachedAttributes the schema/attributes of the data that is cached. This can be helpful
* if you don't store it with the data.
* @return a function that takes the partition id and the iterator of batches in the partition.
* It returns an iterator of batches that should be loaded.
*/
def buildFilter(predicates: Seq[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch]

/**
* Decompress the cached data into a ColumnarBatch. This currently is only used for basic types
* BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType
* That may change in the future.
* @param input the cached batches that should be decompressed.
* @param cacheAttributes the attributes of the data in the batch.
* @param selectedAttributes the field that should be loaded from the data, and the order they
* should appear in the output batch.
* @param conf the configuration for the job.
* @return an RDD of the input cached batches transformed into the ColumnarBatch format.
*/
def decompressColumnar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name decompress sounds like the cache must be compressed, which may not be true. How about readCachedData and readCachedDataColumnar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about convertFromCache? We are not actually reading data from anywhere but memory, so I am a little reluctant to say it is a read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, it's also consistent with convertToCache

input: RDD[CachedBatch],
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[ColumnarBatch]

/**
* Decompress the cached batch into [[InternalRow]]. If you want this to be performant, code
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the error seems from here:

[warn] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/columnar/CachedBatchSerializer.scala:90: Could not find any member to link for "InternalRow".
[warn]   /**
[warn]   ^

[error] /home/runner/work/spark/spark/sql/core/target/java/org/apache/spark/sql/columnar/CachedBatchSerializer.java:40: error: reference not found
[error]    * Decompress the cached batch into {@link InternalRow}. If you want this to be performant, code
[error]                                              ^

InternalRow is not documented and the corresponding Javadoc is not generated via Unidoc:

.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst")))

It is currently not very easy to navigate the logs from GitHub Actions. It is being investigated at SPARK-32253. Maybe we should have a way to report the test results nicely.

For now, we can just wrap it with `...` in this case to work around instead of [[...]]

Another related issue here is that, Unidoc shows all warnings as false positive errors when there is any actual error. That's why we see the errors such as:

[info] Constructing Javadoc information...
[error] /home/runner/work/spark/spark/mllib/target/java/org/apache/spark/mllib/util/MLlibTestSparkContext.java:10: error: illegal combination of modifiers: public and protected
[error]   protected  class testImplicits {
[error]              ^
[error] /home/runner/work/spark/spark/mllib/target/java/org/apache/spark/mllib/util/MLlibTestSparkContext.java:67: error: illegal combination of modifiers: public and protected
[error]   protected  class testImplicits$ extends org.apache.spark.sql.SQLImplicits {
[error]              ^

which are actually warnings IIRC. I investigated at SPARK-20840 which I tried hard to fix it but failed.
If I am remembering right, Javadoc is being executed in a separate process inside Unidoc (via SBT) and there's no way to only show errors because warnings are also redirected to stderr. So, if the process is terminated with a non-zero exit code, it just show all warnings as errors.

* generation is advised.
* @param input the cached batches that should be decompressed.
* @param cacheAttributes the attributes of the data in the batch.
* @param selectedAttributes the field that should be loaded from the data, and the order they
* should appear in the output batch.
* @param conf the configuration for the job.
* @return RDD of the rows that were stored in the cached batches.
*/
def decompressToRows(
input: RDD[CachedBatch],
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[InternalRow]
}

/**
* A [[CachedBatch]] that stores some simple metrics that can be used for filtering of batches with
* the [[SimpleMetricsCachedBatchSerializer]].
* The metrics are returned by the stats value. For each column in the batch 5 columns of metadata
* are needed in the row.
*/
@DeveloperApi
@Since("3.1.0")
trait SimpleMetricsCachedBatch extends CachedBatch {
/**
* Holds the same as ColumnStats.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but this may be technically a little misleading because ColumnStats trait itself doesn't have upperBound and lowerBound. Since upperBound and lowerBound needs to hold a concrete value, those are designed to be declared at the derived classes like BooleanColumnStats. However, all instances having ColumnStats will hold them. Yes.

* upperBound (optional), lowerBound (Optional), nullCount: Int, rowCount: Int, sizeInBytes: Long
* Which is repeated for each column in the original data.
*/
val stats: InternalRow
override def sizeInBytes: Long =
Range.apply(4, stats.numFields, 5).map(stats.getLong).sum
}

// Currently, only use statistics from atomic types except binary type only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Maybe, binary type only -> binary type?

private object ExtractableLiteral {
def unapply(expr: Expression): Option[Literal] = expr match {
case lit: Literal => lit.dataType match {
case BinaryType => None
case _: AtomicType => Some(lit)
case _ => None
}
case _ => None
}
}

/**
* Provides basic filtering for [[CachedBatchSerializer]] implementations.
* The requirement to extend this is that all of the batches produced by your serializer are
* instances of [[SimpleMetricsCachedBatch]].
* This does not calculate the metrics needed to be stored in the batches. That is up to each
* implementation. The metrics required are really just min and max values and those are optional
* especially for complex types. Because those metrics are simple and it is likely that compression
* will also be done on the data we thought it best to let each implementation decide on the most
* efficient way to calculate the metrics, possibly combining them with compression passes that
* might also be done across the data.
*/
@DeveloperApi
@Since("3.1.0")
abstract class SimpleMetricsCachedBatchSerializer extends CachedBatchSerializer with Logging {
override def buildFilter(predicates: Seq[Expression],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit:

def func(
    para1: T,
    para2: T): R = ...

cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
val stats = new PartitionStatistics(cachedAttributes)
val statsSchema = stats.schema

def statsFor(a: Attribute): ColumnStatisticsSchema = {
stats.forAttribute(a)
}

// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true` based on statistics collected about this partition batch.
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)

case Or(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) =>
buildFilter(lhs) || buildFilter(rhs)

case EqualTo(a: AttributeReference, ExtractableLiteral(l)) =>
statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
case EqualTo(ExtractableLiteral(l), a: AttributeReference) =>
statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound

case EqualNullSafe(a: AttributeReference, ExtractableLiteral(l)) =>
statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound
case EqualNullSafe(ExtractableLiteral(l), a: AttributeReference) =>
statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound

case LessThan(a: AttributeReference, ExtractableLiteral(l)) => statsFor(a).lowerBound < l
case LessThan(ExtractableLiteral(l), a: AttributeReference) => l < statsFor(a).upperBound

case LessThanOrEqual(a: AttributeReference, ExtractableLiteral(l)) =>
statsFor(a).lowerBound <= l
case LessThanOrEqual(ExtractableLiteral(l), a: AttributeReference) =>
l <= statsFor(a).upperBound

case GreaterThan(a: AttributeReference, ExtractableLiteral(l)) => l < statsFor(a).upperBound
case GreaterThan(ExtractableLiteral(l), a: AttributeReference) => statsFor(a).lowerBound < l

case GreaterThanOrEqual(a: AttributeReference, ExtractableLiteral(l)) =>
l <= statsFor(a).upperBound
case GreaterThanOrEqual(ExtractableLiteral(l), a: AttributeReference) =>
statsFor(a).lowerBound <= l

case IsNull(a: Attribute) => statsFor(a).nullCount > 0
case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0

case In(a: AttributeReference, list: Seq[Expression])
if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty =>
list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
// This is an example to explain how it works, imagine that the id column stored as follows:
// __________________________________________
// | Partition ID | lowerBound | upperBound |
// |--------------|------------|------------|
// | p1 | '1' | '9' |
// | p2 | '10' | '19' |
// | p3 | '20' | '29' |
// | p4 | '30' | '39' |
// | p5 | '40' | '49' |
// |______________|____________|____________|
//
// A filter: df.filter($"id".startsWith("2")).
// In this case it substr lowerBound and upperBound:
// ________________________________________________________________________________________
// | Partition ID | lowerBound.substr(0, Length("2")) | upperBound.substr(0, Length("2")) |
// |--------------|-----------------------------------|-----------------------------------|
// | p1 | '1' | '9' |
// | p2 | '1' | '1' |
// | p3 | '2' | '2' |
// | p4 | '3' | '3' |
// | p5 | '4' | '4' |
// |______________|___________________________________|___________________________________|
//
// We can see that we only need to read p1 and p3.
case StartsWith(a: AttributeReference, ExtractableLiteral(l)) =>
statsFor(a).lowerBound.substr(0, Length(l)) <= l &&
l <= statsFor(a).upperBound.substr(0, Length(l))
}

// When we bind the filters we need to do it against the stats schema
val partitionFilters: Seq[Expression] = {
predicates.flatMap { p =>
val filter = buildFilter.lift(p)
val boundFilter =
filter.map(
BindReferences.bindReference(
_,
statsSchema,
allowFailures = true))

boundFilter.foreach(_ =>
filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f")))

// If the filter can't be resolved then we are missing required statistics.
boundFilter.filter(_.resolved)
}
}

def ret(index: Int, cachedBatchIterator: Iterator[CachedBatch]): Iterator[CachedBatch] = {
val partitionFilter = Predicate.create(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
cachedAttributes)

partitionFilter.initialize(index)
val schemaIndex = cachedAttributes.zipWithIndex

cachedBatchIterator.filter { cb =>
val cachedBatch = cb.asInstanceOf[SimpleMetricsCachedBatch]
if (!partitionFilter.eval(cachedBatch.stats)) {
logDebug {
val statsString = schemaIndex.map { case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
}.mkString(", ")
s"Skipping partition based on stats $statsString"
}
false
} else {
true
}
}
}
ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.columnar.{DefaultCachedBatchSerializer, InMemoryRelation}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -85,8 +84,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
val inMemoryRelation = sessionWithAqeOff.withActive {
val qe = sessionWithAqeOff.sessionState.executePlan(planToCache)
InMemoryRelation(
sessionWithAqeOff.sessionState.conf.useCompression,
sessionWithAqeOff.sessionState.conf.columnBatchSize, storageLevel,
storageLevel,
qe.executedPlan,
tableName,
optimizedPlan = qe.optimizedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Attri
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)()
val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
Expand All @@ -32,7 +32,7 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializabl
val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
}

private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this visibility change? For me, this is irrelevant to this PR. Can we remove this change from this PR? We had better make this change when it's inevitable. Oh, got it. Sorry, I was confused between two columnar packages.

org.apache.spark.sql.columnar
org.apache.spark.sql.execution.columnar

val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: Seq[AttributeReference]) = {
val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
(AttributeMap(allStats), allStats.flatMap(_._2.schema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
* An Iterator to walk through the InternalRows from a CachedBatch
*/
abstract class ColumnarIterator extends Iterator[InternalRow] {
def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType],
def initialize(input: Iterator[DefaultCachedBatch], columnTypes: Array[DataType],
columnIndexes: Array[Int]): Unit
}

Expand Down Expand Up @@ -203,7 +203,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
return false;
}

${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next();
${classOf[DefaultCachedBatch].getName} batch =
(${classOf[DefaultCachedBatch].getName}) input.next();
currentRow = 0;
numRowsInBatch = batch.numRows();
for (int i = 0; i < columnIndexes.length; i ++) {
Expand Down
Loading