Skip to content

Commit fc39c04

Browse files
authored
perf(flow): optimize performance for flow instances query (#1450)
* pick from 4.2.x * optimize query * fix
1 parent 30f2f2c commit fc39c04

7 files changed

Lines changed: 119 additions & 33 deletions

File tree

server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceServiceTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import com.oceanbase.odc.core.shared.exception.OverLimitException;
6262
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
6363
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
64+
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository.ParentInstanceIdCount;
6465
import com.oceanbase.odc.metadb.flow.GateWayInstanceRepository;
6566
import com.oceanbase.odc.metadb.flow.NodeInstanceEntityRepository;
6667
import com.oceanbase.odc.metadb.flow.SequenceInstanceRepository;
@@ -443,6 +444,14 @@ public void listStatus() {
443444
Assert.assertEquals(0, status.size());
444445
}
445446

447+
@Test
448+
public void testFindByParentInstanceIdIn() {
449+
createChildFlowInstance("test", 1L);
450+
List<ParentInstanceIdCount> byParentInstanceIdIn = flowInstanceRepository.findByParentInstanceIdIn(
451+
Arrays.asList(1L, 2L));
452+
Assert.assertEquals(byParentInstanceIdIn.size(), 1);
453+
}
454+
446455
private void buildFlowInstance(FlowInstance flowInstance) {
447456
buildFlowInstanceWithTaskType(flowInstance, TaskType.ASYNC);
448457
}
@@ -486,6 +495,10 @@ private FlowInstance createFlowInstance(String name) {
486495
return flowFactory.generateFlowInstance(name, null, 1L, null);
487496
}
488497

498+
private FlowInstance createChildFlowInstance(String name, Long parentFloweInstanceId) {
499+
return flowFactory.generateFlowInstance(name, parentFloweInstanceId, 1L, null);
500+
}
501+
489502
private FlowTaskInstance createTaskInstance(Long flowInstanceId, TaskEntity taskEntity,
490503
ExecutionStrategyConfig config) {
491504
FlowTaskInstance taskInstance = new FlowTaskInstance(taskEntity.getTaskType(),
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2024 OceanBase.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
18+
alter table `flow_instance` add index `flow_instance_parent_ins_id`(`parent_instance_id`);
19+
20+
alter table `flow_instance_node_task` add index `flow_instance_node_task_flow_instance_id`(`flow_instance_id`);
21+
22+
alter table `flow_instance` add index `flow_instance_parent_oid_ct_id`(`organization_id`,`create_time`,`id`);
23+
24+
alter table `flow_instance_node` add index `flow_instance_node_intsance_id`(`instance_id`);
25+
26+
alter table `flow_instance` add index `flow_instance_process_definition_id`(`process_definition_id`);
27+
28+
CREATE or replace VIEW `list_flow_instance_view` AS
29+
select
30+
`flow_instance`.`id` AS `id`,
31+
`flow_instance`.`create_time` AS `create_time`,
32+
`flow_instance`.`update_time` AS `update_time`,
33+
`flow_instance`.`name` AS `name`,
34+
`flow_instance`.`flow_config_id` AS `flow_config_id`,
35+
`flow_instance`.`creator_id` AS `creator_id`,
36+
`flow_instance`.`organization_id` AS `organization_id`,
37+
`flow_instance`.`process_definition_id` AS `process_definition_id`,
38+
`flow_instance`.`process_instance_id` AS `process_instance_id`,
39+
`flow_instance`.`status` AS `status`,
40+
`flow_instance`.`flow_config_snapshot_xml` AS `flow_config_snapshot_xml`,
41+
`flow_instance`.`description` AS `description`,
42+
`flow_instance`.`parent_instance_id` AS `parent_instance_id`,
43+
`flow_instance`.`project_id` AS `project_id`,
44+
`flow_instance_node_task`.`task_type` AS `task_type`
45+
from
46+
(
47+
`flow_instance` join `flow_instance_node_task` on ( `flow_instance`.`id` = `flow_instance_node_task`.`flow_instance_id`)
48+
)
49+
group by `flow_instance`.`id`,`flow_instance_node_task`.`task_type`

server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/FlowInstanceRepository.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121

2222
import javax.transaction.Transactional;
2323

24-
import org.springframework.data.jpa.repository.JpaRepository;
2524
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
2625
import org.springframework.data.jpa.repository.Modifying;
2726
import org.springframework.data.jpa.repository.Query;
2827
import org.springframework.data.repository.query.Param;
2928

29+
import com.oceanbase.odc.config.jpa.OdcJpaRepository;
3030
import com.oceanbase.odc.core.shared.constant.FlowStatus;
3131
import com.oceanbase.odc.core.shared.constant.TaskType;
3232

@@ -39,7 +39,7 @@
3939
* @see org.springframework.data.jpa.repository.JpaRepository
4040
*/
4141
public interface FlowInstanceRepository
42-
extends JpaRepository<FlowInstanceEntity, Long>, JpaSpecificationExecutor<FlowInstanceEntity> {
42+
extends OdcJpaRepository<FlowInstanceEntity, Long>, JpaSpecificationExecutor<FlowInstanceEntity> {
4343

4444
List<FlowInstanceEntity> findByIdIn(Collection<Long> ids);
4545

@@ -82,6 +82,7 @@ int updateProcessDefinitionIdById(@Param("flowInstanceId") Long flowInstanceId,
8282

8383
List<FlowInstanceEntity> findByParentInstanceId(Long parentInstanceId);
8484

85+
8586
@Query(value = "select a.parent_instance_id from flow_instance a left join flow_instance_node_task b on a.id = b.flow_instance_id"
8687
+ " where a.id=:id and task_task_id is not null and b.task_type='ALTER_SCHEDULE' LIMIT 1",
8788
nativeQuery = true)
@@ -99,4 +100,14 @@ List<FlowInstanceEntity> findByFlowInstanceIdsAndTaskType(@Param("ids") Collecti
99100
nativeQuery = true)
100101
Set<FlowInstanceEntity> findByScheduleIdAndStatus(@Param("scheduleIds") Set<Long> scheduleIds,
101102
@Param("status") FlowStatus status);
103+
104+
@Query("select e.parentInstanceId as parentInstanceId, count(1) as count from FlowInstanceEntity e where e.parentInstanceId in (?1) group by parentInstanceId")
105+
List<ParentInstanceIdCount> findByParentInstanceIdIn(Collection<Long> parentInstanceId);
106+
107+
interface ParentInstanceIdCount {
108+
109+
Long getParentInstanceId();
110+
111+
Integer getCount();
112+
}
102113
}

server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/ServiceTaskInstanceRepository.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.oceanbase.odc.metadb.flow;
1717

18+
import java.util.Collection;
1819
import java.util.List;
1920
import java.util.Optional;
2021
import java.util.Set;
@@ -78,9 +79,10 @@ Optional<ServiceTaskInstanceEntity> findByInstanceTypeAndName(@Param("instanceTy
7879

7980
@Query(value = "select b.* from flow_instance a left join flow_instance_node_task b "
8081
+ "on a.id = b.flow_instance_id "
81-
+ "where a.parent_instance_id=:id and b.task_type=:#{#type.name()}", nativeQuery = true)
82-
List<ServiceTaskInstanceEntity> findByScheduleIdAndTaskType(@Param("id") Long scheduleId,
83-
@Param("type") TaskType type);
82+
+ "where a.parent_instance_id=:id and b.task_type=:#{#type.name()} and b.status in (:statuses)",
83+
nativeQuery = true)
84+
List<ServiceTaskInstanceEntity> findByScheduleIdAndTaskTypeAndStatusIn(@Param("id") Long scheduleId,
85+
@Param("type") TaskType type, @Param("statuses") Collection<?> statuses);
8486

8587
default List<ServiceTaskInstanceEntity> batchCreate(List<ServiceTaskInstanceEntity> entities) {
8688
String sql = InsertSqlTemplateBuilder.from("flow_instance_node_task")

server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import org.springframework.data.jpa.domain.Specification;
3636
import org.springframework.stereotype.Component;
3737

38+
import com.google.common.base.MoreObjects;
3839
import com.oceanbase.odc.common.util.StringUtils;
3940
import com.oceanbase.odc.metadb.connection.ConnectionConfigRepository;
4041
import com.oceanbase.odc.metadb.connection.ConnectionEntity;
4142
import com.oceanbase.odc.metadb.connection.ConnectionSpecs;
4243
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
4344
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
45+
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository.ParentInstanceIdCount;
4446
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceEntity;
4547
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository;
4648
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceSpecs;
@@ -271,8 +273,14 @@ private FlowInstanceMapper generateMapper(@NonNull Collection<Long> flowInstance
271273
.collect(Collectors.groupingBy(ServiceTaskInstanceEntity::getFlowInstanceId,
272274
Collectors.mapping(ServiceTaskInstanceEntity::getStrategy, Collectors.toList())));
273275

276+
Map<Long, Integer> parentInstanceIdMap = flowInstanceRepository
277+
.findByParentInstanceIdIn(flowInstanceIds)
278+
.stream().collect(
279+
Collectors.toMap(ParentInstanceIdCount::getParentInstanceId, ParentInstanceIdCount::getCount));
280+
274281
Map<Long, Boolean> flowInstanceId2Rollbackable = flowInstanceIds.stream().collect(Collectors
275-
.toMap(Function.identity(), id -> flowInstanceRepository.findByParentInstanceId(id).size() == 0));
282+
.toMap(Function.identity(),
283+
id -> MoreObjects.firstNonNull(parentInstanceIdMap.get(id), 0) == 0));
276284

277285
/**
278286
* In order to improve the interface efficiency, it is necessary to find out the task entity

server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeStatus.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
package com.oceanbase.odc.service.flow.model;
1717

1818
import java.util.Arrays;
19-
import java.util.HashSet;
2019
import java.util.Set;
20+
import java.util.stream.Collectors;
2121

2222
import com.oceanbase.odc.service.flow.instance.FlowTaskInstance;
2323

24-
import lombok.NonNull;
24+
import lombok.Getter;
2525

2626
/**
2727
* Process state, used to mark the execution state of process instance nodes, including process
@@ -31,58 +31,62 @@
3131
* @date 2022-02-07 11:15
3232
* @since ODC_release_3.3.0
3333
*/
34+
@Getter
3435
public enum FlowNodeStatus {
3536
/**
3637
* Process node has been created but has not started execution
3738
*/
38-
CREATED,
39+
CREATED(false, false),
3940
/**
4041
* Process node is executing
4142
*/
42-
EXECUTING,
43+
EXECUTING(false, true),
4344
/**
4445
* Only for {@link FlowTaskInstance}, instance is waitting for approval
4546
*/
46-
PENDING,
47-
WAIT_FOR_CONFIRM,
47+
PENDING(false, false),
48+
49+
WAIT_FOR_CONFIRM(false, true),
4850
/**
4951
* Process node is terminated
5052
*/
51-
CANCELLED,
53+
CANCELLED(true, false),
5254
/**
5355
* Process node execution completes
5456
*/
55-
COMPLETED,
57+
COMPLETED(true, false),
5658
/**
5759
* Process node execution expired
5860
*/
59-
EXPIRED,
61+
EXPIRED(true, false),
6062
/**
6163
* Process node execution failed
6264
*/
63-
FAILED;
65+
FAILED(true, false);
66+
67+
private final boolean finalStatus;
68+
private final boolean executing;
69+
70+
FlowNodeStatus(boolean finalStatus, boolean executing) {
71+
this.finalStatus = finalStatus;
72+
this.executing = executing;
73+
}
6474

6575
public static Set<FlowNodeStatus> getExecutingStatuses() {
66-
return new HashSet<>(Arrays.asList(FlowNodeStatus.EXECUTING, FlowNodeStatus.WAIT_FOR_CONFIRM));
76+
return Arrays.stream(FlowNodeStatus.values()).filter(FlowNodeStatus::isExecuting).collect(Collectors.toSet());
6777
}
6878

6979
public static Set<FlowNodeStatus> getExecutingAndFinalStatuses() {
70-
Set<FlowNodeStatus> executingAndFinalStatuses = getExecutingStatuses();
71-
executingAndFinalStatuses.addAll(getFinalStatuses());
72-
return executingAndFinalStatuses;
80+
return Arrays.stream(FlowNodeStatus.values()).filter(t -> t.isExecuting() || t.isFinalStatus()).collect(
81+
Collectors.toSet());
7382
}
7483

7584
public static Set<FlowNodeStatus> getFinalStatuses() {
76-
return new HashSet<>(Arrays.asList(FlowNodeStatus.CANCELLED, FlowNodeStatus.COMPLETED, FlowNodeStatus.EXPIRED,
77-
FlowNodeStatus.FAILED));
78-
}
79-
80-
public static boolean isFinalStatus(@NonNull FlowNodeStatus status) {
81-
return getFinalStatuses().contains(status);
85+
return Arrays.stream(FlowNodeStatus.values()).filter(t -> t.isFinalStatus()).collect(Collectors.toSet());
8286
}
8387

84-
public boolean isFinalStatus() {
85-
return getFinalStatuses().contains(this);
88+
public static Set<FlowNodeStatus> getNotFinalStatuses() {
89+
return Arrays.stream(FlowNodeStatus.values()).filter(t -> !t.isFinalStatus()).collect(Collectors.toSet());
8690
}
8791

8892
}

server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,11 +441,10 @@ public String getLog(Long scheduleId, Long taskId, OdcTaskLogLevel logLevel) {
441441
}
442442

443443
public boolean hasExecutingAsyncTask(ScheduleEntity schedule) {
444-
Set<Long> executingTaskIds = serviceTaskRepository.findByScheduleIdAndTaskType(
445-
schedule.getId(), TaskType.ASYNC).stream()
446-
.filter(entity -> !FlowNodeStatus.isFinalStatus(entity.getStatus()))
447-
.map(ServiceTaskInstanceEntity::getTargetTaskId).collect(
448-
Collectors.toSet());
444+
Set<Long> executingTaskIds = serviceTaskRepository.findByScheduleIdAndTaskTypeAndStatusIn(schedule.getId(),
445+
TaskType.ASYNC, FlowNodeStatus.getNotFinalStatuses()).stream().map(
446+
ServiceTaskInstanceEntity::getTargetTaskId)
447+
.collect(Collectors.toSet());
449448
List<TaskEntity> taskEntities = taskRepository.findByIdIn(executingTaskIds);
450449
for (TaskEntity taskEntity : taskEntities) {
451450
Long timeoutMillis = JsonUtils.fromJson(taskEntity.getParametersJson(), DatabaseChangeParameters.class)

0 commit comments

Comments
 (0)