Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions phoenix5-spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,9 @@ the deprected `zkUrl` parameter for backwards compatibility purposes. If neither
it falls back to using connection defined by hbase-site.xml.
- `"jdbcUrl"` expects a full Phoenix JDBC URL, i.e. "jdbc:phoenix" or "jdbc:phoenix:zkHost:zkport",
while `"zkUrl"` expects the ZK quorum only, i.e. "zkHost:zkPort"
- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
instead of `"phoenix"`, however this is deprecated.
The `"org.apache.phoenix.spark"` datasource does not accept the `"jdbcUrl"` parameter,
only `"zkUrl"`
- DataSourceV1 implementation was removed,
source type `"org.apache.phoenix.spark"`
use the DatasourceV2 since connector 6.0.0 release.
- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and
`saveToPhoenix` use the deprecated `"org.apache.phoenix.spark"` datasource, and allow
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
Expand Down
44 changes: 0 additions & 44 deletions phoenix5-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -177,39 +171,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>


<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down Expand Up @@ -371,11 +332,6 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${jodatime.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
201 changes: 90 additions & 111 deletions phoenix5-spark/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ public Configuration getConfiguration(Configuration confToClone) {

@Test
public void basicWriteAndReadBackTest() throws SQLException {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);

SparkSession spark = SparkUtil.getSparkSession();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like spark.hadoopRDD.ignoreEmptySplits is default since 3.2.0, so removing it should be OK.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just replaced old code with a call to existing SparkUtil.getSparkSession() to avoid having duplicate code.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one doesn't have spark.hadoopRDD.ignoreEmptySplits for Spark3 (which is fine)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since 3.2.0 / SPARK-34809 spark.hadoopRDD.ignoreEmptySplits is enabled by default.

However, this uses Spark 2. Shouldn't we keep that property for Spark 2 ?

Copy link
Copy Markdown
Contributor Author

@rejeb rejeb May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just replaced old code with a call to existing SparkUtil.getSparkSession() to avoid having duplicate code and the property is there.

String tableName = generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl());
Expand All @@ -85,141 +83,122 @@ public void basicWriteAndReadBackTest() throws SQLException {
"CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)");
}

try (SparkSession spark = sqlContext.sparkSession()) {
StructType schema =
new StructType(new StructField[] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });

StructType schema =
new StructType(new StructField[] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });
// Use old zkUrl
Dataset<Row> df1 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(1, "x")),
schema);

// Use old zkUrl
Dataset<Row> df1 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(1, "x")),
schema);
df1.write().format("phoenix").mode(SaveMode.Overwrite)
.option("table", tableName)
.option(ZOOKEEPER_URL, getUrl())
.save();

// Use jdbcUrl
// In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
// quorum
String jdbcUrl = getUrl();
if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
}
Dataset<Row> df2 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(2, "x")),
schema);

df1.write().format("phoenix").mode(SaveMode.Overwrite)
df2.write().format("phoenix").mode(SaveMode.Overwrite)
.option("table", tableName)
.option(ZOOKEEPER_URL, getUrl())
.option(JDBC_URL, jdbcUrl)
.save();

// Use jdbcUrl
// In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
// quorum
String jdbcUrl = getUrl();
if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
}
Dataset<Row> df2 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(2, "x")),
schema);
// Use default from hbase-site.xml
Dataset<Row> df3 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(3, "x")),
schema);

df2.write().format("phoenix").mode(SaveMode.Overwrite)
.option("table", tableName)
.option(JDBC_URL, jdbcUrl)
.save();
df3.write().format("phoenix").mode(SaveMode.Overwrite)
.option("table", tableName)
.save();

// Use default from hbase-site.xml
Dataset<Row> df3 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(3, "x")),
schema);
try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertFalse(rs.next());
}

df3.write().format("phoenix").mode(SaveMode.Overwrite)
Dataset df1Read = spark.read().format("phoenix")
.option("table", tableName)
.save();

try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertFalse(rs.next());
}

Dataset df1Read = spark.read().format("phoenix")
.option("table", tableName)
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();

assertEquals(3l, df1Read.count());
assertEquals(3l, df1Read.count());

// Use jdbcUrl
Dataset df2Read = spark.read().format("phoenix")
.option("table", tableName)
.option(PhoenixDataSource.JDBC_URL, jdbcUrl)
.load();

assertEquals(3l, df2Read.count());
// Use jdbcUrl
Dataset df2Read = spark.read().format("phoenix")
.option("table", tableName)
.option(PhoenixDataSource.JDBC_URL, jdbcUrl)
.load();

// Use default
Dataset df3Read = spark.read().format("phoenix")
.option("table", tableName)
.load();
assertEquals(3l, df2Read.count());

assertEquals(3l, df3Read.count());
// Use default
Dataset df3Read = spark.read().format("phoenix")
.option("table", tableName)
.load();

} finally {
jsc.stop();
}
assertEquals(3l, df3Read.count());
}

@Test
public void lowerCaseWriteTest() throws SQLException {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test")
.set("spark.hadoopRDD.ignoreEmptySplits", "false");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
SparkSession spark = SparkUtil.getSparkSession();
String tableName = generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()){
stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR, \"v1\" VARCHAR)");
}
StructType schema = new StructType(new StructField[]{
new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("V1", DataTypes.StringType, false, Metadata.empty()),
new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty())
});

try(SparkSession spark = sqlContext.sparkSession()) {
//Doesn't help
spark.conf().set("spark.sql.caseSensitive", true);

StructType schema = new StructType(new StructField[]{
new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("V1", DataTypes.StringType, false, Metadata.empty()),
new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(
Arrays.asList(
RowFactory.create(1, "x", "y")),
schema);

df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", tableName)
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
.option(JDBC_URL, getUrl())
.save();

try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertEquals("y", rs.getString(3));
assertFalse(rs.next());
}
Dataset<Row> df = spark.createDataFrame(
Arrays.asList(
RowFactory.create(1, "x", "y")),
schema);

df.write()
.format("phoenix")
.mode(SaveMode.Overwrite)
.option("table", tableName)
.option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true")
.option(JDBC_URL, getUrl())
.save();

} finally {
jsc.stop();
try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertEquals("y", rs.getString(3));
assertFalse(rs.next());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter

lazy val jdbcUrl = PhoenixSparkITHelper.getUrl

lazy val quorumAddress = ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
lazy val quorumAddress = PhoenixSparkITHelper.getUrl

// Runs SQL commands located in the file defined in the sqlSource argument
// Optional argument tenantId used for running tenant-specific SQL
Expand Down
Loading