11package channels_test
22
33import (
4- "bytes"
54 "context"
65 "errors"
7- "math/rand"
8- "os"
96 "testing"
107 "time"
118
129 "github.com/ipfs/go-cid"
1310 "github.com/ipfs/go-datastore"
1411 dss "github.com/ipfs/go-datastore/sync"
15- "github.com/ipld/go-ipld-prime/codec/dagcbor"
1612 basicnode "github.com/ipld/go-ipld-prime/node/basic"
1713 "github.com/ipld/go-ipld-prime/traversal/selector/builder"
1814 peer "github.com/libp2p/go-libp2p-core/peer"
1915 "github.com/stretchr/testify/require"
20- cbg "github.com/whyrusleeping/cbor-gen"
2116 "golang.org/x/xerrors"
2217
23- versioning "github.com/filecoin-project/go-ds-versioning/pkg"
24- versionedds "github.com/filecoin-project/go-ds-versioning/pkg/datastore"
25-
2618 datatransfer "github.com/filecoin-project/go-data-transfer"
2719 "github.com/filecoin-project/go-data-transfer/channels"
28- "github.com/filecoin-project/go-data-transfer/channels/internal"
29- "github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
30- v0 "github.com/filecoin-project/go-data-transfer/channels/internal/migrations/v0"
31- v1 "github.com/filecoin-project/go-data-transfer/channels/internal/migrations/v1"
32- "github.com/filecoin-project/go-data-transfer/cidlists"
3320 "github.com/filecoin-project/go-data-transfer/encoding"
3421 "github.com/filecoin-project/go-data-transfer/testutil"
3522)
@@ -52,10 +39,7 @@ func TestChannels(t *testing.T) {
5239 selector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
5340 peers := testutil .GeneratePeers (4 )
5441
55- dir := os .TempDir ()
56- cidLists , err := cidlists .NewCIDLists (dir )
57- require .NoError (t , err )
58- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
42+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
5943 require .NoError (t , err )
6044
6145 err = channelList .Start (ctx )
@@ -140,10 +124,8 @@ func TestChannels(t *testing.T) {
140124
141125 t .Run ("datasent/queued when transfer is already finished" , func (t * testing.T ) {
142126 ds := dss .MutexWrap (datastore .NewMapDatastore ())
143- dir := os .TempDir ()
144- cidLists , err := cidlists .NewCIDLists (dir )
145- require .NoError (t , err )
146- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
127+
128+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
147129 require .NoError (t , err )
148130 err = channelList .Start (ctx )
149131 require .NoError (t , err )
@@ -172,10 +154,8 @@ func TestChannels(t *testing.T) {
172154
173155 t .Run ("updating send/receive values" , func (t * testing.T ) {
174156 ds := dss .MutexWrap (datastore .NewMapDatastore ())
175- dir := os .TempDir ()
176- cidLists , err := cidlists .NewCIDLists (dir )
177- require .NoError (t , err )
178- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
157+
158+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
179159 require .NoError (t , err )
180160 err = channelList .Start (ctx )
181161 require .NoError (t , err )
@@ -244,10 +224,8 @@ func TestChannels(t *testing.T) {
244224
245225 t .Run ("missing cids" , func (t * testing.T ) {
246226 ds := dss .MutexWrap (datastore .NewMapDatastore ())
247- dir := os .TempDir ()
248- cidLists , err := cidlists .NewCIDLists (dir )
249- require .NoError (t , err )
250- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
227+
228+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
251229 require .NoError (t , err )
252230 err = channelList .Start (ctx )
253231 require .NoError (t , err )
@@ -390,10 +368,7 @@ func TestChannels(t *testing.T) {
390368 notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
391369 received <- event {evt , chst }
392370 }
393- dir := os .TempDir ()
394- cidLists , err := cidlists .NewCIDLists (dir )
395- require .NoError (t , err )
396- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
371+ channelList , err := channels .New (ds , notifier , decoderByType , decoderByType , & fakeEnv {}, peers [0 ])
397372 require .NoError (t , err )
398373 err = channelList .Start (ctx )
399374 require .NoError (t , err )
@@ -443,257 +418,6 @@ func TestIsChannelCleaningUp(t *testing.T) {
443418 require .False (t , channels .IsChannelCleaningUp (datatransfer .Cancelled ))
444419}
445420
446- func TestMigrationsV0 (t * testing.T ) {
447- ctx := context .Background ()
448- ctx , cancel := context .WithTimeout (ctx , 2 * time .Second )
449- defer cancel ()
450-
451- ds := dss .MutexWrap (datastore .NewMapDatastore ())
452- received := make (chan event )
453- notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
454- received <- event {evt , chst }
455- }
456- numChannels := 5
457- transferIDs := make ([]datatransfer.TransferID , numChannels )
458- initiators := make ([]peer.ID , numChannels )
459- responders := make ([]peer.ID , numChannels )
460- baseCids := make ([]cid.Cid , numChannels )
461-
462- totalSizes := make ([]uint64 , numChannels )
463- sents := make ([]uint64 , numChannels )
464- receiveds := make ([]uint64 , numChannels )
465- messages := make ([]string , numChannels )
466- vouchers := make ([]datatransfer.Voucher , numChannels )
467- voucherResults := make ([]datatransfer.VoucherResult , numChannels )
468-
469- allSelector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
470- allSelectorBuf := new (bytes.Buffer )
471- err := dagcbor .Encode (allSelector , allSelectorBuf )
472- require .NoError (t , err )
473- allSelectorBytes := allSelectorBuf .Bytes ()
474-
475- for i := 0 ; i < numChannels ; i ++ {
476- transferIDs [i ] = datatransfer .TransferID (rand .Uint64 ())
477- initiators [i ] = testutil .GeneratePeers (1 )[0 ]
478- responders [i ] = testutil .GeneratePeers (1 )[0 ]
479- baseCids [i ] = testutil .GenerateCids (1 )[0 ]
480- totalSizes [i ] = rand .Uint64 ()
481- sents [i ] = rand .Uint64 ()
482- receiveds [i ] = rand .Uint64 ()
483- messages [i ] = string (testutil .RandomBytes (20 ))
484- vouchers [i ] = testutil .NewFakeDTType ()
485- vBytes , err := encoding .Encode (vouchers [i ])
486- require .NoError (t , err )
487- voucherResults [i ] = testutil .NewFakeDTType ()
488- vrBytes , err := encoding .Encode (voucherResults [i ])
489- require .NoError (t , err )
490- channel := v0.ChannelState {
491- TransferID : transferIDs [i ],
492- Initiator : initiators [i ],
493- Responder : responders [i ],
494- BaseCid : baseCids [i ],
495- Selector : & cbg.Deferred {
496- Raw : allSelectorBytes ,
497- },
498- Sender : initiators [i ],
499- Recipient : responders [i ],
500- TotalSize : totalSizes [i ],
501- Status : datatransfer .Ongoing ,
502- Sent : sents [i ],
503- Received : receiveds [i ],
504- Message : messages [i ],
505- Vouchers : []v0.EncodedVoucher {
506- {
507- Type : vouchers [i ].Type (),
508- Voucher : & cbg.Deferred {
509- Raw : vBytes ,
510- },
511- },
512- },
513- VoucherResults : []v0.EncodedVoucherResult {
514- {
515- Type : voucherResults [i ].Type (),
516- VoucherResult : & cbg.Deferred {
517- Raw : vrBytes ,
518- },
519- },
520- },
521- }
522- buf := new (bytes.Buffer )
523- err = channel .MarshalCBOR (buf )
524- require .NoError (t , err )
525- err = ds .Put (ctx , datastore .NewKey (datatransfer.ChannelID {
526- Initiator : initiators [i ],
527- Responder : responders [i ],
528- ID : transferIDs [i ],
529- }.String ()), buf .Bytes ())
530- require .NoError (t , err )
531- }
532-
533- selfPeer := testutil .GeneratePeers (1 )[0 ]
534- dir := os .TempDir ()
535- cidLists , err := cidlists .NewCIDLists (dir )
536- require .NoError (t , err )
537- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, selfPeer )
538- require .NoError (t , err )
539- err = channelList .Start (ctx )
540- require .NoError (t , err )
541-
542- for i := 0 ; i < numChannels ; i ++ {
543-
544- channel , err := channelList .GetByID (ctx , datatransfer.ChannelID {
545- Initiator : initiators [i ],
546- Responder : responders [i ],
547- ID : transferIDs [i ],
548- })
549- require .NoError (t , err )
550- require .Equal (t , selfPeer , channel .SelfPeer ())
551- require .Equal (t , transferIDs [i ], channel .TransferID ())
552- require .Equal (t , baseCids [i ], channel .BaseCID ())
553- require .Equal (t , allSelector , channel .Selector ())
554- require .Equal (t , initiators [i ], channel .Sender ())
555- require .Equal (t , responders [i ], channel .Recipient ())
556- require .Equal (t , totalSizes [i ], channel .TotalSize ())
557- require .Equal (t , datatransfer .Ongoing , channel .Status ())
558- require .Equal (t , sents [i ], channel .Sent ())
559- require .Equal (t , receiveds [i ], channel .Received ())
560- require .Equal (t , messages [i ], channel .Message ())
561- require .Equal (t , vouchers [i ], channel .LastVoucher ())
562- require .Equal (t , voucherResults [i ], channel .LastVoucherResult ())
563- }
564- }
565- func TestMigrationsV1 (t * testing.T ) {
566- ctx := context .Background ()
567- ctx , cancel := context .WithTimeout (ctx , 2 * time .Second )
568- defer cancel ()
569-
570- ds := dss .MutexWrap (datastore .NewMapDatastore ())
571- received := make (chan event )
572- notifier := func (evt datatransfer.Event , chst datatransfer.ChannelState ) {
573- received <- event {evt , chst }
574- }
575- numChannels := 5
576- transferIDs := make ([]datatransfer.TransferID , numChannels )
577- initiators := make ([]peer.ID , numChannels )
578- responders := make ([]peer.ID , numChannels )
579- baseCids := make ([]cid.Cid , numChannels )
580-
581- totalSizes := make ([]uint64 , numChannels )
582- sents := make ([]uint64 , numChannels )
583- receiveds := make ([]uint64 , numChannels )
584- messages := make ([]string , numChannels )
585- vouchers := make ([]datatransfer.Voucher , numChannels )
586- voucherResults := make ([]datatransfer.VoucherResult , numChannels )
587- receivedCids := make ([][]cid.Cid , numChannels )
588- allSelector := builder .NewSelectorSpecBuilder (basicnode .Prototype .Any ).Matcher ().Node ()
589- allSelectorBuf := new (bytes.Buffer )
590- err := dagcbor .Encode (allSelector , allSelectorBuf )
591- require .NoError (t , err )
592- allSelectorBytes := allSelectorBuf .Bytes ()
593- selfPeer := testutil .GeneratePeers (1 )[0 ]
594- dir := os .TempDir ()
595- cidLists , err := cidlists .NewCIDLists (dir )
596- require .NoError (t , err )
597-
598- list , err := migrations .GetChannelStateMigrations (selfPeer , cidLists )
599- require .NoError (t , err )
600- vds , up := versionedds .NewVersionedDatastore (ds , list , versioning .VersionKey ("1" ))
601- require .NoError (t , up (ctx ))
602-
603- for i := 0 ; i < numChannels ; i ++ {
604- transferIDs [i ] = datatransfer .TransferID (rand .Uint64 ())
605- initiators [i ] = testutil .GeneratePeers (1 )[0 ]
606- responders [i ] = testutil .GeneratePeers (1 )[0 ]
607- baseCids [i ] = testutil .GenerateCids (1 )[0 ]
608- totalSizes [i ] = rand .Uint64 ()
609- sents [i ] = rand .Uint64 ()
610- receiveds [i ] = rand .Uint64 ()
611- messages [i ] = string (testutil .RandomBytes (20 ))
612- vouchers [i ] = testutil .NewFakeDTType ()
613- vBytes , err := encoding .Encode (vouchers [i ])
614- require .NoError (t , err )
615- voucherResults [i ] = testutil .NewFakeDTType ()
616- vrBytes , err := encoding .Encode (voucherResults [i ])
617- require .NoError (t , err )
618- receivedCids [i ] = testutil .GenerateCids (100 )
619- channel := v1.ChannelState {
620- TransferID : transferIDs [i ],
621- Initiator : initiators [i ],
622- Responder : responders [i ],
623- BaseCid : baseCids [i ],
624- Selector : & cbg.Deferred {
625- Raw : allSelectorBytes ,
626- },
627- Sender : initiators [i ],
628- Recipient : responders [i ],
629- TotalSize : totalSizes [i ],
630- Status : datatransfer .Ongoing ,
631- Sent : sents [i ],
632- Received : receiveds [i ],
633- Message : messages [i ],
634- Vouchers : []internal.EncodedVoucher {
635- {
636- Type : vouchers [i ].Type (),
637- Voucher : & cbg.Deferred {
638- Raw : vBytes ,
639- },
640- },
641- },
642- VoucherResults : []internal.EncodedVoucherResult {
643- {
644- Type : voucherResults [i ].Type (),
645- VoucherResult : & cbg.Deferred {
646- Raw : vrBytes ,
647- },
648- },
649- },
650- SelfPeer : selfPeer ,
651- ReceivedCids : receivedCids [i ],
652- }
653- buf := new (bytes.Buffer )
654- err = channel .MarshalCBOR (buf )
655- require .NoError (t , err )
656- err = vds .Put (ctx , datastore .NewKey (datatransfer.ChannelID {
657- Initiator : initiators [i ],
658- Responder : responders [i ],
659- ID : transferIDs [i ],
660- }.String ()), buf .Bytes ())
661- require .NoError (t , err )
662- }
663-
664- channelList , err := channels .New (ds , cidLists , notifier , decoderByType , decoderByType , & fakeEnv {}, selfPeer )
665- require .NoError (t , err )
666- err = channelList .Start (ctx )
667- require .NoError (t , err )
668-
669- for i := 0 ; i < numChannels ; i ++ {
670-
671- channel , err := channelList .GetByID (ctx , datatransfer.ChannelID {
672- Initiator : initiators [i ],
673- Responder : responders [i ],
674- ID : transferIDs [i ],
675- })
676- require .NoError (t , err )
677- require .Equal (t , selfPeer , channel .SelfPeer ())
678- require .Equal (t , transferIDs [i ], channel .TransferID ())
679- require .Equal (t , baseCids [i ], channel .BaseCID ())
680- require .Equal (t , allSelector , channel .Selector ())
681- require .Equal (t , initiators [i ], channel .Sender ())
682- require .Equal (t , responders [i ], channel .Recipient ())
683- require .Equal (t , totalSizes [i ], channel .TotalSize ())
684- require .Equal (t , datatransfer .Ongoing , channel .Status ())
685- require .Equal (t , sents [i ], channel .Sent ())
686- require .Equal (t , receiveds [i ], channel .Received ())
687- require .Equal (t , messages [i ], channel .Message ())
688- require .Equal (t , vouchers [i ], channel .LastVoucher ())
689- require .Equal (t , voucherResults [i ], channel .LastVoucherResult ())
690- // No longer relying on this migration to migrate CID lists as they
691- // have been deprecated since we moved to CID sets:
692- // https://github.com/filecoin-project/go-data-transfer/pull/217
693- //require.Equal(t, receivedCids[i], channel.ReceivedCids())
694- }
695- }
696-
697421type event struct {
698422 event datatransfer.Event
699423 state datatransfer.ChannelState
0 commit comments