Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,8 @@ openssh-sftp-client = { version = "0.14.0", optional = true, features = [
"tracing",
] }
opentelemetry = { version = "0.21.0", optional = true }
parking_lot = "0.12"
percent-encoding = "2"
persy = { version = "1.4.6", optional = true }
pin-project = "1"
prometheus = { version = "0.13", features = ["process"], optional = true }
prometheus-client = { version = "0.22.0", optional = true }
prost = { version = "0.11", optional = true }
Expand Down
9 changes: 2 additions & 7 deletions core/src/raw/oio/list/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use pin_project::pin_project;

use crate::raw::oio::Entry;
use crate::*;

Expand Down Expand Up @@ -105,8 +103,6 @@ pub trait ListExt: List {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, L: List + Unpin + ?Sized> {
lister: &'a mut L,
}
Expand All @@ -117,9 +113,8 @@ where
{
type Output = Result<Option<Entry>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> {
let this = self.project();
Pin::new(this.lister).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> {
self.lister.poll_next(cx)
}
}

Expand Down
24 changes: 7 additions & 17 deletions core/src/raw/oio/read/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use pin_project::pin_project;
use tokio::io::ReadBuf;

use crate::*;
Expand Down Expand Up @@ -207,8 +206,6 @@ pub trait ReadExt: Read {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ReadFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut [u8],
Expand All @@ -221,13 +218,11 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
Pin::new(this.reader).poll_read(cx, this.buf)
let this = self.get_mut();
this.reader.poll_read(cx, this.buf)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct SeekFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
pos: io::SeekFrom,
Expand All @@ -240,13 +235,11 @@ where
type Output = Result<u64>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
let this = self.project();
Pin::new(this.reader).poll_seek(cx, *this.pos)
let this = self.get_mut();
this.reader.poll_seek(cx, this.pos)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
}
Expand All @@ -257,14 +250,11 @@ where
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.reader).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.reader.poll_next(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
Expand All @@ -277,7 +267,7 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
let this = self.get_mut();
let start_len = this.buf.len();

loop {
Expand Down
8 changes: 2 additions & 6 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::task::Poll;

use bytes::Bytes;
use bytes::BytesMut;
use pin_project::pin_project;

use crate::*;

Expand Down Expand Up @@ -105,8 +104,6 @@ pub trait StreamExt: Stream {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}
Expand All @@ -117,9 +114,8 @@ where
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.inner).poll_next(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.poll_next(cx)
}
}

Expand Down
22 changes: 6 additions & 16 deletions core/src/raw/oio/write/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use pin_project::pin_project;

use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -153,8 +151,6 @@ pub trait WriteExt: Write {
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct WriteFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
buf: &'a dyn oio::WriteBuf,
Expand All @@ -167,13 +163,11 @@ where
type Output = Result<usize>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> {
let this = self.project();
Pin::new(this.writer).poll_write(cx, *this.buf)
let this = self.get_mut();
this.writer.poll_write(cx, this.buf)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct AbortFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
}
Expand All @@ -184,14 +178,11 @@ where
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.writer).poll_abort(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.writer.poll_abort(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct CloseFuture<'a, W: Write + Unpin + ?Sized> {
writer: &'a mut W,
}
Expand All @@ -202,9 +193,8 @@ where
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.writer).poll_close(cx)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.writer.poll_close(cx)
}
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::Mutex;

use async_trait::async_trait;
use parking_lot::Mutex;

use crate::raw::adapters::typed_kv;
use crate::*;
Expand Down Expand Up @@ -96,7 +96,7 @@ impl typed_kv::Adapter for Adapter {
}

fn blocking_get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
match self.inner.lock().get(path) {
match self.inner.lock().unwrap().get(path) {
None => Ok(None),
Some(bs) => Ok(Some(bs.to_owned())),
}
Expand All @@ -107,7 +107,7 @@ impl typed_kv::Adapter for Adapter {
}

fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.inner.lock().insert(path.to_string(), value);
self.inner.lock().unwrap().insert(path.to_string(), value);

Ok(())
}
Expand All @@ -117,7 +117,7 @@ impl typed_kv::Adapter for Adapter {
}

fn blocking_delete(&self, path: &str) -> Result<()> {
self.inner.lock().remove(path);
self.inner.lock().unwrap().remove(path);

Ok(())
}
Expand All @@ -127,7 +127,7 @@ impl typed_kv::Adapter for Adapter {
}

fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
let inner = self.inner.lock();
let inner = self.inner.lock().unwrap();
let keys: Vec<_> = if path.is_empty() {
inner.keys().cloned().collect()
} else {
Expand Down