diff --git a/src/lib.rs b/src/lib.rs index d28739eb0..66e032894 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,7 @@ pub mod yamux; mod bandwidth; mod multistream_select; +mod utils; #[cfg(test)] mod mock; diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index 39080f447..e94a09f27 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -19,9 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId}, - substream::Substream, - PeerId, + protocol::libp2p::kademlia::query::QueryId, substream::Substream, + utils::futures_stream::FuturesStream, PeerId, }; use bytes::{Bytes, BytesMut}; diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 421904d1b..03b98ea23 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -71,7 +71,6 @@ const PARALLELISM_FACTOR: usize = 3; mod bucket; mod config; mod executor; -mod futures_stream; mod handle; mod message; mod query; diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 1c6292c06..efb39f0fa 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -23,10 +23,10 @@ use crate::{ protocol::libp2p::kademlia::{ config::{DEFAULT_PROVIDER_REFRESH_INTERVAL, DEFAULT_PROVIDER_TTL}, - futures_stream::FuturesStream, record::{ContentProvider, Key, ProviderRecord, Record}, types::Key as KademliaKey, }, + utils::futures_stream::FuturesStream, PeerId, }; diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index bbd6ffa5c..748e138d5 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -34,6 +34,7 @@ use crate::{ Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, + utils::futures_stream::FuturesStream, }; use futures::{ @@ -111,12 +112,11 @@ pub(crate) struct TcpTransport { pending_inbound_connections: HashMap, /// Pending opening connections. - pending_connections: FuturesUnordered< - BoxFuture<'static, Result>, - >, + pending_connections: + FuturesStream>>, /// Pending raw, unnegotiated connections. - pending_raw_connections: FuturesUnordered>, + pending_raw_connections: FuturesStream>, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap, @@ -295,8 +295,8 @@ impl TransportBuilder for TcpTransport { pending_open: HashMap::new(), pending_dials: HashMap::new(), pending_inbound_connections: HashMap::new(), - pending_connections: FuturesUnordered::new(), - pending_raw_connections: FuturesUnordered::new(), + pending_connections: FuturesStream::new(), + pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), }, listen_addresses, diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 5975aad05..2435f6397 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -33,6 +33,7 @@ use crate::{ Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, + utils::futures_stream::FuturesStream, DialError, PeerId, }; @@ -115,12 +116,11 @@ pub(crate) struct WebSocketTransport { pending_inbound_connections: HashMap, /// Pending connections. - pending_connections: FuturesUnordered< - BoxFuture<'static, Result>, - >, + pending_connections: + FuturesStream>>, /// Pending raw, unnegotiated connections. - pending_raw_connections: FuturesUnordered>, + pending_raw_connections: FuturesStream>, /// Opened raw connection, waiting for approval/rejection from `TransportManager`. opened_raw: HashMap>, Multiaddr)>, @@ -325,8 +325,8 @@ impl TransportBuilder for WebSocketTransport { pending_open: HashMap::new(), pending_dials: HashMap::new(), pending_inbound_connections: HashMap::new(), - pending_connections: FuturesUnordered::new(), - pending_raw_connections: FuturesUnordered::new(), + pending_connections: FuturesStream::new(), + pending_raw_connections: FuturesStream::new(), cancel_futures: HashMap::new(), }, listen_addresses, diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/utils/futures_stream.rs similarity index 98% rename from src/protocol/libp2p/kademlia/futures_stream.rs rename to src/utils/futures_stream.rs index 439772c3d..6393296d5 100644 --- a/src/protocol/libp2p/kademlia/futures_stream.rs +++ b/src/utils/futures_stream.rs @@ -44,7 +44,7 @@ impl FuturesStream { } } - /// Number of futeres in the stream. + /// Number of futures in the stream. #[cfg(test)] pub fn len(&self) -> usize { self.futures.len() diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 000000000..7c0f49e34 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub mod futures_stream;