Skip to content

Commit 5db58f9

Browse files
committed
[SPARK-44993][CORE] Add ShuffleChecksumUtils.compareChecksums by reusing ShuffleChecksumTestHelp.compareChecksums
### What changes were proposed in this pull request? This PR aims to add `ShuffleChecksumUtils.compareChecksums` by reusing the existing test code `ShuffleChecksumTestHelp.compareChecksums` in order to reuse the functionality in the main code. ### Why are the changes needed? This is very useful in the test code. We can take advantage of this verification logic in `core` module. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the existing test codes because this is a kind of refactoring. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42707 from dongjoon-hyun/SPARK-44993. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8e779d1 commit 5db58f9

File tree

2 files changed

+83
-46
lines changed

2 files changed

+83
-46
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle
19+
20+
import java.io.{DataInputStream, File, FileInputStream}
21+
import java.util.zip.CheckedInputStream
22+
23+
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
24+
import org.apache.spark.network.util.LimitedInputStream
25+
26+
object ShuffleChecksumUtils {
27+
28+
/**
29+
* Ensure that the checksum values are consistent with index file and data file.
30+
*/
31+
def compareChecksums(
32+
numPartition: Int,
33+
algorithm: String,
34+
checksum: File,
35+
data: File,
36+
index: File): Boolean = {
37+
var checksumIn: DataInputStream = null
38+
val expectChecksums = Array.ofDim[Long](numPartition)
39+
try {
40+
checksumIn = new DataInputStream(new FileInputStream(checksum))
41+
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
42+
} finally {
43+
if (checksumIn != null) {
44+
checksumIn.close()
45+
}
46+
}
47+
48+
var dataIn: FileInputStream = null
49+
var indexIn: DataInputStream = null
50+
var checkedIn: CheckedInputStream = null
51+
try {
52+
dataIn = new FileInputStream(data)
53+
indexIn = new DataInputStream(new FileInputStream(index))
54+
var prevOffset = indexIn.readLong
55+
(0 until numPartition).foreach { i =>
56+
val curOffset = indexIn.readLong
57+
val limit = (curOffset - prevOffset).toInt
58+
val bytes = new Array[Byte](limit)
59+
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
60+
checkedIn = new CheckedInputStream(
61+
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
62+
checkedIn.read(bytes, 0, limit)
63+
prevOffset = curOffset
64+
// checksum must be consistent at both write and read sides
65+
if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false
66+
}
67+
} finally {
68+
if (dataIn != null) {
69+
dataIn.close()
70+
}
71+
if (indexIn != null) {
72+
indexIn.close()
73+
}
74+
if (checkedIn != null) {
75+
checkedIn.close()
76+
}
77+
}
78+
true
79+
}
80+
}

core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
package org.apache.spark.shuffle
1919

20-
import java.io.{DataInputStream, File, FileInputStream}
21-
import java.util.zip.CheckedInputStream
22-
23-
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
24-
import org.apache.spark.network.util.LimitedInputStream
20+
import java.io.File
2521

2622
trait ShuffleChecksumTestHelper {
2723

@@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper {
3834
assert(data.exists(), "Data file doesn't exist")
3935
assert(index.exists(), "Index file doesn't exist")
4036

41-
var checksumIn: DataInputStream = null
42-
val expectChecksums = Array.ofDim[Long](numPartition)
43-
try {
44-
checksumIn = new DataInputStream(new FileInputStream(checksum))
45-
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
46-
} finally {
47-
if (checksumIn != null) {
48-
checksumIn.close()
49-
}
50-
}
51-
52-
var dataIn: FileInputStream = null
53-
var indexIn: DataInputStream = null
54-
var checkedIn: CheckedInputStream = null
55-
try {
56-
dataIn = new FileInputStream(data)
57-
indexIn = new DataInputStream(new FileInputStream(index))
58-
var prevOffset = indexIn.readLong
59-
(0 until numPartition).foreach { i =>
60-
val curOffset = indexIn.readLong
61-
val limit = (curOffset - prevOffset).toInt
62-
val bytes = new Array[Byte](limit)
63-
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
64-
checkedIn = new CheckedInputStream(
65-
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
66-
checkedIn.read(bytes, 0, limit)
67-
prevOffset = curOffset
68-
// checksum must be consistent at both write and read sides
69-
assert(checkedIn.getChecksum.getValue == expectChecksums(i))
70-
}
71-
} finally {
72-
if (dataIn != null) {
73-
dataIn.close()
74-
}
75-
if (indexIn != null) {
76-
indexIn.close()
77-
}
78-
if (checkedIn != null) {
79-
checkedIn.close()
80-
}
81-
}
37+
assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm, checksum, data, index),
38+
"checksum must be consistent at both write and read sides")
8239
}
8340
}

0 commit comments

Comments
 (0)