Skip to content

Commit d426104

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/) The following assignments may cause overflow in right hand side. As a result, the result may be negative. ``` long = int * int long = int + int ``` To avoid this problem, this PR performs cast from int to long in right hand side. Existing UTs. Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #21481 from kiszk/SPARK-24452. (cherry picked from commit 90da7dc) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a7d378e commit d426104

10 files changed

Lines changed: 72 additions & 72 deletions

File tree

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
703703
// must be stored in the same memory page.
704704
// (8 byte key length) (key) (value) (8 byte pointer to next value)
705705
int uaoSize = UnsafeAlignedOffset.getUaoSize();
706-
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
706+
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
707707
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
708708
if (!acquireNewPage(recordLength + uaoSize)) {
709709
return false;

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
225225
// check if attempting another run
226226
keepTrying = supervise && exitCode != 0 && !killed
227227
if (keepTrying) {
228-
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
228+
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
229229
waitSeconds = 1
230230
}
231231
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
9595
// the left side of max is >=1 whenever partsScanned >= 2
9696
numPartsToTry = Math.max(1,
9797
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
98-
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
98+
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
9999
}
100100
}
101101

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ private[spark] class BlockManager(
291291
case e: Exception if i < MAX_ATTEMPTS =>
292292
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
293293
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
294-
Thread.sleep(SLEEP_TIME_SECS * 1000)
294+
Thread.sleep(SLEEP_TIME_SECS * 1000L)
295295
case NonFatal(e) =>
296296
throw new SparkException("Unable to register with external shuffle server due to : " +
297297
e.getMessage, e)

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static int calculateHeaderPortionInBytes(int numFields) {
7272
private long elementOffset;
7373

7474
private long getElementOffset(int ordinal, int elementSize) {
75-
return elementOffset + ordinal * elementSize;
75+
return elementOffset + ordinal * (long)elementSize;
7676
}
7777

7878
public Object getBaseObject() { return baseObject; }
@@ -402,46 +402,46 @@ public byte[] toByteArray() {
402402
public short[] toShortArray() {
403403
short[] values = new short[numElements];
404404
Platform.copyMemory(
405-
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2);
405+
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2L);
406406
return values;
407407
}
408408

409409
@Override
410410
public int[] toIntArray() {
411411
int[] values = new int[numElements];
412412
Platform.copyMemory(
413-
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4);
413+
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4L);
414414
return values;
415415
}
416416

417417
@Override
418418
public long[] toLongArray() {
419419
long[] values = new long[numElements];
420420
Platform.copyMemory(
421-
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8);
421+
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8L);
422422
return values;
423423
}
424424

425425
@Override
426426
public float[] toFloatArray() {
427427
float[] values = new float[numElements];
428428
Platform.copyMemory(
429-
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4);
429+
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4L);
430430
return values;
431431
}
432432

433433
@Override
434434
public double[] toDoubleArray() {
435435
double[] values = new double[numElements];
436436
Platform.copyMemory(
437-
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8);
437+
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8L);
438438
return values;
439439
}
440440

441441
private static UnsafeArrayData fromPrimitiveArray(
442442
Object arr, int offset, int length, int elementSize) {
443443
final long headerInBytes = calculateHeaderPortionInBytes(length);
444-
final long valueRegionInBytes = elementSize * length;
444+
final long valueRegionInBytes = (long)elementSize * length;
445445
final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
446446
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
447447
throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " +

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
4141
@Override
4242
public UnsafeRow appendRow(Object kbase, long koff, int klen,
4343
Object vbase, long voff, int vlen) {
44-
final long recordLength = 8 + klen + vlen + 8;
44+
final long recordLength = 8L + klen + vlen + 8;
4545
// if run out of max supported rows or page size, return null
4646
if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
4747
return null;

0 commit comments

Comments
 (0)