Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
39f3c42
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in DSv2
panbingkun Aug 20, 2022
cbe7433
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Aug 20, 2022
023414b
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v
panbingkun Aug 20, 2022
0697773
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Aug 28, 2022
676455e
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 5, 2023
9876120
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 5, 2023
7d71eb0
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 20, 2023
5c40c7c
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Feb 20, 2023
3c3651a
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 20, 2023
b1cc56c
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 21, 2023
ab50fb0
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 21, 2023
9c4a9ac
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Feb 21, 2023
56ceeac
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Jun 29, 2023
be4199c
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 18, 2023
3a296c0
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 18, 2023
f54a832
Apply suggestions from code review
panbingkun Oct 19, 2023
892de53
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 19, 2023
f0b2db4
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 20, 2023
d898932
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 22, 2023
ffd3148
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 22, 2023
54531d8
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 23, 2023
59369a5
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 23, 2023
482dea1
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 23, 2023
3b12d76
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 27, 2023
7684a94
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 27, 2023
474b963
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 30, 2023
ac7586f
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Oct 30, 2023
bcb5db7
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 30, 2023
74e52ba
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Oct 30, 2023
723e664
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Nov 1, 2023
0f5c3c7
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Nov 1, 2023
e06f867
Trigger build
panbingkun Nov 1, 2023
4af1c5e
Trigger build
panbingkun Nov 1, 2023
ee254f4
Trigger build
panbingkun Nov 1, 2023
02065eb
debug appveyor
panbingkun Nov 2, 2023
9c3940c
Temporarily set R-win version to 4.3.2
panbingkun Nov 2, 2023
b47e619
Apply suggestions from code review
panbingkun Nov 3, 2023
be5b337
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Nov 3, 2023
73ff15b
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Nov 3, 2023
e6978e3
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Nov 6, 2023
1bab140
Merge branch 'master' into v2_SHOW_TABLE_EXTENDED
panbingkun Nov 6, 2023
26ae679
fix minor
panbingkun Nov 7, 2023
ef4213a
[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
panbingkun Nov 12, 2023
d318f6b
Trigger build
panbingkun Nov 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case _ =>
}

// `ShowTableExtended` should have been converted to the v1 command if the table is v1.
case _: ShowTableExtended =>
throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW TABLE EXTENDED")

case operator: LogicalPlan =>
operator transformExpressionsDown {
// Check argument data types of higher-order functions downwards first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2PartitionCommand}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTableExtended, V2PartitionCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -49,6 +49,10 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
}
case _ => command
}
case s @ ShowTableExtended(_, _, partitionSpec @ Some(UnresolvedPartitionSpec(_, _)), _) =>
val extractPartitionSpec = new ExtractPartitionSpec(
partitionSpec.get.asInstanceOf[UnresolvedPartitionSpec])
s.copy(partitionSpec = Some(extractPartitionSpec))
}

private def resolvePartitionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ case class UnresolvedPartitionSpec(
override lazy val resolved = false
}

case class ExtractPartitionSpec(
spec: Map[String, String],
location: Option[String] = None) extends PartitionSpec {

def this(u: UnresolvedPartitionSpec) = this(u.spec, u.location)

override lazy val resolved = true
}

sealed trait FieldName extends LeafExpression with Unevaluable {
def name: Seq[String]
override def dataType: DataType = throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1559,10 +1559,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
"definitions will take precedence. See more details in SPARK-28228.")
}

def commandUnsupportedInV2TableError(name: String): Throwable = {
new AnalysisException(s"$name is not supported for v2 tables.")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the error class too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the complexity of this PR, it will be placed in a separate PR to complete the renaming or removing of the error class-related operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, just remove the error class, it will not increase the complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I misunderstood your meaning. That's right, we can remove it from error-classes. json.


def cannotResolveColumnNameAmongAttributesError(
colName: String, fieldNames: String): Throwable = {
new AnalysisException(s"""Cannot resolve column name "$colName" among ($fieldNames)""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import org.apache.spark._
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
import org.apache.spark.sql.catalyst.{TableIdentifier, WalkedTypePath}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier, WalkedTypePath}
import org.apache.spark.sql.catalyst.ScalaReflection.Schema
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedGenerator}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.ParseException
Expand Down Expand Up @@ -2089,4 +2089,17 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
toSQLId(funcName),
pattern))
}

def showTableExtendedMultiPartitionUnsupportedError(tableName: String): Throwable = {
new UnsupportedOperationException(
s"The table $tableName does not support show table extended of multiple partition.")
}

def notExistPartitionError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put it in QueryCompilationErrors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it's hard to believe we don't have a function to throw NoSuchPartitionException already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

tableName: String,
partitionIdent: InternalRow,
partitionSchema: StructType): Throwable = {
new NoSuchPartitionException(tableName, partitionIdent, partitionSchema)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ object DataSourceV2Implicits {
}
}

def isPartitionable: Boolean = table match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def supportsPartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case _: SupportsPartitionManagement => true
case _ => false
}

def asPartitionable: SupportsPartitionManagement = {
table match {
case support: SupportsPartitionManagement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case ShowTableExtended(
DatabaseInSessionCatalog(db),
pattern,
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _)) |
Some(ExtractPartitionSpec(_, _))),
output) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
assert(output.length == 4)
output.head.withName("database") +: output.tail
} else {
output
}
val tablePartitionSpec = partitionSpec.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)
val tablePartitionSpec = partitionSpec match {
case Some(UnresolvedPartitionSpec(spec, _)) => Some(spec)
case Some(ExtractPartitionSpec(spec, _)) => Some(spec)
case _ => None
}
ShowTablesCommand(Some(db), Some(pattern), newOutput, true, tablePartitionSpec)

// ANALYZE TABLE works on permanent views if the views are cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.analysis.{ExtractPartitionSpec, ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
Expand Down Expand Up @@ -368,6 +368,20 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil

case ShowTableExtended(
ResolvedNamespace(catalog, ns),
pattern,
partitionSpec @ (None | Some(ExtractPartitionSpec(_, _))),
output) =>
val tablePartitionSpec = partitionSpec.map(_.asInstanceOf[ExtractPartitionSpec].spec)
ShowTablesExec(
output,
catalog.asTableCatalog,
ns,
Some(pattern),
true,
tablePartitionSpec) :: Nil

case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) =>
val catalogManager = session.sessionState.catalogManager
val namespace = if (ns.nonEmpty) Some(ns) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,25 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec.convertToPartIdent
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, StringUtils}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, SupportsPartitionManagement, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.util.PartitioningUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec}

/**
* Physical plan node for showing tables.
Expand All @@ -33,14 +44,37 @@ case class ShowTablesExec(
output: Seq[Attribute],
catalog: TableCatalog,
namespace: Seq[String],
pattern: Option[String]) extends V2CommandExec with LeafExecNode {
pattern: Option[String],
isExtended: Boolean = false,
partitionSpec: Option[TablePartitionSpec] = None) extends V2CommandExec with LeafExecNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TablePartitionSpec is legacy one, can't you use ResolvedPartitionSpec? For example, see

partitionSpec: Option[ResolvedPartitionSpec]) extends V2CommandExec with LeafExecNode {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()

val tables = catalog.listTables(namespace.toArray)
tables.map { table =>
if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) {
rows += toCatalystRow(table.namespace().quoted, table.name(), isTempView(table))
val identifiers = catalog.listTables(namespace.toArray)
identifiers.map { identifier =>
if (pattern.map(StringUtils.filterPattern(
Seq(identifier.name()), _).nonEmpty).getOrElse(true)) {
if (!isExtended) {
rows += toCatalystRow(identifier.namespace().quoted, identifier.name(),
isTempView(identifier))
} else {
val table = catalog.loadTable(identifier)
if (!partitionSpec.isEmpty && table.isPartitionable) {
// Show the information of partitions.
rows += toCatalystRow(
identifier.namespace().quoted,
identifier.name(),
isTempView(identifier),
s"${extendedPartition(identifier, table.asPartitionable, partitionSpec)}")
} else {
// Show the information of tables.
rows += toCatalystRow(
identifier.namespace().quoted,
identifier.name(),
isTempView(identifier),
s"${extendedTable(identifier, table)}")
}
}
}
}

Expand All @@ -53,4 +87,113 @@ case class ShowTablesExec(
case _ => false
}
}

private def extendedTable(identifier: Identifier, table: Table): String = {
val results = new mutable.LinkedHashMap[String, String]()

if (!identifier.namespace().isEmpty) {
results.put("Namespace", identifier.namespace().quoted)
}
results.put("Table", identifier.name())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it to follow the v1 behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

val tableType = if (table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
results.put("Type", tableType.name)

CatalogV2Util.TABLE_RESERVED_PROPERTIES
.filterNot(_ == TableCatalog.PROP_EXTERNAL)
.foreach(propKey => {
if (table.properties.containsKey(propKey)) {
results.put(propKey.capitalize, table.properties.get(propKey))
}
})

val properties =
conf.redactOptions(table.properties.asScala.toMap).toList
.filter(kv => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(kv._1))
.sortBy(_._1).map {
case (key, value) => key + "=" + value
}.mkString("[", ",", "]")
if (table.properties().isEmpty) {
results.put("Table Properties", properties.mkString("[", ", ", "]"))
}

// Partition Provider & Partition Columns
// TODO check
if (table.isPartitionable && !table.asPartitionable.partitionSchema().isEmpty) {
results.put("Partition Provider", "Catalog")
results.put("Partition Columns", table.asPartitionable.partitionSchema().map(
field => quoteIdentifier(field.name)).mkString(", "))
}

if (table.schema().nonEmpty) results.put("Schema", table.schema().treeString)

results.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "")
}

private def extendedPartition(
identifier: Identifier,
partitionTable: SupportsPartitionManagement,
partitionSpec: Option[TablePartitionSpec]): String = {
val results = new mutable.LinkedHashMap[String, String]()

// "Partition Values"
val partitionSchema = partitionTable.partitionSchema()
val normalizedSpec = normalizePartitionSpec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not needed. The job should be done by ResolvePartitionSpec.resolvePartitionSpec, or there is some reason to bypass it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the latest version has eliminated the above logic.

partitionSpec.get,
partitionSchema,
partitionTable.name(),
conf.resolver)
requireExactMatchedPartitionSpec(identifier.toString,
normalizedSpec, partitionSchema.fieldNames)

val partitionNames = normalizedSpec.keySet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val (names, ident) = (partitionSchema.map(_.name),
convertToPartIdent(normalizedSpec, partitionSchema))
val partitionIdentifiers = partitionTable.listPartitionIdentifiers(names.toArray, ident)
partitionIdentifiers.length match {
case 0 =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functions extendedPartition() is invoked only for non-empty partition spec as I can see, or not? Is there any test for this case?

throw QueryExecutionErrors.notExistPartitionError(
identifier.toString, ident, partitionSchema)
case len if (len > 1) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
case len if (len > 1) =>
case len if len > 1 =>

throw QueryExecutionErrors.showTableExtendedMultiPartitionUnsupportedError(
identifier.toString)
case _ => // do nothing
}
val partitionIdentifier = partitionIdentifiers.head
val len = partitionSchema.length
val partitions = new Array[String](len)
val timeZoneId = conf.sessionLocalTimeZone
var i = 0
while (i < len) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop:

    var i = 0
    while (i < len) {

      i += 1
    }

can be simplified by:

    for (i <- 0 until len) {

    }

val dataType = partitionSchema(i).dataType
val partValueUTF8String =
Cast(Literal(partitionIdentifier.get(i, dataType), dataType),
StringType, Some(timeZoneId)).eval()
val partValueStr = if (partValueUTF8String == null) "null" else partValueUTF8String.toString
partitions(i) = escapePathName(partitionSchema(i).name) + "=" + escapePathName(partValueStr)
i += 1
}
val partitionValues = partitions.mkString("[", ", ", "]")
results.put("Partition Values", s"${partitionValues}")

// "Partition Parameters"
val metadata = partitionTable.loadPartitionMetadata(ident)
if (!metadata.isEmpty) {
val metadataValues = metadata.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("{", ", ", "}")
results.put("Partition Parameters", metadataValues)
}

// TODO "Created Time", "Last Access", "Partition Statistics"

results.map { case ((key, value)) =>
if (value.isEmpty) key else s"$key: $value"
}.mkString("", "\n", "\n")
}
}
Loading