Skip to content

Commit 4e7705d

Browse files
self metadata for cluster info and metrics
1 parent 8754367 commit 4e7705d

File tree

3 files changed

+59
-26
lines changed

3 files changed

+59
-26
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ use serde_json::error::Error as SerdeError;
3737
use serde_json::{to_vec, Value as JsonValue};
3838
use tracing::{error, info, warn};
3939
use url::Url;
40-
use utils::{
41-
check_liveness, to_url_string, ClusterInfo, IngestionStats, QueriedStats, StorageStats,
42-
};
40+
use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats};
4341

4442
use crate::handlers::http::ingest::ingest_internal_stream;
4543
use crate::metrics::prom_utils::Metrics;
@@ -542,24 +540,6 @@ pub async fn send_retention_cleanup_request(
542540
}
543541

544542
pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
545-
let self_info = ClusterInfo::new(
546-
format!(
547-
"{}://{}",
548-
PARSEABLE.options.get_scheme(),
549-
PARSEABLE.options.address
550-
)
551-
.as_str(),
552-
true,
553-
PARSEABLE
554-
.options
555-
.staging_dir()
556-
.to_string_lossy()
557-
.to_string(),
558-
PARSEABLE.storage.get_endpoint(),
559-
None,
560-
Some(String::from("200 OK")),
561-
&PARSEABLE.options.mode.to_node_type(),
562-
);
563543
// Get querier, ingestor and indexer metadata concurrently
564544
let (querier_result, ingestor_result, indexer_result) = future::join3(
565545
get_node_info("querier"),
@@ -592,21 +572,27 @@ pub async fn get_cluster_info() -> Result<impl Responder, StreamError> {
592572
})
593573
.map_err(|err| StreamError::Anyhow(err.into()))?;
594574

575+
let self_metadata = if let Some(metadata) = PARSEABLE.get_metadata() {
576+
vec![metadata]
577+
} else {
578+
vec![]
579+
};
580+
595581
// Fetch info for both node types concurrently
596-
let (querier_infos, ingestor_infos, indexer_infos) = future::join3(
582+
let (querier_infos, ingestor_infos, indexer_infos, self_info) = future::join4(
597583
fetch_nodes_info(querier_metadata),
598584
fetch_nodes_info(ingestor_metadata),
599585
fetch_nodes_info(indexer_metadata),
586+
fetch_nodes_info(self_metadata),
600587
)
601588
.await;
602589

603590
// Combine results from both node types
604591
let mut infos = Vec::new();
592+
infos.extend(self_info?);
605593
infos.extend(querier_infos?);
606594
infos.extend(ingestor_infos?);
607595
infos.extend(indexer_infos?);
608-
infos.push(self_info);
609-
610596
Ok(actix_web::HttpResponse::Ok().json(infos))
611597
}
612598

@@ -885,6 +871,12 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
885871
)
886872
.await;
887873

874+
let self_metadata = if let Some(metadata) = PARSEABLE.get_metadata() {
875+
vec![metadata]
876+
} else {
877+
vec![]
878+
};
879+
888880
// Handle querier metadata result
889881
let querier_metadata: Vec<NodeMetadata> = querier_result.map_err(|err| {
890882
error!("Fatal: failed to get querier info: {:?}", err);
@@ -901,7 +893,8 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
901893
PostError::Invalid(err)
902894
})?;
903895
// Fetch metrics from ingestors and indexers concurrently
904-
let (querier_metrics, ingestor_metrics, indexer_metrics) = future::join3(
896+
let (self_metrics, querier_metrics, ingestor_metrics, indexer_metrics) = future::join4(
897+
fetch_nodes_metrics(self_metadata),
905898
fetch_nodes_metrics(querier_metadata),
906899
fetch_nodes_metrics(ingestor_metadata),
907900
fetch_nodes_metrics(indexer_metadata),
@@ -911,6 +904,12 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
911904
// Combine all metrics
912905
let mut all_metrics = Vec::new();
913906

907+
// Add self metrics
908+
match self_metrics {
909+
Ok(metrics) => all_metrics.extend(metrics),
910+
Err(err) => return Err(err),
911+
}
912+
914913
// Add querier metrics
915914
match querier_metrics {
916915
Ok(metrics) => all_metrics.extend(metrics),

src/handlers/http/modal/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ impl Metadata for NodeMetadata {
535535
pub type IngestorMetadata = NodeMetadata;
536536
pub type IndexerMetadata = NodeMetadata;
537537
pub type QuerierMetadata = NodeMetadata;
538+
pub type PrismMetadata = NodeMetadata;
538539

539540
#[cfg(test)]
540541
mod test {

src/parseable/mod.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::{
4949
logstream::error::{CreateStreamError, StreamError},
5050
modal::{
5151
utils::logstream_utils::PutStreamHeaders, IndexerMetadata, IngestorMetadata,
52-
NodeType, QuerierMetadata,
52+
Metadata, NodeMetadata, NodeType, PrismMetadata, QuerierMetadata,
5353
},
5454
},
5555
STREAM_TYPE_KEY,
@@ -130,6 +130,8 @@ pub struct Parseable {
130130
/// Metadata and staging realting to each logstreams
131131
/// A globally shared mapping of `Streams` that parseable is aware of.
132132
pub streams: Streams,
133+
///Metadata associated only with a prism
134+
pub prism_metadata: Option<Arc<PrismMetadata>>,
133135
/// Metadata associated only with a querier
134136
pub querier_metadata: Option<Arc<QuerierMetadata>>,
135137
/// Metadata associated only with an ingestor
@@ -147,6 +149,14 @@ impl Parseable {
147149
#[cfg(feature = "kafka")] kafka_config: KafkaConfig,
148150
storage: Arc<dyn ObjectStorageProvider>,
149151
) -> Self {
152+
let prism_metadata = match &options.mode {
153+
Mode::Prism => Some(PrismMetadata::load(
154+
&options,
155+
storage.as_ref(),
156+
NodeType::Prism,
157+
)),
158+
_ => None,
159+
};
150160
let ingestor_metadata = match &options.mode {
151161
Mode::Ingest => Some(IngestorMetadata::load(
152162
&options,
@@ -175,6 +185,7 @@ impl Parseable {
175185
options: Arc::new(options),
176186
storage,
177187
streams: Streams::default(),
188+
prism_metadata,
178189
ingestor_metadata,
179190
indexer_metadata,
180191
querier_metadata,
@@ -183,6 +194,28 @@ impl Parseable {
183194
}
184195
}
185196

197+
/// Get the metadata for the current node based on its mode.
198+
pub fn get_metadata(&self) -> Option<NodeMetadata> {
199+
let meta_ref = match self.options.mode {
200+
Mode::Ingest => self.ingestor_metadata.as_ref(),
201+
Mode::Index => self.indexer_metadata.as_ref(),
202+
Mode::Query => self.querier_metadata.as_ref(),
203+
_ => return None,
204+
};
205+
206+
let meta = meta_ref?;
207+
let node_metadata = NodeMetadata {
208+
version: meta.version.clone(),
209+
node_id: meta.node_id.clone(),
210+
port: meta.port.clone(),
211+
domain_name: meta.domain_name.clone(),
212+
bucket_name: meta.bucket_name.clone(),
213+
token: meta.token.clone(),
214+
flight_port: meta.flight_port.clone(),
215+
node_type: meta.node_type().clone(),
216+
};
217+
Some(node_metadata)
218+
}
186219
/// Try to get the handle of a stream in staging, if it doesn't exist return `None`.
187220
pub fn get_stream(&self, stream_name: &str) -> Result<StreamRef, StreamNotFound> {
188221
self.streams

0 commit comments

Comments
 (0)