Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 105 additions & 40 deletions phoenix5-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ limitations under the License.
phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as DataFrames,
and enables persisting DataFrames back to Phoenix.

## Configuration properties

| Name | Default | Usage | Description |
|---------------------------|---------|-------|-----------------------------------------------------------------------------------------------------------------------------------------------|
| table | empty | R/W | table name as `namespace.table_name` |
| zkUrl | empty | R/W | (Optional) List of zookeeper hosts. Deprecated, use `jdbcUrl` instead. Recommended not to set, value will be taken from hbase-site.xml |
| jdbcUrl | empty | R/W | (Optional) jdbc url connection to database as `jdbc:phoenix:zkHost:zkport`. Recommended not to set, value will be taken from hbase-site.xml |
| dateAsTimestamp | false | R | Cast Date to Timestamp |
| doNotMapColumnFamily | false | R | For non default column family. Do not prefix column with column family name |
| TenantId | empty | R/W | Define tenantId when reading from multitenant table |
| phoenixconfigs | empty | R/W | Comma seperated value of hbase/phoenix config to override. (property=value,property=value) |
| skipNormalizingIdentifier | empty | W | skip normalize identifier |

## Reading Phoenix Tables

Given a Phoenix table with the following DDL and DML:
Expand Down Expand Up @@ -45,12 +58,14 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "TABLE1"))
.option("table", "TABLE1")
.load

df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show

spark.stop()
```
Java example:
```java
Expand All @@ -65,21 +80,68 @@ public class PhoenixSparkRead {
public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load data from TABLE1
Dataset<Row> df = sqlContext
Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "TABLE1")
.load();
df.createOrReplaceTempView("TABLE1");

SQLContext sqlCtx = new SQLContext(jsc);
df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df = spark.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
df.show();
jsc.stop();

spark.stop();
}
}
```

### Load as a DataFrame using SparkSql and the DataSourceV2 API
Scala example:
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}

val spark = SparkSession
.builder()
.appName("phoenix-test")
.master("local")
.config("spark.hadoopRDD.ignoreEmptySplits", "false")
.getOrCreate()

// Load data from TABLE1
spark.sql("CREATE TABLE TABLE1_SQL USING phoenix OPTIONS ('table' 'TABLE1')")

val df = spark.sql(s"SELECT ID FROM TABLE1_SQL where COL1='test_row_1'")

df.show

spark.stop()
```
Java example:
```java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class PhoenixSparkRead {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load data from TABLE1
Dataset<Row> df = spark.sql("CREATE TABLE TABLE1_SQL USING phoenix OPTIONS ('table' 'TABLE1'");

df = spark.sql("SELECT * FROM TABLE1_SQL WHERE COL1='test_row_1' AND ID=1L");
df.show();

spark.stop();
}
}
```
Expand All @@ -89,8 +151,8 @@ public class PhoenixSparkRead {
### Save DataFrames to Phoenix using DataSourceV2

The `save` is method on DataFrame allows passing in a data source type. You can use
`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to
specify which table and server to persist the DataFrame to. The column names are derived from
`phoenix` for DataSourceV2 and must also pass in a `table` parameter to
specify which table to persist the DataFrame to. The column names are derived from
the DataFrame's schema field names, and must match the Phoenix column names.

The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
Expand Down Expand Up @@ -118,15 +180,16 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "INPUT_TABLE"))
.option("table", "INPUT_TABLE")
.load

// Save to OUTPUT_TABLE
df.write
.format("phoenix")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "OUTPUT_TABLE"))
.option("table", "OUTPUT_TABLE")
.save()
spark.stop()
```
Java example:
```java
Expand All @@ -141,12 +204,11 @@ public class PhoenixSparkWriteFromInputTable {

public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Load INPUT_TABLE
Dataset<Row> df = sqlContext
Dataset<Row> df = spark
.read()
.format("phoenix")
.option("table", "INPUT_TABLE")
Expand All @@ -158,15 +220,16 @@ public class PhoenixSparkWriteFromInputTable {
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.save();
jsc.stop();

spark.stop();
}
}
```

### Save from an external RDD with a schema to a Phoenix table

Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and
`zkUrl` parameters indicating which table and server to persist the DataFrame to.
Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` parameter
indicating which table to persist the DataFrame to.

Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table
that you save to.
Expand Down Expand Up @@ -200,13 +263,15 @@ val schema = StructType(
val rowRDD = spark.sparkContext.parallelize(dataSet)

// Apply the schema to the RDD.
val df = spark.sqlContext.createDataFrame(rowRDD, schema)
val df = spark.createDataFrame(rowRDD, schema)

df.write
.format("phoenix")
.options(Map("table" -> "OUTPUT_TABLE"))
.option("table", "OUTPUT_TABLE")
.mode(SaveMode.Overwrite)
.save()

spark.stop()
```
Java example:
```java
Expand All @@ -230,10 +295,7 @@ public class PhoenixSparkWriteFromRDDWithSchema {
public static void main() throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = sqlContext.sparkSession();
Dataset<Row> df;
SparkSessinon spark = SparkSession.builder().config(sparkConf).getOrCreate();

// Generate the schema based on the fields
List<StructField> fields = new ArrayList<>();
Expand All @@ -249,14 +311,14 @@ public class PhoenixSparkWriteFromRDDWithSchema {
}

// Create a DataFrame from the rows and the specified schema
df = spark.createDataFrame(rows, schema);
Dataset<Row> df = spark.createDataFrame(rows, schema);
df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", "OUTPUT_TABLE")
.save();
jsc.stop();

spark.stop();
}
}
```
Expand All @@ -270,13 +332,13 @@ it falls back to using connection defined by hbase-site.xml.
- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport",
while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter,
only `"zkUrl"`
instead of `"phoenix"`, however this is deprecated.
The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter,
only `"zkUrl"`
- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
as well as an optional `zkUrl` parameter.
`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
as well as an optional `zkUrl` parameter.

- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver
to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
Expand All @@ -285,7 +347,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.options(Map("table" -> "Table1", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.load;
```
This list of properties is parsed and populated into a properties map which is passed to
Expand All @@ -300,7 +362,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.sqlContext
.read
.format("phoenix")
.options(Map("table" -> "Table1", "jdbcUrl" -> "jdbc:phoenix:phoenix-server:2181", "doNotMapColumnFamily" -> "true"))
.options(Map("table" -> "Table1", "doNotMapColumnFamily" -> "true"))
.load;
```

Expand All @@ -316,14 +378,15 @@ create the DataFrame or RDD directly if you need fine-grained configuration.
### Load as a DataFrame directly using a Configuration object
```scala
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'

val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "phoenix-test", sparkConf)
val sqlContext = new SQLContext(sc)

Expand All @@ -337,13 +400,13 @@ df.show

### Load as an RDD, using a Zookeeper URL
```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.rdd.RDD

val sparkConf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
val sc = new SparkContext("local", "phoenix-test", sparkConf)
val sc = new SparkContext("local", "phoenix-test")

// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
Expand All @@ -368,10 +431,12 @@ CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, co
```

```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "phoenix-test", sparkConf)
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))

sc
Expand All @@ -381,4 +446,4 @@ sc
Seq("ID","COL1","COL2"),
zkUrl = Some("phoenix-server:2181")
)
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -148,7 +147,7 @@ public void basicWriteAndReadBackTest() throws SQLException {

Dataset df1Read = spark.read().format("phoenix")
.option("table", tableName)
.option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();

assertEquals(3l, df1Read.count());

Expand All @@ -173,7 +172,6 @@ public void basicWriteAndReadBackTest() throws SQLException {
}

@Test
@Ignore // Spark3 seems to be unable to handle mixed case colum names
public void lowerCaseWriteTest() throws SQLException {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
Expand Down Expand Up @@ -205,7 +203,8 @@ public void lowerCaseWriteTest() throws SQLException {
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", tableName)
.option(ZOOKEEPER_URL, getUrl())
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
.option(JDBC_URL, getUrl())
.save();

try (Connection conn = DriverManager.getConnection(getUrl());
Expand Down
Loading