@@ -21,6 +21,7 @@ package parallel
2121import (
2222 "errors"
2323 "fmt"
24+ "iter"
2425 "sync"
2526
2627 "github.com/ava-labs/libevm/common"
@@ -48,12 +49,13 @@ import (
4849// All [libevm.StateReader] instances are opened to the state at the beginning
4950// of the block. The [StateDB] is the same one used to execute the block,
5051// before being committed, and MAY be written to.
51- type Handler [Data , Result any ] interface {
52+ type Handler [Data , Result , Aggregated any ] interface {
5253 BeforeBlock (libevm.StateReader , * types.Block )
5354 Gas (* types.Transaction ) (gas uint64 , process bool )
5455 Prefetch (sdb libevm.StateReader , index int , tx * types.Transaction ) Data
5556 Process (sdb libevm.StateReader , index int , tx * types.Transaction , data Data ) Result
56- AfterBlock (StateDB , * types.Block , types.Receipts )
57+ PostProcess (iter.Seq2 [int , Result ]) Aggregated
58+ AfterBlock (StateDB , Aggregated , * types.Block , types.Receipts )
5759}
5860
5961// StateDB is the subset of [state.StateDB] methods that MAY be called by
@@ -64,14 +66,17 @@ type StateDB interface {
6466}
6567
6668// A Processor orchestrates dispatch and collection of results from a [Handler].
67- type Processor [D , R any ] struct {
68- handler Handler [D , R ]
69- workers sync.WaitGroup
69+ type Processor [D , R , A any ] struct {
70+ handler Handler [D , R , A ]
71+ workers sync.WaitGroup
72+
73+ stateShare stateDBSharer
74+ txGas map [common.Hash ]uint64
75+
7076 prefetch , process chan * job
7177 data [](chan D )
7278 results [](chan result [R ])
73- txGas map [common.Hash ]uint64
74- stateShare stateDBSharer
79+ aggregated chan A
7580}
7681
7782type job struct {
@@ -87,20 +92,21 @@ type result[T any] struct {
8792// New constructs a new [Processor] with the specified number of concurrent
8893// workers. [Processor.Close] must be called after the final call to
8994// [Processor.FinishBlock] to avoid leaking goroutines.
90- func New [D , R any ](h Handler [D , R ], prefetchers , processors int ) * Processor [D , R ] {
95+ func New [D , R , A any ](h Handler [D , R , A ], prefetchers , processors int ) * Processor [D , R , A ] {
9196 prefetchers = max (prefetchers , 1 )
9297 processors = max (processors , 1 )
9398 workers := prefetchers + processors
9499
95- p := & Processor [D , R ]{
96- handler : h ,
97- prefetch : make (chan * job ),
98- process : make (chan * job ),
99- txGas : make (map [common.Hash ]uint64 ),
100+ p := & Processor [D , R , A ]{
101+ handler : h ,
100102 stateShare : stateDBSharer {
101103 workers : workers ,
102104 nextAvailable : make (chan struct {}),
103105 },
106+ txGas : make (map [common.Hash ]uint64 ),
107+ prefetch : make (chan * job ),
108+ process : make (chan * job ),
109+ aggregated : make (chan A ),
104110 }
105111
106112 p .workers .Add (workers ) // for shutdown via [Processor.Close]
@@ -139,7 +145,7 @@ func (s *stateDBSharer) distribute(sdb *state.StateDB) {
139145 s .wg .Wait ()
140146}
141147
142- func (p * Processor [D , R ]) worker (prefetch , process chan * job ) {
148+ func (p * Processor [D , R , A ]) worker (prefetch , process chan * job ) {
143149 defer p .workers .Done ()
144150
145151 var sdb * state.StateDB
@@ -181,7 +187,7 @@ func (p *Processor[D, R]) worker(prefetch, process chan *job) {
181187}
182188
183189// Close shuts down the [Processor], after which it can no longer be used.
184- func (p * Processor [D , R ]) Close () {
190+ func (p * Processor [D , R , A ]) Close () {
185191 close (p .prefetch )
186192 close (p .process )
187193 p .workers .Wait ()
@@ -190,7 +196,7 @@ func (p *Processor[D, R]) Close() {
190196// StartBlock dispatches transactions to the [Handler] and returns immediately.
191197// It MUST be paired with a call to [Processor.FinishBlock], without overlap of
192198// blocks.
193- func (p * Processor [D , R ]) StartBlock (sdb * state.StateDB , rules params.Rules , b * types.Block ) error {
199+ func (p * Processor [D , R , A ]) StartBlock (sdb * state.StateDB , rules params.Rules , b * types.Block ) error {
194200 // The distribution mechanism copies the StateDB so we don't need to do it
195201 // here, but the [Handler] is called directly so we do copy.
196202 p .stateShare .distribute (sdb )
@@ -232,9 +238,6 @@ func (p *Processor[D, R]) StartBlock(sdb *state.StateDB, rules params.Rules, b *
232238 }
233239 }
234240
235- // The first goroutine pipelines into the second, which has its results
236- // emptied by [Processor.FinishBlock]. The return of said function therefore
237- // guarantees that we haven't leaked either of these.
238241 go func () {
239242 for _ , j := range jobs {
240243 p .prefetch <- j
@@ -245,21 +248,40 @@ func (p *Processor[D, R]) StartBlock(sdb *state.StateDB, rules params.Rules, b *
245248 p .process <- j
246249 }
247250 }()
251+ go func () {
252+ n := len (b .Transactions ())
253+ p .aggregated <- p .handler .PostProcess (p .resultIter (n ))
254+ }()
248255 return nil
249256}
250257
258+ func (p * Processor [D , R , A ]) resultIter (n int ) iter.Seq2 [int , R ] {
259+ return func (yield func (int , R ) bool ) {
260+ for i := range n {
261+ r , ok := p .Result (i )
262+ if ! ok {
263+ continue
264+ }
265+ if ! yield (i , r ) {
266+ return
267+ }
268+ }
269+ }
270+ }
271+
251272// FinishBlock returns the [Processor] to a state ready for the next block. A
252273// return from FinishBlock guarantees that all dispatched work from the
253274// respective call to [Processor.StartBlock] has been completed.
254- func (p * Processor [D , R ]) FinishBlock (sdb vm.StateDB , b * types.Block , rs types.Receipts ) {
275+ func (p * Processor [D , R , A ]) FinishBlock (sdb vm.StateDB , b * types.Block , rs types.Receipts ) {
276+ p .handler .AfterBlock (sdb , <- p .aggregated , b , rs )
277+
255278 for i := range len (b .Transactions ()) {
256279 // Every result channel is guaranteed to have some value in its buffer
257280 // because [Processor.BeforeBlock] either sends a nil *R or it
258281 // dispatches a job, which will send a non-nil *R.
259282 tx := (<- p .results [i ]).tx
260283 delete (p .txGas , tx )
261284 }
262- p .handler .AfterBlock (sdb , b , rs )
263285}
264286
265287// Result blocks until the i'th transaction passed to [Processor.StartBlock] has
@@ -271,9 +293,12 @@ func (p *Processor[D, R]) FinishBlock(sdb vm.StateDB, b *types.Block, rs types.R
271293// Multiple calls to Result with the same argument are allowed. Callers MUST NOT
272294// charge the gas price for preprocessing as this is handled by
273295// [Processor.PreprocessingGasCharge] if registered as a [vm.Preprocessor].
296+ //
274297// The same value will be returned by each call with the same argument, such
275- // that if R is a pointer then modifications will persist between calls.
276- func (p * Processor [D , R ]) Result (i int ) (R , bool ) {
298+ // that if R is a pointer then modifications will persist between calls. The
299+ // caller does NOT have mutually exclusive access to R, which MUST carry a mutex
300+ // if thread safety is required.
301+ func (p * Processor [D , R , A ]) Result (i int ) (R , bool ) {
277302 ch := p .results [i ]
278303 r := <- ch
279304 defer func () {
@@ -289,7 +314,7 @@ func (p *Processor[D, R]) Result(i int) (R, bool) {
289314 return * r .val , true
290315}
291316
292- func (p * Processor [R , D ]) shouldProcess (tx * types.Transaction , rules params.Rules ) (process bool , retErr error ) {
317+ func (p * Processor [R , D , S ]) shouldProcess (tx * types.Transaction , rules params.Rules ) (process bool , retErr error ) {
293318 // An explicit 0 is necessary to avoid [Processor.PreprocessingGasCharge]
294319 // returning [ErrTxUnknown].
295320 p .txGas [tx .Hash ()] = 0
@@ -339,12 +364,12 @@ var ErrTxUnknown = errors.New("transaction unknown by parallel preprocessor")
339364
340365// PreprocessingGasCharge implements the [vm.Preprocessor] interface and MUST be
341366// registered via [vm.RegisterHooks] to ensure proper gas accounting.
342- func (p * Processor [R , D ]) PreprocessingGasCharge (tx common.Hash ) (uint64 , error ) {
367+ func (p * Processor [R , D , S ]) PreprocessingGasCharge (tx common.Hash ) (uint64 , error ) {
343368 g , ok := p .txGas [tx ]
344369 if ! ok {
345370 return 0 , fmt .Errorf ("%w: %v" , ErrTxUnknown , tx )
346371 }
347372 return g , nil
348373}
349374
350- var _ vm.Preprocessor = (* Processor [struct {}, struct {} ])(nil )
375+ var _ vm.Preprocessor = (* Processor [any , any , any ])(nil )
0 commit comments