Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,10 @@ void test_map_with_preserved_order() {
List<Integer> result = new ArrayList<>();
AtomicReference<String> sessionId = new AtomicReference<>();
//应该乱序
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).id("start")
.map(i -> {
SleepUtil.sleep((6 - i) * 20L);
return i * 10;
})
.map(i -> i + 10)
.close(r -> result.add(r.get().getData()))
.onComplete(id -> sessionId.set(id));
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).id("start").map(i -> {
SleepUtil.sleep((6 - i) * 20L);
return i * 10;
}).map(i -> i + 10).close(r -> result.add(r.get().getData())).onComplete(sessionId::set);
FlowSession session = new FlowSession(true);
Window window = session.begin();
for (int i = 1; i < 6; i++) {
Expand All @@ -166,7 +162,7 @@ void test_map_with_preserved_order() {
SleepUtil.sleep(100);
window.complete();

FlowsTestUtil.waitUntil(() -> result.size() == 5, 10000);
FlowsTestUtil.waitUntil(() -> sessionId.get() != null, 10000);
assertEquals(session.getId(), sessionId.get());
assertEquals(20, result.get(0));
assertEquals(30, result.get(1));
Expand Down Expand Up @@ -367,7 +363,7 @@ void test_reduce_with_window_and_keyBy_complete_event() {
.reduce(() -> ObjectUtils.<Tuple<String, Integer>>cast(Tuple.from("", 0)),
(acc, data) -> Tuple.from(data.first(), acc.second() + data.second().second()))
.just(data -> {
reduced.add((Tuple<String, Integer>)data);
reduced.add((Tuple<String, Integer>) data);
})
.close();
FlowSession session = new FlowSession();
Expand Down Expand Up @@ -448,34 +444,23 @@ void test_nested_reduce() {
@Test
void test_buffer() {
AtomicInteger counter = new AtomicInteger();
Flows.<Integer>create(repo, messenger, locks)
.window(3)
.buffer()
.flatMap(i -> {
return Flows.flux(i.toArray(new Integer[0]));
})
.just(value -> counter.set(counter.get() + 1))
.close()
.offer(new Integer[] {1, 2, 3, 4, 5, 6});
Flows.<Integer>create(repo, messenger, locks).window(3).buffer().flatMap(i -> {
return Flows.flux(i.toArray(new Integer[0]));
}).just(value -> counter.set(counter.get() + 1)).close().offer(new Integer[] {1, 2, 3, 4, 5, 6});
FlowsTestUtil.waitUntil(() -> counter.get() == 6);
}

@Test
void should_get_one_result_when_reduce_given_multi_flatmap_data() {
AtomicInteger counter = new AtomicInteger();
Flows.<Integer>create(repo, messenger, locks)
.flatMap(i -> {
return Flows.flux(i * 10, i * 10 + 1);
})
.reduce(() -> 0, (acc, value) -> {
System.out.println("reduce value=" + value + ", acc=" + (acc + value));
return acc + value;
})
.just(value -> {
System.out.println("value=" + value);
})
.close(r -> counter.set(r.get().getData()))
.offer(new Integer[] {1, 2});
Flows.<Integer>create(repo, messenger, locks).flatMap(i -> {
return Flows.flux(i * 10, i * 10 + 1);
}).reduce(() -> 0, (acc, value) -> {
System.out.println("reduce value=" + value + ", acc=" + (acc + value));
return acc + value;
}).just(value -> {
System.out.println("value=" + value);
}).close(r -> counter.set(r.get().getData())).offer(new Integer[] {1, 2});
FlowsTestUtil.waitUntil(() -> counter.get() != 0);

Assertions.assertEquals(62, counter.get());
Expand Down Expand Up @@ -971,21 +956,14 @@ void test_mermaid() {
.fork(p -> p.map(i -> i * 2))
.join(() -> "", (acc, i) -> acc + i.toString())
.close();
assertEquals("start((Start))"
+ System.lineSeparator() + "start-->node0(map)"
+ System.lineSeparator() + "node9-->node3"
+ System.lineSeparator() + "node8-->node6"
+ System.lineSeparator() + "node6-->end7((End))"
+ System.lineSeparator() + "node5-->node6([+])"
+ System.lineSeparator() + "node4-->node8(map)"
+ System.lineSeparator() + "node4-->node5(map)"
+ System.lineSeparator() + "node3-->node4{{=}}"
+ System.lineSeparator() + "node2-->node3([+])"
+ System.lineSeparator() + "node10-->node3"
+ System.lineSeparator() + "node1-->node9(map)"
+ System.lineSeparator() + "node1-->node2(map)"
+ System.lineSeparator() + "node1-->node10(map)"
+ System.lineSeparator() + "node0-->node1{?}", new Mermaid(flow).get());
assertEquals("start((Start))" + System.lineSeparator() + "start-->node0(map)" + System.lineSeparator()
+ "node9-->node3" + System.lineSeparator() + "node8-->node6" + System.lineSeparator()
+ "node6-->end7((End))" + System.lineSeparator() + "node5-->node6([+])" + System.lineSeparator()
+ "node4-->node8(map)" + System.lineSeparator() + "node4-->node5(map)" + System.lineSeparator()
+ "node3-->node4{{=}}" + System.lineSeparator() + "node2-->node3([+])" + System.lineSeparator()
+ "node10-->node3" + System.lineSeparator() + "node1-->node9(map)" + System.lineSeparator()
+ "node1-->node2(map)" + System.lineSeparator() + "node1-->node10(map)" + System.lineSeparator()
+ "node0-->node1{?}", new Mermaid(flow).get());
}

@Test
Expand All @@ -1002,8 +980,8 @@ void test_flow_flat_map() {
assertEquals(4, result.size());
}

private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo, String traceId,
String metaId, FlowNodeStatus status) {
private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo, String traceId, String metaId,
FlowNodeStatus status) {
return () -> {
List<FlowContext<T>> all = repo.getContextsByTrace(traceId);
return all.stream()
Expand All @@ -1018,15 +996,13 @@ private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo,
void testCleanErrorContext() {
long[] data = {1};
FlowContextMemoRepo testRepo = new FlowContextMemoRepo(false);
ProcessFlow<Integer> flowTest = Flows.<Integer>create(testRepo, messenger, locks)
.id("flow test start node")
.map(i -> {
ProcessFlow<Integer> flowTest =
Flows.<Integer>create(testRepo, messenger, locks).id("flow test start node").map(i -> {
if (i == 1) {
throw new FlowTestException();
}
return i;
})
.close(r -> data[0] = r.get().getData());
}).close(r -> data[0] = r.get().getData());
flowTest.setId("flow test");
String traceId = flowTest.offer(1);

Expand All @@ -1038,18 +1014,13 @@ void testCleanErrorContext() {
@Test
void test_flow_flat_map_with_reduce_under_cold_stream() {
List<Integer> result = new ArrayList<>();
ProcessFlow<Integer> flow = Flows.<Integer>create()
.flatMap(num -> {
Integer[] maps = new Integer[num];
for (int i = 0; i < num; i++) {
maps[i] = i * 10;
}
return Flows.flux(maps);
})
.map(i -> i * 10)
.reduce(() -> 0, Integer::sum)
.just(i -> result.add(i))
.close();
ProcessFlow<Integer> flow = Flows.<Integer>create().flatMap(num -> {
Integer[] maps = new Integer[num];
for (int i = 0; i < num; i++) {
maps[i] = i * 10;
}
return Flows.flux(maps);
}).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close();
flow.offer(new Integer[] {2, 3});
FlowsTestUtil.waitUntil(() -> result.size() == 1);
assertEquals(1, result.size());
Expand All @@ -1059,18 +1030,13 @@ void test_flow_flat_map_with_reduce_under_cold_stream() {
@Test
void test_flow_flat_map_with_reduce_under_hot_stream() throws InterruptedException {
List<Integer> result = new ArrayList<>();
ProcessFlow<Integer> flow = Flows.<Integer>create()
.flatMap(num -> {
Integer[] maps = new Integer[num];
for (int i = 0; i < num; i++) {
maps[i] = i * 10;
}
return Flows.flux(maps);
})
.map(i -> i * 10)
.reduce(() -> 0, Integer::sum)
.just(i -> result.add(i))
.close();
ProcessFlow<Integer> flow = Flows.<Integer>create().flatMap(num -> {
Integer[] maps = new Integer[num];
for (int i = 0; i < num; i++) {
maps[i] = i * 10;
}
return Flows.flux(maps);
}).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close();
FlowSession session = new FlowSession();
Window window = session.begin();
flow.offer(2, session);
Expand All @@ -1089,19 +1055,14 @@ void test_flatMap_under_preserved_order() throws InterruptedException {
StringBuilder result = new StringBuilder();
final int count = 4;
final int flatmapSize = 2;
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks)
.flatMap(num -> {
SleepUtil.sleep((count - num) * 20L);
String[] maps = new String[flatmapSize];
for (int i = 0; i < flatmapSize; i++) {
maps[i] = num + "-" + i;
}
return Flows.flux(maps);
})
.map(s -> "|" + s)
.reduce((acc, i) -> acc + i)
.map(i -> result.append(i.substring(1)))
.close();
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).flatMap(num -> {
SleepUtil.sleep((count - num) * 20L);
String[] maps = new String[flatmapSize];
for (int i = 0; i < flatmapSize; i++) {
maps[i] = num + "-" + i;
}
return Flows.flux(maps);
}).map(s -> "|" + s).reduce((acc, i) -> acc + i).map(i -> result.append(i.substring(1))).close();
FlowSession session = new FlowSession(true);
Window window = session.begin();
for (int i = 0; i < count; i++) {
Expand Down