diff --git a/README.md b/README.md index 5f96627..4b94551 100644 --- a/README.md +++ b/README.md @@ -226,14 +226,14 @@ Make sure the feature `kafka-reporter` is enabled. #[cfg(feature = "kafka-reporter")] mod example { use skywalking::reporter::Report; - use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig}; + use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, ClientConfig}; async fn do_something(reporter: &impl Report) { // .... } async fn foo() { - let mut client_config = RDKafkaClientConfig::new(); + let mut client_config = ClientConfig::new(); client_config .set("bootstrap.servers", "broker:9092") .set("message.timeout.ms", "6000"); diff --git a/e2e/src/main.rs b/e2e/src/main.rs index a7dcc18..e65e2db 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -35,7 +35,7 @@ use skywalking::{ reporter::{ CollectItem, Report, grpc::GrpcReporter, - kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig}, + kafka::{ClientConfig, KafkaReportBuilder, KafkaReporter}, }, trace::{ propagation::{ @@ -252,7 +252,7 @@ async fn main() -> Result<(), Box> { let reporter1 = GrpcReporter::connect("http://127.0.0.1:19876").await?; let handle1 = reporter1.reporting().await.spawn(); - let mut client_config = RDKafkaClientConfig::new(); + let mut client_config = ClientConfig::new(); client_config .set("bootstrap.servers", "127.0.0.1:9092") .set("message.timeout.ms", "6000") diff --git a/src/reporter/kafka.rs b/src/reporter/kafka.rs index 38ed968..d606d0d 100644 --- a/src/reporter/kafka.rs +++ b/src/reporter/kafka.rs @@ -18,9 +18,12 @@ use super::{CollectItemConsume, CollectItemProduce}; use crate::reporter::{CollectItem, Report}; -pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel}; -use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::{ + config::ClientConfig as RDKafkaClientConfig, + producer::{FutureProducer, FutureRecord}, +}; use std::{ + collections::HashMap, error, future::{Future, pending}, pin::Pin, @@ -48,6 +51,89 @@ pub enum Error { }, } +/// Log level for Kafka client. +#[derive(Debug, Clone, Copy)] +pub enum LogLevel { + /// Critical level. + Critical, + /// Error level. + Error, + /// Warning level. + Warning, + /// Notice level. + Notice, + /// Info level. + Info, + /// Debug level. + Debug, +} + +impl From for rdkafka::config::RDKafkaLogLevel { + fn from(level: LogLevel) -> Self { + match level { + LogLevel::Critical => rdkafka::config::RDKafkaLogLevel::Critical, + LogLevel::Error => rdkafka::config::RDKafkaLogLevel::Error, + LogLevel::Warning => rdkafka::config::RDKafkaLogLevel::Warning, + LogLevel::Notice => rdkafka::config::RDKafkaLogLevel::Notice, + LogLevel::Info => rdkafka::config::RDKafkaLogLevel::Info, + LogLevel::Debug => rdkafka::config::RDKafkaLogLevel::Debug, + } + } +} + +/// Configuration for Kafka client. +#[derive(Debug, Clone)] +pub struct ClientConfig { + /// Configuration parameters as key-value pairs. + params: HashMap, + /// Log level for the client. + log_level: Option, +} + +impl ClientConfig { + /// Create a new empty configuration. + pub fn new() -> Self { + Self { + params: HashMap::new(), + log_level: None, + } + } + + /// Set a configuration parameter. + pub fn set(&mut self, key: K, value: V) -> &mut Self + where + K: Into, + V: Into, + { + self.params.insert(key.into(), value.into()); + self + } + + /// Set log level. + pub fn set_log_level(&mut self, level: LogLevel) -> &mut Self { + self.log_level = Some(level); + self + } + + /// Convert to rdkafka ClientConfig. + fn to_rdkafka_config(&self) -> RDKafkaClientConfig { + let mut config = RDKafkaClientConfig::new(); + for (key, value) in &self.params { + config.set(key, value); + } + if let Some(log_level) = self.log_level { + config.set_log_level(log_level.into()); + } + config + } +} + +impl Default for ClientConfig { + fn default() -> Self { + Self::new() + } +} + type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static; fn default_err_handle(message: &str, err: &dyn error::Error) { @@ -71,14 +157,14 @@ pub struct KafkaReportBuilder { state: Arc, producer: Arc

, consumer: C, - client_config: RDKafkaClientConfig, + client_config: ClientConfig, namespace: Option, err_handle: Arc, } impl KafkaReportBuilder, mpsc::UnboundedReceiver> { - /// Create builder, with rdkafka client configuration. - pub fn new(client_config: RDKafkaClientConfig) -> Self { + /// Create builder, with client configuration. + pub fn new(client_config: ClientConfig) -> Self { let (producer, consumer) = mpsc::unbounded_channel(); Self::new_with_pc(client_config, producer, consumer) } @@ -87,7 +173,7 @@ impl KafkaReportBuilder, mpsc::UnboundedRecei impl KafkaReportBuilder { /// Special purpose, used for user-defined produce and consume operations, /// usually you can use [KafkaReportBuilder::new]. - pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self { + pub fn new_with_pc(client_config: ClientConfig, producer: P, consumer: C) -> Self { Self { state: Default::default(), producer: Arc::new(producer), @@ -118,7 +204,7 @@ impl KafkaReportBuilder { /// handle to push data to kafka in the background. pub async fn build(self) -> Result<(KafkaReporter

, KafkaReporting), Error> { let kafka_producer = KafkaProducer::new( - self.client_config.create()?, + self.client_config.to_rdkafka_config().create()?, self.err_handle.clone(), self.namespace, )