Skip to content

Commit 62c3fb5

Browse files
xdustinfaceUdjinM6
authored andcommitted
llmq: Fix thread handling in CDKGSessionManager and CDKGSessionHandler (#3601)
* llqm: Fix thread handling in CDKGSessionManager and CDKGSessionHandler * llmq: Removed unused thread_pool from CDKGSessionManager * Tweak `CDKGSessionHandler::StartThread()` * llmq: Simplify CDKGSessionHandler's thread naming * llmq: Make sure CDKGSessionHandler uses a valid LLMQ type Co-Authored-By: UdjinM6 <UdjinM6@users.noreply.github.com> Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
1 parent aa3bec6 commit 62c3fb5

5 files changed

Lines changed: 42 additions & 27 deletions

File tree

src/llmq/quorums_dkgsessionhandler.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ void CDKGPendingMessages::Clear()
8585

8686
//////
8787

88-
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
88+
CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CDKGSessionManager& _dkgManager) :
8989
params(_params),
90-
messageHandlerPool(_messageHandlerPool),
9190
blsWorker(_blsWorker),
9291
dkgManager(_dkgManager),
9392
curSession(std::make_shared<CDKGSession>(_params, _blsWorker, _dkgManager)),
@@ -96,18 +95,13 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params, ctp
9695
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
9796
pendingPrematureCommitments((size_t)_params.size * 2, MSG_QUORUM_PREMATURE_COMMITMENT)
9897
{
99-
phaseHandlerThread = std::thread([this] {
100-
RenameThread(strprintf("dash-q-phase-%d", (uint8_t)params.type).c_str());
101-
PhaseHandlerThread();
102-
});
98+
if (params.type == Consensus::LLMQ_NONE) {
99+
throw std::runtime_error("Can't initialize CDKGSessionHandler with LLMQ_NONE type.");
100+
}
103101
}
104102

105103
CDKGSessionHandler::~CDKGSessionHandler()
106104
{
107-
stopRequested = true;
108-
if (phaseHandlerThread.joinable()) {
109-
phaseHandlerThread.join();
110-
}
111105
}
112106

113107
void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
@@ -146,6 +140,24 @@ void CDKGSessionHandler::ProcessMessage(CNode* pfrom, const std::string& strComm
146140
}
147141
}
148142

143+
void CDKGSessionHandler::StartThread()
144+
{
145+
if (phaseHandlerThread.joinable()) {
146+
throw std::runtime_error("Tried to start an already started CDKGSessionHandler thread.");
147+
}
148+
149+
std::string threadName = strprintf("q-phase-%d", params.type);
150+
phaseHandlerThread = std::thread(&TraceThread<std::function<void()> >, threadName, std::function<void()>(std::bind(&CDKGSessionHandler::PhaseHandlerThread, this)));
151+
}
152+
153+
void CDKGSessionHandler::StopThread()
154+
{
155+
stopRequested = true;
156+
if (phaseHandlerThread.joinable()) {
157+
phaseHandlerThread.join();
158+
}
159+
}
160+
149161
bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pindexQuorum)
150162
{
151163
//AssertLockHeld(cs_main);

src/llmq/quorums_dkgsessionhandler.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ class CDKGSessionHandler
103103
std::atomic<bool> stopRequested{false};
104104

105105
const Consensus::LLMQParams& params;
106-
ctpl::thread_pool& messageHandlerPool;
107106
CBLSWorker& blsWorker;
108107
CDKGSessionManager& dkgManager;
109108

@@ -120,12 +119,15 @@ class CDKGSessionHandler
120119
CDKGPendingMessages pendingPrematureCommitments;
121120

122121
public:
123-
CDKGSessionHandler(const Consensus::LLMQParams& _params, ctpl::thread_pool& _messageHandlerPool, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
122+
CDKGSessionHandler(const Consensus::LLMQParams& _params, CBLSWorker& blsWorker, CDKGSessionManager& _dkgManager);
124123
~CDKGSessionHandler();
125124

126125
void UpdatedBlockTip(const CBlockIndex *pindexNew);
127126
void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman);
128127

128+
void StartThread();
129+
void StopThread();
130+
129131
private:
130132
bool InitNewQuorum(const CBlockIndex* pindexQuorum);
131133

src/llmq/quorums_dkgsessionmgr.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,29 @@ CDKGSessionManager::CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWork
2525
llmqDb(_llmqDb),
2626
blsWorker(_blsWorker)
2727
{
28+
for (const auto& qt : Params().GetConsensus().llmqs) {
29+
dkgSessionHandlers.emplace(std::piecewise_construct,
30+
std::forward_as_tuple(qt.first),
31+
std::forward_as_tuple(qt.second, blsWorker, *this));
32+
}
2833
}
2934

3035
CDKGSessionManager::~CDKGSessionManager()
3136
{
3237
}
3338

34-
void CDKGSessionManager::StartMessageHandlerPool()
39+
void CDKGSessionManager::StartThreads()
3540
{
36-
for (const auto& qt : Params().GetConsensus().llmqs) {
37-
dkgSessionHandlers.emplace(std::piecewise_construct,
38-
std::forward_as_tuple(qt.first),
39-
std::forward_as_tuple(qt.second, messageHandlerPool, blsWorker, *this));
41+
for (auto& it : dkgSessionHandlers) {
42+
it.second.StartThread();
4043
}
41-
42-
messageHandlerPool.resize(2);
43-
RenameThreadPool(messageHandlerPool, "dash-q-msg");
4444
}
4545

46-
void CDKGSessionManager::StopMessageHandlerPool()
46+
void CDKGSessionManager::StopThreads()
4747
{
48-
messageHandlerPool.stop(true);
48+
for (auto& it : dkgSessionHandlers) {
49+
it.second.StopThread();
50+
}
4951
}
5052

5153
void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload)

src/llmq/quorums_dkgsessionmgr.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ class CDKGSessionManager
2323
private:
2424
CDBWrapper& llmqDb;
2525
CBLSWorker& blsWorker;
26-
ctpl::thread_pool messageHandlerPool;
2726

2827
std::map<Consensus::LLMQType, CDKGSessionHandler> dkgSessionHandlers;
2928

@@ -50,8 +49,8 @@ class CDKGSessionManager
5049
CDKGSessionManager(CDBWrapper& _llmqDb, CBLSWorker& _blsWorker);
5150
~CDKGSessionManager();
5251

53-
void StartMessageHandlerPool();
54-
void StopMessageHandlerPool();
52+
void StartThreads();
53+
void StopThreads();
5554

5655
void UpdatedBlockTip(const CBlockIndex *pindexNew, bool fInitialDownload);
5756

src/llmq/quorums_init.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ void StartLLMQSystem()
7070
blsWorker->Start();
7171
}
7272
if (quorumDKGSessionManager) {
73-
quorumDKGSessionManager->StartMessageHandlerPool();
73+
quorumDKGSessionManager->StartThreads();
7474
}
7575
if (quorumSigSharesManager) {
7676
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
@@ -97,7 +97,7 @@ void StopLLMQSystem()
9797
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
9898
}
9999
if (quorumDKGSessionManager) {
100-
quorumDKGSessionManager->StopMessageHandlerPool();
100+
quorumDKGSessionManager->StopThreads();
101101
}
102102
if (blsWorker) {
103103
blsWorker->Stop();

0 commit comments

Comments
 (0)