Skip to content

[SUPPORT] NotSerializeableException glue2.0 - spark 2.4.3, hudi-spark-bundle_2.11-0.11.0.jar #6819

@mikeashley87

Description

@mikeashley87

Describe the problem you faced

I am getting a NotSerializeableException when doing an initial upsert into a hudi table.

Running locally with spark ( 2.4.7 /2.4.6 ) works fine. Running in EMR with spark 2.4.6 works fine.

To Reproduce

Steps to reproduce the behavior:

  1. Read from an existing glue table ( non hudi)
  2. Read from destination glue table (hudi)
  3. Apply join logic for existing glue table to apply some transformation
  4. Upsert into hudi table

Expected behavior

Upsert into hudi table occurs without error

Environment Description

  • Hudi version :hudi-spark-bundle_2.11-0.11.0.jar

  • Spark version :2.4.3

  • Hive version :glue

  • Hadoop version :glue 2.0

  • Storage (HDFS/S3/GCS..) :s3

  • Running on Docker? (yes/no) :

Additional context

Stacktrace

: org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 3752, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.apache.hadoop.fs.Path
Serialization stack:
	- object not serializable (class: org.apache.hadoop.fs.Path, value: s3://someprefix/sometable)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: scala.collection.mutable.WrappedArray$ofRef, name: array, type: class [Ljava.lang.Object;)
	- object (class scala.collection.mutable.WrappedArray$ofRef, WrappedArray(s3://someprefix/sometable))
	- writeObject data (class: org.apache.spark.rdd.ParallelCollectionPartition)
	- object (class org.apache.spark.rdd.ParallelCollectionPartition, org.apache.spark.rdd.ParallelCollectionPartition@33e4)
	- field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
	- object (class org.apache.spark.scheduler.ResultTask, ResultTask(114, 0))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.map(HoodieSparkEngineContext.java:103)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.listAllPartitions(HoodieBackedTableMetadataWriter.java:621)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initialCommit(HoodieBackedTableMetadataWriter.java:1054)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeFromFilesystem(HoodieBackedTableMetadataWriter.java:547)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.initializeIfNeeded(HoodieBackedTableMetadataWriter.java:380)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initialize(SparkHoodieBackedTableMetadataWriter.java:120)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.<init>(HoodieBackedTableMetadataWriter.java:170)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.<init>(SparkHoodieBackedTableMetadataWriter.java:89)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.create(SparkHoodieBackedTableMetadataWriter.java:75)
	at org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:448)
	at org.apache.hudi.client.SparkRDDWriteClient.doInitTable(SparkRDDWriteClient.java:433)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1462)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1496)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:154)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:213)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:304)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)```

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:writerWrite client and core write operationspriority:highSignificant impact; potential bugs

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions