Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ class TestHoodieSparkSqlWriter extends HoodieSparkWriterTestBase {

@Test
def testBulkInsertForDropPartitionColumn(): Unit = {
//create a new table
val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"
// create a new table
val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city")
val data =
Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco"),
Expand All @@ -278,21 +276,21 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
(1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", "driver-P", 34.15, "sao_paulo"),
(1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", "driver-T", 17.85, "chennai"));

var inserts = spark.createDataFrame(data).toDF(columns: _*)
val inserts = spark.createDataFrame(data).toDF(columns: _*)
inserts.write.format("hudi").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city").
option(HoodieWriteConfig.TABLE_NAME, tableName).
option(HoodieWriteConfig.TBL_NAME.key(), hoodieFooTableName).
option("hoodie.datasource.write.recordkey.field", "uuid").
option("hoodie.datasource.write.precombine.field", "rider").
option("hoodie.datasource.write.operation", "bulk_insert").
option("hoodie.datasource.write.hive_style_partitioning", "true").
option("hoodie.populate.meta.fields", "false").
option("hoodie.datasource.write.drop.partition.columns", "true").
mode(SaveMode.Overwrite).
save(basePath)
save(tempBasePath)

// Ensure the partition column (i.e 'city') can be read back
val tripsDF = spark.read.format("hudi").load(basePath)
val tripsDF = spark.read.format("hudi").load(tempBasePath)
tripsDF.show()
tripsDF.select("city").foreach(row => {
assertNotNull(row)
Expand All @@ -302,7 +300,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo")
val partitionPaths = new Array[String](3)
for (i <- partitionPaths.indices) {
partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i))
partitionPaths(i) = String.format("%s/%s/*", tempBasePath, partitions(i))
}
val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), partitionPaths(1), partitionPaths(2))
rawFileDf.show()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class TestJdbcSource extends UtilitiesTestBase {

private static final TypedProperties PROPS = new TypedProperties();
private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator();
private static final String SECRETS_PATH = String.format("%s/%s", sharedTempDir.toAbsolutePath(), "hudi/config/secret");
private static Connection connection;

@BeforeAll
Expand Down Expand Up @@ -384,7 +385,7 @@ public void testSourceWithPasswordOnFs() {
// Remove secret string from props
PROPS.remove("hoodie.streamer.jdbc.password");
// Set property to read secret from fs file
PROPS.setProperty("hoodie.streamer.jdbc.password.file", "file:///tmp/hudi/config/secret");
PROPS.setProperty("hoodie.streamer.jdbc.password.file", SECRETS_PATH);
PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
// Add 10 records with commit time 000
clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
Expand Down Expand Up @@ -441,7 +442,7 @@ public void testSourceWithStorageLevel() {

private void writeSecretToFs() throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
FSDataOutputStream outputStream = fs.create(new Path("file:///tmp/hudi/config/secret"));
FSDataOutputStream outputStream = fs.create(new Path(SECRETS_PATH));
outputStream.writeBytes(JDBC_PASS);
outputStream.close();
}
Expand Down