Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ impl Accessor for GdriveBackend {
return Ok(RpDelete::default());
};

let resp = self.core.gdrive_delete(&file_id).await?;
let resp = self.core.gdrive_trash(&file_id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down Expand Up @@ -183,9 +183,9 @@ impl Accessor for GdriveBackend {

// copy will overwrite `to`, delete it if exist
if let Some(id) = self.core.path_cache.get(&to_path).await? {
let resp = self.core.gdrive_delete(&id).await?;
let resp = self.core.gdrive_trash(&id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down Expand Up @@ -223,9 +223,9 @@ impl Accessor for GdriveBackend {

// rename will overwrite `to`, delete it if exist
if let Some(id) = self.core.path_cache.get(&target).await? {
let resp = self.core.gdrive_delete(&id).await?;
let resp = self.core.gdrive_trash(&id).await?;
let status = resp.status();
if status != StatusCode::NO_CONTENT && status != StatusCode::NOT_FOUND {
if status != StatusCode::OK {
return Err(parse_error(resp).await?);
}

Expand Down
74 changes: 49 additions & 25 deletions core/src/services/gdrive/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,63 @@ use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use log::debug;
use serde::Deserialize;
use tokio::sync::Mutex;

use super::backend::GdriveBackend;
use crate::raw::HttpClient;
use crate::raw::{normalize_root, PathCacher};
use crate::raw::{ConfigDeserializer, HttpClient};
use crate::services::gdrive::core::GdriveSigner;
use crate::services::gdrive::core::{GdriveCore, GdrivePathQuery};
use crate::Scheme;
use crate::*;

/// [GoogleDrive](https://drive.google.com/) configuration.
#[derive(Default, Deserialize)]
#[serde(default)]
#[non_exhaustive]
pub struct GdriveConfig {
/// The root for gdrive
pub root: Option<String>,
/// Access token for gdrive.
pub access_token: Option<String>,
/// Refresh token for gdrive.
pub refresh_token: Option<String>,
/// Client id for gdrive.
pub client_id: Option<String>,
/// Client secret for gdrive.
pub client_secret: Option<String>,
}

impl Debug for GdriveConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GdriveConfig")
.field("root", &self.root)
.finish_non_exhaustive()
}
}

/// [GoogleDrive](https://drive.google.com/) backend support.
#[derive(Default)]
#[doc = include_str!("docs.md")]
pub struct GdriveBuilder {
root: Option<String>,

access_token: Option<String>,

refresh_token: Option<String>,
client_id: Option<String>,
client_secret: Option<String>,
config: GdriveConfig,

http_client: Option<HttpClient>,
}

impl Debug for GdriveBuilder {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend").field("root", &self.root).finish()
f.debug_struct("Backend")
.field("config", &self.config)
.finish()
}
}

impl GdriveBuilder {
/// Set root path of GoogleDrive folder.
pub fn root(&mut self, root: &str) -> &mut Self {
self.root = Some(root.to_string());
self.config.root = Some(root.to_string());
self
}

Expand All @@ -72,7 +94,7 @@ impl GdriveBuilder {
/// - If you want to use the access token for a long time,
/// you can use the refresh token to get a new access token.
pub fn access_token(&mut self, access_token: &str) -> &mut Self {
self.access_token = Some(access_token.to_string());
self.config.access_token = Some(access_token.to_string());
self
}

Expand All @@ -82,23 +104,23 @@ impl GdriveBuilder {
///
/// OpenDAL will use this refresh token to get a new access token when the old one is expired.
pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self {
self.refresh_token = Some(refresh_token.to_string());
self.config.refresh_token = Some(refresh_token.to_string());
self
}

/// Set the client id for GoogleDrive.
///
/// This is required for OAuth 2.0 Flow to refresh the access token.
pub fn client_id(&mut self, client_id: &str) -> &mut Self {
self.client_id = Some(client_id.to_string());
self.config.client_id = Some(client_id.to_string());
self
}

/// Set the client secret for GoogleDrive.
///
/// This is required for OAuth 2.0 Flow with refresh the access token.
pub fn client_secret(&mut self, client_secret: &str) -> &mut Self {
self.client_secret = Some(client_secret.to_string());
self.config.client_secret = Some(client_secret.to_string());
self
}

Expand All @@ -120,19 +142,18 @@ impl Builder for GdriveBuilder {
type Accessor = GdriveBackend;

fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = Self::default();
let config = GdriveConfig::deserialize(ConfigDeserializer::new(map))
.expect("config deserialize must succeed");

map.get("root").map(|v| builder.root(v));
map.get("access_token").map(|v| builder.access_token(v));
map.get("refresh_token").map(|v| builder.refresh_token(v));
map.get("client_id").map(|v| builder.client_id(v));
map.get("client_secret").map(|v| builder.client_secret(v));
Self {
config,

builder
http_client: None,
}
}

fn build(&mut self) -> Result<Self::Accessor> {
let root = normalize_root(&self.root.take().unwrap_or_default());
let root = normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);

let client = if let Some(client) = self.http_client.take() {
Expand All @@ -145,21 +166,24 @@ impl Builder for GdriveBuilder {
};

let mut signer = GdriveSigner::new(client.clone());
match (self.access_token.take(), self.refresh_token.take()) {
match (
self.config.access_token.take(),
self.config.refresh_token.take(),
) {
(Some(access_token), None) => {
signer.access_token = access_token;
// We will never expire user specified access token.
signer.expires_in = DateTime::<Utc>::MAX_UTC;
}
(None, Some(refresh_token)) => {
let client_id = self.client_id.take().ok_or_else(|| {
let client_id = self.config.client_id.take().ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"client_id must be set when refresh_token is set",
)
.with_context("service", Scheme::Gdrive)
})?;
let client_secret = self.client_secret.take().ok_or_else(|| {
let client_secret = self.config.client_secret.take().ok_or_else(|| {
Error::new(
ErrorKind::ConfigInvalid,
"client_secret must be set when refresh_token is set",
Expand Down
11 changes: 8 additions & 3 deletions core/src/services/gdrive/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,16 @@ impl GdriveCore {
self.client.send(req).await
}

pub async fn gdrive_delete(&self, file_id: &str) -> Result<Response<IncomingAsyncBody>> {
pub async fn gdrive_trash(&self, file_id: &str) -> Result<Response<IncomingAsyncBody>> {
let url = format!("https://www.googleapis.com/drive/v3/files/{}", file_id);

let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
let body = serde_json::to_vec(&json!({
"trashed": true
}))
.map_err(new_json_serialize_error)?;

let mut req = Request::patch(&url)
.body(AsyncBody::Bytes(Bytes::from(body)))
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
Expand Down
4 changes: 3 additions & 1 deletion core/src/services/gdrive/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
| StatusCode::GATEWAY_TIMEOUT
// Gdrive sometimes return METHOD_NOT_ALLOWED for our requests for abuse detection.
| StatusCode::METHOD_NOT_ALLOWED => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};

Expand Down
5 changes: 5 additions & 0 deletions core/tests/behavior/async_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ pub async fn test_delete_stream(op: Operator) -> Result<()> {
if !op.info().full_capability().create_dir {
return Ok(());
}
// Gdrive think that this test is an abuse of their service and redirect us
// to an infinite loop. Let's ignore this test for gdrive.
if op.info().scheme() == Scheme::Gdrive {
return Ok(());
}

let dir = uuid::Uuid::new_v4().to_string();
op.create_dir(&format!("{dir}/"))
Expand Down
6 changes: 6 additions & 0 deletions core/tests/behavior/async_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ pub async fn test_list_prefix(op: Operator) -> Result<()> {

/// listing a directory, which contains more objects than a single page can take.
pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
// Gdrive think that this test is an abuse of their service and redirect us
// to an infinite loop. Let's ignore this test for gdrive.
if op.info().scheme() == Scheme::Gdrive {
return Ok(());
}

op.create_dir("test_list_rich_dir/").await?;

let mut expected: Vec<String> = (0..=100)
Expand Down