Skip to content

Commit 550000f

Browse files
committed
fix: copy from CSV OOM when file is large.
1 parent 6cbbbf7 commit 550000f

File tree

3 files changed

+7
-43
lines changed

3 files changed

+7
-43
lines changed

src/query/storages/stage/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ databend-common-storages-parquet = { workspace = true }
3434
databend-storages-common-stage = { workspace = true }
3535
databend-storages-common-table-meta = { workspace = true }
3636
enum-as-inner = { workspace = true }
37-
futures = { workspace = true }
3837
jsonb = { workspace = true }
3938
lexical-core = { workspace = true }
4039
log = { workspace = true }

src/query/storages/stage/src/read/row_based/processors/reader.rs

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,21 @@ use databend_common_exception::Result;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_pipeline_sources::PrefetchAsyncSource;
2525
use databend_storages_common_stage::SingleFilePartition;
26-
use futures::AsyncRead;
27-
use futures::AsyncReadExt;
2826
use log::debug;
2927
use opendal::Operator;
3028

3129
use crate::read::row_based::batch::BytesBatch;
3230

3331
struct FileState {
3432
file: SingleFilePartition,
35-
reader: opendal::FuturesAsyncReader,
33+
reader: opendal::Reader,
3634
offset: usize,
3735
}
3836

3937
pub struct BytesReader {
4038
table_ctx: Arc<dyn TableContext>,
4139
op: Operator,
4240
read_batch_size: usize,
43-
io_size: usize,
4441
file_state: Option<FileState>,
4542
prefetch_num: usize,
4643
}
@@ -52,36 +49,26 @@ impl BytesReader {
5249
read_batch_size: usize,
5350
prefetch_num: usize,
5451
) -> Result<Self> {
55-
// TODO: Use 8MiB as default IO size for now, we can extract as a new config.
56-
let default_io_size: usize = 8 * 1024 * 1024;
57-
// Calculate the IO size, which:
58-
//
59-
// - is the multiple of read_batch_size.
60-
// - is larger or equal to default_io_size.
61-
let io_size = default_io_size.div_ceil(read_batch_size) * read_batch_size;
62-
6352
Ok(Self {
6453
table_ctx,
6554
op,
6655
read_batch_size,
67-
io_size,
6856
file_state: None,
6957
prefetch_num,
7058
})
7159
}
7260

7361
pub async fn read_batch(&mut self) -> Result<DataBlock> {
7462
if let Some(state) = &mut self.file_state {
75-
let end = state.file.size.min(self.read_batch_size + state.offset);
76-
let mut buffer = vec![0u8; end - state.offset];
77-
let n = read_full(&mut state.reader, &mut buffer[..]).await?;
63+
let end = state.file.size.min(self.read_batch_size + state.offset) as u64;
64+
let buffer = state.reader.read(state.offset as u64..end).await?.to_vec();
65+
let n = buffer.len();
7866
if n == 0 {
7967
return Err(ErrorCode::BadBytes(format!(
8068
"Unexpected EOF {} expect {} bytes, read only {} bytes.",
8169
state.file.path, state.file.size, state.offset
8270
)));
8371
};
84-
buffer.truncate(n);
8572

8673
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, n);
8774
self.table_ctx
@@ -129,15 +116,8 @@ impl PrefetchAsyncSource for BytesReader {
129116
};
130117
let file = SingleFilePartition::from_part(&part)?.clone();
131118

132-
let reader = self
133-
.op
134-
.reader_with(&file.path)
135-
.chunk(self.io_size)
136-
// TODO: Use 4 concurrent for test, let's extract as a new setting.
137-
.concurrent(4)
138-
.await?
139-
.into_futures_async_read(0..file.size as u64)
140-
.await?;
119+
let reader = self.op.reader(&file.path).await?;
120+
141121
self.file_state = Some(FileState {
142122
file,
143123
reader,
@@ -150,18 +130,3 @@ impl PrefetchAsyncSource for BytesReader {
150130
}
151131
}
152132
}
153-
154-
#[async_backtrace::framed]
155-
pub async fn read_full<R: AsyncRead + Unpin>(reader: &mut R, buf: &mut [u8]) -> Result<usize> {
156-
let mut buf = &mut buf[0..];
157-
let mut n = 0;
158-
while !buf.is_empty() {
159-
let read = reader.read(buf).await?;
160-
if read == 0 {
161-
break;
162-
}
163-
n += read;
164-
buf = &mut buf[read..]
165-
}
166-
Ok(n)
167-
}

src/query/storages/stage/src/read/row_based/read_pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl RowBasedReadPipelineBuilder<'_> {
5555
let batch_size = settings.get_input_read_buffer_size()? as usize;
5656
pipeline.add_source(
5757
|output| {
58-
let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size, 1)?;
58+
let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size, 2)?;
5959
PrefetchAsyncSourcer::create(ctx.clone(), output, reader)
6060
},
6161
num_threads,

0 commit comments

Comments
 (0)