Skip to content

Commit a9982d1

Browse files
author
Alexey Kudinkin
authored
[HUDI-4584] Cleaning up Spark utilities (#6351)
Cleans up Spark utilities and removes duplication
1 parent c677333 commit a9982d1

26 files changed

Lines changed: 407 additions & 423 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ protected static class SparkRowConverter {
452452
private final SparkRowSerDe rowSerDe;
453453

454454
SparkRowConverter(StructType schema) {
455-
this.rowSerDe = HoodieSparkUtils.getDeserializer(schema);
455+
this.rowSerDe = HoodieSparkUtils.getCatalystRowSerDe(schema);
456456
this.avroConverter = AvroConversionUtils.createConverterToAvro(schema, STRUCT_NAME, NAMESPACE);
457457
}
458458

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.apache.hudi.table.upgrade;
2121

22-
import org.apache.hudi.HoodieSparkUtils;
2322
import org.apache.hudi.common.engine.HoodieEngineContext;
2423
import org.apache.hudi.config.HoodieWriteConfig;
2524
import org.apache.hudi.table.HoodieSparkTable;
2625
import org.apache.hudi.table.HoodieTable;
26+
import org.apache.hudi.util.SparkKeyGenUtils;
2727

2828
/**
2929
* Spark upgrade and downgrade helper.
@@ -47,6 +47,6 @@ public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext contex
4747

4848
@Override
4949
public String getPartitionColumns(HoodieWriteConfig config) {
50-
return HoodieSparkUtils.getPartitionColumns(config.getProps());
50+
return SparkKeyGenUtils.getPartitionColumns(config.getProps());
5151
}
5252
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919
package org.apache.hudi
2020

2121
import org.apache.avro.Schema.Type
22-
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
22+
import org.apache.avro.generic.GenericRecord
2323
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
2424
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
2525
import org.apache.hudi.avro.HoodieAvroUtils
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.sql.catalyst.InternalRow
28-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2928
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
3029
import org.apache.spark.sql.{Dataset, Row, SparkSession}
3130

@@ -90,8 +89,7 @@ object AvroConversionUtils {
9089
@Deprecated
9190
def createConverterToRow(sourceAvroSchema: Schema,
9291
targetSqlType: StructType): GenericRecord => Row = {
93-
val encoder = RowEncoder.apply(targetSqlType).resolveAndBind()
94-
val serde = sparkAdapter.createSparkRowSerDe(encoder)
92+
val serde = sparkAdapter.createSparkRowSerDe(targetSqlType)
9593
val converter = AvroConversionUtils.createAvroToInternalRowConverter(sourceAvroSchema, targetSqlType)
9694

9795
avro => converter.apply(avro).map(serde.deserializeRow).get
@@ -104,8 +102,7 @@ object AvroConversionUtils {
104102
def createConverterToAvro(sourceSqlType: StructType,
105103
structName: String,
106104
recordNamespace: String): Row => GenericRecord = {
107-
val encoder = RowEncoder.apply(sourceSqlType).resolveAndBind()
108-
val serde = sparkAdapter.createSparkRowSerDe(encoder)
105+
val serde = sparkAdapter.createSparkRowSerDe(sourceSqlType)
109106
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
110107
val (nullable, _) = resolveAvroTypeNullability(avroSchema)
111108

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala

Lines changed: 3 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,13 @@ package org.apache.hudi
2020

2121
import org.apache.avro.Schema
2222
import org.apache.avro.generic.GenericRecord
23-
import org.apache.hadoop.fs.{FileSystem, Path}
23+
import org.apache.hudi.avro.HoodieAvroUtils
2424
import org.apache.hudi.client.utils.SparkRowSerDe
25-
import org.apache.hudi.common.config.TypedProperties
2625
import org.apache.hudi.common.model.HoodieRecord
27-
import org.apache.hudi.common.table.HoodieTableMetaClient
28-
import org.apache.hudi.internal.schema.InternalSchema
29-
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
30-
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils
31-
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
32-
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
33-
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
3426
import org.apache.spark.SPARK_VERSION
3527
import org.apache.spark.rdd.RDD
3628
import org.apache.spark.sql.DataFrame
37-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
38-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
39-
import org.apache.spark.sql.sources._
4029
import org.apache.spark.sql.types.{StringType, StructField, StructType}
41-
import java.util.Properties
42-
43-
import org.apache.hudi.avro.HoodieAvroUtils
4430

4531
import scala.collection.JavaConverters._
4632

@@ -72,63 +58,6 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
7258
}))
7359
}
7460

75-
/**
76-
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
77-
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
78-
*/
79-
def isGlobPath(pattern: Path): Boolean = {
80-
pattern.toString.exists("{}[]*?\\".toSet.contains)
81-
}
82-
83-
/**
84-
* This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like
85-
* skipping meta paths.
86-
*/
87-
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
88-
// find base path to assist in skipping meta paths
89-
var basePath = pattern.getParent
90-
while (basePath.getName.equals("*")) {
91-
basePath = basePath.getParent
92-
}
93-
94-
Option(fs.globStatus(pattern)).map { statuses => {
95-
val nonMetaStatuses = statuses.filterNot(entry => {
96-
// skip all entries in meta path
97-
var leafPath = entry.getPath
98-
// walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped
99-
while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
100-
leafPath = leafPath.getParent
101-
}
102-
leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
103-
})
104-
nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
105-
}
106-
}.getOrElse(Seq.empty[Path])
107-
}
108-
109-
/**
110-
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
111-
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
112-
*/
113-
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
114-
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
115-
}
116-
117-
/**
118-
* Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths
119-
* which match the glob pattern. Otherwise, returns original path
120-
*
121-
* @param paths List of absolute or globbed paths
122-
* @param fs File system
123-
* @return list of absolute file paths
124-
*/
125-
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
126-
paths.flatMap(path => {
127-
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
128-
globPathIfNecessary(fs, qualified)
129-
})
130-
}
131-
13261
/**
13362
* @deprecated please use other overload [[createRdd]]
13463
*/
@@ -182,142 +111,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
182111
}
183112
}
184113

185-
def getDeserializer(structType: StructType) : SparkRowSerDe = {
186-
val encoder = RowEncoder.apply(structType).resolveAndBind()
187-
sparkAdapter.createSparkRowSerDe(encoder)
188-
}
189-
190-
/**
191-
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
192-
* Non-Empty Option[Expression],or else return None.
193-
*/
194-
def convertToCatalystExpressions(filters: Seq[Filter],
195-
tableSchema: StructType): Seq[Option[Expression]] = {
196-
filters.map(convertToCatalystExpression(_, tableSchema))
197-
}
198-
199-
200-
/**
201-
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
202-
* Non-Empty Option[Expression],or else return None.
203-
*/
204-
def convertToCatalystExpression(filters: Array[Filter],
205-
tableSchema: StructType): Option[Expression] = {
206-
val expressions = convertToCatalystExpressions(filters, tableSchema)
207-
if (expressions.forall(p => p.isDefined)) {
208-
if (expressions.isEmpty) {
209-
None
210-
} else if (expressions.length == 1) {
211-
expressions.head
212-
} else {
213-
Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And))
214-
}
215-
} else {
216-
None
217-
}
218-
}
219-
220-
/**
221-
* Convert Filter to Catalyst Expression. If convert success return an Non-Empty
222-
* Option[Expression],or else return None.
223-
*/
224-
def convertToCatalystExpression(filter: Filter, tableSchema: StructType): Option[Expression] = {
225-
Option(
226-
filter match {
227-
case EqualTo(attribute, value) =>
228-
org.apache.spark.sql.catalyst.expressions.EqualTo(toAttribute(attribute, tableSchema), Literal.create(value))
229-
case EqualNullSafe(attribute, value) =>
230-
org.apache.spark.sql.catalyst.expressions.EqualNullSafe(toAttribute(attribute, tableSchema), Literal.create(value))
231-
case GreaterThan(attribute, value) =>
232-
org.apache.spark.sql.catalyst.expressions.GreaterThan(toAttribute(attribute, tableSchema), Literal.create(value))
233-
case GreaterThanOrEqual(attribute, value) =>
234-
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
235-
case LessThan(attribute, value) =>
236-
org.apache.spark.sql.catalyst.expressions.LessThan(toAttribute(attribute, tableSchema), Literal.create(value))
237-
case LessThanOrEqual(attribute, value) =>
238-
org.apache.spark.sql.catalyst.expressions.LessThanOrEqual(toAttribute(attribute, tableSchema), Literal.create(value))
239-
case In(attribute, values) =>
240-
val attrExp = toAttribute(attribute, tableSchema)
241-
val valuesExp = values.map(v => Literal.create(v))
242-
org.apache.spark.sql.catalyst.expressions.In(attrExp, valuesExp)
243-
case IsNull(attribute) =>
244-
org.apache.spark.sql.catalyst.expressions.IsNull(toAttribute(attribute, tableSchema))
245-
case IsNotNull(attribute) =>
246-
org.apache.spark.sql.catalyst.expressions.IsNotNull(toAttribute(attribute, tableSchema))
247-
case And(left, right) =>
248-
val leftExp = convertToCatalystExpression(left, tableSchema)
249-
val rightExp = convertToCatalystExpression(right, tableSchema)
250-
if (leftExp.isEmpty || rightExp.isEmpty) {
251-
null
252-
} else {
253-
org.apache.spark.sql.catalyst.expressions.And(leftExp.get, rightExp.get)
254-
}
255-
case Or(left, right) =>
256-
val leftExp = convertToCatalystExpression(left, tableSchema)
257-
val rightExp = convertToCatalystExpression(right, tableSchema)
258-
if (leftExp.isEmpty || rightExp.isEmpty) {
259-
null
260-
} else {
261-
org.apache.spark.sql.catalyst.expressions.Or(leftExp.get, rightExp.get)
262-
}
263-
case Not(child) =>
264-
val childExp = convertToCatalystExpression(child, tableSchema)
265-
if (childExp.isEmpty) {
266-
null
267-
} else {
268-
org.apache.spark.sql.catalyst.expressions.Not(childExp.get)
269-
}
270-
case StringStartsWith(attribute, value) =>
271-
val leftExp = toAttribute(attribute, tableSchema)
272-
val rightExp = Literal.create(s"$value%")
273-
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
274-
case StringEndsWith(attribute, value) =>
275-
val leftExp = toAttribute(attribute, tableSchema)
276-
val rightExp = Literal.create(s"%$value")
277-
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
278-
case StringContains(attribute, value) =>
279-
val leftExp = toAttribute(attribute, tableSchema)
280-
val rightExp = Literal.create(s"%$value%")
281-
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
282-
case _ => null
283-
}
284-
)
285-
}
286-
287-
/**
288-
* @param properties config properties
289-
* @return partition columns
290-
*/
291-
def getPartitionColumns(properties: Properties): String = {
292-
val props = new TypedProperties(properties)
293-
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
294-
getPartitionColumns(keyGenerator, props)
295-
}
296-
297-
/**
298-
* @param keyGen key generator
299-
* @return partition columns
300-
*/
301-
def getPartitionColumns(keyGen: KeyGenerator, typedProperties: TypedProperties): String = {
302-
keyGen match {
303-
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
304-
// is: "field_name: field_type", we extract the field_name from the partition path field.
305-
case c: BaseKeyGenerator
306-
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
307-
c.getPartitionPathFields.asScala.map(pathField =>
308-
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
309-
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
310-
.mkString(",")
311-
312-
case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
313-
case _ => typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
314-
}
315-
}
316-
317-
private def toAttribute(columnName: String, tableSchema: StructType): AttributeReference = {
318-
val field = tableSchema.find(p => p.name == columnName)
319-
assert(field.isDefined, s"Cannot find column: $columnName, Table Columns are: " +
320-
s"${tableSchema.fieldNames.mkString(",")}")
321-
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
114+
def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
115+
sparkAdapter.createSparkRowSerDe(structType)
322116
}
323117
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi.util
19+
20+
import org.apache.hadoop.fs.{FileSystem, Path}
21+
import org.apache.hudi.common.table.HoodieTableMetaClient
22+
23+
/**
24+
* TODO convert to Java, move to hudi-common
25+
*/
26+
object PathUtils {
27+
28+
/**
29+
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
30+
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
31+
*/
32+
def isGlobPath(pattern: Path): Boolean = {
33+
pattern.toString.exists("{}[]*?\\".toSet.contains)
34+
}
35+
36+
/**
37+
* This method is inspired from [[org.apache.spark.deploy.SparkHadoopUtil]] with some modifications like
38+
* skipping meta paths.
39+
*/
40+
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
41+
// find base path to assist in skipping meta paths
42+
var basePath = pattern.getParent
43+
while (basePath.getName.equals("*")) {
44+
basePath = basePath.getParent
45+
}
46+
47+
Option(fs.globStatus(pattern)).map { statuses => {
48+
val nonMetaStatuses = statuses.filterNot(entry => {
49+
// skip all entries in meta path
50+
var leafPath = entry.getPath
51+
// walk through every parent until we reach base path. if .hoodie is found anywhere, path needs to be skipped
52+
while (!leafPath.equals(basePath) && !leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
53+
leafPath = leafPath.getParent
54+
}
55+
leafPath.getName.equals(HoodieTableMetaClient.METAFOLDER_NAME)
56+
})
57+
nonMetaStatuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
58+
}
59+
}.getOrElse(Seq.empty[Path])
60+
}
61+
62+
/**
63+
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
64+
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
65+
*/
66+
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
67+
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
68+
}
69+
70+
/**
71+
* Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths
72+
* which match the glob pattern. Otherwise, returns original path
73+
*
74+
* @param paths List of absolute or globbed paths
75+
* @param fs File system
76+
* @return list of absolute file paths
77+
*/
78+
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
79+
paths.flatMap(path => {
80+
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
81+
globPathIfNecessary(fs, qualified)
82+
})
83+
}
84+
}

0 commit comments

Comments
 (0)