-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54663][CORE] Computes RowBasedChecksum in ShuffleWriters #50230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
4cd559f
53d11af
c7675b1
7b89c44
64dd36b
422e370
89901ca
a1c50fa
db59634
04e08eb
c9c28e6
d82bad2
2575d52
3b99edb
74266a5
22c79c8
df48158
cf28940
4cfaac8
786fdd3
602729c
137f254
dde16d4
f7d9dfa
5aabe70
2fd0a94
bbe26bf
1a8e9f7
97af717
5e01c52
ce29311
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.shuffle.checksum | ||
|
|
||
| import java.io.ObjectOutputStream | ||
| import java.util.zip.Checksum | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper | ||
| import org.apache.spark.util.ExposedBufferByteArrayOutputStream | ||
|
|
||
| /** | ||
| * A class for computing checksum for input (key, value) pairs. The checksum is independent of | ||
| * the order of the input (key, value) pairs. It is done by computing a checksum for each row | ||
| * first, and then computing the XOR for all the row checksums. | ||
| */ | ||
| abstract class RowBasedChecksum() extends Serializable with Logging { | ||
| private var hasError: Boolean = false | ||
| private var checksumValue: Long = 0 | ||
| /** Returns the checksum value computed. Tt returns the default checksum value (0) if there | ||
| * are any errors encountered during the checksum computation. | ||
| */ | ||
| def getValue: Long = { | ||
| if (!hasError) checksumValue else 0 | ||
| } | ||
|
|
||
| /** Updates the row-based checksum with the given (key, value) pair */ | ||
| def update(key: Any, value: Any): Unit = { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!hasError) { | ||
| try { | ||
| val rowChecksumValue = calculateRowChecksum(key, value) | ||
| checksumValue = checksumValue ^ rowChecksumValue | ||
|
||
| } catch { | ||
| case NonFatal(e) => | ||
| logError("Checksum computation encountered error: ", e) | ||
| hasError = true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** Computes and returns the checksum value for the given (key, value) pair */ | ||
| protected def calculateRowChecksum(key: Any, value: Any): Long | ||
| } | ||
|
|
||
| /** | ||
| * A Concrete implementation of RowBasedChecksum. The checksum for each row is | ||
| * computed by first converting the (key, value) pair to byte array using OutputStreams, | ||
| * and then computing the checksum for the byte array. | ||
| * Note that this checksum computation is very expensive, and it is used only in tests | ||
| * in the core component. A much cheaper implementation of RowBasedChecksum is in | ||
| * UnsafeRowChecksum. | ||
| * | ||
| * @param checksumAlgorithm the algorithm used for computing checksum. | ||
| */ | ||
| class OutputStreamRowBasedChecksum(checksumAlgorithm: String) | ||
|
||
| extends RowBasedChecksum() { | ||
|
|
||
| private val DEFAULT_INITIAL_SER_BUFFER_SIZE = 32 * 1024 | ||
|
|
||
| @transient private lazy val serBuffer = | ||
| new ExposedBufferByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE) | ||
| @transient private lazy val objOut = new ObjectOutputStream(serBuffer) | ||
mridulm marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @transient | ||
| protected lazy val checksum: Checksum = | ||
| ShuffleChecksumHelper.getChecksumByAlgorithm(checksumAlgorithm) | ||
|
|
||
| override protected def calculateRowChecksum(key: Any, value: Any): Long = { | ||
| assert(checksum != null, "Checksum is null") | ||
|
|
||
| // Converts the (key, value) pair into byte array. | ||
| objOut.reset() | ||
| serBuffer.reset() | ||
| objOut.writeObject((key, value)) | ||
| objOut.flush() | ||
| serBuffer.flush() | ||
|
|
||
| // Computes and returns the checksum for the byte array. | ||
| checksum.reset() | ||
| checksum.update(serBuffer.getBuf, 0, serBuffer.size()) | ||
| checksum.getValue | ||
| } | ||
| } | ||
|
|
||
| object RowBasedChecksum { | ||
| def getAggregatedChecksumValue(rowBasedChecksums: Array[RowBasedChecksum]): Long = { | ||
| Option(rowBasedChecksums) | ||
| .map(_.foldLeft(0L)((acc, c) => acc * 31L + c.getValue)) | ||
| .getOrElse(0L) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.util; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
|
|
||
| /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */ | ||
| public final class ExposedBufferByteArrayOutputStream extends ByteArrayOutputStream { | ||
| public ExposedBufferByteArrayOutputStream(int size) { super(size); } | ||
| public byte[] getBuf() { return buf; } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -29,6 +29,7 @@ import org.apache.spark.internal.LogKeys._ | |||||
| import org.apache.spark.rdd.RDD | ||||||
| import org.apache.spark.serializer.Serializer | ||||||
| import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} | ||||||
| import org.apache.spark.shuffle.checksum.RowBasedChecksum | ||||||
| import org.apache.spark.storage.BlockManagerId | ||||||
| import org.apache.spark.util.Utils | ||||||
|
|
||||||
|
|
@@ -59,6 +60,9 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { | |||||
| override def rdd: RDD[T] = _rdd | ||||||
| } | ||||||
|
|
||||||
| object ShuffleDependency { | ||||||
| private val EMPTY_ROW_BASED_CHECKSUMS: Array[RowBasedChecksum] = Array.empty | ||||||
|
||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * :: DeveloperApi :: | ||||||
|
|
@@ -74,6 +78,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { | |||||
| * @param aggregator map/reduce-side aggregator for RDD's shuffle | ||||||
| * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) | ||||||
| * @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask | ||||||
| * @param rowBasedChecksums the row-based checksums for each shuffle partition | ||||||
| */ | ||||||
| @DeveloperApi | ||||||
| class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( | ||||||
|
|
@@ -83,9 +88,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( | |||||
| val keyOrdering: Option[Ordering[K]] = None, | ||||||
| val aggregator: Option[Aggregator[K, V, C]] = None, | ||||||
| val mapSideCombine: Boolean = false, | ||||||
| val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) | ||||||
| val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor, | ||||||
| val rowBasedChecksums: Array[RowBasedChecksum] = ShuffleDependency.EMPTY_ROW_BASED_CHECKSUMS) | ||||||
mridulm marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| extends Dependency[Product2[K, V]] with Logging { | ||||||
|
|
||||||
| def this( | ||||||
| rdd: RDD[_ <: Product2[K, V]], | ||||||
| partitioner: Partitioner, | ||||||
| serializer: Serializer, | ||||||
| keyOrdering: Option[Ordering[K]], | ||||||
| aggregator: Option[Aggregator[K, V, C]], | ||||||
| mapSideCombine: Boolean, | ||||||
| shuffleWriterProcessor: ShuffleWriteProcessor) = { | ||||||
| this( | ||||||
| rdd, | ||||||
| partitioner, | ||||||
| serializer, | ||||||
| keyOrdering, | ||||||
| aggregator, | ||||||
| mapSideCombine, | ||||||
| shuffleWriterProcessor, | ||||||
| Array.empty | ||||||
|
||||||
| Array.empty | |
| EMPTY_ROW_BASED_CHECKSUMS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update the classdoc. We now also leverage the sum to handle duplicated values better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated