Skip to content

Commit 0c39ece

Browse files
authored
chore: follow udf error format for reduce (#199)
1 parent a79f1fd commit 0c39ece

File tree

8 files changed

+53
-23
lines changed

8 files changed

+53
-23
lines changed

examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static class StreamSorter extends Accumulator {
3939
private final TreeSet<Datum> sortedBuffer = new TreeSet<>(Comparator
4040
.comparing(Datum::getEventTime)
4141
.thenComparing(Datum::getID)); // Assuming Datum has a getUniqueId() method
42+
4243
@Override
4344
public void processMessage(Datum datum, OutputStreamObserver outputStream) {
4445
log.info("Received datum with event time: {}", datum.toString());
@@ -57,10 +58,10 @@ public void handleEndOfStream(OutputStreamObserver outputStreamObserver) {
5758

5859
private void flushBuffer(OutputStreamObserver outputStream) {
5960
log.info("Watermark updated, flushing sortedBuffer: {}", latestWm.toEpochMilli());
60-
while (!sortedBuffer.isEmpty() && !sortedBuffer
61+
while (!sortedBuffer.isEmpty() && sortedBuffer
6162
.first()
6263
.getEventTime()
63-
.isAfter(latestWm)) {
64+
.isBefore(latestWm)) {
6465
Datum datum = sortedBuffer.pollFirst();
6566
assert datum != null;
6667
outputStream.send(new Message(datum));

src/main/java/io/numaproj/numaflow/accumulator/Service.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ static void handleFailure(
4242
// Build gRPC Status
4343
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
4444
.setCode(Code.INTERNAL.getNumber())
45-
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (
46-
e.getMessage() != null ? e.getMessage() : ""))
45+
.setMessage(
46+
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
4747
.addDetails(Any.pack(DebugInfo.newBuilder()
4848
.setDetail(ExceptionUtils.getStackTrace(e))
4949
.build()))

src/main/java/io/numaproj/numaflow/reducer/Service.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
import akka.actor.ActorRef;
44
import akka.actor.ActorSystem;
55
import akka.actor.AllDeadLetters;
6+
import com.google.protobuf.Any;
67
import com.google.protobuf.Empty;
8+
import com.google.rpc.Code;
9+
import com.google.rpc.DebugInfo;
710
import io.grpc.Status;
11+
import io.grpc.protobuf.StatusProto;
812
import io.grpc.stub.StreamObserver;
913
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
1014
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
1115
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
1216
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
17+
import io.numaproj.numaflow.shared.ExceptionUtils;
1318
import io.numaproj.numaflow.shared.GrpcServerUtils;
1419
import lombok.extern.slf4j.Slf4j;
1520

@@ -37,8 +42,14 @@ static void handleFailure(
3742
failureFuture.get();
3843
} catch (Exception e) {
3944
e.printStackTrace();
40-
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
41-
responseObserver.onError(status.asException());
45+
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
46+
.setCode(Code.INTERNAL.getNumber())
47+
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
48+
.addDetails(Any.pack(DebugInfo.newBuilder()
49+
.setDetail(ExceptionUtils.getStackTrace(e))
50+
.build()))
51+
.build();
52+
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
4253
}
4354
}).start();
4455
}

src/main/java/io/numaproj/numaflow/reducestreamer/Service.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@
33
import akka.actor.ActorRef;
44
import akka.actor.ActorSystem;
55
import akka.actor.AllDeadLetters;
6+
import com.google.protobuf.Any;
67
import com.google.protobuf.Empty;
8+
import com.google.rpc.Code;
9+
import com.google.rpc.DebugInfo;
710
import io.grpc.Status;
11+
import io.grpc.protobuf.StatusProto;
812
import io.grpc.stub.StreamObserver;
913
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
1014
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
1115
import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
1216
import io.numaproj.numaflow.reducestreamer.model.Metadata;
1317
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
1418
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
19+
import io.numaproj.numaflow.shared.ExceptionUtils;
1520
import io.numaproj.numaflow.shared.GrpcServerUtils;
1621
import lombok.extern.slf4j.Slf4j;
1722

@@ -38,8 +43,15 @@ static void handleFailure(
3843
failureFuture.get();
3944
} catch (Exception e) {
4045
e.printStackTrace();
41-
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
42-
responseObserver.onError(status.asException());
46+
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
47+
.setCode(Code.INTERNAL.getNumber())
48+
.setMessage(
49+
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
50+
.addDetails(Any.pack(DebugInfo.newBuilder()
51+
.setDetail(ExceptionUtils.getStackTrace(e))
52+
.build()))
53+
.build();
54+
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
4355
}
4456
}).start();
4557
}

src/main/java/io/numaproj/numaflow/sessionreducer/Service.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
import akka.actor.AllDeadLetters;
66
import akka.pattern.Patterns;
77
import akka.util.Timeout;
8+
import com.google.protobuf.Any;
89
import com.google.protobuf.Empty;
9-
import io.grpc.Status;
10+
import com.google.rpc.Code;
11+
import com.google.rpc.DebugInfo;
12+
import io.grpc.protobuf.StatusProto;
1013
import io.grpc.stub.StreamObserver;
1114
import io.numaproj.numaflow.sessionreduce.v1.SessionReduceGrpc;
1215
import io.numaproj.numaflow.sessionreduce.v1.Sessionreduce;
1316
import io.numaproj.numaflow.sessionreducer.model.SessionReducer;
1417
import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory;
18+
import io.numaproj.numaflow.shared.ExceptionUtils;
1519
import lombok.extern.slf4j.Slf4j;
1620
import scala.concurrent.Await;
1721
import scala.concurrent.Future;
@@ -39,8 +43,15 @@ static void handleFailure(
3943
failureFuture.get();
4044
} catch (Exception e) {
4145
e.printStackTrace();
42-
var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e);
43-
responseObserver.onError(status.asException());
46+
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
47+
.setCode(Code.INTERNAL.getNumber())
48+
.setMessage(
49+
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
50+
.addDetails(Any.pack(DebugInfo.newBuilder()
51+
.setDetail(ExceptionUtils.getStackTrace(e))
52+
.build()))
53+
.build();
54+
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
4455
}
4556
}).start();
4657
}

src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
2828
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
2929
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertTrue;
3031
import static org.junit.Assert.fail;
3132

3233
public class ServerErrTest {
@@ -107,9 +108,7 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
107108
}
108109
}
109110
try {
110-
assertEquals(
111-
"UNKNOWN: java.lang.RuntimeException: unknown exception",
112-
outputStreamObserver.t.getMessage());
111+
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
113112
} catch (Throwable e) {
114113
exceptionInThread.set(e);
115114
}

src/test/java/io/numaproj/numaflow/reducestreamer/ServerErrTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
3232
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
3333
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
3435
import static org.junit.Assert.fail;
3536

3637
public class ServerErrTest {
@@ -107,9 +108,7 @@ public void given_reducerThrows_when_serverRuns_then_outputStreamContainsThrowab
107108
}
108109
}
109110
try {
110-
assertEquals(
111-
"UNKNOWN: java.lang.RuntimeException: unknown exception",
112-
outputStreamObserver.t.getMessage());
111+
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
113112
} catch (Throwable e) {
114113
exceptionInThread.set(e);
115114
}

src/test/java/io/numaproj/numaflow/sessionreducer/ServerErrTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.atomic.AtomicReference;
2323

2424
import static org.junit.Assert.assertEquals;
25+
import static org.junit.Assert.assertTrue;
2526
import static org.junit.Assert.fail;
2627

2728
public class ServerErrTest {
@@ -78,9 +79,7 @@ public void given_actorThrows_when_serverRuns_then_outputStreamContainsThrowable
7879
try {
7980
// this test triggers a supervisor runtime exception by sending an OPEN request with 2 windows.
8081
// we are expecting the error message below.
81-
assertEquals(
82-
"UNKNOWN: java.lang.RuntimeException: open operation error: expected exactly one window",
83-
outputStreamObserver.t.getMessage());
82+
assertTrue(outputStreamObserver.t.getMessage().contains("expected exactly one window"));
8483
} catch (Throwable e) {
8584
exceptionInThread.set(e);
8685
}
@@ -148,9 +147,7 @@ public void given_sessionReducerThrows_when_serverRuns_then_outputStreamContains
148147
}
149148
}
150149
try {
151-
assertEquals(
152-
"UNKNOWN: java.lang.RuntimeException: unknown exception",
153-
outputStreamObserver.t.getMessage());
150+
assertTrue(outputStreamObserver.t.getMessage().contains("UDF_EXECUTION_ERROR"));
154151
} catch (Throwable e) {
155152
exceptionInThread.set(e);
156153
}

0 commit comments

Comments
 (0)