Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ private[sql] object CatalogV2Implicits {

implicit class BucketSpecHelper(spec: BucketSpec) {
def asTransform: BucketTransform = {
val references = spec.bucketColumnNames.map(col => reference(Seq(col)))
if (spec.sortColumnNames.nonEmpty) {
throw QueryCompilationErrors.cannotConvertBucketWithSortColumnsToTransformError(spec)
val sortedCol = spec.sortColumnNames.map(col => reference(Seq(col)))
bucket(spec.numBuckets, references.toArray, sortedCol.toArray)
} else {
bucket(spec.numBuckets, references.toArray)
}

val references = spec.bucketColumnNames.map(col => reference(Seq(col)))
bucket(spec.numBuckets, references.toArray)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ private[sql] object LogicalExpressions {
def bucket(numBuckets: Int, references: Array[NamedReference]): BucketTransform =
BucketTransform(literal(numBuckets, IntegerType), references)

def bucket(
numBuckets: Int,
references: Array[NamedReference],
sortedCols: Array[NamedReference]): BucketTransform =
BucketTransform(literal(numBuckets, IntegerType), references, sortedCols)

def identity(reference: NamedReference): IdentityTransform = IdentityTransform(reference)

def years(reference: NamedReference): YearsTransform = YearsTransform(reference)
Expand Down Expand Up @@ -97,7 +103,8 @@ private[sql] abstract class SingleColumnTransform(ref: NamedReference) extends R

private[sql] final case class BucketTransform(
numBuckets: Literal[Int],
columns: Seq[NamedReference]) extends RewritableTransform {
columns: Seq[NamedReference],
sortedColumns: Seq[NamedReference] = Seq.empty[NamedReference]) extends RewritableTransform {

override val name: String = "bucket"

Expand All @@ -107,7 +114,13 @@ private[sql] final case class BucketTransform(

override def arguments: Array[Expression] = numBuckets +: columns.toArray

override def describe: String = s"bucket(${arguments.map(_.describe).mkString(", ")})"
override def describe: String =
if (sortedColumns.nonEmpty) {
s"bucket(${arguments.map(_.describe).mkString(", ")}," +
s" ${sortedColumns.map(_.describe).mkString(", ")})"
} else {
s"bucket(${arguments.map(_.describe).mkString(", ")})"
}

override def toString: String = describe

Expand All @@ -117,23 +130,30 @@ private[sql] final case class BucketTransform(
}

private[sql] object BucketTransform {
def unapply(expr: Expression): Option[(Int, FieldReference)] = expr match {
def unapply(expr: Expression): Option[(Int, FieldReference, FieldReference)] =
expr match {
case transform: Transform =>
transform match {
case BucketTransform(n, FieldReference(parts)) =>
Some((n, FieldReference(parts)))
case BucketTransform(n, FieldReference(parts), FieldReference(sortCols)) =>
Some((n, FieldReference(parts), FieldReference(sortCols)))
case _ =>
None
}
case _ =>
None
}

def unapply(transform: Transform): Option[(Int, NamedReference)] = transform match {
def unapply(transform: Transform): Option[(Int, NamedReference, NamedReference)] =
transform match {
case NamedTransform("bucket", Seq(
Lit(value: Int, IntegerType),
Ref(partCols: Seq[String]),
Ref(sortCols: Seq[String]))) =>
Some((value, FieldReference(partCols), FieldReference(sortCols)))
case NamedTransform("bucket", Seq(
Lit(value: Int, IntegerType),
Ref(seq: Seq[String]))) =>
Some((value, FieldReference(seq)))
Ref(partCols: Seq[String]))) =>
Some((value, FieldReference(partCols), FieldReference(Seq.empty[String])))
case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, Star, TableAlreadyExistsException, UnresolvedRegex}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.JoinType
Expand Down Expand Up @@ -1384,11 +1384,6 @@ object QueryCompilationErrors {
new AnalysisException("Cannot use interval type in the table schema.")
}

def cannotConvertBucketWithSortColumnsToTransformError(spec: BucketSpec): Throwable = {
new AnalysisException(
s"Cannot convert bucketing with sort columns to a transform: $spec")
}

def cannotConvertTransformsToPartitionColumnsError(nonIdTransforms: Seq[Transform]): Throwable = {
new AnalysisException("Transforms cannot be converted to partition columns: " +
nonIdTransforms.map(_.describe).mkString(", "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class InMemoryTable(
case (v, t) =>
throw new IllegalArgumentException(s"Match: unsupported argument(s) type - ($v, $t)")
}
case BucketTransform(numBuckets, ref) =>
case BucketTransform(numBuckets, ref, _) =>
val (value, dataType) = extractor(ref.fieldNames, cleanedSchema, row)
val valueHashCode = if (value == null) 0 else value.hashCode
((valueHashCode + 31 * dataType.hashCode()) & Integer.MAX_VALUE) % numBuckets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ class TransformExtractorSuite extends SparkFunSuite {
}

bucketTransform match {
case BucketTransform(numBuckets, FieldReference(seq)) =>
case BucketTransform(numBuckets, FieldReference(seq), _) =>
assert(numBuckets === 16)
assert(seq === Seq("a", "b"))
case _ =>
fail("Did not match BucketTransform extractor")
}

transform("unknown", ref("a", "b")) match {
case BucketTransform(_, _) =>
case BucketTransform(_, _, _) =>
fail("Matched unknown transform")
case _ =>
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ private[sql] object V2SessionCatalog {
case IdentityTransform(FieldReference(Seq(col))) =>
identityCols += col

case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))
case BucketTransform(numBuckets, FieldReference(Seq(col)), FieldReference(Seq(sortCol))) =>
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, sortCol :: Nil))

case transform =>
throw QueryExecutionErrors.unsupportedPartitionTransformError(transform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,24 @@ class DataSourceV2SQLSuite
}
}

test("create table using - with sorted bucket") {
val identifier = "testcat.table_name"
withTable(identifier) {
sql(s"CREATE TABLE $identifier (a int, b string, c int) USING $v2Source PARTITIONED BY (c)" +
s" CLUSTERED BY (b) SORTED by (a) INTO 4 BUCKETS")
val table = getTableMetadata(identifier)
val describe = spark.sql(s"DESCRIBE $identifier")
val part1 = describe
.filter("col_name = 'Part 0'")
.select("data_type").head.getString(0)
assert(part1 === "c")
val part2 = describe
.filter("col_name = 'Part 1'")
.select("data_type").head.getString(0)
assert(part2 === "bucket(4, b, a)")
}
}

test("REFRESH TABLE: v2 table") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down