Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion components/log-ingestor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ edition = "2024"

[dependencies]
anyhow = "1.0.100"
axum = { version = "0.8.6", features = ["json"] }
async-trait = "0.1.89"
aws-sdk-s3 = "1.110.0"
aws-sdk-sqs = "1.89.0"
clap = { version = "4.5.51", features = ["derive"] }
clp-rust-utils = { path = "../clp-rust-utils" }
secrecy = { version = "0.10.3", features = ["serde"] }
serde_json = "1.0.145"
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "time"] }
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "signal", "time"] }
tokio-util = "0.7.17"
tracing = "0.1.41"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.20", features = ["json", "env-filter", "fmt", "std"] }
uuid = { version = "1.18.1", features = ["v4"] }

[dev-dependencies]
Expand Down
95 changes: 93 additions & 2 deletions components/log-ingestor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,94 @@
fn main() {
println!("Hello, world!");
use anyhow::Context;
use axum::{Router, routing::get};
use clap::Parser;
use clp_rust_utils::{clp_config::package, serde::yaml};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{self, fmt::writer::MakeWriterExt};

fn read_config_and_credentials(
args: &Args,
) -> anyhow::Result<(package::config::Config, package::credentials::Credentials)> {
let config_path = std::path::Path::new(args.config.as_str());
let config: package::config::Config = yaml::from_path(config_path).context(format!(
"Config file {} does not exist",
config_path.display()
))?;

let credentials = package::credentials::Credentials {
database: package::credentials::Database {
password: secrecy::SecretString::new(
std::env::var("CLP_DB_PASS")
.context("Expect `CLP_DB_PASS` env variable")?
.into_boxed_str(),
),
user: std::env::var("CLP_DB_USER").context("Expect `CLP_DB_USER` env variable")?,
},
};
Ok((config, credentials))
}

fn set_up_logging() -> anyhow::Result<()> {
let logs_directory =
std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?;
let logs_directory = std::path::Path::new(logs_directory.as_str());
let file_appender =
RollingFileAppender::new(Rotation::HOURLY, logs_directory, "log_ingestor.log");
let (non_blocking_writer, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.with_writer(std::io::stdout.and(non_blocking_writer))
.init();
Ok(())
}

#[derive(Parser)]
#[command(version)]
struct Args {
#[arg(long)]
config: String,

#[arg(long)]
host: String,

#[arg(long)]
port: u16,
}

async fn shutdown_signal() {
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to listen for SIGTERM");
tokio::select! {
_ = sigterm.recv() => {
}
_ = tokio::signal::ctrl_c()=> {
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let (_config, _credentials) = read_config_and_credentials(&args)?;
set_up_logging()?;

let addr = format!("{}:{}", args.host, args.port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.context(format!("Cannot listen to {addr}"))?;

let app = Router::new()
.route("/", get(health))
.route("/health", get(health));

tracing::info!("Server started at {addr}");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}

async fn health() -> String {
"Log ingestor is running".to_owned()
}
Loading