Skip to content

Commit 5871589

Browse files
authored
Merge pull request #111 from Actyx/libp2p-0.48
libp2p 0.48
2 parents 87ac5ac + 1b385e9 commit 5871589

File tree

16 files changed

+1605
-1049
lines changed

16 files changed

+1605
-1049
lines changed

Cargo.lock

Lines changed: 430 additions & 355 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ description = "small embeddable ipfs implementation"
1111
repository = "https://github.com/ipfs-rust/ipfs-embed"
1212

1313
[features]
14-
default = ["async_global"]
15-
async_global = ["async-global-executor", "libp2p/tcp-async-io", "libp2p/dns-async-std"]
16-
tokio = ["tokio-crate", "libp2p/tcp-tokio", "libp2p/dns-tokio"]
14+
default = ["async_global", "rsa"]
15+
rsa = ["libp2p/rsa"]
16+
async_global = ["async-global-executor", "libp2p/tcp-async-io", "libp2p/dns-async-std", "libp2p/mdns-async-io"]
17+
tokio = ["tokio-crate", "libp2p/tcp-tokio", "libp2p/dns-tokio", "libp2p/mdns-tokio"]
1718
telemetry = ["tide", "async_global"]
1819
# Makes it possible to exchange data via Bitswap with a go-ipfs node
1920
compat = ["libp2p-bitswap/compat"]
@@ -28,11 +29,11 @@ chrono = "0.4.19"
2829
fnv = "1.0.7"
2930
futures = "0.3.21"
3031
futures-timer = "3.0.2"
31-
ipfs-sqlite-block-store = "0.12.0"
32+
ipfs-sqlite-block-store = "0.13.0"
3233
lazy_static = "1.4.0"
33-
libipld = { version = "0.12.0", default-features = false }
34-
libp2p-bitswap = "0.22.0"
35-
libp2p-broadcast = "0.9.1"
34+
libipld = { version = "0.14.0", default-features = false }
35+
libp2p-bitswap = "0.23.0"
36+
libp2p-broadcast = "0.10.0"
3637
names = "0.13.0"
3738
parking_lot = "0.11.2"
3839
pin-project = "1.0.10"
@@ -42,30 +43,29 @@ thiserror = "1.0.30"
4243
tide = { version = "0.16.0", optional = true }
4344
tokio-crate = { package = "tokio", version = "1.17.0", features = ["rt"], optional = true }
4445
tracing = "0.1.32"
45-
trust-dns-resolver = "0.20"
46+
trust-dns-resolver = "0.21.2"
4647
void = "1.0.2"
4748

4849
[dependencies.libp2p]
49-
version = "0.43.0"
50+
version = "0.48.0"
5051
default-features = false
5152
features = [
5253
"gossipsub",
5354
"identify",
5455
"kad",
55-
"mdns",
5656
"ping",
57-
#"relay",
5857
"mplex",
5958
"noise",
6059
"pnet",
6160
"yamux",
6261
]
6362

6463
[dev-dependencies]
64+
anyhow = { version = "1", features = ["backtrace"] }
6565
async-std = { version = "1.11.0", features = ["attributes"] }
66-
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
67-
libp2p-bitswap = { version = "0.22.0", default-features = false, features = ["compat"] }
68-
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
66+
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
67+
libp2p-bitswap = { version = "0.23.0", default-features = false, features = ["compat"] }
68+
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
6969
rand = "0.8.5"
7070
regex = "1.5.5"
7171
tempdir = "0.3.7"

cli/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ anyhow = "1.0.56"
1010
async-process = "1.3.0"
1111
async-std = { version = "1.11.0", features = ["attributes"] }
1212
chrono = "0.4.19"
13+
futures = "0.3.24"
1314
ipfs-embed = { path = ".." }
14-
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor"] }
15-
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
15+
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor"] }
16+
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
1617
parking_lot = "0.11.2"
1718
serde = { version = "1.0.136", features = ["derive"] }
1819
serde_json = "1.0.79"

cli/src/main.rs

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22
use async_std::stream::StreamExt;
3+
use futures::TryFutureExt;
34
use ipfs_embed::{DefaultParams, Ipfs, NetworkConfig, StorageConfig};
45
use ipfs_embed_cli::{keypair, Command, Config, Event};
56
use parking_lot::Mutex;
@@ -45,11 +46,11 @@ async fn run() -> Result<()> {
4546
};
4647
network.identify.as_mut().unwrap().agent_version = node_name;
4748

48-
let ipfs = Ipfs::<DefaultParams>::new(ipfs_embed::Config { storage, network }).await?;
49-
let mut events = ipfs.swarm_events();
49+
let mut ipfs = Ipfs::<DefaultParams>::new(ipfs_embed::Config { storage, network }).await?;
50+
let mut events = ipfs.swarm_events().await?;
5051

5152
for addr in config.listen_on {
52-
let _ = ipfs.listen_on(addr)?;
53+
let _ = ipfs.listen_on(addr);
5354
}
5455

5556
for addr in config.external {
@@ -111,36 +112,38 @@ async fn run() -> Result<()> {
111112
loop {
112113
line.clear();
113114
stdin.read_line(&mut line)?;
114-
match line.parse()? {
115-
Command::AddAddress(peer, addr) => {
116-
ipfs.lock().add_address(&peer, addr);
115+
#[allow(clippy::unit_arg)]
116+
let result = match line.parse() {
117+
Ok(Command::AddAddress(peer, addr)) => Ok(ipfs.lock().add_address(peer, addr)),
118+
Ok(Command::Dial(peer)) => Ok(ipfs.lock().dial(peer)),
119+
Ok(Command::PrunePeers) => Ok(ipfs.lock().prune_peers(Duration::ZERO)),
120+
Ok(Command::Get(cid)) => ipfs
121+
.lock()
122+
.get(&cid)
123+
.map(|block| writeln!(stdout, "{}", Event::Block(block)).expect("print")),
124+
Ok(Command::Insert(block)) => ipfs.lock().insert(block),
125+
Ok(Command::Alias(alias, cid)) => ipfs.lock().alias(&alias, cid.as_ref()),
126+
Ok(Command::Flush) => {
127+
let f = ipfs
128+
.lock()
129+
.flush()
130+
.inspect_ok(|_| writeln!(stdout, "{}", Event::Flushed).expect("print"));
131+
f.await
117132
}
118-
Command::Dial(peer) => {
119-
ipfs.lock().dial(&peer);
120-
}
121-
Command::PrunePeers => {
122-
ipfs.lock().prune_peers(Duration::ZERO);
123-
}
124-
Command::Get(cid) => {
125-
let block = ipfs.lock().get(&cid)?;
126-
writeln!(stdout, "{}", Event::Block(block))?;
127-
}
128-
Command::Insert(block) => {
129-
ipfs.lock().insert(block)?;
130-
}
131-
Command::Alias(alias, cid) => {
132-
ipfs.lock().alias(&alias, cid.as_ref())?;
133-
}
134-
Command::Flush => {
135-
ipfs.lock().flush().await?;
136-
writeln!(stdout, "{}", Event::Flushed)?;
137-
}
138-
Command::Sync(cid) => {
133+
Ok(Command::Sync(cid)) => {
139134
let providers = ipfs.lock().peers();
140135
tracing::debug!("sync {} from {:?}", cid, providers);
141-
ipfs.lock().sync(&cid, providers).await?;
142-
writeln!(stdout, "{}", Event::Synced)?;
136+
let f = ipfs
137+
.lock()
138+
.sync(&cid, providers)
139+
.and_then(|f| f)
140+
.inspect_ok(|_| writeln!(stdout, "{}", Event::Synced).expect("print"));
141+
f.await
143142
}
143+
Err(err) => Err(err),
144+
};
145+
if let Err(err) = result {
146+
eprintln!("main loop error (line = {}): {}", line, err);
144147
}
145148
}
146149
}

examples/compat.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ fn tracing_try_init() {
2121
async fn main() -> anyhow::Result<()> {
2222
tracing_try_init();
2323
let config = Config::default();
24-
let ipfs = Ipfs::<Sp>::new(config).await?;
24+
let mut ipfs = Ipfs::<Sp>::new(config).await?;
2525
let peer: PeerId = "QmRSGx67Kq8w7xSBDia7hQfbfuvauMQGgxcwSWw976x4BS".parse()?;
2626
let addr: Multiaddr = "/ip4/54.173.33.96/tcp/4001".parse()?;
27-
ipfs.dial_address(&peer, addr);
27+
ipfs.dial_address(peer, addr);
2828

2929
// 10 random bytes
3030
let _cid_rand10: Cid = "QmXQsqVRpp2W7fbYZHi4aB2Xkqfd3DpwWskZoLVEYigMKC".parse()?;
@@ -42,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
4242
let block = ipfs.fetch(&cid_simple_dag, vec![peer]).await?;
4343
println!("got single block. len = {}", block.data().len());
4444

45-
let mut updates = ipfs.sync(&cid_simple_dag, vec![peer]);
45+
let mut updates = ipfs.sync(&cid_simple_dag, vec![peer]).await?;
4646
println!("starting sync of large file");
4747
while let Some(update) = updates.next().await {
4848
println!("{:?}", update);

examples/sync.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ async fn main() -> Result<()> {
4545
.init();
4646
let mut config = Config::new("/tmp/local1".as_ref(), Keypair::generate());
4747
config.network.kad = None;
48-
let a = Ipfs::<DefaultParams>::new(config).await?;
49-
a.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?
48+
let mut a = Ipfs::<DefaultParams>::new(config).await?;
49+
a.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)
5050
.next()
5151
.await
5252
.unwrap();
@@ -76,6 +76,7 @@ async fn main() -> Result<()> {
7676

7777
b.alias(ROOT, builder.prev.as_ref())?;
7878
b.sync(builder.prev.as_ref().unwrap(), vec![a.local_peer_id()])
79+
.await?
7980
.await?;
8081
b.flush().await?;
8182

harness/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ async-std = "1.11.0"
1313
escargot = "0.5.7"
1414
futures = "0.3.21"
1515
ipfs-embed-cli = { path = "../cli" }
16-
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
17-
libp2p = { version = "0.43.0", default-features = false }
16+
libipld = { version = "0.14.0", default-features = false, features = ["dag-cbor", "dag-pb", "derive"] }
17+
libp2p = { version = "0.48.0", default-features = false }
1818
maplit = "1.0.2"
19-
multihash = { version = "0.14.0", default-features = false, features = ["blake3"] }
19+
multihash = { version = "0.16.1", default-features = false, features = ["blake3"] }
2020
netsim-embed = "0.7.1"
2121
rand = "0.8.5"
2222
structopt = "0.3.26"

harness/src/bin/discover_nat.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,11 @@ fn main() -> anyhow::Result<()> {
110110
// we can’t attempt to dial while the connection exists
111111
i.addresses.get(&i.connections[0]).map(|s| s.as_str()) ==
112112
Some("Candidate")
113-
// can’t check for full hashmap equality since the state where only the
114-
// Candidate is present may be lost to output race conditions
113+
// can’t check for full hashmap equality since the state where only the
114+
// Candidate is present may be lost to output race conditions
115+
|| i.addresses.is_empty()
116+
// if consumer sent identify first, then the NAT address wasn’t known
117+
// and only falsifiable listen addresses are left
115118
))
116119
.then(|| ())
117120
})
@@ -137,22 +140,30 @@ fn main() -> anyhow::Result<()> {
137140
.deadline(started, 30)
138141
.await
139142
.unwrap();
140-
m.drain_matching(|e| matches!(e, Event::DialFailure(p, ..) if p == peer));
143+
m.drain_matching(|e| matches!(e, Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer));
141144
tracing::info!("provider {} saw close from {}", id, m_id);
142145
m.send(Command::Dial(*peer));
143-
m.select(|e| matches!(e, Event::DialFailure(p, ..) if p == peer).then(|| ()))
146+
let alive = m
147+
.select(|e| match e {
148+
Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer => Some(true),
149+
Event::PeerRemoved(p) if p == peer => Some(false),
150+
_ => None,
151+
})
144152
.timeout(10)
145153
.await
154+
.unwrap()
146155
.unwrap();
147-
m.send(Command::PrunePeers);
148-
m.select(|e| {
149-
// prune_peers will remove the peer when a failure happens while not
150-
// connected
151-
matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ())
152-
})
153-
.timeout(10)
154-
.await
155-
.unwrap();
156+
if alive {
157+
m.send(Command::PrunePeers);
158+
m.select(|e| {
159+
// prune_peers will remove the peer when a failure happens while not
160+
// connected
161+
matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ())
162+
})
163+
.timeout(10)
164+
.await
165+
.unwrap();
166+
}
156167
tracing::info!("provider {} done with {}", id, m_id);
157168
}
158169
}

harness/src/bin/discover_nat_forward.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ fn main() -> anyhow::Result<()> {
141141
Event::PeerInfo(p, i) if p == peer => Some(i.connections[0].clone()),
142142
_ => None
143143
}).timeout(1).await.unwrap().unwrap();
144-
tracing::info!("first address is {}", a_1);
145144
// the NAT may give us the correct port in a_1 already, so no second entry to
146145
// check
147146
let a_nat = a_1
148147
.replace(1, |_| Some(Protocol::Tcp(30000)))
149148
.filter(|a| *m_id == m_nat && *a != a_1);
149+
tracing::info!("first address is {}, a_nat={:?}", a_1, a_nat);
150150
m.select(|e| {
151151
matches!(e, Event::PeerInfo(p, i) if p == peer && (
152152
// port_reuse unfortunately means that the NATed port is added to
@@ -156,10 +156,13 @@ fn main() -> anyhow::Result<()> {
156156
a_nat.iter().all(|a_nat| {
157157
i.addresses.get(a_nat).map(|x| x.as_str()) == Some("Dial")
158158
}))
159+
// if consumer sent identify first, then the NAT address wasn’t known
160+
// and only falsifiable listen addresses are left
161+
|| i.addresses.is_empty()
159162
)
160163
.then(|| ())
161164
})
162-
.deadline(started, 5).await.unwrap();
165+
.deadline(started, 10).await.unwrap();
163166
tracing::info!("provider {} identified {}", id, m_id);
164167
}
165168
m.drain();
@@ -190,11 +193,21 @@ fn main() -> anyhow::Result<()> {
190193
let m = sim.machine(*id);
191194
for (m_id, (peer, _addr)) in consumers.iter() {
192195
m.send(Command::Dial(*peer));
193-
m.select(|e| matches!(e, Event::DialFailure(p, ..) if p == peer).then(|| ()))
194-
.timeout(10).await.unwrap();
195-
m.send(Command::PrunePeers);
196-
m.select(|e| matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ()))
197-
.timeout(10).await.unwrap();
196+
let alive = m
197+
.select(|e| match e {
198+
Event::DialFailure(p, ..) | Event::Unreachable(p) if p == peer => Some(true),
199+
Event::PeerRemoved(p) if p == peer => Some(false),
200+
_ => None,
201+
})
202+
.timeout(10)
203+
.await
204+
.unwrap()
205+
.unwrap();
206+
if alive {
207+
m.send(Command::PrunePeers);
208+
m.select(|e| matches!(e, Event::PeerRemoved(p) if p == peer).then(|| ()))
209+
.timeout(10).await.unwrap();
210+
}
198211
tracing::info!("provider {} done with {}", id, m_id);
199212
}
200213
}

src/db.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -275,31 +275,29 @@ where
275275
self.rw("missing_blocks", |x| x.missing_blocks(cid))
276276
}
277277

278-
pub async fn evict(&self) -> Result<()> {
278+
pub fn evict(&self) -> impl Future<Output = Result<()>> {
279279
let store = self.inner.store.clone();
280280
let gc_min_blocks = self.inner.gc_min_blocks;
281281
let gc_target_duration = self.inner.gc_target_duration;
282-
self.inner
283-
.executor
284-
.spawn_blocking(move || {
285-
while !store
286-
.lock()
287-
.incremental_gc(gc_min_blocks, gc_target_duration)?
288-
{
289-
tracing::trace!("x");
290-
}
291-
Ok(())
292-
})
293-
.await?
294-
}
295-
296-
pub async fn flush(&self) -> Result<()> {
282+
let evict = self.inner.executor.spawn_blocking(move || {
283+
while !store
284+
.lock()
285+
.incremental_gc(gc_min_blocks, gc_target_duration)?
286+
{
287+
tracing::trace!("x");
288+
}
289+
Ok(())
290+
});
291+
async { evict.await? }
292+
}
293+
294+
pub fn flush(&self) -> impl Future<Output = Result<()>> {
297295
let store = self.inner.store.clone();
298296
let flush = self
299297
.inner
300298
.executor
301299
.spawn_blocking(move || store.lock().flush());
302-
Ok(observe_future("flush", flush).await??)
300+
async { Ok(observe_future("flush", flush).await??) }
303301
}
304302

305303
pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
@@ -347,7 +345,7 @@ where
347345
} else {
348346
timer.stop_and_discard();
349347
}
350-
Ok(res?)
348+
res
351349
}
352350

353351
struct SqliteStoreCollector<S: StoreParams> {

0 commit comments

Comments
 (0)