77
88 "github.com/ipfs/go-cid"
99 "github.com/ipfs/go-datastore"
10- "github.com/ipfs/go-datastore/namespace"
1110 "github.com/ipld/go-ipld-prime"
1211 peer "github.com/libp2p/go-libp2p-core/peer"
1312 cbg "github.com/whyrusleeping/cbor-gen"
@@ -21,18 +20,11 @@ import (
2120 datatransfer "github.com/filecoin-project/go-data-transfer"
2221 "github.com/filecoin-project/go-data-transfer/channels/internal"
2322 "github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
24- "github.com/filecoin-project/go-data-transfer/cidlists"
25- "github.com/filecoin-project/go-data-transfer/cidsets"
2623 "github.com/filecoin-project/go-data-transfer/encoding"
2724)
2825
2926type DecoderByTypeFunc func (identifier datatransfer.TypeIdentifier ) (encoding.Decoder , bool )
3027
31- type ReceivedCidsReader interface {
32- ToArray (chid datatransfer.ChannelID ) ([]cid.Cid , error )
33- Len (chid datatransfer.ChannelID ) (int , error )
34- }
35-
3628type Notifier func (datatransfer.Event , datatransfer.ChannelState )
3729
3830// ErrNotFound is returned when a channel cannot be found with a given channel ID
@@ -59,7 +51,6 @@ type Channels struct {
5951 blockIndexCache * blockIndexCache
6052 stateMachines fsm.Group
6153 migrateStateMachines func (context.Context ) error
62- seenCIDs * cidsets.CIDSetManager
6354}
6455
6556// ChannelEnvironment -- just a proxy for DTNetwork for now
@@ -72,22 +63,19 @@ type ChannelEnvironment interface {
7263
7364// New returns a new thread safe list of channels
7465func New (ds datastore.Batching ,
75- cidLists cidlists.CIDLists ,
7666 notifier Notifier ,
7767 voucherDecoder DecoderByTypeFunc ,
7868 voucherResultDecoder DecoderByTypeFunc ,
7969 env ChannelEnvironment ,
8070 selfPeer peer.ID ) (* Channels , error ) {
8171
82- seenCIDsDS := namespace .Wrap (ds , datastore .NewKey ("seencids" ))
8372 c := & Channels {
84- seenCIDs : cidsets .NewCIDSetManager (seenCIDsDS ),
8573 notifier : notifier ,
8674 voucherDecoder : voucherDecoder ,
8775 voucherResultDecoder : voucherResultDecoder ,
8876 }
8977 c .blockIndexCache = newBlockIndexCache ()
90- channelMigrations , err := migrations .GetChannelStateMigrations (selfPeer , cidLists )
78+ channelMigrations , err := migrations .GetChannelStateMigrations (selfPeer )
9179 if err != nil {
9280 return nil , err
9381 }
@@ -127,19 +115,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
127115 }
128116 log .Debugw ("process data transfer listeners" , "name" , datatransfer .Events [evtCode ], "transfer ID" , realChannel .TransferID )
129117 c .notifier (evt , c .fromInternalChannelState (realChannel ))
130-
131- // When the channel has been cleaned up, remove the caches of seen cids
132- if evt .Code == datatransfer .CleanupComplete {
133- chid := datatransfer.ChannelID {
134- Initiator : realChannel .Initiator ,
135- Responder : realChannel .Responder ,
136- ID : realChannel .TransferID ,
137- }
138- err := c .removeSeenCIDCaches (chid )
139- if err != nil {
140- log .Errorf ("failed to clean up channel %s: %s" , err )
141- }
142- }
143118}
144119
145120// CreateNew creates a new channel id and channel state and saves to channels.
@@ -271,12 +246,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint
271246// Returns true if this is the first time the block has been received
272247func (c * Channels ) DataReceived (chid datatransfer.ChannelID , k cid.Cid , delta uint64 , index int64 , unique bool ) (bool , error ) {
273248 new , err := c .fireProgressEvent (chid , datatransfer .DataReceived , datatransfer .DataReceivedProgress , k , delta , index , unique , c .getReceivedIndex )
274- // TODO: remove when ReceivedCids and legacy protocol is removed
275- // write the seen received cids, but write async in order to avoid blocking processing
276- if err == nil {
277- sid := seenCidsSetID (chid , datatransfer .DataReceived )
278- _ , _ = c .seenCIDs .InsertSetCID (sid , k )
279- }
280249 return new , err
281250}
282251
@@ -395,25 +364,6 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
395364 return c .stateMachines .Has (chid )
396365}
397366
398- // removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
399- // blocks that have already been queued / sent / received
400- func (c * Channels ) removeSeenCIDCaches (chid datatransfer.ChannelID ) error {
401- // Clean up seen block caches
402- progressStates := []datatransfer.EventCode {
403- datatransfer .DataQueued ,
404- datatransfer .DataSent ,
405- datatransfer .DataReceived ,
406- }
407- for _ , evt := range progressStates {
408- sid := seenCidsSetID (chid , evt )
409- err := c .seenCIDs .DeleteSet (sid )
410- if err != nil {
411- return err
412- }
413- }
414- return nil
415- }
416-
417367// fireProgressEvent fires
418368// - an event for queuing / sending / receiving blocks
419369// - a corresponding "progress" event if the block has not been seen before
@@ -463,37 +413,7 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
463413 return nil
464414}
465415
466- // Get the ID of the CID set for the given channel ID and event code.
467- // The CID set stores a unique list of queued / sent / received CIDs.
468- func seenCidsSetID (chid datatransfer.ChannelID , evt datatransfer.EventCode ) cidsets.SetID {
469- return cidsets .SetID (chid .String () + "/" + datatransfer .Events [evt ])
470- }
471-
472416// Convert from the internally used channel state format to the externally exposed ChannelState
473417func (c * Channels ) fromInternalChannelState (ch internal.ChannelState ) datatransfer.ChannelState {
474- rcr := & receivedCidsReader {seenCIDs : c .seenCIDs }
475- return fromInternalChannelState (ch , c .voucherDecoder , c .voucherResultDecoder , rcr )
476- }
477-
478- // Implements the ReceivedCidsReader interface so that the internal channel
479- // state has access to the received CIDs.
480- // The interface is used (instead of passing these values directly)
481- // so the values can be loaded lazily. Reading all CIDs from the datastore
482- // is an expensive operation so we want to avoid doing it unless necessary.
483- // Note that the received CIDs get cleaned up when the channel completes, so
484- // these methods will return an empty array after that point.
485- type receivedCidsReader struct {
486- seenCIDs * cidsets.CIDSetManager
418+ return fromInternalChannelState (ch , c .voucherDecoder , c .voucherResultDecoder )
487419}
488-
489- func (r * receivedCidsReader ) ToArray (chid datatransfer.ChannelID ) ([]cid.Cid , error ) {
490- sid := seenCidsSetID (chid , datatransfer .DataReceived )
491- return r .seenCIDs .SetToArray (sid )
492- }
493-
494- func (r * receivedCidsReader ) Len (chid datatransfer.ChannelID ) (int , error ) {
495- sid := seenCidsSetID (chid , datatransfer .DataReceived )
496- return r .seenCIDs .SetLen (sid )
497- }
498-
499- var _ ReceivedCidsReader = (* receivedCidsReader )(nil )
0 commit comments