Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,15 @@ sql <- function(x, ...) {
dispatchFunc("sql(sqlQuery)", x, ...)
}

#' Create a SparkDataFrame from a SparkSQL Table
#' Create a SparkDataFrame from a SparkSQL table or view
#'
#' Returns the specified Table as a SparkDataFrame. The Table must have already been registered
#' in the SparkSession.
#' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
#' have already been registered in the SparkSession.
#'
#' @param tableName The SparkSQL Table to convert to a SparkDataFrame.
#' @param tableName the qualified or unqualified name that designates a table or view. If a database
#' is specified, it identifies the table/view from the database.
#' Otherwise, it first attempts to find a temporary view with the given name
#' and then match the table/view from the current database.
#' @return SparkDataFrame
#' @rdname tableToDF
#' @name tableToDF
Expand Down
52 changes: 29 additions & 23 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ createExternalTable <- function(x, ...) {
#'
#' Caches the specified table in-memory.
#'
#' @param tableName The name of the table being cached
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @return SparkDataFrame
#' @rdname cacheTable
#' @export
Expand Down Expand Up @@ -94,7 +95,8 @@ cacheTable <- function(x, ...) {
#'
#' Removes the specified table from the in-memory cache.
#'
#' @param tableName The name of the table being uncached
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @export
Expand Down Expand Up @@ -162,14 +164,14 @@ clearCache <- function() {
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
.Deprecated("dropTempView", old = "dropTempTable")
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

Expand All @@ -178,7 +180,7 @@ dropTempTable <- function(x, ...) {
#' Drops the temporary view with the given view name in the catalog.
#' If the view has been cached before, then it will also be uncached.
#'
#' @param viewName the name of the view to be dropped.
#' @param viewName the name of the temporary view to be dropped.
#' @return TRUE if the view is dropped successfully, FALSE otherwise.
#' @rdname dropTempView
#' @name dropTempView
Expand Down Expand Up @@ -317,10 +319,10 @@ listDatabases <- function() {
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}

#' Returns a list of tables in the specified database
#' Returns a list of tables or views in the specified database
#'
#' Returns a list of tables in the specified database.
#' This includes all temporary tables.
#' Returns a list of tables or views in the specified database.
#' This includes all temporary views.
#'
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of tables.
Expand Down Expand Up @@ -349,11 +351,13 @@ listTables <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}

#' Returns a list of columns for the given table in the specified database
#' Returns a list of columns for the given table/view in the specified database
#'
#' Returns a list of columns for the given table in the specified database.
#' Returns a list of columns for the given table/view in the specified database.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table/view. If no database
#' identifier is provided, it refers to a table/view in the current database.
#' If \code{databaseName} parameter is specified, this must be an unqualified name.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
Expand Down Expand Up @@ -409,12 +413,13 @@ listFunctions <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}

#' Recover all the partitions in the directory of a table and update the catalog
#' Recovers all the partitions in the directory of a table and update the catalog
#'
#' Recover all the partitions in the directory of a table and update the catalog. The name should
#' reference a partitioned table, and not a temporary view.
#' Recovers all the partitions in the directory of a table and update the catalog. The name should
#' reference a partitioned table, and not a view.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @export
Expand All @@ -430,17 +435,18 @@ recoverPartitions <- function(tableName) {
invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
}

#' Invalidate and refresh all the cached metadata of the given table
#' Invalidates and refreshes all the cached data and metadata of the given table
#'
#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
#' Spark SQL or the external data source library it uses might cache certain metadata about a
#' table, such as the location of blocks. When those change outside of Spark SQL, users should
#' Invalidates and refreshes all the cached data and metadata of the given table. For performance
#' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
#' a table, such as the location of blocks. When those change outside of Spark SQL, users should
#' call this function to invalidate the cache.
#'
#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
#' new version cached lazily.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @rdname refreshTable
#' @name refreshTable
#' @export
Expand All @@ -456,11 +462,11 @@ refreshTable <- function(tableName) {
invisible(handledCallJMethod(catalog, "refreshTable", tableName))
}

#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path
#' Invalidates and refreshes all the cached data and metadata for SparkDataFrame containing path
#'
#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame
#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
#' everything that is cached.
#' Invalidates and refreshes all the cached data (and the associated metadata) for any
#' SparkDataFrame that contains the given data source path. Path matching is by prefix, i.e. "/"
#' would invalidate everything that is cached.
#'
#' @param path the path of the data source.
#' @rdname refreshByPath
Expand Down
27 changes: 19 additions & 8 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def listDatabases(self):
@ignore_unicode_prefix
@since(2.0)
def listTables(self, dbName=None):
"""Returns a list of tables in the specified database.
"""Returns a list of tables/views in the specified database.

If no database is specified, the current database is used.
This includes all temporary tables.
This includes all temporary views.
"""
if dbName is None:
dbName = self.currentDatabase()
Expand Down Expand Up @@ -115,7 +115,7 @@ def listFunctions(self, dbName=None):
@ignore_unicode_prefix
@since(2.0)
def listColumns(self, tableName, dbName=None):
"""Returns a list of columns for the given table in the specified database.
"""Returns a list of columns for the given table/view in the specified database.

If no database is specified, the current database is used.

Expand Down Expand Up @@ -161,14 +161,15 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, **
def createTable(self, tableName, path=None, source=None, schema=None, **options):
"""Creates a table based on the dataset in a data source.

It returns the DataFrame associated with the external table.
It returns the DataFrame associated with the table.

The data source is specified by the ``source`` and a set of ``options``.
If ``source`` is not specified, the default data source configured by
``spark.sql.sources.default`` will be used.
``spark.sql.sources.default`` will be used. When ``path`` is specified, an external table is
created from the data at the given path. Otherwise a managed table is created.

Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
created external table.
created table.

:return: :class:`DataFrame`
"""
Expand Down Expand Up @@ -276,14 +277,24 @@ def clearCache(self):

@since(2.0)
def refreshTable(self, tableName):
"""Invalidate and refresh all the cached metadata of the given table."""
"""Invalidates and refreshes all the cached data and metadata of the given table."""
self._jcatalog.refreshTable(tableName)

@since('2.1.1')
def recoverPartitions(self, tableName):
"""Recover all the partitions of the given table and update the catalog."""
"""Recovers all the partitions of the given table and update the catalog.

Only works with a partitioned table, and not a view.
"""
self._jcatalog.recoverPartitions(tableName)

@since('2.2.0')
def refreshByPath(self, path):
"""Invalidates and refreshes all the cached data (and the associated metadata) for any
DataFrame that contains the given data source path.
"""
self._jcatalog.refreshByPath(path)

def _reset(self):
"""(Internal use only) Drop all existing databases (except "default"), tables,
partitions and functions, and set the current database to "default".
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def sql(self, sqlQuery):

@since(1.0)
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.
"""Returns the specified table or view as a :class:`DataFrame`.

:return: :class:`DataFrame`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ abstract class Catalog {

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and a set of options.
* Creates a table based on the dataset in a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @param tableName is either a qualified or unqualified name that designates a table.
Expand Down Expand Up @@ -321,7 +321,7 @@ abstract class Catalog {
/**
* :: Experimental ::
* (Scala-specific)
* Creates a table from the given path based on a data source and a set of options.
* Creates a table based on the dataset in a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @param tableName is either a qualified or unqualified name that designates a table.
Expand Down Expand Up @@ -357,7 +357,7 @@ abstract class Catalog {

/**
* :: Experimental ::
* Create a table from the given path based on a data source, a schema and a set of options.
* Create a table based on the dataset in a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @param tableName is either a qualified or unqualified name that designates a table.
Expand Down Expand Up @@ -397,7 +397,7 @@ abstract class Catalog {
/**
* :: Experimental ::
* (Scala-specific)
* Create a table from the given path based on a data source, a schema and a set of options.
* Create a table based on the dataset in a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @param tableName is either a qualified or unqualified name that designates a table.
Expand Down Expand Up @@ -447,6 +447,7 @@ abstract class Catalog {

/**
* Recovers all the partitions in the directory of a table and update the catalog.
* Only works with a partitioned table, and not a view.
*
* @param tableName is either a qualified or unqualified name that designates a table.
* If no database identifier is provided, it refers to a table in the
Expand Down Expand Up @@ -493,10 +494,10 @@ abstract class Catalog {
def clearCache(): Unit

/**
* Invalidates and refreshes all the cached metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
* Invalidates and refreshes all the cached data and metadata of the given table. For performance
* reasons, Spark SQL or the external data source library it uses might cache certain metadata
* about a table, such as the location of blocks. When those change outside of Spark SQL, users
* should call this function to invalidate the cache.
*
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}

/**
* Returns a list of columns for the given table temporary view.
* Returns a list of columns for the given table/view or temporary view.
*/
@throws[AnalysisException]("table does not exist")
override def listColumns(tableName: String): Dataset[Column] = {
Expand All @@ -150,7 +150,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}

/**
* Returns a list of columns for the given table in the specified database.
* Returns a list of columns for the given table/view or temporary view in the specified database.
*/
@throws[AnalysisException]("database or table does not exist")
override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
Expand Down Expand Up @@ -273,7 +273,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* :: Experimental ::
* Creates a table from the given path based on a data source and returns the corresponding
* Creates a table from the given path and returns the corresponding
* DataFrame.
*
* @group ddl_ops
Expand All @@ -287,7 +287,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
/**
* :: Experimental ::
* (Scala-specific)
* Creates a table from the given path based on a data source and a set of options.
* Creates a table based on the dataset in a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
Expand All @@ -304,7 +304,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
/**
* :: Experimental ::
* (Scala-specific)
* Creates a table from the given path based on a data source, a schema and a set of options.
* Creates a table based on the dataset in a data source, a schema and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
Expand Down Expand Up @@ -367,6 +367,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* Recovers all the partitions in the directory of a table and update the catalog.
* Only works with a partitioned table, and not a temporary view.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a temporary view.
->
not a view.

*
* @param tableName is either a qualified or unqualified name that designates a table.
* If no database identifier is provided, it refers to a table in the
Expand Down Expand Up @@ -431,8 +432,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}

/**
* Refreshes the cache entry for a table or view, if any. For Hive metastore table, the metadata
* is refreshed. For data source tables, the schema will not be inferred and refreshed.
* Invalidates and refreshes all the cached data and metadata of the given table or view.
* For Hive metastore table, the metadata is refreshed. For data source tables, the schema will
* not be inferred and refreshed.
*
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
*
* @group cachemgmt
* @since 2.0.0
Expand All @@ -456,7 +461,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {

/**
* Refreshes the cache entry and the associated metadata for all Dataset (if any), that contain
* the given data source path.
* the given data source path. Path matching is by prefix, i.e. "/" would invalidate
Copy link
Member

@gatorsmile gatorsmile Apr 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalidate -> invalidate and refresh

We also do the re-cache, but the new version cached lazily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason in here, CatalogImpl.scala is very different from Catalog.scala - let me know if you want me to change them - for now I've updated the first sentence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I found this sentence is copied from Catalog.scala. Maybe, we can update them to

Path matching is by prefix, i.e. "/" would invalidate all the cached entries and make the new versions cached lazily.

* everything that is cached.
*
* @group cachemgmt
* @since 2.0.0
Expand Down