Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
Expand Down Expand Up @@ -150,8 +151,8 @@ object HoodieDatasetBulkInsertHelper extends Logging {
}

writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator
}).collect()
table.getContext.parallelize(writeStatuses.toList.asJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this could be a reason for performance problems. Can you please elaborate what you're trying to achieve here?

cc @boneanxs

Copy link
Contributor Author

@Zouxxyy Zouxxyy Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin

Currently, collect is used internally in bulk insert for [[Dataset] when execute clusting, which cause

  1. A single spark job is generated within it, and if there are many clusting groups, too many spark jobs will be generated, which makes the spark app not simple enough
  2. Because Executor is not explicitly specified when submiting spark Jobs throughCompletableFuture. supplyAsync, the number of spark jobs that can be executed simultaneously is limited to the number of CPU cores of the driver, which may cause a performance bottleneck

In addition, performClusteringWithRecordsRDD does not have the above problems, because it does not use collect internally, so I just keep their behavior consistent

You can see https://issues.apache.org/jira/browse/HUDI-5327, I introduced the case I encountered in it

cc @boneanxs

Copy link
Contributor

@boneanxs boneanxs Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Zouxxyy, Thanks for raising this issue! It's so nice to see you're trying this feature!

The reason to collect the data here is that HoodieData<WriteStatus> will be used multiple times after performClustering, I recall there is an isEmpty check could take lots of time(validateWriteResult), so here we directly convert to a list of WriteStatus, which will reduce the time.

For the second issue, I noticed this and raised a pr to fix it: #7343, will that address your problem? Feel free to review it!

I think performClusteringWithRecordsRDD also has the same issue such as using RDDSpatialCurveSortPartitioner to optimize data layout, it will call RDD.isEmpty, which will raise a new job.

Copy link
Contributor Author

@Zouxxyy Zouxxyy Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boneanxs
For isEmpty check could take lots of time, I provided a PR to fix it, #7373, so maybe we don't need #7343

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can fix this by directly using getStat, but what if updateIndex will calculate writeStatusList multiple times? If we can directly dereference RDD<WriteStatus> to a list of WriteStatus at one feasible point(such as performClusteringWithRecordsAsRow has already done), we no need to take care of such issue anymore.

As for the parallelism of thread pool could cause the performance issue, I think performClusteringWithRecordsRDD also has the same issue. As we might call partitioner.repartitionRecords, there could also raise a new job inside the Future thread such as https://github.com/apache/hudi/blob/ea48a85efcf8e331d0cc105d426e830b8bfe5b37/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java#L66(check if the RDD is empty or not), or sortBy function in RDDCustomColumnsSortPartitioner(sortBy use RangePartitoner which needs to sample the rdd first to decide the ranges, which will also raise a job in the Future)

Copy link
Contributor Author

@Zouxxyy Zouxxyy Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boneanxs
For, #7343, you should be right, I overlooked that other operations may also generate a job. However, I'm wondering if it's necessary to specifically set a parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boneanxs
For RDD reuse problem, I think we should use persist (fixed #7373) instead of using collcet and creating a new RDD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For, #7343, you should be right, I overlooked that other operations may also generate a job. However, I'm wondering if it's necessary to specifically set a parameter

Very appreciate it if you can review the pr to share your thought, could you please explain more in that pr? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zouxxyy in this case we should actually not be relying on persist as a way to avoid double execution, since persisting is essentially just a caching mechanism (re-using cached blocks on executors) and it'd not be relied upon (it could fail at any point if, for ex, one of the executors fail, making you recompute whole RDD)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin, ok, WriteStatus has a large class attributeas writtenRecords, as long as collect does not cause OOM

})
HoodieJavaRDD.of(writeStatuses)
}

private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
Expand Down