Skip to content

Commit 85a9b92

Browse files
committed
Adjust iterativelength task as well
1 parent 31f4673 commit 85a9b92

5 files changed

Lines changed: 61 additions & 67 deletions

File tree

src/core/operator/bfs_state.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "duckpgq/core/operator/bfs_state.hpp"
22

33
#include <duckpgq/core/operator/event/shortest_path_event.hpp>
4+
#include <duckpgq/core/operator/event/iterative_length_event.hpp>
5+
46
#include <duckpgq/core/utils/compressed_sparse_row.hpp>
57
#include <duckpgq/core/utils/duckpgq_barrier.hpp>
68
#include <duckpgq/core/utils/duckpgq_utils.hpp>
@@ -159,7 +161,6 @@ void BFSState::InitializeLanes() {
159161

160162
void BFSState::ScheduleBFSBatch(Pipeline &pipeline, Event &event, const PhysicalPathFinding *op) {
161163
if (mode == "iterativelength") {
162-
throw NotImplementedException("Iterative length has not been implemented yet");
163164
event.InsertEvent(
164165
make_shared_ptr<IterativeLengthEvent>(shared_from_this(), pipeline, *op));
165166
} else if (mode == "shortestpath") {

src/core/operator/event/iterative_length_event.cpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,25 @@
55
namespace duckpgq {
66
namespace core {
77

8-
ParallelIterativeEvent::ParallelIterativeEvent(GlobalBFSState &gstate_p, Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
9-
: BasePipelineEvent(pipeline_p), gstate(gstate_p), op(op_p) {}
8+
IterativeLengthEvent::IterativeLengthEvent(shared_ptr<BFSState> gbfs_state_p,
9+
Pipeline &pipeline_p, const PhysicalPathFinding &op_p)
10+
: BasePipelineEvent(pipeline_p), gbfs_state(std::move(gbfs_state_p)), op(op_p) {
1011

12+
}
1113

12-
void ParallelIterativeEvent::Schedule() {
13-
auto &context = pipeline->GetClientContext();
1414

15+
void IterativeLengthEvent::Schedule() {
16+
auto &context = pipeline->GetClientContext();
17+
// std::cout << gbfs_state->csr->ToString();
1518
vector<shared_ptr<Task>> bfs_tasks;
16-
size_t threads_to_schedule = std::min(gstate.num_threads, (idx_t)gstate.global_task_queue.size());
17-
18-
for (idx_t tnum = 0; tnum < threads_to_schedule; tnum++) {
19-
bfs_tasks.push_back(make_uniq<PhysicalIterativeTask>(
20-
shared_from_this(), context, gstate, tnum, op));
19+
for (idx_t tnum = 0; tnum < gbfs_state->scheduled_threads; tnum++) {
20+
bfs_tasks.push_back(make_uniq<IterativeLengthTask>(
21+
shared_from_this(), context, gbfs_state, tnum, op));
2122
}
2223
SetTasks(std::move(bfs_tasks));
2324
}
2425

25-
void ParallelIterativeEvent::FinishEvent() {
26-
27-
// // if remaining pairs, schedule the BFS for the next batch
28-
// if (bfs_state->started_searches < gstate.global_pairs->Count()) {
29-
// op.ScheduleBFSEvent(*pipeline, *this, gstate);
30-
// }
26+
void IterativeLengthEvent::FinishEvent() {
3127
}
3228

3329
} // namespace core

src/core/operator/task/iterative_length_task.cpp

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
namespace duckpgq {
55
namespace core {
66

7-
PhysicalIterativeTask::PhysicalIterativeTask(shared_ptr<Event> event_p, ClientContext &context,
8-
PathFindingGlobalSinkState &state, idx_t worker_id,
9-
const PhysicalOperator &op_p)
10-
: ExecutorTask(context, std::move(event_p), op_p), context(context),
11-
state(state), worker_id(worker_id) {}
12-
13-
bool PhysicalIterativeTask::SetTaskRange() {
14-
auto task = state.global_bfs_state->FetchTask();
7+
IterativeLengthTask::IterativeLengthTask(shared_ptr<Event> event_p,
8+
ClientContext &context,
9+
shared_ptr<BFSState> &state, idx_t worker_id,
10+
const PhysicalOperator &op_p)
11+
: ExecutorTask(context, std::move(event_p), op_p), context(context),
12+
state(state), worker_id(worker_id) {
13+
left = right = UINT64_MAX; // NOLINT
14+
}
15+
16+
bool IterativeLengthTask::SetTaskRange() {
17+
auto task = state->FetchTask();
1518
if (task == nullptr) {
1619
return false;
1720
}
@@ -21,14 +24,13 @@ bool PhysicalIterativeTask::SetTaskRange() {
2124
}
2225

2326

24-
TaskExecutionResult PhysicalIterativeTask::ExecuteTask(TaskExecutionMode mode) {
25-
auto &bfs_state = state.global_bfs_state;
26-
auto &barrier = bfs_state->barrier;
27+
TaskExecutionResult IterativeLengthTask::ExecuteTask(TaskExecutionMode mode) {
28+
auto &barrier = state->barrier;
2729
do {
2830

2931
IterativeLength();
3032
barrier->Wait([&]() {
31-
bfs_state->ResetTaskIndex(); // Reset task index safely
33+
state->ResetTaskIndex(); // Reset task index safely
3234
});
3335

3436
barrier->Wait();
@@ -38,11 +40,11 @@ bool PhysicalIterativeTask::SetTaskRange() {
3840
}
3941

4042
barrier->Wait([&]() {
41-
bfs_state->ResetTaskIndex(); // Reset task index safely
43+
state->ResetTaskIndex(); // Reset task index safely
4244
});
4345

4446
barrier->Wait();
45-
} while (bfs_state->change);
47+
} while (state->change);
4648

4749
if (worker_id == 0) {
4850
UnReachableSet();
@@ -52,19 +54,17 @@ bool PhysicalIterativeTask::SetTaskRange() {
5254
return TaskExecutionResult::TASK_FINISHED;
5355
}
5456

55-
void PhysicalIterativeTask::IterativeLength() {
56-
auto &bfs_state = state.global_bfs_state;
57-
auto &seen = bfs_state->seen;
58-
auto &visit = bfs_state->iter & 1 ? bfs_state->visit1 : bfs_state->visit2;
59-
auto &next = bfs_state->iter & 1 ? bfs_state->visit2 : bfs_state->visit1;
60-
auto &barrier = bfs_state->barrier;
61-
int64_t *v = (int64_t *)state.csr->v;
62-
vector<int64_t> &e = state.csr->e;
63-
auto &change = bfs_state->change;
57+
void IterativeLengthTask::IterativeLength() {
58+
auto &seen = state->seen;
59+
auto &visit = state->iter & 1 ? state->visit1 : state->visit2;
60+
auto &next = state->iter & 1 ? state->visit2 : state->visit1;
61+
auto &barrier = state->barrier;
62+
int64_t *v = (int64_t *)state->csr->v;
63+
vector<int64_t> &e = state->csr->e;
64+
auto &change = state->change;
6465

6566
// Attempt to get a task range
6667
bool has_tasks = SetTaskRange();
67-
// std::cout << "Worker " << worker_id << ": Has tasks = " << has_tasks << std::endl;
6868

6969
// Clear `next` array regardless of task availability
7070
for (auto i = left; i < right; i++) {
@@ -81,7 +81,7 @@ bool PhysicalIterativeTask::SetTaskRange() {
8181
for (auto offset = v[i]; offset < v[i + 1]; offset++) {
8282
auto n = e[offset];
8383
{
84-
std::lock_guard<std::mutex> lock(bfs_state->element_locks[n]);
84+
std::lock_guard<std::mutex> lock(state->element_locks[n]);
8585
next[n] |= visit[i];
8686
}
8787
}
@@ -93,7 +93,7 @@ bool PhysicalIterativeTask::SetTaskRange() {
9393
// Synchronize at the end of the main processing
9494
barrier->Wait([&]() {
9595
// std::cout << "Worker " << worker_id << ": Resetting task index." << std::endl;
96-
bfs_state->ResetTaskIndex();
96+
state->ResetTaskIndex();
9797
});
9898
barrier->Wait();
9999

@@ -114,47 +114,44 @@ bool PhysicalIterativeTask::SetTaskRange() {
114114

115115
// Final synchronization after processing
116116
barrier->Wait([&]() {
117-
// std::cout << "Worker " << worker_id << ": Resetting task index at second barrier." << std::endl;
118-
bfs_state->ResetTaskIndex();
117+
state->ResetTaskIndex();
119118
});
120119
barrier->Wait();
121120
}
122121

123-
void PhysicalIterativeTask::ReachDetect() const {
124-
auto &bfs_state = state.global_bfs_state;
125-
auto result_data = FlatVector::GetData<int64_t>(bfs_state->result.data[0]);
122+
void IterativeLengthTask::ReachDetect() const {
123+
auto result_data = FlatVector::GetData<int64_t>(state->pf_results->data[0]);
126124

127125
// detect lanes that finished
128126
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
129-
int64_t search_num = bfs_state->lane_to_num[lane];
127+
int64_t search_num = state->lane_to_num[lane];
130128
if (search_num >= 0) { // active lane
131-
int64_t dst_pos = bfs_state->vdata_dst.sel->get_index(search_num);
132-
if (bfs_state->seen[bfs_state->dst[dst_pos]][lane]) {
129+
int64_t dst_pos = state->vdata_dst.sel->get_index(search_num);
130+
if (state->seen[state->dst[dst_pos]][lane]) {
133131
result_data[search_num] =
134-
bfs_state->iter; /* found at iter => iter = path length */
135-
bfs_state->lane_to_num[lane] = -1; // mark inactive
136-
bfs_state->active--;
132+
state->iter; /* found at iter => iter = path length */
133+
state->lane_to_num[lane] = -1; // mark inactive
134+
state->active--;
137135
}
138136
}
139137
}
140-
if (bfs_state->active == 0) {
141-
bfs_state->change = false;
138+
if (state->active == 0) {
139+
state->change = false;
142140
}
143141
// into the next iteration
144-
bfs_state->iter++;
142+
state->iter++;
145143
}
146144

147-
void PhysicalIterativeTask::UnReachableSet() const {
148-
auto &bfs_state = state.global_bfs_state;
149-
auto result_data = FlatVector::GetData<int64_t>(bfs_state->result.data[0]);
150-
auto &result_validity = FlatVector::Validity(bfs_state->result.data[0]);
145+
void IterativeLengthTask::UnReachableSet() const {
146+
auto result_data = FlatVector::GetData<int64_t>(state->pf_results->data[0]);
147+
auto &result_validity = FlatVector::Validity(state->pf_results->data[0]);
151148

152149
for (int64_t lane = 0; lane < LANE_LIMIT; lane++) {
153-
int64_t search_num = bfs_state->lane_to_num[lane];
150+
int64_t search_num = state->lane_to_num[lane];
154151
if (search_num >= 0) { // active lane
155152
result_validity.SetInvalid(search_num);
156153
result_data[search_num] = (int64_t)-1; /* no path */
157-
bfs_state->lane_to_num[lane] = -1; // mark inactive
154+
state->lane_to_num[lane] = -1; // mark inactive
158155
}
159156
}
160157
}

src/include/duckpgq/core/operator/task/iterative_length_task.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
namespace duckpgq {
88
namespace core {
99

10-
class PhysicalIterativeTask : public ExecutorTask {
10+
class IterativeLengthTask : public ExecutorTask {
1111
public:
12-
PhysicalIterativeTask(shared_ptr<Event> event_p, ClientContext &context,
13-
PathFindingGlobalSinkState &state, idx_t worker_id,
14-
const PhysicalOperator &op_p);
12+
IterativeLengthTask(shared_ptr<Event> event_p, ClientContext &context,
13+
shared_ptr<BFSState> &state, idx_t worker_id,
14+
const PhysicalOperator &op_p);
1515

1616
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override;
1717
private:

test/sql/path_finding/parallel_path_length.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ statement ok
99
select setseed(0.42);
1010

1111
statement ok
12-
set disabled_optimizers = 'compressed_materialization';
12+
set threads=1;
1313

1414
query I
1515
SELECT current_setting('experimental_path_finding_operator');

0 commit comments

Comments
 (0)