Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Bug Fixes

* (blockstm) [#25789](https://github.com/cosmos/cosmos-sdk/issues/25789) Wake up suspended executors when scheduler doesn't complete to prevent goroutine leaks.
* (x/staking) [#25649](https://github.com/cosmos/cosmos-sdk/pull/25649) Add missing `defer iterator.Close()` calls in `IterateDelegatorRedelegations` and `GetRedelegations` to prevent resource leaks.
* (mempool) [#25563](https://github.com/cosmos/cosmos-sdk/pull/25563) Cleanup sender indices in case of tx replacement.
* (x/epochs) [#25425](https://github.com/cosmos/cosmos-sdk/pull/25425) Fix `InvokeSetHooks` being called with a nil keeper and `AppModule` containing a copy instead of a pointer (hooks set post creating the `AppModule` like with depinject didn't apply because it's a different instance).
Expand Down
8 changes: 8 additions & 0 deletions blockstm/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,11 @@ func (s *Scheduler) Stats() string {
return fmt.Sprintf("executed: %d, validated: %d",
s.executedTxns.Load(), s.validatedTxns.Load())
}

// CancelAll wakes up all suspended executors.
// Called during context cancellation to prevent hanging.
func (s *Scheduler) CancelAll() {
for i := range s.txnStatus {
s.txnStatus[i].TryCancel()
}
}
11 changes: 11 additions & 0 deletions blockstm/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,14 @@ func (s *StatusEntry) Suspend(cond *Condvar) {

s.Unlock()
}

// TryCancel wakes up a suspended executor if it's suspended.
// Called during context cancellation to prevent hanging.
func (s *StatusEntry) TryCancel() {
s.Lock()
if s.status == StatusSuspended && s.cond != nil {
s.cond.Notify()
s.cond = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to set the status to StatusExecuting so it don't break any invariant, because the executor may still in the process of executing transaction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to clear the estimation marks, otherwise, the executor still can't break out of the loop.

}
s.Unlock()
}
6 changes: 4 additions & 2 deletions blockstm/stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ func ExecuteBlockWithEstimates(
}

if !scheduler.Done() {
if ctx.Err() != nil {
// canceled
err := ctx.Err()
// canceled, wake up all suspended executors to prevent hanging
scheduler.CancelAll()
if err != nil {
return ctx.Err()
}

Expand Down
15 changes: 13 additions & 2 deletions blockstm/txnrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (e STMRunner) Run(ctx context.Context, ms storetypes.MultiStore, txs [][]by
memTxs, estimates = preEstimates(txs, e.workers, authStore, bankStore, e.coinDenom(ms), e.txDecoder)
}

if err := ExecuteBlockWithEstimates(
err := ExecuteBlockWithEstimates(
ctx,
blockSize,
index,
Expand All @@ -99,7 +99,18 @@ func (e STMRunner) Run(ctx context.Context, ms storetypes.MultiStore, txs [][]by
incarnationCache[txn].Store(v)
}
},
); err != nil {
)

// fallback to sequential execution if parallel execution failed with scheduler error
if err != nil && err.Error() == "scheduler did not complete" {
results = make([]*abci.ExecTxResult, blockSize)
for i := range txs {
results[i] = deliverTx(txs[i], ms, i, nil)
}
return results, nil
}

if err != nil {
return nil, err
}

Expand Down
Loading