Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.annotation.Evolving;

/**
* A 'reducer' for output of user-defined functions.
*
* A user_defined function f_source(x) is 'reducible' on another user_defined function f_target(x),
* if there exists a 'reducer' r(x) such that r(f_source(x)) = f_target(x) for all input x.
* @param <I> reducer input type
* @param <O> reducer output type
* @since 4.0.0
*/
@Evolving
public interface Reducer<I, O> {
O reduce(I arg1);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.connector.catalog.functions;

import org.apache.spark.annotation.Evolving;
import scala.Option;

/**
* Base class for user-defined functions that can be 'reduced' on another function.
*
* A function f_source(x) is 'reducible' on another function f_target(x) if
* there exists a reducer function r(x) such that r(f_source(x)) = f_target(x) for all input x.
*
* <p>
* Examples:
* <ul>
* <li>Bucket functions
* <ul>
* <li>f_source(x) = bucket(4, x)</li>
* <li>f_target(x) = bucket(2, x)</li>
* <li>r(x) = x / 2</li>
* </ul>
* <li>Date functions
* <ul>
* <li>f_source(x) = days(x)</li>
* <li>f_target(x) = hours(x)</li>
* <li>r(x) = x / 24</li>
* </ul>
* </ul>
* @param <I> reducer function input type
* @param <O> reducer function output type
* @since 4.0.0
*/
@Evolving
public interface ReducibleFunction<I, O> {

/**
* If this function is 'reducible' on another function, return the {@link Reducer} function.
* <p>
* Example:
* <ul>
* <li>this_function = bucket(4, x)
* <li>other function = bucket(2, x)
* </ul>
* Invoke with arguments
* <ul>
* <li>other = bucket</li>
* <li>this param = Int(4)</li>
* <li>other param = Int(2)</li>
* </ul>
* @param other the other function
* @param thisParam param for this function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me what should this be beyond the bucketing case. Should we add a separate method just for the special case of bucketing?

    Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other);

    Option<Reducer<I, O>> bucketReducer(ReducibleFunction<I, O> other, int numBuckets,
                                  int otherNumBuckets);

Copy link
Member Author

@szehon-ho szehon-ho Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, made two methods and a default for both, so that implementations can pick one to override

Copy link
Member Author

@szehon-ho szehon-ho Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on second thought, we do plan to add geo partition transforms , for example xz2 ordering https://github.com/spatialx-project/geolake/blob/main/api/src/main/java/org/apache/iceberg/transforms/ExtendedZCurve.java which need 'resolution' parameter.

as well as multi-arg bucket transforms: https://docs.google.com/document/d/1aDoZqRgvDOOUVAGhvKZbp5vFstjsAMY4EFCyjlxpaaw/

So i added back the type to Object in latest pr.

This is one approach, we can probably re-use int in some of these, but it seemed cleaner to have a generic type. Let me know what you think.

Note, I tried awhile to make the method type-parameterized like

<T> Option<Reducer<I, O>> reducer(ReducibleFunction<I, O> other, T numBuckets,
                                  T otherNumBuckets);

but was not able to override it successfully in Bucket in scala

* @param otherParam param for the other function
* @return a reduction function if it is reducible, none if not
*/
Option<Reducer<I, O>> reducer(ReducibleFunction<?, ?> other, Option<?> thisParam,
Option<?> otherParam);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.connector.catalog.functions.BoundFunction
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, Reducer, ReducibleFunction}
import org.apache.spark.sql.types.DataType

/**
Expand Down Expand Up @@ -54,6 +54,47 @@ case class TransformExpression(
false
}

/**
* Whether this [[TransformExpression]]'s function is compatible with the `other`
* [[TransformExpression]]'s function.
*
* This is true if both are instances of [[ReducibleFunction]] and there exists a [[Reducer]] r(x)
* such that r(t1(x)) = t2(x), or r(t2(x)) = t1(x), for all input x.
*
* @param other the transform expression to compare to
* @return true if compatible, false if not
*/
def isCompatible(other: TransformExpression): Boolean = {
if (isSameFunction(other)) {
true
} else {
(function, other.function) match {
case (f: ReducibleFunction[_, _], o: ReducibleFunction[_, _]) =>
val reducer = f.reducer(o, numBucketsOpt, other.numBucketsOpt)
val otherReducer = o.reducer(f, other.numBucketsOpt, numBucketsOpt)
reducer.isDefined || otherReducer.isDefined
case _ => false
}
}
}

/**
* Return a [[Reducer]] for this transform expression on another
* on the transform expression.
* <p>
* A [[Reducer]] exists for a transform expression function if it is
* 'reducible' on the other expression function.
* <p>
* @return reducer function or None if not reducible on the other transform expression
*/
def reducers(other: TransformExpression): Option[Reducer[_, _]] = {
(function, other.function) match {
case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) =>
e1.reducer(e2, numBucketsOpt, other.numBucketsOpt)
case _ => None
}
}

override def dataType: DataType = function.resultType()

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -833,10 +834,44 @@ case class KeyGroupedShuffleSpec(
(left, right) match {
case (_: LeafExpression, _: LeafExpression) => true
case (left: TransformExpression, right: TransformExpression) =>
left.isSameFunction(right)
if (SQLConf.get.v2BucketingPushPartValuesEnabled &&
!SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled &&
SQLConf.get.v2BucketingAllowCompatibleTransforms) {
left.isCompatible(right)
} else {
left.isSameFunction(right)
}
case _ => false
}

/**
* Return a set of [[Reducer]] for the partition expressions of this shuffle spec,
* on the partition expressions of another shuffle spec.
* <p>
* A [[Reducer]] exists for a partition expression function of this shuffle spec if it is
* 'reducible' on the corresponding partition expression function of the other shuffle spec.
* <p>
* If a value is returned, there must be one Option[[Reducer]] per partition expression.
* A None value in the set indicates that the particular partition expression is not reducible
* on the corresponding expression on the other shuffle spec.
* <p>
* Returning none also indicates that none of the partition expressions can be reduced on the
* corresponding expression on the other shuffle spec.
*/
def reducers(other: ShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {
other match {
case otherSpec: KeyGroupedShuffleSpec =>
val results = partitioning.expressions.zip(otherSpec.partitioning.expressions).map {
case (e1: TransformExpression, e2: TransformExpression) => e1.reducers(e2)
case (_, _) => None
}

// optimize to not return a value, if none of the partition expressions are reducible
if (results.forall(p => p.isEmpty)) None else Some(results)
case _ => None
}
}

override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
// Only support partition expressions are AttributeReference for now
partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
Expand All @@ -846,6 +881,20 @@ case class KeyGroupedShuffleSpec(
}
}

object KeyGroupedShuffleSpec {
def reducePartitionValue(row: InternalRow,
expressions: Seq[Expression],
reducers: Seq[Option[Reducer[_, _]]]):
InternalRowComparableWrapper = {
val partitionVals = row.toSeq(expressions.map(_.dataType))
val reducedRow = partitionVals.zip(reducers).map{
case (v, Some(reducer: Reducer[Any, Any])) => reducer.reduce(v)
case (v, _) => v
}.toArray
InternalRowComparableWrapper(new GenericInternalRow(reducedRow), expressions)
}
}

case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
specs.exists(_.isCompatibleWith(other))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS =
buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled")
.doc("Whether to allow storage-partition join in the case where the partition transforms" +
"are compatible but not identical. This config requires both " +
s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " +
s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
"to be disabled."
)
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
.doc("The maximum number of buckets allowed.")
.version("2.4.0")
Expand Down Expand Up @@ -5233,6 +5245,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def v2BucketingAllowJoinKeysSubsetOfPartitionKeys: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)

def v2BucketingAllowCompatibleTransforms: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.read._
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -164,6 +165,18 @@ case class BatchScanExec(
(groupedParts, expressions)
}

// Also re-group the partitions if we are reducing compatible partition expressions
val finalGroupedPartitions = spjParams.reducers match {
Copy link

@rekbun rekbun Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this could produce incorrect results when joining presorted bucketed tables with compatible bucket counts.

Specifically, if we have two tables:

  1. Bucketed and sorted on the same join keys
  2. With different bucket counts, where one table's bucket count is a multiple of the other

When performing a bucketed join in Spark, it's expected that the sort order should be preserved. However, the merging or grouping process involved in the join might break these sorting guarantees, leading to incorrect results.

case Some(reducers) =>
val result = groupedPartitions.groupBy { case (row, _) =>
KeyGroupedShuffleSpec.reducePartitionValue(row, partExpressions, reducers)
}.map { case (wrapper, splits) => (wrapper.row, splits.flatMap(_._2)) }.toSeq
val rowOrdering = RowOrdering.createNaturalAscendingOrdering(
partExpressions.map(_.dataType))
result.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
case _ => groupedPartitions
}

// When partially clustered, the input partitions are not grouped by partition
// values. Here we'll need to check `commonPartitionValues` and decide how to group
// and replicate splits within a partition.
Expand All @@ -174,7 +187,7 @@ case class BatchScanExec(
.get
.map(t => (InternalRowComparableWrapper(t._1, partExpressions), t._2))
.toMap
val nestGroupedPartitions = groupedPartitions.map { case (partValue, splits) =>
val nestGroupedPartitions = finalGroupedPartitions.map { case (partValue, splits) =>
// `commonPartValuesMap` should contain the part value since it's the super set.
val numSplits = commonPartValuesMap
.get(InternalRowComparableWrapper(partValue, partExpressions))
Expand Down Expand Up @@ -207,7 +220,7 @@ case class BatchScanExec(
} else {
// either `commonPartitionValues` is not defined, or it is defined but
// `applyPartialClustering` is false.
val partitionMapping = groupedPartitions.map { case (partValue, splits) =>
val partitionMapping = finalGroupedPartitions.map { case (partValue, splits) =>
InternalRowComparableWrapper(partValue, partExpressions) -> splits
}.toMap

Expand Down Expand Up @@ -259,6 +272,7 @@ case class StoragePartitionJoinParams(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
joinKeyPositions: Option[Seq[Int]] = None,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false) {
override def equals(other: Any): Boolean = other match {
Expand Down
Loading