@@ -30,8 +30,6 @@ import (
3030 "github.com/ethereum/go-ethereum/common/lru"
3131 "github.com/ethereum/go-ethereum/core"
3232 "github.com/ethereum/go-ethereum/core/bloombits"
33- "github.com/ethereum/go-ethereum/core/rawdb"
34- "github.com/ethereum/go-ethereum/core/state"
3533 "github.com/ethereum/go-ethereum/core/types"
3634 "github.com/ethereum/go-ethereum/ethdb"
3735 "github.com/ethereum/go-ethereum/event"
@@ -63,7 +61,6 @@ type Backend interface {
6361 GetBody (ctx context.Context , hash common.Hash , number rpc.BlockNumber ) (* types.Body , error )
6462 GetReceipts (ctx context.Context , blockHash common.Hash ) (types.Receipts , error )
6563 GetLogs (ctx context.Context , blockHash common.Hash , number uint64 ) ([][]* types.Log , error )
66- Pending () (* types.Block , types.Receipts , * state.StateDB )
6764
6865 CurrentHeader () * types.Header
6966 ChainConfig () * params.ChainConfig
@@ -152,10 +149,6 @@ const (
152149 UnknownSubscription Type = iota
153150 // LogsSubscription queries for new or removed (chain reorg) logs
154151 LogsSubscription
155- // PendingLogsSubscription queries for logs in pending blocks
156- PendingLogsSubscription
157- // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
158- MinedAndPendingLogsSubscription
159152 // PendingTransactionsSubscription queries for pending transactions entering
160153 // the pending state
161154 PendingTransactionsSubscription
@@ -192,10 +185,8 @@ type subscription struct {
192185// EventSystem creates subscriptions, processes events and broadcasts them to the
193186// subscription which match the subscription criteria.
194187type EventSystem struct {
195- backend Backend
196- sys * FilterSystem
197- lightMode bool
198- lastHead * types.Header
188+ backend Backend
189+ sys * FilterSystem
199190
200191 // Subscriptions
201192 txsSub event.Subscription // Subscription for new transaction event
@@ -218,11 +209,10 @@ type EventSystem struct {
218209//
219210// The returned manager has a loop that needs to be stopped with the Stop function
220211// or by stopping the given mux.
221- func NewEventSystem (sys * FilterSystem , lightMode bool ) * EventSystem {
212+ func NewEventSystem (sys * FilterSystem ) * EventSystem {
222213 m := & EventSystem {
223214 sys : sys ,
224215 backend : sys .backend ,
225- lightMode : lightMode ,
226216 install : make (chan * subscription ),
227217 uninstall : make (chan * subscription ),
228218 txsCh : make (chan core.NewTxsEvent , txChanSize ),
@@ -310,10 +300,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
310300 to = rpc .BlockNumber (crit .ToBlock .Int64 ())
311301 }
312302
313- // only interested in pending logs
314- if from == rpc .PendingBlockNumber && to == rpc .PendingBlockNumber {
315- return es . subscribePendingLogs ( crit , logs ), nil
303+ // Pending logs are not supported anymore.
304+ if from == rpc .PendingBlockNumber || to == rpc .PendingBlockNumber {
305+ return nil , errPendingLogsUnsupported
316306 }
307+
317308 // only interested in new mined logs
318309 if from == rpc .LatestBlockNumber && to == rpc .LatestBlockNumber {
319310 return es .subscribeLogs (crit , logs ), nil
@@ -322,34 +313,13 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
322313 if from >= 0 && to >= 0 && to >= from {
323314 return es .subscribeLogs (crit , logs ), nil
324315 }
325- // interested in mined logs from a specific block number, new logs and pending logs
326- if from >= rpc .LatestBlockNumber && to == rpc .PendingBlockNumber {
327- return es .subscribeMinedPendingLogs (crit , logs ), nil
328- }
329316 // interested in logs from a specific block number to new mined blocks
330317 if from >= 0 && to == rpc .LatestBlockNumber {
331318 return es .subscribeLogs (crit , logs ), nil
332319 }
333320 return nil , errInvalidBlockRange
334321}
335322
336- // subscribeMinedPendingLogs creates a subscription that returned mined and
337- // pending logs that match the given criteria.
338- func (es * EventSystem ) subscribeMinedPendingLogs (crit ethereum.FilterQuery , logs chan []* types.Log ) * Subscription {
339- sub := & subscription {
340- id : rpc .NewID (),
341- typ : MinedAndPendingLogsSubscription ,
342- logsCrit : crit ,
343- created : time .Now (),
344- logs : logs ,
345- txs : make (chan []* types.Transaction ),
346- headers : make (chan * types.Header ),
347- installed : make (chan struct {}),
348- err : make (chan error ),
349- }
350- return es .subscribe (sub )
351- }
352-
353323// subscribeLogs creates a subscription that will write all logs matching the
354324// given criteria to the given logs channel.
355325func (es * EventSystem ) subscribeLogs (crit ethereum.FilterQuery , logs chan []* types.Log ) * Subscription {
@@ -367,23 +337,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
367337 return es .subscribe (sub )
368338}
369339
370- // subscribePendingLogs creates a subscription that writes contract event logs for
371- // transactions that enter the transaction pool.
372- func (es * EventSystem ) subscribePendingLogs (crit ethereum.FilterQuery , logs chan []* types.Log ) * Subscription {
373- sub := & subscription {
374- id : rpc .NewID (),
375- typ : PendingLogsSubscription ,
376- logsCrit : crit ,
377- created : time .Now (),
378- logs : logs ,
379- txs : make (chan []* types.Transaction ),
380- headers : make (chan * types.Header ),
381- installed : make (chan struct {}),
382- err : make (chan error ),
383- }
384- return es .subscribe (sub )
385- }
386-
387340// SubscribeNewHeads creates a subscription that writes the header of a block that is
388341// imported in the chain.
389342func (es * EventSystem ) SubscribeNewHeads (headers chan * types.Header ) * Subscription {
@@ -430,18 +383,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
430383 }
431384}
432385
433- func (es * EventSystem ) handlePendingLogs (filters filterIndex , logs []* types.Log ) {
434- if len (logs ) == 0 {
435- return
436- }
437- for _ , f := range filters [PendingLogsSubscription ] {
438- matchedLogs := filterLogs (logs , nil , f .logsCrit .ToBlock , f .logsCrit .Addresses , f .logsCrit .Topics )
439- if len (matchedLogs ) > 0 {
440- f .logs <- matchedLogs
441- }
442- }
443- }
444-
445386func (es * EventSystem ) handleTxsEvent (filters filterIndex , ev core.NewTxsEvent ) {
446387 for _ , f := range filters [PendingTransactionsSubscription ] {
447388 f .txs <- ev .Txs
@@ -452,91 +393,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
452393 for _ , f := range filters [BlocksSubscription ] {
453394 f .headers <- ev .Block .Header ()
454395 }
455- if es .lightMode && len (filters [LogsSubscription ]) > 0 {
456- es .lightFilterNewHead (ev .Block .Header (), func (header * types.Header , remove bool ) {
457- for _ , f := range filters [LogsSubscription ] {
458- if f .logsCrit .FromBlock != nil && header .Number .Cmp (f .logsCrit .FromBlock ) < 0 {
459- continue
460- }
461- if f .logsCrit .ToBlock != nil && header .Number .Cmp (f .logsCrit .ToBlock ) > 0 {
462- continue
463- }
464- if matchedLogs := es .lightFilterLogs (header , f .logsCrit .Addresses , f .logsCrit .Topics , remove ); len (matchedLogs ) > 0 {
465- f .logs <- matchedLogs
466- }
467- }
468- })
469- }
470- }
471-
472- func (es * EventSystem ) lightFilterNewHead (newHeader * types.Header , callBack func (* types.Header , bool )) {
473- oldh := es .lastHead
474- es .lastHead = newHeader
475- if oldh == nil {
476- return
477- }
478- newh := newHeader
479- // find common ancestor, create list of rolled back and new block hashes
480- var oldHeaders , newHeaders []* types.Header
481- for oldh .Hash () != newh .Hash () {
482- if oldh .Number .Uint64 () >= newh .Number .Uint64 () {
483- oldHeaders = append (oldHeaders , oldh )
484- oldh = rawdb .ReadHeader (es .backend .ChainDb (), oldh .ParentHash , oldh .Number .Uint64 ()- 1 )
485- }
486- if oldh .Number .Uint64 () < newh .Number .Uint64 () {
487- newHeaders = append (newHeaders , newh )
488- newh = rawdb .ReadHeader (es .backend .ChainDb (), newh .ParentHash , newh .Number .Uint64 ()- 1 )
489- if newh == nil {
490- // happens when CHT syncing, nothing to do
491- newh = oldh
492- }
493- }
494- }
495- // roll back old blocks
496- for _ , h := range oldHeaders {
497- callBack (h , true )
498- }
499- // check new blocks (array is in reverse order)
500- for i := len (newHeaders ) - 1 ; i >= 0 ; i -- {
501- callBack (newHeaders [i ], false )
502- }
503- }
504-
505- // filter logs of a single header in light client mode
506- func (es * EventSystem ) lightFilterLogs (header * types.Header , addresses []common.Address , topics [][]common.Hash , remove bool ) []* types.Log {
507- if ! bloomFilter (header .Bloom , addresses , topics ) {
508- return nil
509- }
510- // Get the logs of the block
511- ctx , cancel := context .WithTimeout (context .Background (), time .Second * 5 )
512- defer cancel ()
513- cached , err := es .sys .cachedLogElem (ctx , header .Hash (), header .Number .Uint64 ())
514- if err != nil {
515- return nil
516- }
517- unfiltered := append ([]* types.Log {}, cached .logs ... )
518- for i , log := range unfiltered {
519- // Don't modify in-cache elements
520- logcopy := * log
521- logcopy .Removed = remove
522- // Swap copy in-place
523- unfiltered [i ] = & logcopy
524- }
525- logs := filterLogs (unfiltered , nil , nil , addresses , topics )
526- // Txhash is already resolved
527- if len (logs ) > 0 && logs [0 ].TxHash != (common.Hash {}) {
528- return logs
529- }
530- // Resolve txhash
531- body , err := es .sys .cachedGetBody (ctx , cached , header .Hash (), header .Number .Uint64 ())
532- if err != nil {
533- return nil
534- }
535- for _ , log := range logs {
536- // logs are already copied, safe to modify
537- log .TxHash = body .Transactions [log .TxIndex ].Hash ()
538- }
539- return logs
540396}
541397
542398// eventLoop (un)installs filters and processes mux events.
@@ -564,46 +420,13 @@ func (es *EventSystem) eventLoop() {
564420 es .handleLogs (index , ev .Logs )
565421 case ev := <- es .chainCh :
566422 es .handleChainEvent (index , ev )
567- // If we have no pending log subscription,
568- // we don't need to collect any pending logs.
569- if len (index [PendingLogsSubscription ]) == 0 {
570- continue
571- }
572-
573- // Pull the pending logs if there is a new chain head.
574- pendingBlock , pendingReceipts , _ := es .backend .Pending ()
575- if pendingBlock == nil || pendingReceipts == nil {
576- continue
577- }
578- if pendingBlock .ParentHash () != ev .Block .Hash () {
579- continue
580- }
581- var logs []* types.Log
582- for _ , receipt := range pendingReceipts {
583- if len (receipt .Logs ) > 0 {
584- logs = append (logs , receipt .Logs ... )
585- }
586- }
587- es .handlePendingLogs (index , logs )
588423
589424 case f := <- es .install :
590- if f .typ == MinedAndPendingLogsSubscription {
591- // the type are logs and pending logs subscriptions
592- index [LogsSubscription ][f.id ] = f
593- index [PendingLogsSubscription ][f.id ] = f
594- } else {
595- index [f.typ ][f.id ] = f
596- }
425+ index [f.typ ][f.id ] = f
597426 close (f .installed )
598427
599428 case f := <- es .uninstall :
600- if f .typ == MinedAndPendingLogsSubscription {
601- // the type are logs and pending logs subscriptions
602- delete (index [LogsSubscription ], f .id )
603- delete (index [PendingLogsSubscription ], f .id )
604- } else {
605- delete (index [f .typ ], f .id )
606- }
429+ delete (index [f .typ ], f .id )
607430 close (f .err )
608431
609432 // System stopped
0 commit comments