Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions java/src/main/java/ai/rapids/cudf/Aggregation.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2020-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*
*/
Expand Down Expand Up @@ -61,7 +61,8 @@ enum Kind {
MERGE_TDIGEST(33), // This can take a delta argument for accuracy level
HISTOGRAM(34),
MERGE_HISTOGRAM(35),
BITWISE_AGG(36);
BITWISE_AGG(36),
SUM_WITH_OVERFLOW(37);

final int nativeId;

Expand Down Expand Up @@ -533,6 +534,23 @@ static SumAggregation sum() {
return new SumAggregation();
}

static final class SumWithOverflowAggregation extends NoParamAggregation {
private SumWithOverflowAggregation() {
super(Kind.SUM_WITH_OVERFLOW);
}
}

/**
* Sum aggregation that also reports overflow. The result is a struct with
* children {sum, overflow: BOOL8}. For column reductions the input must be
* INT64. For hash-based groupby the input may be any signed integer type
* (INT8/16/32/64) or fixed-point decimal. Sort-based groupby, scan,
* segmented reduce, and rolling are not supported by cudf.
*/
static SumWithOverflowAggregation sumWithOverflow() {
return new SumWithOverflowAggregation();
}

static final class ProductAggregation extends NoParamAggregation {
private ProductAggregation() {
super(Kind.PRODUCT);
Expand Down
12 changes: 11 additions & 1 deletion java/src/main/java/ai/rapids/cudf/GroupByAggregation.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*
*/
Expand Down Expand Up @@ -69,6 +69,16 @@ public static GroupByAggregation sum() {
return new GroupByAggregation(Aggregation.sum());
}

/**
* Sum aggregation that also reports per-group overflow. The result column is a
* STRUCT with children {sum: input-type, overflow: BOOL8}. Supported input
* types are signed integers (INT8/16/32/64) and fixed-point decimal. Only
* hash-based groupby is supported; sort-based groupby will throw.
*/
public static GroupByAggregation sumWithOverflow() {
return new GroupByAggregation(Aggregation.sumWithOverflow());
}

/**
* Product Aggregation.
*/
Expand Down
10 changes: 9 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ReductionAggregation.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2021-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*
*/
Expand Down Expand Up @@ -52,6 +52,14 @@ public static ReductionAggregation sum() {
return new ReductionAggregation(Aggregation.sum());
}

/**
* Sum reduction that also reports int64 overflow. Result is a struct scalar
* with children {sum: INT64, overflow: BOOL8}. Input column must be INT64.
*/
public static ReductionAggregation sumWithOverflow() {
return new ReductionAggregation(Aggregation.sumWithOverflow());
}

/**
* Product Aggregation.
*/
Expand Down
4 changes: 3 additions & 1 deletion java/src/main/native/src/AggregationJni.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2020-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2020-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -96,6 +96,8 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createNoParamAgg(JNIEnv*
case 35: // MERGE_HISTOGRAM
return cudf::make_merge_histogram_aggregation();
// case 36: BITWISE_AGG
case 37: // SUM_WITH_OVERFLOW
return cudf::make_sum_with_overflow_aggregation();

default: throw std::logic_error("Unsupported No Parameter Aggregation Operation");
}
Expand Down
103 changes: 102 additions & 1 deletion java/src/test/java/ai/rapids/cudf/ReductionTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*
*/
Expand All @@ -18,6 +18,9 @@
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ReductionTest extends CudfTestBase {
public static final double DELTAD = 0.00001;
Expand Down Expand Up @@ -639,4 +642,102 @@ void testWithSetOutputType() {
assertEquals(expected, result);
}
}

// SUM_WITH_OVERFLOW reduction returns a struct scalar with children
// {sum: INT64, overflow: BOOL8}. Helper closes the temporary children views.
private static long[] readSumWithOverflow(Scalar result) {
assertEquals(DType.STRUCT, result.getType());
assertTrue(result.isValid());
ColumnView[] children = result.getChildrenFromStructScalar();
try {
assertEquals(2, children.length);
assertEquals(DType.INT64, children[0].getType());
assertEquals(DType.BOOL8, children[1].getType());
try (ColumnVector sumCol = children[0].copyToColumnVector();
ColumnVector ovfCol = children[1].copyToColumnVector();
HostColumnVector sumHost = sumCol.copyToHost();
HostColumnVector ovfHost = ovfCol.copyToHost()) {
long sumValid = sumHost.isNull(0) ? 0L : 1L;
long sumValue = sumHost.isNull(0) ? 0L : sumHost.getLong(0);
long ovfValue = ovfHost.getBoolean(0) ? 1L : 0L;
return new long[] { sumValid, sumValue, ovfValue };
}
} finally {
for (ColumnView c : children) c.close();
}
}

@Test
void testSumWithOverflowNoOverflow() {
try (ColumnVector cv = ColumnVector.fromLongs(1L, 2L, 3L, 4L, 5L);
Scalar result = cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT)) {
long[] r = readSumWithOverflow(result);
assertEquals(1L, r[0]); // sum is valid
assertEquals(15L, r[1]); // 1+2+3+4+5
assertEquals(0L, r[2]); // no overflow
}
}

@Test
void testSumWithOverflowPositiveOverflow() {
try (ColumnVector cv = ColumnVector.fromLongs(Long.MAX_VALUE, 1L);
Scalar result = cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT)) {
long[] r = readSumWithOverflow(result);
assertEquals(1L, r[0]);
assertEquals(1L, r[2]); // overflow detected
}
}

@Test
void testSumWithOverflowNegativeOverflow() {
try (ColumnVector cv = ColumnVector.fromLongs(Long.MIN_VALUE, -1L);
Scalar result = cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT)) {
long[] r = readSumWithOverflow(result);
assertEquals(1L, r[2]);
}
}

@Test
void testSumWithOverflowEmptyColumn() {
try (ColumnVector cv = ColumnVector.fromLongs();
Scalar result = cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT)) {
assertEquals(DType.STRUCT, result.getType());
assertTrue(result.isValid());
ColumnView[] children = result.getChildrenFromStructScalar();
try (ColumnVector sumCol = children[0].copyToColumnVector();
ColumnVector ovfCol = children[1].copyToColumnVector();
HostColumnVector sumHost = sumCol.copyToHost();
HostColumnVector ovfHost = ovfCol.copyToHost()) {
assertTrue(sumHost.isNull(0)); // empty input -> null sum
assertFalse(ovfHost.getBoolean(0)); // no overflow
} finally {
for (ColumnView c : children) c.close();
}
}
}

@Test
void testSumWithOverflowAllNullColumn() {
try (ColumnVector cv = ColumnVector.fromBoxedLongs(null, null, null);
Scalar result = cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT)) {
ColumnView[] children = result.getChildrenFromStructScalar();
try (ColumnVector sumCol = children[0].copyToColumnVector();
ColumnVector ovfCol = children[1].copyToColumnVector();
HostColumnVector sumHost = sumCol.copyToHost();
HostColumnVector ovfHost = ovfCol.copyToHost()) {
assertTrue(sumHost.isNull(0));
assertFalse(ovfHost.getBoolean(0));
} finally {
for (ColumnView c : children) c.close();
}
}
}

@Test
void testSumWithOverflowRejectsNonInt64() {
try (ColumnVector cv = ColumnVector.fromInts(1, 2, 3)) {
assertThrows(CudfException.class, () ->
cv.reduce(ReductionAggregation.sumWithOverflow(), DType.STRUCT).close());
}
}
}
90 changes: 90 additions & 0 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7826,6 +7826,96 @@ void testGroupByM2() {
}
}

@Test
void testGroupByHashSumWithOverflow() {
// int64 keys 1, 2, 3 with values that fit comfortably in int64.
try (Table input = new Table.TestBuilder()
.column(1, 2, 3, 1, 2, 2, 1, 3, 3, 2)
.column(10L, 20L, 30L, 11L, 21L, 22L, 12L, 31L, 32L, 23L)
.build();
Table results = input.groupBy(0).aggregate(
GroupByAggregation.sumWithOverflow().onColumn(1));
Table sorted = results.orderBy(OrderByArg.asc(0))) {
assertEquals(2, sorted.getNumberOfColumns());
assertEquals(3, sorted.getRowCount());

ColumnVector keyCol = sorted.getColumn(0);
ColumnVector structCol = sorted.getColumn(1);
assertEquals(DType.STRUCT, structCol.getType());

try (ColumnView sumChild = structCol.getChildColumnView(0);
ColumnView ovfChild = structCol.getChildColumnView(1);
ColumnVector sumCol = sumChild.copyToColumnVector();
ColumnVector ovfCol = ovfChild.copyToColumnVector();
ColumnVector expectedKeys = ColumnVector.fromInts(1, 2, 3);
ColumnVector expectedSum = ColumnVector.fromLongs(33L, 86L, 93L);
ColumnVector expectedOvf = ColumnVector.fromBooleans(false, false, false)) {
assertColumnsAreEqual(expectedKeys, keyCol);
assertColumnsAreEqual(expectedSum, sumCol);
assertColumnsAreEqual(expectedOvf, ovfCol);
}
}
}

@Test
void testGroupByHashSumWithOverflowDetectsOverflow() {
// Group 1 overflows (max + max), group 2 stays in range.
try (Table input = new Table.TestBuilder()
.column(1, 1, 2, 2)
.column(Long.MAX_VALUE, Long.MAX_VALUE, 3L, 4L)
.build();
Table results = input.groupBy(0).aggregate(
GroupByAggregation.sumWithOverflow().onColumn(1));
Table sorted = results.orderBy(OrderByArg.asc(0))) {
ColumnVector structCol = sorted.getColumn(1);
try (ColumnView ovfChild = structCol.getChildColumnView(1);
ColumnVector ovfCol = ovfChild.copyToColumnVector();
ColumnVector expectedOvf = ColumnVector.fromBooleans(true, false)) {
assertColumnsAreEqual(expectedOvf, ovfCol);
}
}
}

@Test
void testGroupByHashSumWithOverflowInt32() {
// Int32 input: hash groupby supports it (reduction does not).
try (Table input = new Table.TestBuilder()
.column(1, 2, 1, 2)
.column(1, 10, 2, 20)
.build();
Table results = input.groupBy(0).aggregate(
GroupByAggregation.sumWithOverflow().onColumn(1));
Table sorted = results.orderBy(OrderByArg.asc(0))) {
ColumnVector structCol = sorted.getColumn(1);
assertEquals(DType.STRUCT, structCol.getType());
try (ColumnView sumChild = structCol.getChildColumnView(0);
ColumnView ovfChild = structCol.getChildColumnView(1);
ColumnVector sumCol = sumChild.copyToColumnVector();
ColumnVector ovfCol = ovfChild.copyToColumnVector();
ColumnVector expectedSum = ColumnVector.fromInts(3, 30);
ColumnVector expectedOvf = ColumnVector.fromBooleans(false, false)) {
assertEquals(DType.INT32, sumCol.getType());
assertColumnsAreEqual(expectedSum, sumCol);
assertColumnsAreEqual(expectedOvf, ovfCol);
}
}
}

@Test
void testGroupBySortSumWithOverflowThrows() {
// Sort-based groupby (keysSorted=true forces the sort impl in cudf).
// SUM_WITH_OVERFLOW is hash-only, so cudf should throw.
GroupByOptions sortOpts = GroupByOptions.builder().withKeysSorted(true).build();
try (Table input = new Table.TestBuilder()
.column(1, 1, 2, 2)
.column(1L, 2L, 3L, 4L)
.build()) {
assertThrows(CudfException.class, () ->
input.groupBy(sortOpts, 0).aggregate(
GroupByAggregation.sumWithOverflow().onColumn(1)).close());
}
}

@Test
void testGroupByMergeM2() {
StructType nestedType = new StructType(false,
Expand Down
Loading