-
Notifications
You must be signed in to change notification settings - Fork 60
PHOENIX-7407 Remove deprecated datasource V1 code from spark2 and spark3 connector #145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,10 +73,8 @@ public Configuration getConfiguration(Configuration confToClone) { | |
|
|
||
| @Test | ||
| public void basicWriteAndReadBackTest() throws SQLException { | ||
| SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") | ||
| .set("spark.hadoopRDD.ignoreEmptySplits", "false"); | ||
| JavaSparkContext jsc = new JavaSparkContext(sparkConf); | ||
| SQLContext sqlContext = new SQLContext(jsc); | ||
|
|
||
| SparkSession spark = SparkUtil.getSparkSession(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since 3.2.0 / SPARK-34809 spark.hadoopRDD.ignoreEmptySplits is enabled by default. However, this uses Spark 2. Shouldn't we keep that property for Spark 2 ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just replaced old code with a call to existing |
||
| String tableName = generateUniqueName(); | ||
|
|
||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
|
|
@@ -85,141 +83,122 @@ public void basicWriteAndReadBackTest() throws SQLException { | |
| "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)"); | ||
| } | ||
|
|
||
| try (SparkSession spark = sqlContext.sparkSession()) { | ||
| StructType schema = | ||
| new StructType(new StructField[] { | ||
| new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), | ||
| new StructField("v1", DataTypes.StringType, false, Metadata.empty()) }); | ||
|
|
||
| StructType schema = | ||
| new StructType(new StructField[] { | ||
| new StructField("id", DataTypes.IntegerType, false, Metadata.empty()), | ||
| new StructField("v1", DataTypes.StringType, false, Metadata.empty()) }); | ||
| // Use old zkUrl | ||
| Dataset<Row> df1 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(1, "x")), | ||
| schema); | ||
|
|
||
| // Use old zkUrl | ||
| Dataset<Row> df1 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(1, "x")), | ||
| schema); | ||
| df1.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .option(ZOOKEEPER_URL, getUrl()) | ||
| .save(); | ||
|
|
||
| // Use jdbcUrl | ||
| // In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK | ||
| // quorum | ||
| String jdbcUrl = getUrl(); | ||
| if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) { | ||
| jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl; | ||
| } | ||
| Dataset<Row> df2 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(2, "x")), | ||
| schema); | ||
|
|
||
| df1.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| df2.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .option(ZOOKEEPER_URL, getUrl()) | ||
| .option(JDBC_URL, jdbcUrl) | ||
| .save(); | ||
|
|
||
| // Use jdbcUrl | ||
| // In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK | ||
| // quorum | ||
| String jdbcUrl = getUrl(); | ||
| if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) { | ||
| jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl; | ||
| } | ||
| Dataset<Row> df2 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(2, "x")), | ||
| schema); | ||
| // Use default from hbase-site.xml | ||
| Dataset<Row> df3 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(3, "x")), | ||
| schema); | ||
|
|
||
| df2.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .option(JDBC_URL, jdbcUrl) | ||
| .save(); | ||
| df3.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .save(); | ||
|
|
||
| // Use default from hbase-site.xml | ||
| Dataset<Row> df3 = | ||
| spark.createDataFrame( | ||
| Arrays.asList(RowFactory.create(3, "x")), | ||
| schema); | ||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
| Statement stmt = conn.createStatement()) { | ||
| ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); | ||
| assertTrue(rs.next()); | ||
| assertEquals(1, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertTrue(rs.next()); | ||
| assertEquals(2, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertTrue(rs.next()); | ||
| assertEquals(3, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertFalse(rs.next()); | ||
| } | ||
|
|
||
| df3.write().format("phoenix").mode(SaveMode.Overwrite) | ||
| Dataset df1Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .save(); | ||
|
|
||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
| Statement stmt = conn.createStatement()) { | ||
| ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); | ||
| assertTrue(rs.next()); | ||
| assertEquals(1, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertTrue(rs.next()); | ||
| assertEquals(2, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertTrue(rs.next()); | ||
| assertEquals(3, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertFalse(rs.next()); | ||
| } | ||
|
|
||
| Dataset df1Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .option(PhoenixDataSource.JDBC_URL, getUrl()).load(); | ||
| .option(PhoenixDataSource.JDBC_URL, getUrl()).load(); | ||
|
|
||
| assertEquals(3l, df1Read.count()); | ||
| assertEquals(3l, df1Read.count()); | ||
|
|
||
| // Use jdbcUrl | ||
| Dataset df2Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .option(PhoenixDataSource.JDBC_URL, jdbcUrl) | ||
| .load(); | ||
|
|
||
| assertEquals(3l, df2Read.count()); | ||
| // Use jdbcUrl | ||
| Dataset df2Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .option(PhoenixDataSource.JDBC_URL, jdbcUrl) | ||
| .load(); | ||
|
|
||
| // Use default | ||
| Dataset df3Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .load(); | ||
| assertEquals(3l, df2Read.count()); | ||
|
|
||
| assertEquals(3l, df3Read.count()); | ||
| // Use default | ||
| Dataset df3Read = spark.read().format("phoenix") | ||
| .option("table", tableName) | ||
| .load(); | ||
|
|
||
| } finally { | ||
| jsc.stop(); | ||
| } | ||
| assertEquals(3l, df3Read.count()); | ||
| } | ||
|
|
||
| @Test | ||
| public void lowerCaseWriteTest() throws SQLException { | ||
| SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test") | ||
| .set("spark.hadoopRDD.ignoreEmptySplits", "false"); | ||
| JavaSparkContext jsc = new JavaSparkContext(sparkConf); | ||
| SQLContext sqlContext = new SQLContext(jsc); | ||
| SparkSession spark = SparkUtil.getSparkSession(); | ||
| String tableName = generateUniqueName(); | ||
|
|
||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
| Statement stmt = conn.createStatement()){ | ||
| stmt.executeUpdate("CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR, \"v1\" VARCHAR)"); | ||
| } | ||
| StructType schema = new StructType(new StructField[]{ | ||
| new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()), | ||
| new StructField("V1", DataTypes.StringType, false, Metadata.empty()), | ||
| new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty()) | ||
| }); | ||
|
|
||
| try(SparkSession spark = sqlContext.sparkSession()) { | ||
| //Doesn't help | ||
| spark.conf().set("spark.sql.caseSensitive", true); | ||
|
|
||
| StructType schema = new StructType(new StructField[]{ | ||
| new StructField("ID", DataTypes.IntegerType, false, Metadata.empty()), | ||
| new StructField("V1", DataTypes.StringType, false, Metadata.empty()), | ||
| new StructField("\"v1\"", DataTypes.StringType, false, Metadata.empty()) | ||
| }); | ||
|
|
||
| Dataset<Row> df = spark.createDataFrame( | ||
| Arrays.asList( | ||
| RowFactory.create(1, "x", "y")), | ||
| schema); | ||
|
|
||
| df.write() | ||
| .format("phoenix") | ||
| .mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true") | ||
| .option(JDBC_URL, getUrl()) | ||
| .save(); | ||
|
|
||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
| Statement stmt = conn.createStatement()) { | ||
| ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); | ||
| assertTrue(rs.next()); | ||
| assertEquals(1, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertEquals("y", rs.getString(3)); | ||
| assertFalse(rs.next()); | ||
| } | ||
| Dataset<Row> df = spark.createDataFrame( | ||
| Arrays.asList( | ||
| RowFactory.create(1, "x", "y")), | ||
| schema); | ||
|
|
||
| df.write() | ||
| .format("phoenix") | ||
| .mode(SaveMode.Overwrite) | ||
| .option("table", tableName) | ||
| .option(PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER,"true") | ||
| .option(JDBC_URL, getUrl()) | ||
| .save(); | ||
|
|
||
| } finally { | ||
| jsc.stop(); | ||
| try (Connection conn = DriverManager.getConnection(getUrl()); | ||
| Statement stmt = conn.createStatement()) { | ||
| ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); | ||
| assertTrue(rs.next()); | ||
| assertEquals(1, rs.getInt(1)); | ||
| assertEquals("x", rs.getString(2)); | ||
| assertEquals("y", rs.getString(3)); | ||
| assertFalse(rs.next()); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like spark.hadoopRDD.ignoreEmptySplits is default since 3.2.0, so removing it should be OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just replaced old code with a call to existing
SparkUtil.getSparkSession()to avoid having duplicate code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That one doesn't have spark.hadoopRDD.ignoreEmptySplits for Spark3 (which is fine)