diff --git a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java index e3c9815cb..babff8c9b 100644 --- a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java +++ b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java @@ -149,14 +149,10 @@ void test_map_with_preserved_order() { List result = new ArrayList<>(); AtomicReference sessionId = new AtomicReference<>(); //应该乱序 - ProcessFlow flow = Flows.create(repo, messenger, locks).id("start") - .map(i -> { - SleepUtil.sleep((6 - i) * 20L); - return i * 10; - }) - .map(i -> i + 10) - .close(r -> result.add(r.get().getData())) - .onComplete(id -> sessionId.set(id)); + ProcessFlow flow = Flows.create(repo, messenger, locks).id("start").map(i -> { + SleepUtil.sleep((6 - i) * 20L); + return i * 10; + }).map(i -> i + 10).close(r -> result.add(r.get().getData())).onComplete(sessionId::set); FlowSession session = new FlowSession(true); Window window = session.begin(); for (int i = 1; i < 6; i++) { @@ -166,7 +162,7 @@ void test_map_with_preserved_order() { SleepUtil.sleep(100); window.complete(); - FlowsTestUtil.waitUntil(() -> result.size() == 5, 10000); + FlowsTestUtil.waitUntil(() -> sessionId.get() != null, 10000); assertEquals(session.getId(), sessionId.get()); assertEquals(20, result.get(0)); assertEquals(30, result.get(1)); @@ -367,7 +363,7 @@ void test_reduce_with_window_and_keyBy_complete_event() { .reduce(() -> ObjectUtils.>cast(Tuple.from("", 0)), (acc, data) -> Tuple.from(data.first(), acc.second() + data.second().second())) .just(data -> { - reduced.add((Tuple)data); + reduced.add((Tuple) data); }) .close(); FlowSession session = new FlowSession(); @@ -448,34 +444,23 @@ void test_nested_reduce() { @Test void test_buffer() { AtomicInteger counter = new AtomicInteger(); - Flows.create(repo, messenger, locks) - .window(3) - .buffer() - .flatMap(i -> { - return Flows.flux(i.toArray(new Integer[0])); - }) - .just(value -> counter.set(counter.get() + 1)) - .close() - .offer(new Integer[] {1, 2, 3, 4, 5, 6}); + Flows.create(repo, messenger, locks).window(3).buffer().flatMap(i -> { + return Flows.flux(i.toArray(new Integer[0])); + }).just(value -> counter.set(counter.get() + 1)).close().offer(new Integer[] {1, 2, 3, 4, 5, 6}); FlowsTestUtil.waitUntil(() -> counter.get() == 6); } @Test void should_get_one_result_when_reduce_given_multi_flatmap_data() { AtomicInteger counter = new AtomicInteger(); - Flows.create(repo, messenger, locks) - .flatMap(i -> { - return Flows.flux(i * 10, i * 10 + 1); - }) - .reduce(() -> 0, (acc, value) -> { - System.out.println("reduce value=" + value + ", acc=" + (acc + value)); - return acc + value; - }) - .just(value -> { - System.out.println("value=" + value); - }) - .close(r -> counter.set(r.get().getData())) - .offer(new Integer[] {1, 2}); + Flows.create(repo, messenger, locks).flatMap(i -> { + return Flows.flux(i * 10, i * 10 + 1); + }).reduce(() -> 0, (acc, value) -> { + System.out.println("reduce value=" + value + ", acc=" + (acc + value)); + return acc + value; + }).just(value -> { + System.out.println("value=" + value); + }).close(r -> counter.set(r.get().getData())).offer(new Integer[] {1, 2}); FlowsTestUtil.waitUntil(() -> counter.get() != 0); Assertions.assertEquals(62, counter.get()); @@ -971,21 +956,14 @@ void test_mermaid() { .fork(p -> p.map(i -> i * 2)) .join(() -> "", (acc, i) -> acc + i.toString()) .close(); - assertEquals("start((Start))" - + System.lineSeparator() + "start-->node0(map)" - + System.lineSeparator() + "node9-->node3" - + System.lineSeparator() + "node8-->node6" - + System.lineSeparator() + "node6-->end7((End))" - + System.lineSeparator() + "node5-->node6([+])" - + System.lineSeparator() + "node4-->node8(map)" - + System.lineSeparator() + "node4-->node5(map)" - + System.lineSeparator() + "node3-->node4{{=}}" - + System.lineSeparator() + "node2-->node3([+])" - + System.lineSeparator() + "node10-->node3" - + System.lineSeparator() + "node1-->node9(map)" - + System.lineSeparator() + "node1-->node2(map)" - + System.lineSeparator() + "node1-->node10(map)" - + System.lineSeparator() + "node0-->node1{?}", new Mermaid(flow).get()); + assertEquals("start((Start))" + System.lineSeparator() + "start-->node0(map)" + System.lineSeparator() + + "node9-->node3" + System.lineSeparator() + "node8-->node6" + System.lineSeparator() + + "node6-->end7((End))" + System.lineSeparator() + "node5-->node6([+])" + System.lineSeparator() + + "node4-->node8(map)" + System.lineSeparator() + "node4-->node5(map)" + System.lineSeparator() + + "node3-->node4{{=}}" + System.lineSeparator() + "node2-->node3([+])" + System.lineSeparator() + + "node10-->node3" + System.lineSeparator() + "node1-->node9(map)" + System.lineSeparator() + + "node1-->node2(map)" + System.lineSeparator() + "node1-->node10(map)" + System.lineSeparator() + + "node0-->node1{?}", new Mermaid(flow).get()); } @Test @@ -1002,8 +980,8 @@ void test_flow_flat_map() { assertEquals(4, result.size()); } - private Supplier>> contextSupplier(FlowContextRepo repo, String traceId, - String metaId, FlowNodeStatus status) { + private Supplier>> contextSupplier(FlowContextRepo repo, String traceId, String metaId, + FlowNodeStatus status) { return () -> { List> all = repo.getContextsByTrace(traceId); return all.stream() @@ -1018,15 +996,13 @@ private Supplier>> contextSupplier(FlowContextRepo repo, void testCleanErrorContext() { long[] data = {1}; FlowContextMemoRepo testRepo = new FlowContextMemoRepo(false); - ProcessFlow flowTest = Flows.create(testRepo, messenger, locks) - .id("flow test start node") - .map(i -> { + ProcessFlow flowTest = + Flows.create(testRepo, messenger, locks).id("flow test start node").map(i -> { if (i == 1) { throw new FlowTestException(); } return i; - }) - .close(r -> data[0] = r.get().getData()); + }).close(r -> data[0] = r.get().getData()); flowTest.setId("flow test"); String traceId = flowTest.offer(1); @@ -1038,18 +1014,13 @@ void testCleanErrorContext() { @Test void test_flow_flat_map_with_reduce_under_cold_stream() { List result = new ArrayList<>(); - ProcessFlow flow = Flows.create() - .flatMap(num -> { - Integer[] maps = new Integer[num]; - for (int i = 0; i < num; i++) { - maps[i] = i * 10; - } - return Flows.flux(maps); - }) - .map(i -> i * 10) - .reduce(() -> 0, Integer::sum) - .just(i -> result.add(i)) - .close(); + ProcessFlow flow = Flows.create().flatMap(num -> { + Integer[] maps = new Integer[num]; + for (int i = 0; i < num; i++) { + maps[i] = i * 10; + } + return Flows.flux(maps); + }).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close(); flow.offer(new Integer[] {2, 3}); FlowsTestUtil.waitUntil(() -> result.size() == 1); assertEquals(1, result.size()); @@ -1059,18 +1030,13 @@ void test_flow_flat_map_with_reduce_under_cold_stream() { @Test void test_flow_flat_map_with_reduce_under_hot_stream() throws InterruptedException { List result = new ArrayList<>(); - ProcessFlow flow = Flows.create() - .flatMap(num -> { - Integer[] maps = new Integer[num]; - for (int i = 0; i < num; i++) { - maps[i] = i * 10; - } - return Flows.flux(maps); - }) - .map(i -> i * 10) - .reduce(() -> 0, Integer::sum) - .just(i -> result.add(i)) - .close(); + ProcessFlow flow = Flows.create().flatMap(num -> { + Integer[] maps = new Integer[num]; + for (int i = 0; i < num; i++) { + maps[i] = i * 10; + } + return Flows.flux(maps); + }).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close(); FlowSession session = new FlowSession(); Window window = session.begin(); flow.offer(2, session); @@ -1089,19 +1055,14 @@ void test_flatMap_under_preserved_order() throws InterruptedException { StringBuilder result = new StringBuilder(); final int count = 4; final int flatmapSize = 2; - ProcessFlow flow = Flows.create(repo, messenger, locks) - .flatMap(num -> { - SleepUtil.sleep((count - num) * 20L); - String[] maps = new String[flatmapSize]; - for (int i = 0; i < flatmapSize; i++) { - maps[i] = num + "-" + i; - } - return Flows.flux(maps); - }) - .map(s -> "|" + s) - .reduce((acc, i) -> acc + i) - .map(i -> result.append(i.substring(1))) - .close(); + ProcessFlow flow = Flows.create(repo, messenger, locks).flatMap(num -> { + SleepUtil.sleep((count - num) * 20L); + String[] maps = new String[flatmapSize]; + for (int i = 0; i < flatmapSize; i++) { + maps[i] = num + "-" + i; + } + return Flows.flux(maps); + }).map(s -> "|" + s).reduce((acc, i) -> acc + i).map(i -> result.append(i.substring(1))).close(); FlowSession session = new FlowSession(true); Window window = session.begin(); for (int i = 0; i < count; i++) {