Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl ParseableSinkProcessor {

let (rb, is_first) = batch_json_event.into_recordbatch(
&schema,
Utc::now(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
Expand Down
5 changes: 3 additions & 2 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
use anyhow::{anyhow, Error as AnyError};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand Down Expand Up @@ -108,6 +108,7 @@ pub trait EventFormat: Sized {
fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
Expand Down Expand Up @@ -145,7 +146,7 @@ pub trait EventFormat: Sized {
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(rb.num_rows()))],
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
);

Ok((rb, is_first))
Expand Down
70 changes: 54 additions & 16 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
let size: usize = body.len();
let parsed_timestamp = Utc::now().naive_utc();
let now = Utc::now();
let (rb, is_first) = {
let body_val: Value = serde_json::from_slice(&body)?;
let hash_map = PARSEABLE.streams.read().unwrap();
Expand All @@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
.clone();
let event = format::json::Event { data: body_val };
// For internal streams, use old schema
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)?
};
event::Event {
rb,
stream_name,
origin_format: "json",
origin_size: size as u64,
is_first_event: is_first,
parsed_timestamp,
parsed_timestamp: now.naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::Internal,
Expand Down Expand Up @@ -351,6 +351,7 @@ mod tests {
use arrow::datatypes::Int64Type;
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
use arrow_schema::{DataType, Field};
use chrono::Utc;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -392,8 +393,15 @@ mod tests {
"b": "hello",
});

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 4);
Expand All @@ -419,8 +427,15 @@ mod tests {
"c": null
});

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -450,7 +465,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -480,7 +496,9 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
assert!(
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
);
}

#[test]
Expand All @@ -496,7 +514,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 1);
Expand Down Expand Up @@ -535,8 +554,15 @@ mod tests {
},
]);

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -582,8 +608,15 @@ mod tests {
},
]);

let (rb, _) =
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(
json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -630,7 +663,8 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -677,7 +711,9 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
assert!(
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
);
}

#[test]
Expand Down Expand Up @@ -718,6 +754,7 @@ mod tests {
let (rb, _) = into_event_batch(
flattened_json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V0,
Expand Down Expand Up @@ -806,6 +843,7 @@ mod tests {
let (rb, _) = into_event_batch(
flattened_json,
HashMap::default(),
Utc::now(),
false,
None,
SchemaVersion::V1,
Expand Down
6 changes: 5 additions & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ async fn push_logs(
let static_schema_flag = stream.get_static_schema_flag();
let custom_partition = stream.get_custom_partition();
let schema_version = stream.get_schema_version();
let p_timestamp = Utc::now();

let data = if time_partition.is_some() || custom_partition.is_some() {
convert_array_to_object(
Expand All @@ -121,7 +122,7 @@ async fn push_logs(
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
let parsed_timestamp = match time_partition.as_ref() {
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
_ => p_timestamp.naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
Expand All @@ -144,6 +145,7 @@ async fn push_logs(
let (rb, is_first_event) = into_event_batch(
value,
schema,
p_timestamp,
static_schema_flag,
time_partition.as_ref(),
schema_version,
Expand All @@ -168,12 +170,14 @@ async fn push_logs(
pub fn into_event_batch(
data: Value,
schema: HashMap<String, Arc<Field>>,
p_timestamp: DateTime<Utc>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
let (rb, is_first) = json::Event { data }.into_recordbatch(
&schema,
p_timestamp,
static_schema_flag,
time_partition,
schema_version,
Expand Down
14 changes: 7 additions & 7 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::Arc;
use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array};
use arrow_schema::Schema;
use arrow_select::take::take;
use chrono::Utc;
use chrono::{DateTime, Utc};
use itertools::Itertools;

pub mod batch_adapter;
Expand Down Expand Up @@ -133,8 +133,8 @@ pub fn get_field<'a>(
/// # Returns
///
/// A column in arrow, containing the current timestamp in millis.
pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
pub fn get_timestamp_array(p_timestamp: DateTime<Utc>, size: usize) -> TimestampMillisecondArray {
TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size)
}

pub fn reverse(rb: &RecordBatch) -> RecordBatch {
Expand Down Expand Up @@ -196,19 +196,19 @@ mod tests {
#[test]
fn test_timestamp_array_has_correct_size_and_value() {
let size = 5;
let now = Utc::now().timestamp_millis();
let now = Utc::now();

let array = get_timestamp_array(size);
let array = get_timestamp_array(now, size);

assert_eq!(array.len(), size);
for i in 0..size {
assert!(array.value(i) >= now);
assert!(array.value(i) >= now.timestamp_millis());
}
}

#[test]
fn test_timestamp_array_with_zero_size() {
let array = get_timestamp_array(0);
let array = get_timestamp_array(Utc::now(), 0);

assert_eq!(array.len(), 0);
assert!(array.is_empty());
Expand Down
Loading