-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14124] [SQL] [FOLLOWUP] Implement Database-related DDL Commands #12081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 106 commits
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
c2a872c
ab6dbd7
4276356
2dab708
0458770
1debdfa
763706d
4de6ec1
9422a4f
52bdf48
1e95df3
fab24cf
8b2e33b
2ee1876
b9f0090
ade6f7e
9fd63d2
5199d49
404214c
c001dd9
59daa48
41d5f64
472a6e3
0fba10a
cbf73b3
c08f561
474df88
3d9828d
72d2361
07afea5
8bf2007
7cd839e
a9ddedc
87a165b
7e4b4dc
522a494
16ac0b1
b9359cd
65bd090
babf2da
9e09469
35f9b42
3c03dbf
8b6f86b
50a8e4a
f666fa2
b120b77
f1fb8bb
b2cb03d
d0b541b
d1f6792
2ba5a73
f3337fa
4a57a80
4890c65
1618f50
924f132
04e1d83
3ae138b
ec4f595
9cf48e1
38130a8
eeec2c1
09cc36d
592ba1c
3353a39
807abaf
4989110
83a1915
0483145
236a5f4
8a3e230
8f185be
08aaa4d
fa5f124
44fdf83
64f704e
206b7e2
006ea2d
3350529
bd1caa5
4c8af84
a2d71fd
375b692
1da261a
0c0dc8a
7c4b2f0
38f3af9
ae6285f
95d9a00
2ec4d59
59a9805
b885f7b
8089c6f
a6c7518
838eaeb
55cf518
b190b86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,7 +40,10 @@ import org.apache.spark.sql.types._ | |
| * unless 'ifNotExists' is true. | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name | ||
| * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name | ||
| * [COMMENT database_comment] | ||
| * [LOCATION database_directory] | ||
| * [WITH DBPROPERTIES (property_name=property_value, ...)]; | ||
| * }}} | ||
| */ | ||
| case class CreateDatabase( | ||
|
|
@@ -57,7 +60,7 @@ case class CreateDatabase( | |
| CatalogDatabase( | ||
| databaseName, | ||
| comment.getOrElse(""), | ||
| path.getOrElse(catalog.getDefaultDBPath(databaseName)), | ||
| catalog.createDatabasePath(databaseName, path), | ||
|
||
| props), | ||
| ifNotExists) | ||
| Seq.empty[Row] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,49 +95,85 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) | ||
| } | ||
|
|
||
| private def dropTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { | ||
| catalog.dropTable(name, ignoreIfNotExists = false) | ||
| } | ||
|
||
|
|
||
| private def appendTrailingSlash(path: String): String = { | ||
| if (!path.endsWith(File.separator)) path + File.separator else path | ||
| } | ||
|
|
||
| test("the qualified path of a database is stored in the catalog") { | ||
| val catalog = sqlContext.sessionState.catalog | ||
|
|
||
| val path = System.getProperty("java.io.tmpdir") | ||
| // The generated temp path is not qualified. | ||
| assert(!path.startsWith("file:/")) | ||
| sql(s"CREATE DATABASE db1 LOCATION '$path'") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri | ||
| assert("file" === pathInCatalog.getScheme) | ||
| assert(path === pathInCatalog.getPath) | ||
|
|
||
| withSQLConf( | ||
| SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir"))) { | ||
| sql(s"CREATE DATABASE db2") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| // The generated temp path is not qualified. | ||
| assert(!path.startsWith("file:/")) | ||
| sql(s"CREATE DATABASE db1 LOCATION '$path'") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri | ||
| assert("file" === pathInCatalog.getScheme) | ||
| assert(s"${sqlContext.conf.warehousePath}/db2.db" === pathInCatalog.getPath) | ||
| } | ||
| val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path | ||
| assert(expectedPath === pathInCatalog.getPath) | ||
|
|
||
| withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { | ||
| sql(s"CREATE DATABASE db2") | ||
| val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri | ||
| assert("file" === pathInCatalog.getScheme) | ||
| val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db" | ||
| assert(expectedPath === pathInCatalog.getPath) | ||
| } | ||
|
|
||
| sql("DROP DATABASE db1") | ||
| sql("DROP DATABASE db2") | ||
| sql("DROP DATABASE db1") | ||
| sql("DROP DATABASE db2") | ||
| } | ||
| } | ||
|
|
||
| test("Create/Drop Database") { | ||
| withSQLConf( | ||
| SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
|
|
||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
|
|
||
| val databaseNames = Seq("db1", "`database`") | ||
| sql(s"CREATE DATABASE $dbName") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
| expectedLocation, | ||
| Map.empty)) | ||
| sql(s"DROP DATABASE $dbName CASCADE") | ||
| assert(!catalog.databaseExists(dbNameWithoutBackTicks)) | ||
| } finally { | ||
| catalog.reset() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("Create/Drop Database - location") { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| val dbPath = "file:" + path | ||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
|
|
||
| sql(s"CREATE DATABASE $dbName") | ||
| sql(s"CREATE DATABASE $dbName Location '$path'") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + System.getProperty("java.io.tmpdir") + | ||
| File.separator + s"$dbNameWithoutBackTicks.db" | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
| expectedLocation, | ||
| if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath, | ||
| Map.empty)) | ||
| sql(s"DROP DATABASE $dbName CASCADE") | ||
| assert(!catalog.databaseExists(dbNameWithoutBackTicks)) | ||
|
|
@@ -149,77 +185,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| } | ||
|
|
||
| test("Create Database - database already exists") { | ||
| withSQLConf( | ||
| SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
|
|
||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| sql(s"CREATE DATABASE $dbName") | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + System.getProperty("java.io.tmpdir") + | ||
| File.separator + s"$dbNameWithoutBackTicks.db" | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
| expectedLocation, | ||
| Map.empty)) | ||
|
|
||
| val message = intercept[AnalysisException] { | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
|
|
||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| sql(s"CREATE DATABASE $dbName") | ||
| }.getMessage | ||
| assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) | ||
| } finally { | ||
| catalog.reset() | ||
| val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) | ||
| val expectedLocation = | ||
| "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
| assert(db1 == CatalogDatabase( | ||
| dbNameWithoutBackTicks, | ||
| "", | ||
| expectedLocation, | ||
| Map.empty)) | ||
|
|
||
| val message = intercept[AnalysisException] { | ||
| sql(s"CREATE DATABASE $dbName") | ||
| }.getMessage | ||
| assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) | ||
| } finally { | ||
| catalog.reset() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("Alter/Describe Database") { | ||
| withSQLConf( | ||
| SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
| withTempDir { tmpDir => | ||
| val path = tmpDir.toString | ||
| withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val databaseNames = Seq("db1", "`database`") | ||
|
|
||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| val location = | ||
| "file:" + System.getProperty("java.io.tmpdir") + | ||
| File.separator + s"$dbNameWithoutBackTicks.db" | ||
|
|
||
| sql(s"CREATE DATABASE $dbName") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "") :: Nil) | ||
|
|
||
| sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) | ||
|
|
||
| sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) | ||
| } finally { | ||
| catalog.reset() | ||
| databaseNames.foreach { dbName => | ||
| try { | ||
| val dbNameWithoutBackTicks = cleanIdentifier(dbName) | ||
| val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" | ||
|
|
||
| sql(s"CREATE DATABASE $dbName") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "") :: Nil) | ||
|
|
||
| sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) | ||
|
|
||
| sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") | ||
|
|
||
| checkAnswer( | ||
| sql(s"DESCRIBE DATABASE EXTENDED $dbName"), | ||
| Row("Database Name", dbNameWithoutBackTicks) :: | ||
| Row("Description", "") :: | ||
| Row("Location", location) :: | ||
| Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) | ||
| } finally { | ||
| catalog.reset() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -251,7 +288,42 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | |
| } | ||
| } | ||
|
|
||
| // TODO: test drop database in restrict mode | ||
| test("drop non-empty database in restrict mode") { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val dbName = "db1" | ||
| sql(s"CREATE DATABASE $dbName") | ||
|
|
||
| // create a table in database | ||
| val tableIdent1 = TableIdentifier("tab1", Some(dbName)) | ||
| createTable(catalog, tableIdent1) | ||
|
|
||
| // drop a non-empty database in Restrict mode | ||
| val message = intercept[AnalysisException] { | ||
| sql(s"DROP DATABASE $dbName RESTRICT") | ||
| }.getMessage | ||
| assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) | ||
|
|
||
| dropTable(catalog, tableIdent1) | ||
| assert(catalog.listDatabases().contains(dbName)) | ||
| sql(s"DROP DATABASE $dbName RESTRICT") | ||
| assert(!catalog.listDatabases().contains(dbName)) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to check this db is removed, right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. Thanks! |
||
|
|
||
| test("drop non-empty database in cascade mode") { | ||
| val catalog = sqlContext.sessionState.catalog | ||
| val dbName = "db1" | ||
| sql(s"CREATE DATABASE $dbName") | ||
|
|
||
| // create a table in database | ||
| val tableIdent1 = TableIdentifier("tab1", Some(dbName)) | ||
| createTable(catalog, tableIdent1) | ||
|
|
||
| // drop a non-empty database in CASCADE mode | ||
| assert(catalog.listTables(dbName).contains(tableIdent1)) | ||
| assert(catalog.listDatabases().contains(dbName)) | ||
| sql(s"DROP DATABASE $dbName CASCADE") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to check this both this table and db is removed, right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do. Thanks! |
||
| assert(!catalog.listDatabases().contains(dbName)) | ||
| } | ||
|
|
||
| test("create table in default db") { | ||
| val catalog = sqlContext.sessionState.catalog | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this doesn't seem to belong to the session catalog. It's only used in 1 place so I would just move it to
CreateDatabaseitself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did try it, but it involves a lot of code changes.
The major issue is the type mismatch between the
pathincase class CreateDatabaseand thelocationUriinCatalogDatabase. The first one has to beOption[String]since users might not providelocationin the create table command. The second one has to beStringsince we always have thelocation.https://github.com/gatorsmile/spark/blob/mkdir/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala#L63
If we change the type of
locationUriinCatalogDatabasetoOption[String], the related code changes look not clean.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just call this
defaultDatabasePathThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do the change.