Skip to content
Merged
3 changes: 3 additions & 0 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ mod streams;
/// File extension for arrow files in staging
const ARROW_FILE_EXTENSION: &str = "arrows";

/// File extension for incomplete arrow files
const PART_FILE_EXTENSION: &str = "part";

/// Name of a Stream
/// NOTE: this used to be a struct, flattened out for simplicity
pub type LogStream = String;
Expand Down
15 changes: 7 additions & 8 deletions src/parseable/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ fn find_limit_and_type(
#[cfg(test)]
mod tests {
use std::{
fs::File,
io::{self, Cursor, Read},
path::Path,
sync::Arc,
Expand All @@ -338,7 +337,10 @@ mod tests {
use arrow_schema::{DataType, Field, Schema};
use temp_dir::TempDir;

use crate::parseable::staging::reader::{MergedReverseRecordReader, OffsetReader};
use crate::parseable::staging::{
reader::{MergedReverseRecordReader, OffsetReader},
writer::DiskWriter,
};

use super::get_reverse_reader;

Expand Down Expand Up @@ -482,15 +484,12 @@ mod tests {
schema: &Arc<Schema>,
batches: &[RecordBatch],
) -> io::Result<()> {
let file = File::create(path)?;
let mut writer =
StreamWriter::try_new(file, schema).expect("Failed to create StreamWriter");
let mut writer = DiskWriter::try_new(path, schema).expect("Failed to create StreamWriter");

for batch in batches {
writer.write(batch).expect("Failed to write batch");
}

writer.finish().expect("Failed to finalize writer");
Ok(())
}

Expand Down Expand Up @@ -524,7 +523,7 @@ mod tests {
#[test]
fn test_merged_reverse_record_reader() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test.arrow");
let file_path = dir.path().join("test.data.arrows");

// Create a schema
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -627,7 +626,7 @@ mod tests {
#[test]
fn test_get_reverse_reader_single_message() -> io::Result<()> {
let dir = TempDir::new().unwrap();
let file_path = dir.path().join("test_single.arrow");
let file_path = dir.path().join("test_single.data.arrows");

// Create a schema
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
Expand Down
56 changes: 53 additions & 3 deletions src/parseable/staging/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

use std::{
collections::{HashMap, HashSet},
fs::File,
fs::{File, OpenOptions},
io::BufWriter,
path::PathBuf,
sync::Arc,
};

Expand All @@ -28,13 +30,61 @@ use arrow_ipc::writer::StreamWriter;
use arrow_schema::Schema;
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use tracing::error;

use crate::utils::arrow::adapt_batch;
use crate::{
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
utils::arrow::adapt_batch,
};

use super::StagingError;

#[derive(Default)]
pub struct Writer {
pub mem: MemWriter<16384>,
pub disk: HashMap<String, StreamWriter<File>>,
pub disk: HashMap<String, DiskWriter>,
}

pub struct DiskWriter {
inner: StreamWriter<BufWriter<File>>,
path: PathBuf,
}

impl DiskWriter {
/// Try to create a file to stream arrows into
pub fn try_new(path: impl Into<PathBuf>, schema: &Schema) -> Result<Self, StagingError> {
let mut path = path.into();
path.set_extension(PART_FILE_EXTENSION);
let file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&path)?;
let inner = StreamWriter::try_new_buffered(file, schema)?;

Ok(Self { inner, path })
}

/// Write a single recordbatch into file
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
self.inner.write(rb).map_err(StagingError::Arrow)
}
}

impl Drop for DiskWriter {
/// Write the continuation bytes and mark the file as done, rename to `.data.arrows`
fn drop(&mut self) {
if let Err(err) = self.inner.finish() {
error!("Couldn't finish arrow file {:?}, error = {err}", self.path);
return;
}

let mut arrow_path = self.path.to_owned();
arrow_path.set_extension(ARROW_FILE_EXTENSION);
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
error!("Couldn't rename file {:?}, error = {err}", self.path);
}
}
}

/// Structure to keep recordbatches in memory.
Expand Down
27 changes: 7 additions & 20 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike};
use derive_more::{Deref, DerefMut};
Expand Down Expand Up @@ -59,7 +58,7 @@ use crate::{
use super::{
staging::{
reader::{MergedRecordReader, MergedReverseRecordReader},
writer::Writer,
writer::{DiskWriter, Writer},
StagingError,
},
LogStream, ARROW_FILE_EXTENSION,
Expand Down Expand Up @@ -133,12 +132,7 @@ impl Stream {
);
std::fs::create_dir_all(&self.data_path)?;

let file = OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)?;

let mut writer = StreamWriter::try_new(file, &record.schema())
let mut writer = DiskWriter::try_new(file_path, &record.schema())
.expect("File and RecordBatch both are checked");

writer.write(record)?;
Expand Down Expand Up @@ -364,18 +358,11 @@ impl Stream {
}

pub fn flush(&self) {
let mut disk_writers = {
let mut writer = self.writer.lock().unwrap();
// Flush memory
writer.mem.clear();
// Take schema -> disk writer mapping
std::mem::take(&mut writer.disk)
};

// Flush disk
for writer in disk_writers.values_mut() {
_ = writer.finish();
}
let mut writer = self.writer.lock().unwrap();
// Flush memory
writer.mem.clear();
// Drop schema -> disk writer mapping, triggers flush to disk
writer.disk.drain();
}

fn parquet_writer_props(
Expand Down
Loading