Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/query/storages/stage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ databend-common-storages-parquet = { workspace = true }
databend-storages-common-stage = { workspace = true }
databend-storages-common-table-meta = { workspace = true }
enum-as-inner = { workspace = true }
futures = { workspace = true }
jsonb = { workspace = true }
lexical-core = { workspace = true }
log = { workspace = true }
Expand Down
47 changes: 6 additions & 41 deletions src/query/storages/stage/src/read/row_based/processors/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@ use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline_sources::PrefetchAsyncSource;
use databend_storages_common_stage::SingleFilePartition;
use futures::AsyncRead;
use futures::AsyncReadExt;
use log::debug;
use opendal::Operator;

use crate::read::row_based::batch::BytesBatch;

struct FileState {
file: SingleFilePartition,
reader: opendal::FuturesAsyncReader,
reader: opendal::Reader,
offset: usize,
}

pub struct BytesReader {
table_ctx: Arc<dyn TableContext>,
op: Operator,
read_batch_size: usize,
io_size: usize,
file_state: Option<FileState>,
prefetch_num: usize,
}
Expand All @@ -52,36 +49,26 @@ impl BytesReader {
read_batch_size: usize,
prefetch_num: usize,
) -> Result<Self> {
// TODO: Use 8MiB as default IO size for now, we can extract as a new config.
let default_io_size: usize = 8 * 1024 * 1024;
// Calculate the IO size, which:
//
// - is the multiple of read_batch_size.
// - is larger or equal to default_io_size.
let io_size = default_io_size.div_ceil(read_batch_size) * read_batch_size;

Ok(Self {
table_ctx,
op,
read_batch_size,
io_size,
file_state: None,
prefetch_num,
})
}

pub async fn read_batch(&mut self) -> Result<DataBlock> {
if let Some(state) = &mut self.file_state {
let end = state.file.size.min(self.read_batch_size + state.offset);
let mut buffer = vec![0u8; end - state.offset];
let n = read_full(&mut state.reader, &mut buffer[..]).await?;
let end = state.file.size.min(self.read_batch_size + state.offset) as u64;
let buffer = state.reader.read(state.offset as u64..end).await?.to_vec();
let n = buffer.len();
if n == 0 {
return Err(ErrorCode::BadBytes(format!(
"Unexpected EOF {} expect {} bytes, read only {} bytes.",
state.file.path, state.file.size, state.offset
)));
};
buffer.truncate(n);

Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, n);
self.table_ctx
Expand Down Expand Up @@ -129,15 +116,8 @@ impl PrefetchAsyncSource for BytesReader {
};
let file = SingleFilePartition::from_part(&part)?.clone();

let reader = self
.op
.reader_with(&file.path)
.chunk(self.io_size)
// TODO: Use 4 concurrent for test, let's extract as a new setting.
.concurrent(4)
.await?
.into_futures_async_read(0..file.size as u64)
.await?;
let reader = self.op.reader(&file.path).await?;

self.file_state = Some(FileState {
file,
reader,
Expand All @@ -150,18 +130,3 @@ impl PrefetchAsyncSource for BytesReader {
}
}
}

#[async_backtrace::framed]
pub async fn read_full<R: AsyncRead + Unpin>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
let mut buf = &mut buf[0..];
let mut n = 0;
while !buf.is_empty() {
let read = reader.read(buf).await?;
if read == 0 {
break;
}
n += read;
buf = &mut buf[read..]
}
Ok(n)
}
1 change: 1 addition & 0 deletions tests/sqllogictests/scripts/prepare_tpch_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ echo "CREATE TABLE IF NOT EXISTS ${db}.lineitem
#import data
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
python $CURDIR/prepare_duckdb_tpch_data.py 1
ls -lh /tmp/tpch_1/*

stmt "drop stage if exists s1"
stmt "create stage s1 url='fs:///tmp/tpch_1/'"
Expand Down
11 changes: 11 additions & 0 deletions tests/sqllogictests/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,17 @@ fn run_script(name: &str, args: &[&str]) -> Result<()> {
name,
String::from_utf8(output.stderr).unwrap()
)));
} else {
println!(
"script stdout:\n {}",
String::from_utf8(output.stdout).unwrap()
);
if !output.stderr.is_empty() {
println!(
"script stderr:\n {}",
String::from_utf8(output.stderr).unwrap()
);
}
}
Ok(())
}
Expand Down