Skip to content

Commit f88adf3

Browse files
camelcase stream info, fix stats for historical
1 parent 87f499e commit f88adf3

File tree

7 files changed

+36
-61
lines changed

7 files changed

+36
-61
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,10 @@ pub async fn send_retention_cleanup_request(
622622
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
623623
ingestor.domain_name, body
624624
);
625+
return Err(ObjectStorageError::Custom(format!(
626+
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
627+
ingestor.domain_name, body
628+
)));
625629
}
626630

627631
Ok(())

src/handlers/http/logstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
333333
return Err(StreamNotFound(stream_name.clone()).into());
334334
}
335335

336-
let storage = PARSEABLE.storage.get_object_store();
336+
let storage = PARSEABLE.storage().get_object_store();
337337

338338
// Get first and latest event timestamps from storage
339339
let (stream_first_event_at, stream_latest_event_at) = match storage
@@ -412,7 +412,7 @@ pub async fn put_stream_hot_tier(
412412
hot_tier_manager
413413
.put_hot_tier(&stream_name, &mut hottier)
414414
.await?;
415-
let storage = PARSEABLE.storage.get_object_store();
415+
let storage = PARSEABLE.storage().get_object_store();
416416
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
417417
stream_metadata.hot_tier_enabled = true;
418418
storage

src/handlers/http/modal/ingest/ingestor_logstream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub async fn retention_cleanup(
3838
Json(date_list): Json<Vec<String>>,
3939
) -> Result<impl Responder, StreamError> {
4040
let stream_name = stream_name.into_inner();
41-
let storage = PARSEABLE.storage.get_object_store();
41+
let storage = PARSEABLE.storage().get_object_store();
4242
// if the stream not found in memory map,
4343
//check if it exists in the storage
4444
//create stream and schema from storage
@@ -62,7 +62,7 @@ pub async fn retention_cleanup(
6262
});
6363
}
6464

65-
Ok(("Cleanup complete", StatusCode::OK))
65+
Ok(actix_web::HttpResponse::NoContent().finish())
6666
}
6767

6868
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {

src/migration/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ async fn setup_logstream_metadata(
366366
..
367367
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
368368

369-
let storage = PARSEABLE.storage.get_object_store();
369+
let storage = PARSEABLE.storage().get_object_store();
370370

371371
update_data_type_time_partition(arrow_schema, time_partition.as_ref()).await?;
372372
storage.put_schema(stream, arrow_schema).await?;

src/prism/logstream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ async fn get_stream_info_helper(stream_name: &str) -> Result<StreamInfo, StreamE
152152
return Err(StreamNotFound(stream_name.to_owned()).into());
153153
}
154154

155-
let storage = PARSEABLE.storage.get_object_store();
155+
let storage = PARSEABLE.storage().get_object_store();
156156

157157
// Get first and latest event timestamps from storage
158158
let (stream_first_event_at, stream_latest_event_at) = match storage

src/storage/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,11 @@ pub struct ObjectStoreFormat {
130130
}
131131

132132
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133+
#[serde(rename_all = "camelCase")]
133134
pub struct StreamInfo {
134-
#[serde(rename = "created-at")]
135135
pub created_at: String,
136-
#[serde(rename = "first-event-at")]
137136
#[serde(skip_serializing_if = "Option::is_none")]
138137
pub first_event_at: Option<String>,
139-
#[serde(rename = "latest-event-at")]
140138
#[serde(skip_serializing_if = "Option::is_none")]
141139
pub latest_event_at: Option<String>,
142140
#[serde(skip_serializing_if = "Option::is_none")]

src/storage/object_storage.rs

Lines changed: 25 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@ use std::path::Path;
3838
use std::sync::Arc;
3939
use std::time::Duration;
4040
use std::time::Instant;
41-
use tokio::task;
4241
use tokio::task::JoinSet;
4342
use tracing::info;
44-
use tracing::trace;
4543
use tracing::{error, warn};
4644
use ulid::Ulid;
4745

@@ -96,7 +94,6 @@ impl UploadContext {
9694

9795
/// Result of a single file upload operation
9896
pub(crate) struct UploadResult {
99-
stats_calculated: bool,
10097
file_path: std::path::PathBuf,
10198
manifest_file: Option<catalog::manifest::File>,
10299
}
@@ -135,10 +132,9 @@ async fn upload_single_parquet_file(
135132
let manifest = catalog::create_from_parquet_file(absolute_path, &path)?;
136133

137134
// Calculate field stats if enabled
138-
let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await;
135+
calculate_stats_if_enabled(&stream_name, &path, &schema).await;
139136

140137
Ok(UploadResult {
141-
stats_calculated,
142138
file_path: path,
143139
manifest_file: Some(manifest),
144140
})
@@ -172,19 +168,19 @@ async fn calculate_stats_if_enabled(
172168
stream_name: &str,
173169
path: &std::path::Path,
174170
schema: &Arc<Schema>,
175-
) -> bool {
171+
) {
176172
if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats {
177173
let max_field_statistics = PARSEABLE.options.max_field_statistics;
178-
match calculate_field_stats(stream_name, path, schema, max_field_statistics).await {
179-
Ok(stats) if stats => return true,
180-
Err(err) => trace!(
174+
if let Err(err) =
175+
calculate_field_stats(stream_name, path, schema, max_field_statistics).await
176+
{
177+
tracing::trace!(
181178
"Error calculating field stats for stream {}: {}",
182-
stream_name, err
183-
),
184-
_ => {}
179+
stream_name,
180+
err
181+
);
185182
}
186183
}
187-
false
188184
}
189185

190186
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
@@ -978,19 +974,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
978974
{
979975
match stats {
980976
TypedStatistics::Int(int_stats) => {
981-
let min_ts =
982-
DateTime::from_timestamp_millis(int_stats.min).unwrap_or_default();
983-
min_timestamp = Some(match min_timestamp {
984-
Some(existing) => existing.min(min_ts),
985-
None => min_ts,
986-
});
987-
988-
let max_ts =
989-
DateTime::from_timestamp_millis(int_stats.max).unwrap_or_default();
990-
max_timestamp = Some(match max_timestamp {
991-
Some(existing) => existing.max(max_ts),
992-
None => max_ts,
993-
});
977+
if let Some(min_ts) = DateTime::from_timestamp_millis(int_stats.min) {
978+
min_timestamp = Some(match min_timestamp {
979+
Some(existing) => existing.min(min_ts),
980+
None => min_ts,
981+
});
982+
}
983+
if let Some(max_ts) = DateTime::from_timestamp_millis(int_stats.max) {
984+
max_timestamp = Some(match max_timestamp {
985+
Some(existing) => existing.max(max_ts),
986+
None => max_ts,
987+
});
988+
}
994989
}
995990
TypedStatistics::String(str_stats) => {
996991
if let Ok(min_ts) = DateTime::parse_from_rfc3339(&str_stats.min) {
@@ -1070,18 +1065,14 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
10701065
let upload_context = UploadContext::new(stream);
10711066

10721067
// Process parquet files concurrently and collect results
1073-
let (stats_calculated, manifest_files) =
1074-
process_parquet_files(&upload_context, stream_name).await?;
1068+
let manifest_files = process_parquet_files(&upload_context, stream_name).await?;
10751069

10761070
// Update snapshot with collected manifest files
10771071
update_snapshot_with_manifests(stream_name, manifest_files).await?;
10781072

10791073
// Process schema files
10801074
process_schema_files(&upload_context, stream_name).await?;
10811075

1082-
// Handle stats synchronization if needed
1083-
handle_stats_sync(stats_calculated).await;
1084-
10851076
Ok(())
10861077
}
10871078
}
@@ -1090,7 +1081,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
10901081
async fn process_parquet_files(
10911082
upload_context: &UploadContext,
10921083
stream_name: &str,
1093-
) -> Result<(bool, Vec<catalog::manifest::File>), ObjectStorageError> {
1084+
) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> {
10941085
let semaphore = Arc::new(tokio::sync::Semaphore::new(100));
10951086
let mut join_set = JoinSet::new();
10961087
let object_store = PARSEABLE.storage().get_object_store();
@@ -1143,16 +1134,12 @@ async fn spawn_parquet_upload_task(
11431134
/// Collects results from all upload tasks
11441135
async fn collect_upload_results(
11451136
mut join_set: JoinSet<Result<UploadResult, ObjectStorageError>>,
1146-
) -> Result<(bool, Vec<catalog::manifest::File>), ObjectStorageError> {
1147-
let mut stats_calculated = false;
1137+
) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> {
11481138
let mut uploaded_files = Vec::new();
11491139

11501140
while let Some(result) = join_set.join_next().await {
11511141
match result {
11521142
Ok(Ok(upload_result)) => {
1153-
if upload_result.stats_calculated {
1154-
stats_calculated = true;
1155-
}
11561143
if let Some(manifest_file) = upload_result.manifest_file {
11571144
uploaded_files.push((upload_result.file_path, manifest_file));
11581145
} else {
@@ -1183,7 +1170,7 @@ async fn collect_upload_results(
11831170
})
11841171
.collect();
11851172

1186-
Ok((stats_calculated, manifest_files))
1173+
Ok(manifest_files)
11871174
}
11881175

11891176
/// Updates snapshot with collected manifest files
@@ -1215,20 +1202,6 @@ async fn process_schema_files(
12151202
Ok(())
12161203
}
12171204

1218-
/// Handles stats synchronization if needed
1219-
async fn handle_stats_sync(stats_calculated: bool) {
1220-
if stats_calculated {
1221-
// perform local sync for the `pstats` dataset
1222-
task::spawn(async move {
1223-
if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME)
1224-
&& let Err(err) = stats_stream.flush_and_convert(false, false)
1225-
{
1226-
error!("Failed in local sync for dataset stats stream: {err}");
1227-
}
1228-
});
1229-
}
1230-
}
1231-
12321205
/// Builds the stream relative path for a file
12331206
fn stream_relative_path(
12341207
stream_name: &str,

0 commit comments

Comments
 (0)