Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: global
displayTitle: Spark SQL and DataFrame Guide
displayTitle: Spark SQL, DataFrames and Datasets Guide
title: Spark SQL and DataFrames
---

Expand All @@ -9,18 +9,51 @@ title: Spark SQL and DataFrames

# Overview

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided
by Spark SQL provide Spark with more about the structure of both the data and the computation being performed. Internally,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a word missing between "more" and "about" like information?

Spark SQL uses this extra information to perform extra optimizations. There are several ways to
interact with Spark SQL including SQL, the DataFrames API and the Datasets API. When computing a result
the same execution engine is used, independent of which API/language you are using to express the
computation. This unification means that developers can easily switch back and forth between the
various APIs based on which provides the most natural way to express a given transformation.

Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the [Hive Tables](#hive-tables) section.
All of the examples on this page use sample data included in the Spark distribution and can be run in
the `spark-shell`, `pyspark` shell, or `sparkR` shell.

# DataFrames
## SQL

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
One use of Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL.
Spark SQL can also be used to read data from an existing Hive installation. For more on how to
configure this feature, please refer to the [Hive Tables](#hive-tables) section. When running
SQL from within another programming language the results will be returned as a [DataFrame](#DataFrames).
You can also interact with the SQL interface using the [command-line](#running-the-spark-sql-cli)
or over [JDBC/ODBC](#running-the-thrift-jdbcodbc-server).

The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame), and [R](api/R/index.html).
## DataFrames

All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell.
A DataFrame is a distributed collection of data organized into named columns. It is conceptually
equivalent to a table in a relational database or a data frame in R/Python, but with richer
optimizations under the hood. DataFrames can be constructed from a wide array of [sources](#data-sources) such
as: structured data files, tables in Hive, external databases, or existing RDDs.

The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame),
[Java](api/java/index.html?org/apache/spark/sql/DataFrame.html),
[Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame), and [R](api/R/index.html).

## Datasets

A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of
RDDs (strong typing, ability to use powerful lambda functions) with the benifits of Spark SQL's
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benifits -> benefits

optimized execution engine. A Dataset can be [constructed](#creating-datasets) from JVM objects and then manipulated
using functional transformations (map, flatMap, filter, etc.).

The unified Dataset API can be used both in [Scala](api/scala/index.html#org.apache.spark.sql.Dataset) and
[Java](api/java/index.html?org/apache/spark/sql/Dataset.html). Python does not yet have support for
the Dataset API, but due to its dynamic nature many of the benifits are already available (i.e. you can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benifits -> benefits

access the field of a row by name naturally `row.columnName`). Full python support will be added
in a future release.

# Getting Started

## Starting Point: SQLContext

Expand Down Expand Up @@ -428,6 +461,45 @@ df <- sql(sqlContext, "SELECT * FROM table")
</div>


## Creating Datasets

Datasets are similar to RDDs, however, instead of using Java Serialization or Kryo they use
a specialized [Encoder](api/scala/index.html#org.apache.spark.sql.Encoder) to serialize the objects
for processing or transmitting over the network. While both encoders and standard serialization are
responsible for during an object into bytes, encoders are code generated dynamically and use a format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during -> turning?

that allows Spark to perform many operations like filtering, sorting and hashing without deserialzing
the back into an object.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bytes back into an object?


<div class="codetabs">
<div data-lang="scala" markdown="1">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2 whitespaces


{% highlight scala %}
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 whitespaces here too

val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
{% endhighlight %}

</div>
</div>

## Interoperating with RDDs

Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first
Expand Down
49 changes: 46 additions & 3 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,61 @@ package org.apache.spark.sql

import java.lang.reflect.Modifier

import org.apache.spark.annotation.Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import order


import scala.annotation.implicitNotFound
import scala.reflect.{ClassTag, classTag}

import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
import org.apache.spark.sql.catalyst.expressions.{DecodeUsingSerializer, BoundReference, EncodeUsingSerializer}
import org.apache.spark.sql.types._

/**
* :: Experimental ::
* Used to convert a JVM object of type `T` to and from the internal Spark SQL representation.
*
* Encoders are not intended to be thread-safe and thus they are allow to avoid internal locking
* and reuse internal buffers to improve performance.
* == Scala ==
* Encoders are generally created automatically through implicits from a `SQLContext`.
*
* {{{
* import sqlContext.implicits._
*
* val ds = Seq(1, 2, 3).toDS() // implicitly provided (sqlContext.implicits.newIntEncoder)
* }}}
*
* == Java ==
* Encoders are specified by calling static methods on [[Encoders]].
*
* {{{
* List<String> data = Arrays.asList("abc", "abc", "xyz");
* Dataset<String> ds = context.createDataset(data, Encoders.STRING());
* }}}
*
* Encoders can be composed into tuples:
*
* {{{
* Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
* List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a");
* Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);
* }}}
*
* Or constructed from Java Beans:
*
* {{{
* Encoders.bean(MyClass.class);
* }}}
*
* == Implementation ==
* - Encoders are not intended to be thread-safe and thus they are allowed to avoid internal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this sentence: "allowed to avoid" is troubling me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, is that better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's way clearer, yup.

* locking and reuse internal buffers to improve performance.
*
* @since 1.6.0
*/
@Experimental
@implicitNotFound("Unable to find encoder for type stored in a Dataset. Primitive types " +
"(Int, String, etc) and Product types (case classes) are supported by importing " +
"sqlContext.implicits._ Support for serializing other types will be added in future " +
"releases.")
trait Encoder[T] extends Serializable {

/** Returns the schema of encoding this type of object as a Row. */
Expand All @@ -43,10 +84,12 @@ trait Encoder[T] extends Serializable {
}

/**
* Methods for creating encoders.
* :: Experimental ::
* Methods for creating an [[Encoder]].
*
* @since 1.6.0
*/
@Experimental
object Encoders {

/**
Expand Down
20 changes: 19 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,25 @@ class TypedColumn[-T, U](

/**
* :: Experimental ::
* A column in a [[DataFrame]].
* A column that will be computed based on the data in a [[DataFrame]].
*
* A new column is constructed based on the input columns present in a dataframe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also mention literal here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good idea.

*
* {{{
* df("columnName") // On a specific DataFrame.
* col("columnName") // A generic column no yet associcated with a DataFrame.
* col("columnName.field") // Extracting a struct field
* col("`a.column.with.dots`") // Escape `.` in column names.
* $"columnName" // Scala short hand for a named column.
* expr("a + 1") // A column that is constructed from a parsed SQL Expression.
* }}}
*
* [[Column]] objects can be composed to form complex expressions:
*
* {{{
* $"a" + 1
* $"a" === $"b"
* }}}
*
* @groupname java_expr_ops Java-specific expression operators
* @groupname expr_ops Expression operators
Expand Down