Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 26 additions & 23 deletions provider/src/json_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use fendermint_vm_message::{
chain::ChainMessage,
query::{FvmQuery, FvmQueryHeight},
};
use futures_util::StreamExt;
use fvm_shared::address::Address;
use reqwest::multipart::{Form, Part};
use tendermint::abci::response::DeliverTx;
Expand All @@ -20,7 +19,6 @@ use tendermint_rpc::{
endpoint::abci_query::AbciQuery, Client, HttpClient, Scheme, Url, WebSocketClient,
WebSocketClientDriver, WebSocketClientUrl,
};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use crate::object::ObjectProvider;
use crate::query::QueryProvider;
Expand Down Expand Up @@ -184,26 +182,19 @@ where
Ok(cid)
}

async fn download<W>(
async fn download(
&self,
address: Address,
key: &str,
range: Option<String>,
height: u64,
mut writer: W,
) -> anyhow::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
{
) -> anyhow::Result<reqwest::Response> {
let client = self
.objects
.clone()
.ok_or_else(|| anyhow!("object provider is required"))?;

let url = format!(
"{}v1/objectstores/{}/{}?height={}",
client.url, address, key, height
);
let url = format!("{}v1/objects/{}/{}?height={}", client.url, address, key, height);
let response = if let Some(range) = range {
client
.inner
Expand All @@ -221,19 +212,31 @@ where
)));
}

let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
match item {
Ok(chunk) => {
writer.write_all(&chunk).await?;
}
Err(e) => {
return Err(anyhow!(e));
}
}
Ok(response)
}

async fn size(&self, address: Address, key: &str, height: u64) -> anyhow::Result<usize> {
let client = self
.objects
.clone()
.ok_or_else(|| anyhow!("object provider is required"))?;

let url = format!("{}v1/objects/{}/{}?height={}", client.url, address, key, height);
let response = client.inner.head(url).send().await?;
if !response.status().is_success() {
return Err(anyhow!(format!(
"failed to get object size: {}",
response.text().await?
)));
}

Ok(())
let size: usize = response
.headers()
.get("content-length")
.ok_or_else(|| anyhow!("missing content-length header in response for object size"))?
.to_str()?
.parse()?;
Ok(size)
}
}

Expand Down
11 changes: 5 additions & 6 deletions provider/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use async_trait::async_trait;
use fvm_shared::address::Address;
use tokio::io::AsyncWrite;

use crate::response::Cid;

Expand All @@ -20,14 +19,14 @@ pub trait ObjectProvider: Send + Sync {
) -> anyhow::Result<Cid>;

/// Download an object.
async fn download<W>(
async fn download(
&self,
address: Address,
key: &str,
range: Option<String>,
height: u64,
writer: W,
) -> anyhow::Result<()>
where
W: AsyncWrite + Unpin + Send + 'static;
) -> anyhow::Result<reqwest::Response>;

/// Gets the object size.
async fn size(&self, address: Address, key: &str, height: u64) -> anyhow::Result<usize>;
}
38 changes: 23 additions & 15 deletions sdk/src/machine/objectstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,23 +487,31 @@ impl ObjectStore {
if !resolved {
return Err(anyhow!("object is not resolved"));
}
// The `download` method is currently using /objectstore API
// since we have decided to keep the GET APIs intact for a while.
// If we decide to remove these APIs, we can move to Object API
// for downloading the file with CID.
// TODO: If detached objects had size on-chain, we could show download progress.
msg_bar.set_prefix("[2/2]");
msg_bar.set_message(format!("Downloading {}...", cid));
provider
.download(
self.address,
key,
options.range,
options.height.into(),
writer,
)
.await?;
msg_bar.set_message(format!("Downloading {}... ", cid));

let object_size = provider
.size(self.address, key, options.height.into())
.await?;
let pro_bar = bars.add(new_progress_bar(object_size));
let response = provider
.download(self.address, key, options.range, options.height.into())
.await?;
let mut stream = response.bytes_stream();
let mut progress = 0;
while let Some(item) = stream.next().await {
match item {
Ok(chunk) => {
writer.write_all(&chunk).await?;
progress = min(progress + chunk.len(), object_size);
pro_bar.set_position(progress as u64);
}
Err(e) => {
return Err(anyhow!(e));
}
}
}
pro_bar.finish_and_clear();
msg_bar.println(format!(
"{} Downloaded detached object in {} (cid={})",
SPARKLE,
Expand Down