diff --git a/Cargo.lock b/Cargo.lock index 961c5b8..28f84f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy 0.7.35", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -147,6 +159,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -198,6 +219,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -370,6 +397,9 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "foldhash", +] [[package]] name = "http" @@ -411,6 +441,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.6.0" @@ -424,6 +460,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -700,6 +737,52 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df88858cd28baaaf2cfc894e37789ed4184be0e1351157aec7bf3c2266c793fd" +dependencies = [ + "base64", + "http-body-util", + "hyper", + "hyper-util", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown", + "metrics", + "quanta", + "rand", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -732,6 +815,8 @@ version = "0.1.0" dependencies = [ "chrono", "futures", + "metrics", + "metrics-exporter-prometheus", "paho-mqtt", "reqwest", "serde", @@ -894,6 +979,21 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy 0.8.25", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -903,6 +1003,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.40" @@ -918,6 +1033,53 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core", +] + +[[package]] +name = "raw-cpuid" +version = "11.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -1199,6 +1361,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" @@ -1595,6 +1763,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" @@ -1970,6 +2144,46 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" +dependencies = [ + "zerocopy-derive 0.8.25", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 9e88fe5..ce6a18a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] chrono = { version = "0.4.40", default-features = false, features = ["std", "now", "serde"] } futures = "0.3.31" +metrics = "0.24.2" +metrics-exporter-prometheus = { version = "0.17.0", default-features = false, features = ["http-listener"] } paho-mqtt = { version = "0.13.2", default-features = false, features = ["ssl"] } reqwest = { version = "0.12.15", default-features = false, features = ["http2", "stream", "native-tls", "json"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/src/config.rs b/src/config.rs index 8b4447e..723cc0c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,10 +21,26 @@ pub struct MqttConfiguration { pub topics: Vec, } +#[derive(Debug, Deserialize)] +pub struct MetricsConfiguration { + #[serde(default = "default_metrics_listen_addr")] + pub listen_address: String, +} + +impl Default for MetricsConfiguration { + fn default() -> Self { + Self { + listen_address: default_metrics_listen_addr(), + } + } +} + #[derive(Debug, Deserialize)] pub struct Configuration { pub mqtt: MqttConfiguration, pub http: HttpConfiguration, + #[serde(default)] + pub metrics: MetricsConfiguration, } impl Default for Configuration { @@ -45,6 +61,7 @@ impl Default for Configuration { url: "http://127.0.0.1:9428/insert/jsonline?_stream_fields=topic" .into(), }, + metrics: Default::default(), } } } @@ -62,6 +79,10 @@ fn default_mqtt_client_id() -> String { env!("CARGO_PKG_NAME").into() } +fn default_metrics_listen_addr() -> String { + "0.0.0.0:9002".into() +} + #[derive(Debug, thiserror::Error)] pub enum ConfigurationError { #[error("Error reading file: {0}")] diff --git a/src/main.rs b/src/main.rs index cd20dcf..5aca772 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,17 +3,20 @@ mod config; mod mqtt; mod relay; +use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, FixedOffset, Utc}; use futures::stream::StreamExt; +use metrics::{counter, gauge}; use serde::Serialize; use tokio::signal::unix::SignalKind; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::Notify; use tokio::task::JoinHandle; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; use backoff::Backoff; use config::Configuration; @@ -78,6 +81,11 @@ async fn run_sender( } let (stream, handle) = relay.new_stream(); let stream = stream + .map(|v| { + trace!("{:?}", v); + gauge!("message_queue_depth").decrement(1); + v + }) .map(|v| serde_json::to_string(&v).map(|v| format!("{}\n", v))); let body = reqwest::Body::wrap_stream(stream); let req = client @@ -127,12 +135,15 @@ async fn run_subscriber( } match LogRecord::try_from(msg) { Ok(m) => { + counter!("messages_processed_total").increment(1); if let Err(e) = tx.send(m) { error!("mpsc channel error: {}", e); break; } + gauge!("message_queue_depth").increment(1) } Err(e) => { + counter!("messages_invalid_total").increment(1); warn!("Ignoring non-UTF8 payload: {}", e); } } @@ -177,6 +188,36 @@ async fn wait_signal() -> Result<(), std::io::Error> { Ok(()) } +fn setup_metrics( + config: &Configuration, +) -> Result<(), Box> { + let addr = SocketAddr::from_str(&config.metrics.listen_address)?; + metrics_exporter_prometheus::PrometheusBuilder::new() + .with_http_listener(addr) + .install()?; + info!("Serving Prometheus metrics on {}", addr); + + metrics::gauge!("message_queue_depth").set(0); + metrics::describe_gauge!( + "message_queue_depth", + "Current message queue depth" + ); + + metrics::counter!("messages_processed_total").absolute(0); + metrics::describe_counter!( + "messages_processed_total", + "Total number of messages processed" + ); + + metrics::counter!("messages_invalid_total").absolute(0); + metrics::describe_counter!( + "messages_invalid_total", + "Total number of non-UTF8 messages ignored" + ); + + Ok(()) +} + #[tokio::main] async fn main() { tracing_subscriber::fmt() @@ -196,6 +237,10 @@ async fn main() { Configuration::default() }; + if let Err(e) = setup_metrics(&config) { + warn!("Failed to set up metrics: {}", e); + } + let notify = Arc::new(Notify::new()); let (tx, rx) = mpsc::unbounded_channel(); let mqtt_task = match start_subscriber(&config, tx, notify.clone()).await {