Skip to content

Commit 3fbc7dd

Browse files
committed
[SPARK-39552][SQL] Unify v1 and v2 DESCRIBE TABLE
### What changes were proposed in this pull request? In the PR, I propose to change output of v2 `DESCRIBE TABLE`, and make it the same as v1. In particular: 1. Return NULL instead of empty strings when any comment doesn't exist in the schema info. 2. When a v2 table has identity transformations (partition by) only, output the partitioning info in v1 style. For instance: ```sql > CREATE TABLE tbl (id bigint, data string) USING _; > DESCRIBE TABLE tbl; +-----------------------+---------+-------+ |col_name |data_type|comment| +-----------------------+---------+-------+ |id |bigint |null | |data |string |null | |# Partition Information| | | |# col_name |data_type|comment| |id |bigint |null | +-----------------------+---------+-------+ ``` Also the PR moves/adds some tests to the base traits: - "DESCRIBE TABLE of a non-partitioned table" - "DESCRIBE TABLE of a partitioned table" and addresses review comments in apache/spark#35265. ### Why are the changes needed? The changes unify outputs of v1 and v2 implementations, and make the migration process from the version v1 to v2 easier for Spark SQL users. ### Does this PR introduce _any_ user-facing change? Yes, it changes outputs of v2 `DESCRIBE TABLE`. ### How was this patch tested? By running the `DESCRIBE TABLE` test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *DescribeTableSuite" ``` and related test suites: ``` $ build/sbt "test:testOnly *DataSourceV2SQLSuite" ``` Closes #36946 from MaxGekk/unify-v1-v2-describe-table. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent a34dc4d commit 3fbc7dd

28 files changed

Lines changed: 273 additions & 122 deletions

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ class SparkContext(config: SparkConf) extends Logging {
407407
SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf)
408408

409409
SparkContext.supplementJavaModuleOptions(_conf)
410+
SparkContext.supplementJavaIPv6Options(_conf)
410411

411412
_driverLogger = DriverLogger(_conf)
412413

@@ -3049,6 +3050,18 @@ object SparkContext extends Logging {
30493050
supplement(DRIVER_JAVA_OPTIONS)
30503051
supplement(EXECUTOR_JAVA_OPTIONS)
30513052
}
3053+
3054+
private def supplementJavaIPv6Options(conf: SparkConf): Unit = {
3055+
def supplement(key: OptionalConfigEntry[String]): Unit = {
3056+
val v = conf.get(key) match {
3057+
case Some(opts) => s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6} $opts"
3058+
case None => s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"
3059+
}
3060+
conf.set(key.key, v)
3061+
}
3062+
supplement(DRIVER_JAVA_OPTIONS)
3063+
supplement(EXECUTOR_JAVA_OPTIONS)
3064+
}
30523065
}
30533066

30543067
/**

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ private[spark] object JettyUtils extends Logging {
246246
serverName: String = "",
247247
poolSize: Int = 200): ServerInfo = {
248248

249+
logInfo(s"Start Jetty $hostName:$port for $serverName")
249250
// Start the server first, with no connectors.
250251
val pool = new QueuedThreadPool(poolSize)
251252
if (serverName.nonEmpty) {

core/src/main/scala/org/apache/spark/ui/WebUI.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,19 @@ private[spark] abstract class WebUI(
140140
def initialize(): Unit
141141

142142
def initServer(): ServerInfo = {
143-
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
144-
val server = startJettyServer(host, port, sslOptions, conf, name, poolSize)
143+
val hostName = Utils.localHostNameForURI()
144+
val server = startJettyServer(hostName, port, sslOptions, conf, name, poolSize)
145145
server
146146
}
147147

148148
/** Binds to the HTTP server behind this web interface. */
149149
def bind(): Unit = {
150150
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
151151
try {
152-
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
153152
val server = initServer()
154153
handlers.foreach(server.addHandler(_, securityManager))
155154
serverInfo = Some(server)
156-
logInfo(s"Bound $className to $host, and started at $webUrl")
155+
logInfo(s"Bound $className to ${Utils.localHostNameForURI()}, and started at $webUrl")
157156
} catch {
158157
case e: Exception =>
159158
logError(s"Failed to bind $className", e)

core/src/test/scala/org/apache/spark/ui/UISuite.scala

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import org.apache.spark.util.Utils
3838

3939
class UISuite extends SparkFunSuite {
4040

41+
val localhost = Utils.localHostNameForURI()
42+
4143
/**
4244
* Create a test SparkContext with the SparkUI enabled.
4345
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
@@ -91,7 +93,7 @@ class UISuite extends SparkFunSuite {
9193
withSpark(newSparkContext()) { sc =>
9294
// test if visible from http://localhost:4040
9395
eventually(timeout(10.seconds), interval(50.milliseconds)) {
94-
val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString)
96+
val html = Utils.tryWithResource(Source.fromURL(s"http://$localhost:4040"))(_.mkString)
9597
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
9698
}
9799
}
@@ -201,41 +203,41 @@ class UISuite extends SparkFunSuite {
201203

202204
test("verify proxy rewrittenURI") {
203205
val prefix = "/worker-id"
204-
val target = "http://localhost:8081"
206+
val target = s"http://$localhost:8081"
205207
val path = "/worker-id/json"
206208
var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null)
207-
assert(rewrittenURI.toString() === "http://localhost:8081/json")
209+
assert(rewrittenURI.toString() === s"http://$localhost:8081/json")
208210
rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done")
209-
assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done")
211+
assert(rewrittenURI.toString() === s"http://$localhost:8081/json?test=done")
210212
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id", null)
211-
assert(rewrittenURI.toString() === "http://localhost:8081")
213+
assert(rewrittenURI.toString() === s"http://$localhost:8081")
212214
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/test%2F", null)
213-
assert(rewrittenURI.toString() === "http://localhost:8081/test%2F")
215+
assert(rewrittenURI.toString() === s"http://$localhost:8081/test%2F")
214216
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-id/%F0%9F%98%84", null)
215-
assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84")
217+
assert(rewrittenURI.toString() === s"http://$localhost:8081/%F0%9F%98%84")
216218
rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/worker-noid/json", null)
217219
assert(rewrittenURI === null)
218220
}
219221

220222
test("SPARK-33611: Avoid encoding twice on the query parameter of proxy rewrittenURI") {
221223
val prefix = "/worker-id"
222-
val target = "http://localhost:8081"
224+
val target = s"http://$localhost:8081"
223225
val path = "/worker-id/json"
224226
val rewrittenURI =
225227
JettyUtils.createProxyURI(prefix, target, path, "order%5B0%5D%5Bcolumn%5D=0")
226-
assert(rewrittenURI.toString === "http://localhost:8081/json?order%5B0%5D%5Bcolumn%5D=0")
228+
assert(rewrittenURI.toString === s"http://$localhost:8081/json?order%5B0%5D%5Bcolumn%5D=0")
227229
}
228230

229231
test("verify rewriting location header for reverse proxy") {
230232
val clientRequest = mock(classOf[HttpServletRequest])
231-
var headerValue = "http://localhost:4040/jobs"
232-
val targetUri = URI.create("http://localhost:4040")
233+
var headerValue = s"http://$localhost:4040/jobs"
234+
val targetUri = URI.create(s"http://$localhost:4040")
233235
when(clientRequest.getScheme()).thenReturn("http")
234-
when(clientRequest.getHeader("host")).thenReturn("localhost:8080")
236+
when(clientRequest.getHeader("host")).thenReturn(s"$localhost:8080")
235237
when(clientRequest.getPathInfo()).thenReturn("/proxy/worker-id/jobs")
236238
var newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
237-
assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs")
238-
headerValue = "http://localhost:4041/jobs"
239+
assert(newHeader.toString() === s"http://$localhost:8080/proxy/worker-id/jobs")
240+
headerValue = s"http://$localhost:4041/jobs"
239241
newHeader = JettyUtils.createProxyLocationHeader(headerValue, clientRequest, targetUri)
240242
assert(newHeader === null)
241243
}
@@ -249,7 +251,7 @@ class UISuite extends SparkFunSuite {
249251
val serverInfo = JettyUtils.startJettyServer("0.0.0.0", 0, sslOptions, conf)
250252
try {
251253
val path = "/test"
252-
val url = new URL(s"http://localhost:${serverInfo.boundPort}$path/root")
254+
val url = new URL(s"http://$localhost:${serverInfo.boundPort}$path/root")
253255

254256
assert(TestUtils.httpResponseCode(url) === HttpServletResponse.SC_NOT_FOUND)
255257

@@ -260,7 +262,7 @@ class UISuite extends SparkFunSuite {
260262
// Try a request with bad content in a parameter to make sure the security filter
261263
// is being added to new handlers.
262264
val badRequest = new URL(
263-
s"http://localhost:${serverInfo.boundPort}$path/root?bypass&invalid<=foo")
265+
s"http://$localhost:${serverInfo.boundPort}$path/root?bypass&invalid<=foo")
264266
assert(TestUtils.httpResponseCode(badRequest) === HttpServletResponse.SC_OK)
265267
assert(servlet.lastRequest.getParameter("invalid<") === null)
266268
assert(servlet.lastRequest.getParameter("invalid&lt;") !== null)
@@ -276,7 +278,7 @@ class UISuite extends SparkFunSuite {
276278
val (conf, securityMgr, sslOptions) = sslEnabledConf()
277279
val serverInfo = JettyUtils.startJettyServer("0.0.0.0", 0, sslOptions, conf)
278280
try {
279-
val serverAddr = s"http://localhost:${serverInfo.boundPort}"
281+
val serverAddr = s"http://$localhost:${serverInfo.boundPort}"
280282

281283
val (_, ctx) = newContext("/ctx1")
282284
serverInfo.addHandler(ctx, securityMgr)
@@ -285,7 +287,7 @@ class UISuite extends SparkFunSuite {
285287
assert(conn.getResponseCode() === HttpServletResponse.SC_FOUND)
286288
val location = Option(conn.getHeaderFields().get("Location"))
287289
.map(_.get(0)).orNull
288-
val expectedLocation = s"https://localhost:${serverInfo.securePort.get}/ctx(1)?a[0]=b"
290+
val expectedLocation = s"https://$localhost:${serverInfo.securePort.get}/ctx(1)?a[0]=b"
289291
assert(location == expectedLocation)
290292
}
291293
} finally {
@@ -313,9 +315,9 @@ class UISuite extends SparkFunSuite {
313315

314316
tests.foreach { case (scheme, port, expected) =>
315317
val urls = Seq(
316-
s"$scheme://localhost:$port/root",
317-
s"$scheme://localhost:$port/test1/root",
318-
s"$scheme://localhost:$port/test2/root")
318+
s"$scheme://$localhost:$port/root",
319+
s"$scheme://$localhost:$port/test1/root",
320+
s"$scheme://$localhost:$port/test2/root")
319321
urls.foreach { url =>
320322
val rc = TestUtils.httpResponseCode(new URL(url))
321323
assert(rc === expected, s"Unexpected status $rc for $url")
@@ -355,7 +357,7 @@ class UISuite extends SparkFunSuite {
355357

356358
val serverInfo = JettyUtils.startJettyServer("0.0.0.0", 0, sslOptions, conf)
357359
try {
358-
val serverAddr = s"http://localhost:${serverInfo.boundPort}"
360+
val serverAddr = s"http://$localhost:${serverInfo.boundPort}"
359361

360362
val redirect = JettyUtils.createRedirectHandler("/src", "/dst")
361363
serverInfo.addHandler(redirect, securityMgr)
@@ -395,7 +397,7 @@ class UISuite extends SparkFunSuite {
395397
try {
396398
val (_, ctx) = newContext("/ctx")
397399
serverInfo.addHandler(ctx, securityMgr)
398-
val urlStr = s"http://localhost:${serverInfo.boundPort}/ctx"
400+
val urlStr = s"http://$localhost:${serverInfo.boundPort}/ctx"
399401

400402
assert(TestUtils.httpResponseCode(new URL(urlStr + "/")) === HttpServletResponse.SC_OK)
401403

sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*/
4242
@Experimental
4343
public class CaseInsensitiveStringMap implements Map<String, String> {
44-
private final Logger logger = LoggerFactory.getLogger(CaseInsensitiveStringMap.class);
44+
private static final Logger logger = LoggerFactory.getLogger(CaseInsensitiveStringMap.class);
4545

4646
private String unsupportedOperationMsg = "CaseInsensitiveStringMap is read-only.";
4747

sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ class ExpressionSet protected(
132132
newSet
133133
}
134134

135+
override def concat(that: IterableOnce[Expression]): ExpressionSet = {
136+
val newSet = clone()
137+
that.iterator.foreach(newSet.add)
138+
newSet
139+
}
140+
135141
override def --(that: IterableOnce[Expression]): ExpressionSet = {
136142
val newSet = clone()
137143
that.iterator.foreach(newSet.remove)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ class Analyzer(override val catalogManager: CatalogManager)
451451
* Substitute child plan with WindowSpecDefinitions.
452452
*/
453453
object WindowsSubstitution extends Rule[LogicalPlan] {
454-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
454+
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
455455
_.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
456456
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
457457
case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
@@ -460,14 +460,6 @@ class Analyzer(override val catalogManager: CatalogManager)
460460
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
461461
WindowExpression(c, windowSpecDefinition)
462462
}
463-
464-
case p @ Project(projectList, _) =>
465-
projectList.foreach(_.transformDownWithPruning(
466-
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
467-
case UnresolvedWindowExpression(_, windowSpec) =>
468-
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
469-
})
470-
p
471463
}
472464
}
473465

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, Decorrela
2626
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
29+
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION
2930
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils}
3031
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
3132
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -231,7 +232,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
231232
failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup")
232233

233234
case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
234-
!e.isInstanceOf[WindowExpression] =>
235+
!e.isInstanceOf[WindowExpression] && e.resolved =>
235236
val w = e.children.find(_.isInstanceOf[WindowFunction]).get
236237
failAnalysis(s"Window function $w requires an OVER clause.")
237238

@@ -542,6 +543,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
542543
s"""Only a single table generating function is allowed in a SELECT clause, found:
543544
| ${exprs.map(_.sql).mkString(",")}""".stripMargin)
544545

546+
case p @ Project(projectList, _) =>
547+
projectList.foreach(_.transformDownWithPruning(
548+
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) {
549+
case UnresolvedWindowExpression(_, windowSpec) =>
550+
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
551+
})
552+
545553
case j: Join if !j.duplicateResolved =>
546554
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
547555
failAnalysis(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownThroughWindow.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, IntegerLiteral, NamedExpression, RankLike, RowFrame, RowNumberLike, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
20+
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentRow, DenseRank, IntegerLiteral, NamedExpression, NTile, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
2121
import org.apache.spark.sql.catalyst.plans.logical.{Limit, LocalLimit, LogicalPlan, Project, Sort, Window}
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, WINDOW}
@@ -33,7 +33,8 @@ object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
3333
// The window frame of RankLike and RowNumberLike can only be UNBOUNDED PRECEDING to CURRENT ROW.
3434
private def supportsPushdownThroughWindow(
3535
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
36-
case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _,
36+
case Alias(WindowExpression(_: Rank | _: DenseRank | _: NTile | _: RowNumber,
37+
WindowSpecDefinition(Nil, _,
3738
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
3839
case _ => false
3940
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.Row
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.dsl.plans._
23-
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
23+
import org.apache.spark.sql.catalyst.expressions.{CurrentRow, PercentRank, Rank, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding}
2424
import org.apache.spark.sql.catalyst.plans._
2525
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.rules._
@@ -187,4 +187,15 @@ class LimitPushdownThroughWindowSuite extends PlanTest {
187187
Optimize.execute(originalQuery.analyze),
188188
WithoutOptimize.execute(originalQuery.analyze))
189189
}
190+
191+
test("SPARK-38614: Should not push through percent_rank window function") {
192+
val originalQuery = testRelation
193+
.select(a, b, c,
194+
windowExpr(new PercentRank(), windowSpec(Nil, c.desc :: Nil, windowFrame)).as("rn"))
195+
.limit(2)
196+
197+
comparePlans(
198+
Optimize.execute(originalQuery.analyze),
199+
WithoutOptimize.execute(originalQuery.analyze))
200+
}
190201
}

0 commit comments

Comments
 (0)