Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ async fn handle_count_query(
stream: table_name.to_string(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
num_bins: Some(1),
conditions: None,
};
let count_records = counts_req.get_bin_density().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl PrismDatasetRequest {
stream: stream.to_owned(),
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
num_bins: Some(10),
conditions: None,
};

Expand Down
67 changes: 46 additions & 21 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,10 @@ impl Query {
/// Record of counts for a given time bin.
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct CountsRecord {
#[serde(alias = "_bin_start_time_")]
/// Start time of the bin
pub start_time: String,
#[serde(alias = "_bin_end_time_")]
/// End time of the bin
pub end_time: String,
/// Number of logs in the bin
Expand Down Expand Up @@ -330,8 +332,8 @@ pub struct CountsRequest {
pub start_time: String,
/// Excluded end time for counts query
pub end_time: String,
/// Number of bins to divide the time range into
pub num_bins: u64,
/// optional number of bins to divide the time range into
pub num_bins: Option<u64>,
/// Conditions
pub conditions: Option<CountConditions>,
}
Expand Down Expand Up @@ -400,9 +402,28 @@ impl CountsRequest {
.signed_duration_since(time_range.start)
.num_minutes() as u64;

let num_bins = if let Some(num_bins) = self.num_bins {
num_bins
} else {
// create number of bins based on total minutes
if total_minutes <= 60 * 5 {
// till 5 hours, 1 bin = 1 min
total_minutes
} else if total_minutes <= 60 * 24 {
// till 1 day, 1 bin = 5 min
total_minutes.div_ceil(5)
} else if total_minutes <= 60 * 24 * 10 {
// till 10 days, 1 bin = 1 hour
total_minutes.div_ceil(60)
} else {
// > 10 days, 1 bin = 1 day
total_minutes.div_ceil(1440)
}
};

// divide minutes by num bins to get minutes per bin
let quotient = total_minutes / self.num_bins;
let remainder = total_minutes % self.num_bins;
let quotient = total_minutes / num_bins;
let remainder = total_minutes % num_bins;
let have_remainder = remainder > 0;

// now create multiple bounds [startTime, endTime)
Expand All @@ -412,9 +433,9 @@ impl CountsRequest {
let mut start = time_range.start;

let loop_end = if have_remainder {
self.num_bins
num_bins
} else {
self.num_bins - 1
num_bins - 1
};

// Create bins for all but the last date
Expand Down Expand Up @@ -449,36 +470,40 @@ impl CountsRequest {

let dur = time_range.end.signed_duration_since(time_range.start);

let date_bin = if dur.num_minutes() <= 60 * 10 {
// date_bin 1 minute
let table_name = &self.stream;
let start_time_col_name = "_bin_start_time_";
let end_time_col_name = "_bin_end_time_";
let date_bin = if dur.num_minutes() <= 60 * 5 {
// less than 5 hour = 1 min bin
format!(
"CAST(DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1m' as {end_time_col_name}"
)
} else if dur.num_minutes() <= 60 * 24 {
// 1 day = 5 min bin
format!(
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
self.stream
"CAST(DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('5m', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '5m' as {end_time_col_name}"
)
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
// date_bin 1 hour
} else if dur.num_minutes() < 60 * 24 * 10 {
// 10 days = 1 hour bin
format!(
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
self.stream
"CAST(DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1h', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1h' as {end_time_col_name}"
)
} else {
// date_bin 1 day
// 1 day
format!(
"CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
self.stream
"CAST(DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as {start_time_col_name}, DATE_BIN('1d', \"{table_name}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1d' as {end_time_col_name}"
)
};

let query = if let Some(conditions) = &count_conditions.conditions {
let f = get_filter_string(conditions).map_err(QueryError::CustomError)?;
format!(
"SELECT {date_bin}, COUNT(*) as count FROM \"{}\" WHERE {} GROUP BY end_time,start_time ORDER BY end_time",
self.stream, f
"SELECT {date_bin}, COUNT(*) as count FROM \"{table_name}\" WHERE {} GROUP BY {end_time_col_name},{start_time_col_name} ORDER BY {end_time_col_name}",
f
)
} else {
format!(
"SELECT {date_bin}, COUNT(*) as count FROM \"{}\" GROUP BY end_time,start_time ORDER BY end_time",
self.stream
"SELECT {date_bin}, COUNT(*) as count FROM \"{table_name}\" GROUP BY {end_time_col_name},{start_time_col_name} ORDER BY {end_time_col_name}",
)
};
Ok(query)
Expand Down
Loading