Implement Prometheus metrics

The `metrics` crate, along with `metrics-exporter-prometheus`, makes it
extremely easy to instrument code with metrics and export them to
Prometheus.  Here, we're adding a few metrics to track how many messages
are being processed and monitor the queue depth.
master
Dustin 2025-04-30 21:20:04 -05:00
parent 753a0be931
commit ec364b8fd3
4 changed files with 283 additions and 1 deletions

214
Cargo.lock generated
View File

@ -17,6 +17,18 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy 0.7.35",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.3" version = "1.1.3"
@ -147,6 +159,15 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.21" version = "0.8.21"
@ -198,6 +219,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]] [[package]]
name = "foreign-types" name = "foreign-types"
version = "0.3.2" version = "0.3.2"
@ -370,6 +397,9 @@ name = "hashbrown"
version = "0.15.2" version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
"foldhash",
]
[[package]] [[package]]
name = "http" name = "http"
@ -411,6 +441,12 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.6.0" version = "1.6.0"
@ -424,6 +460,7 @@ dependencies = [
"http", "http",
"http-body", "http-body",
"httparse", "httparse",
"httpdate",
"itoa", "itoa",
"pin-project-lite", "pin-project-lite",
"smallvec", "smallvec",
@ -700,6 +737,52 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "metrics"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5"
dependencies = [
"ahash",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df88858cd28baaaf2cfc894e37789ed4184be0e1351157aec7bf3c2266c793fd"
dependencies = [
"base64",
"http-body-util",
"hyper",
"hyper-util",
"indexmap",
"ipnet",
"metrics",
"metrics-util",
"quanta",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]]
name = "metrics-util"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown",
"metrics",
"quanta",
"rand",
"rand_xoshiro",
"sketches-ddsketch",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@ -732,6 +815,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"futures", "futures",
"metrics",
"metrics-exporter-prometheus",
"paho-mqtt", "paho-mqtt",
"reqwest", "reqwest",
"serde", "serde",
@ -894,6 +979,21 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "portable-atomic"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy 0.8.25",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.95" version = "1.0.95"
@ -903,6 +1003,21 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quanta"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.40" version = "1.0.40"
@ -918,6 +1033,53 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "rand"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.2",
]
[[package]]
name = "rand_xoshiro"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41"
dependencies = [
"rand_core",
]
[[package]]
name = "raw-cpuid"
version = "11.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.11.1" version = "1.11.1"
@ -1199,6 +1361,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "sketches-ddsketch"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.9" version = "0.4.9"
@ -1595,6 +1763,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.1" version = "0.3.1"
@ -1970,6 +2144,46 @@ dependencies = [
"synstructure", "synstructure",
] ]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb"
dependencies = [
"zerocopy-derive 0.8.25",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "zerofrom" name = "zerofrom"
version = "0.1.6" version = "0.1.6"

View File

@ -6,6 +6,8 @@ edition = "2021"
[dependencies] [dependencies]
chrono = { version = "0.4.40", default-features = false, features = ["std", "now", "serde"] } chrono = { version = "0.4.40", default-features = false, features = ["std", "now", "serde"] }
futures = "0.3.31" futures = "0.3.31"
metrics = "0.24.2"
metrics-exporter-prometheus = { version = "0.17.0", default-features = false, features = ["http-listener"] }
paho-mqtt = { version = "0.13.2", default-features = false, features = ["ssl"] } paho-mqtt = { version = "0.13.2", default-features = false, features = ["ssl"] }
reqwest = { version = "0.12.15", default-features = false, features = ["http2", "stream", "native-tls", "json"] } reqwest = { version = "0.12.15", default-features = false, features = ["http2", "stream", "native-tls", "json"] }
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }

View File

@ -21,10 +21,26 @@ pub struct MqttConfiguration {
pub topics: Vec<String>, pub topics: Vec<String>,
} }
#[derive(Debug, Deserialize)]
pub struct MetricsConfiguration {
#[serde(default = "default_metrics_listen_addr")]
pub listen_address: String,
}
impl Default for MetricsConfiguration {
fn default() -> Self {
Self {
listen_address: default_metrics_listen_addr(),
}
}
}
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct Configuration { pub struct Configuration {
pub mqtt: MqttConfiguration, pub mqtt: MqttConfiguration,
pub http: HttpConfiguration, pub http: HttpConfiguration,
#[serde(default)]
pub metrics: MetricsConfiguration,
} }
impl Default for Configuration { impl Default for Configuration {
@ -45,6 +61,7 @@ impl Default for Configuration {
url: "http://127.0.0.1:9428/insert/jsonline?_stream_fields=topic" url: "http://127.0.0.1:9428/insert/jsonline?_stream_fields=topic"
.into(), .into(),
}, },
metrics: Default::default(),
} }
} }
} }
@ -62,6 +79,10 @@ fn default_mqtt_client_id() -> String {
env!("CARGO_PKG_NAME").into() env!("CARGO_PKG_NAME").into()
} }
fn default_metrics_listen_addr() -> String {
"0.0.0.0:9002".into()
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ConfigurationError { pub enum ConfigurationError {
#[error("Error reading file: {0}")] #[error("Error reading file: {0}")]

View File

@ -3,17 +3,20 @@ mod config;
mod mqtt; mod mqtt;
mod relay; mod relay;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use chrono::{DateTime, FixedOffset, Utc}; use chrono::{DateTime, FixedOffset, Utc};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use metrics::{counter, gauge};
use serde::Serialize; use serde::Serialize;
use tokio::signal::unix::SignalKind; use tokio::signal::unix::SignalKind;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, trace, warn};
use backoff::Backoff; use backoff::Backoff;
use config::Configuration; use config::Configuration;
@ -78,6 +81,11 @@ async fn run_sender(
} }
let (stream, handle) = relay.new_stream(); let (stream, handle) = relay.new_stream();
let stream = stream let stream = stream
.map(|v| {
trace!("{:?}", v);
gauge!("message_queue_depth").decrement(1);
v
})
.map(|v| serde_json::to_string(&v).map(|v| format!("{}\n", v))); .map(|v| serde_json::to_string(&v).map(|v| format!("{}\n", v)));
let body = reqwest::Body::wrap_stream(stream); let body = reqwest::Body::wrap_stream(stream);
let req = client let req = client
@ -127,12 +135,15 @@ async fn run_subscriber(
} }
match LogRecord::try_from(msg) { match LogRecord::try_from(msg) {
Ok(m) => { Ok(m) => {
counter!("messages_processed_total").increment(1);
if let Err(e) = tx.send(m) { if let Err(e) = tx.send(m) {
error!("mpsc channel error: {}", e); error!("mpsc channel error: {}", e);
break; break;
} }
gauge!("message_queue_depth").increment(1)
} }
Err(e) => { Err(e) => {
counter!("messages_invalid_total").increment(1);
warn!("Ignoring non-UTF8 payload: {}", e); warn!("Ignoring non-UTF8 payload: {}", e);
} }
} }
@ -177,6 +188,36 @@ async fn wait_signal() -> Result<(), std::io::Error> {
Ok(()) Ok(())
} }
fn setup_metrics(
config: &Configuration,
) -> Result<(), Box<dyn std::error::Error>> {
let addr = SocketAddr::from_str(&config.metrics.listen_address)?;
metrics_exporter_prometheus::PrometheusBuilder::new()
.with_http_listener(addr)
.install()?;
info!("Serving Prometheus metrics on {}", addr);
metrics::gauge!("message_queue_depth").set(0);
metrics::describe_gauge!(
"message_queue_depth",
"Current message queue depth"
);
metrics::counter!("messages_processed_total").absolute(0);
metrics::describe_counter!(
"messages_processed_total",
"Total number of messages processed"
);
metrics::counter!("messages_invalid_total").absolute(0);
metrics::describe_counter!(
"messages_invalid_total",
"Total number of non-UTF8 messages ignored"
);
Ok(())
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
@ -196,6 +237,10 @@ async fn main() {
Configuration::default() Configuration::default()
}; };
if let Err(e) = setup_metrics(&config) {
warn!("Failed to set up metrics: {}", e);
}
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let mqtt_task = match start_subscriber(&config, tx, notify.clone()).await { let mqtt_task = match start_subscriber(&config, tx, notify.clone()).await {