Skip to content

Commit 3827b7b

Browse files
author
Vinaykumar Bhat
committed
[HUDI-7040] Handle dropping of partition columns in BulkInsertDataInternalWriterHelper::write(...)
Issue: There are two configs which when set in a certain manner throws exceptions or asserts 1. Configs to disable populating metadata fields (for each row) 2. Configs to drop partition columns (to save storage space) from a row With #1 and #2, partition paths cannot be deduced using partition columns (as the partition columns are dropped higher up the stack. BulkInsertDataInternalWriterHelper::write(...) relied on metadata fields to extract partition path in such cases. But with #1 it is not possible resulting in asserts/exceptions. The fix is to push down the dropping of partition columns down the stack after partition path is computed. The fix manipulates the raw 'InternalRow' row structure by only copying the relevent fields into a new 'InternalRow' structure. Each row is processed individually to drop the partition columns and copy it a to new 'InternalRow'
1 parent 4c3a1db commit 3827b7b

6 files changed

Lines changed: 101 additions & 31 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,10 @@ public boolean shouldAllowMultiWriteOnSameInstant() {
13901390
return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE);
13911391
}
13921392

1393+
public boolean shouldDropPartitionColumns() {
1394+
return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS);
1395+
}
1396+
13931397
public String getWriteStatusClassName() {
13941398
return getString(WRITE_STATUS_CLASS_NAME);
13951399
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.hudi.HoodieDatasetBulkInsertHelper;
2122
import org.apache.hudi.client.WriteStatus;
2223
import org.apache.hudi.common.model.HoodieRecord;
2324
import org.apache.hudi.common.util.Option;
@@ -38,11 +39,16 @@
3839
import java.io.IOException;
3940
import java.util.ArrayList;
4041
import java.util.HashMap;
42+
import java.util.HashSet;
4143
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Objects;
46+
import java.util.Set;
4447
import java.util.UUID;
4548

49+
import scala.collection.JavaConversions;
50+
import scala.collection.JavaConverters;
51+
4652
/**
4753
* Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2.
4854
*/
@@ -124,7 +130,33 @@ public void write(InternalRow row) throws IOException {
124130
lastKnownPartitionPath = partitionPath.clone();
125131
}
126132

127-
handle.write(row);
133+
boolean shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns();
134+
if (shouldDropPartitionColumns) {
135+
// Drop the partition columns from the row
136+
// Using the deprecated JavaConversions to be compatible with scala versions < 2.12. Once hudi support for scala versions < 2.12 is
137+
// stopped, can move this to JavaConverters.seqAsJavaList(...)
138+
List<String> partitionCols = JavaConversions.<String>seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
139+
Set<Integer> partitionIdx = new HashSet<Integer>();
140+
for (String col : partitionCols) {
141+
partitionIdx.add(this.structType.fieldIndex(col));
142+
}
143+
144+
// Relies on InternalRow::toSeq(...) preserving the column ordering based on the supplied schema
145+
// Using the deprecated JavaConversions to be compatible with scala versions < 2.12.
146+
List<Object> cols = JavaConversions.<Object>seqAsJavaList(row.toSeq(structType));
147+
int idx = 0;
148+
List<Object> newCols = new ArrayList<Object>();
149+
for (Object o : cols) {
150+
if (!partitionIdx.contains(idx)) {
151+
newCols.add(o);
152+
}
153+
idx += 1;
154+
}
155+
InternalRow newRow = InternalRow.fromSeq(JavaConverters.<Object>asScalaIteratorConverter(newCols.iterator()).asScala().toSeq());
156+
handle.write(newRow);
157+
} else {
158+
handle.write(row);
159+
}
128160
} catch (Throwable t) {
129161
LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
130162
throw t;

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper
6262
def prepareForBulkInsert(df: DataFrame,
6363
config: HoodieWriteConfig,
6464
partitioner: BulkInsertPartitioner[Dataset[Row]],
65-
shouldDropPartitionColumns: Boolean,
6665
instantTime: String): Dataset[Row] = {
6766
val populateMetaFields = config.populateMetaFields()
6867
val schema = df.schema
@@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper
128127
HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
129128
}
130129

131-
val trimmedDF = if (shouldDropPartitionColumns) {
132-
dropPartitionColumns(updatedDF, config)
133-
} else {
134-
updatedDF
135-
}
136-
137130
val targetParallelism =
138-
deduceShuffleParallelism(trimmedDF, config.getBulkInsertShuffleParallelism)
131+
deduceShuffleParallelism(updatedDF, config.getBulkInsertShuffleParallelism)
139132

140-
partitioner.repartitionRecords(trimmedDF, targetParallelism)
133+
partitioner.repartitionRecords(updatedDF, targetParallelism)
141134
}
142135

143136
/**
@@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper
243236
}
244237
}
245238

246-
private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
247-
val partitionPathFields = getPartitionPathFields(config).toSet
248-
val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
249-
if (nestedPartitionPathFields.nonEmpty) {
250-
logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields")
251-
}
252-
253-
val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq
254-
255-
df.drop(partitionPathCols: _*)
256-
}
257-
258239
private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = {
259240
val keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)
260241
val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
261242
keyGenerator.getPartitionPathFields.asScala
262243
}
244+
245+
def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = {
246+
val partitionPathFields = getPartitionPathFields(config).toSet
247+
val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
248+
249+
return (partitionPathFields -- nestedPartitionPathFields).toSeq
250+
}
251+
263252
}

hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ public final HoodieWriteResult execute(Dataset<Row> records, boolean isTablePart
9595
table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime));
9696

9797
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows = getPartitioner(populateMetaFields, isTablePartitioned);
98-
boolean shouldDropPartitionColumns = writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS());
99-
Dataset<Row> hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime);
98+
Dataset<Row> hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, instantTime);
10099

101100
preExecute();
102101
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = buildHoodieWriteMetadata(doExecute(hoodieDF, bulkInsertPartitionerRows.arePartitionRecordsSorted()));

hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField)
128128
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
129129
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
130130
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
131-
new NonSortPartitionerWithRows(), false, "0000000001");
131+
new NonSortPartitionerWithRows(), "0000000001");
132132
StructType resultSchema = result.schema();
133133

134134
assertEquals(result.count(), 10);
@@ -172,7 +172,7 @@ public void testBulkInsertHelperNoMetaFields() {
172172
.build();
173173
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
174174
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
175-
new NonSortPartitionerWithRows(), false, "000001111");
175+
new NonSortPartitionerWithRows(), "000001111");
176176
StructType resultSchema = result.schema();
177177

178178
assertEquals(result.count(), 10);
@@ -209,7 +209,7 @@ public void testBulkInsertPreCombine(boolean enablePreCombine) {
209209
rows.addAll(updates);
210210
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
211211
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
212-
new NonSortPartitionerWithRows(), false, "000001111");
212+
new NonSortPartitionerWithRows(), "000001111");
213213
StructType resultSchema = result.schema();
214214

215215
assertEquals(result.count(), enablePreCombine ? 10 : 15);
@@ -313,7 +313,7 @@ public void testNoPropsSet() {
313313
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
314314
try {
315315
Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
316-
new NonSortPartitionerWithRows(), false, "000001111");
316+
new NonSortPartitionerWithRows(), "000001111");
317317
preparedDF.count();
318318
fail("Should have thrown exception");
319319
} catch (Exception e) {
@@ -325,7 +325,7 @@ public void testNoPropsSet() {
325325
dataset = sqlContext.createDataFrame(rows, structType);
326326
try {
327327
Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
328-
new NonSortPartitionerWithRows(), false, "000001111");
328+
new NonSortPartitionerWithRows(), "000001111");
329329
preparedDF.count();
330330
fail("Should have thrown exception");
331331
} catch (Exception e) {
@@ -337,7 +337,7 @@ public void testNoPropsSet() {
337337
dataset = sqlContext.createDataFrame(rows, structType);
338338
try {
339339
Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
340-
new NonSortPartitionerWithRows(), false, "000001111");
340+
new NonSortPartitionerWithRows(), "000001111");
341341
preparedDF.count();
342342
fail("Should have thrown exception");
343343
} catch (Exception e) {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql._
3939
import org.apache.spark.sql.functions.{expr, lit}
4040
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
4141
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
42-
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
42+
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertNull, assertTrue, fail}
4343
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
4444
import org.junit.jupiter.params.ParameterizedTest
4545
import org.junit.jupiter.params.provider.Arguments.arguments
@@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter {
365365
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields)
366366
}
367367

368+
@Test
369+
def testBulkInsertForDropPartitionColumn(): Unit = {
370+
//create a new table
371+
val tableName = "trips_table"
372+
val basePath = "file:///tmp/trips_table"
373+
val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city")
374+
val data =
375+
Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco"),
376+
(1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-M", 27.70, "san_francisco"),
377+
(1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco"),
378+
(1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", "driver-P", 34.15, "sao_paulo"),
379+
(1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", "driver-T", 17.85, "chennai"));
380+
381+
var inserts = spark.createDataFrame(data).toDF(columns: _*)
382+
inserts.write.format("hudi").
383+
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city").
384+
option(HoodieWriteConfig.TABLE_NAME, tableName).
385+
option("hoodie.datasource.write.recordkey.field", "uuid").
386+
option("hoodie.datasource.write.precombine.field", "rider").
387+
option("hoodie.datasource.write.operation", "bulk_insert").
388+
option("hoodie.datasource.write.hive_style_partitioning", "true").
389+
option("hoodie.populate.meta.fields", "false").
390+
option("hoodie.datasource.write.drop.partition.columns", "true").
391+
mode(SaveMode.Overwrite).
392+
save(basePath)
393+
394+
// Ensure the partition column (i.e 'city') can be read back
395+
val tripsDF = spark.read.format("hudi").load(basePath)
396+
tripsDF.show()
397+
tripsDF.select("city").foreach(row => {
398+
assertNotNull(row)
399+
})
400+
401+
// Peek into the raw parquet file and ensure partition column is not written to the file
402+
val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo")
403+
val partitionPaths = new Array[String](3)
404+
for (i <- partitionPaths.indices) {
405+
partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i))
406+
}
407+
val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), partitionPaths(1), partitionPaths(2))
408+
rawFileDf.show()
409+
rawFileDf.select("city").foreach(row => {
410+
assertNull(row.get(0))
411+
})
412+
}
413+
368414
/**
369415
* Test case for disable and enable meta fields.
370416
*/

0 commit comments

Comments
 (0)