Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ package object config {
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

// Note: This is a SQL config but needs to be in core because it's cross-session and can not put
// in SQLConf.
private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
Expand Down
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ object MimaExcludes {
// [SPARK-16967] Move Mesos to Module
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
// [SPARK-16240] ML persistence backward compatibility for LDA
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView")
)
}

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def tables(self, dbName=None):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
Row(database=u'', tableName=u'table1', isTemporary=True)
"""
if dbName is None:
return DataFrame(self._ssql_ctx.tables(), self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ statement
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList? (COMMENT STRING)?
(PARTITIONED ON identifierList)?
(TBLPROPERTIES tablePropertyList)? AS query #createView
| CREATE (OR REPLACE)? TEMPORARY VIEW
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
| ALTER VIEW tableIdentifier AS? query #alterViewQuery
Expand Down Expand Up @@ -669,7 +670,7 @@ nonReserved
| MAP | ARRAY | STRUCT
| LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER
| DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED
| EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
| EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS
| GROUPING | CUBE | ROLLUP
| EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
| TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
Expand Down Expand Up @@ -857,6 +858,7 @@ CACHE: 'CACHE';
UNCACHE: 'UNCACHE';
LAZY: 'LAZY';
FORMATTED: 'FORMATTED';
GLOBAL: 'GLOBAL';
TEMPORARY: 'TEMPORARY' | 'TEMP';
OPTIONS: 'OPTIONS';
UNSET: 'UNSET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,12 @@ class Analyzer(
i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
case u: UnresolvedRelation =>
val table = u.tableIdentifier
if (table.database.isDefined && conf.runSQLonFile &&
if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) &&
(!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
// If the table does not exist, and the database part is specified, and we support
// running SQL directly on files, then let's just return the original UnresolvedRelation.
// It is possible we are matching a query like "select * from parquet.`/path/to/query`".
// The plan will get resolved later.
// If the database part is specified, and we support running SQL directly on files, and
// it's not a temporary view, and the table does not exist, then let's just return the
// original UnresolvedRelation. It is possible we are matching a query like "select *
// from parquet.`/path/to/query`". The plan will get resolved later.
// Note that we are testing (!db_exists || !table_exists) because the catalog throws
// an exception from tableExists if the database does not exist.
u
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.catalog

import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.StringUtils


/**
* A thread-safe manager for global temporary views, providing atomic operations to manage them,
* e.g. create, update, remove, etc.
*
* Note that, the view name is always case-sensitive here, callers are responsible to format the
* view name w.r.t. case-sensitive config.
*
* @param database The system preserved virtual database that keeps all the global temporary views.
*/
class GlobalTempViewManager(val database: String) {

/** List of view definitions, mapping from view name to logical plan. */
@GuardedBy("this")
private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]

/**
* Returns the global view definition which matches the given name, or None if not found.
*/
def get(name: String): Option[LogicalPlan] = synchronized {
viewDefinitions.get(name)
}

/**
* Creates a global temp view, or issue an exception if the view already exists and
* `overrideIfExists` is false.
*/
def create(
name: String,
viewDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
if (!overrideIfExists && viewDefinitions.contains(name)) {
throw new TempTableAlreadyExistsException(name)
}
viewDefinitions.put(name, viewDefinition)
}

/**
* Updates the global temp view if it exists, returns true if updated, false otherwise.
*/
def update(
name: String,
viewDefinition: LogicalPlan): Boolean = synchronized {
if (viewDefinitions.contains(name)) {
viewDefinitions.put(name, viewDefinition)
true
} else {
false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we use update and when do we use create?

Copy link
Contributor Author

@cloud-fan cloud-fan Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE VIEW and ALTER VIEW.

We can have a single API for them, but need to introduce a write mode: errorIfExists, overrideIfExists(if not exists, create one), updateIfExists(if not exists, do nothing).


/**
* Removes the global temp view if it exists, returns true if removed, false otherwise.
*/
def remove(name: String): Boolean = synchronized {
viewDefinitions.remove(name).isDefined
}

/**
* Renames the global temp view if the source view exists and the destination view not exists, or
* issue an exception if the source view exists but the destination view already exists. Returns
* true if renamed, false otherwise.
*/
def rename(oldName: String, newName: String): Boolean = synchronized {
if (viewDefinitions.contains(oldName)) {
if (viewDefinitions.contains(newName)) {
throw new AnalysisException(
s"rename temporary view from '$oldName' to '$newName': destination view already exists")
}

val viewDefinition = viewDefinitions(oldName)
viewDefinitions.remove(oldName)
viewDefinitions.put(newName, viewDefinition)
true
} else {
false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is reason that failing to rename has two behavior (when source does not exist, we return false. But, when destination already exists, we thrown an error)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaviour comes from the previous temp view: If the source doesn't exist, try persisted table/view, if source exists but destination already exists, throw an error.

}

/**
* Lists the names of all global temporary views.
*/
def listViewNames(pattern: String): Seq[String] = synchronized {
StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern)
}

/**
* Clears all the global temporary views.
*/
def clear(): Unit = synchronized {
viewDefinitions.clear()
}
}
Loading