Initial commit

master
Dustin 2025-04-29 22:11:10 -05:00
commit 753a0be931
9 changed files with 2559 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target/
/config.toml
/mqtt.password

2020
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

18
Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "mqtt2vl"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = { version = "0.4.40", default-features = false, features = ["std", "now", "serde"] }
futures = "0.3.31"
paho-mqtt = { version = "0.13.2", default-features = false, features = ["ssl"] }
reqwest = { version = "0.12.15", default-features = false, features = ["http2", "stream", "native-tls", "json"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["rt", "macros", "rt-multi-thread", "signal"] }
tokio-stream = { version = "0.1.17", default-features = false, features = ["sync"] }
toml = "0.8.22"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

1
rustfmt.toml Normal file
View File

@ -0,0 +1 @@
max_width = 79

47
src/backoff.rs Normal file
View File

@ -0,0 +1,47 @@
use std::iter::Iterator;
use std::time::Duration;
use tracing::warn;
pub struct Backoff {
duration: Duration,
max: Duration,
min: Duration,
}
impl Default for Backoff {
fn default() -> Self {
let min = Duration::from_millis(250);
let max = Duration::from_secs(300);
let duration = min;
Self { duration, max, min }
}
}
impl Iterator for Backoff {
type Item = Duration;
fn next(&mut self) -> Option<Self::Item> {
let duration = self.duration;
self.duration = std::cmp::min(self.duration * 2, self.max);
Some(duration)
}
}
impl Backoff {
pub fn reset(&mut self) {
self.duration = self.min;
}
pub async fn sleep(&mut self) {
warn!(
"Retrying in {}",
if self.duration.as_secs() < 1 {
format!("{} ms", self.duration.as_millis())
} else {
format!("{} seconds", self.duration.as_secs())
}
);
tokio::time::sleep(self.next().unwrap()).await;
}
}

72
src/config.rs Normal file
View File

@ -0,0 +1,72 @@
use std::path::{Path, PathBuf};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct HttpConfiguration {
pub url: String,
}
#[derive(Debug, Deserialize)]
pub struct MqttConfiguration {
pub url: String,
#[serde(default = "default_mqtt_client_id")]
pub client_id: String,
pub ca_file: Option<PathBuf>,
pub client_cert: Option<PathBuf>,
pub client_key: Option<PathBuf>,
pub username: Option<String>,
pub password: Option<String>,
pub password_file: Option<PathBuf>,
pub topics: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct Configuration {
pub mqtt: MqttConfiguration,
pub http: HttpConfiguration,
}
impl Default for Configuration {
fn default() -> Self {
Configuration {
mqtt: MqttConfiguration {
url: "mqtt://127.0.0.1:1883".into(),
topics: vec!["/debug".into()],
client_id: default_mqtt_client_id(),
ca_file: None,
client_cert: None,
client_key: None,
username: None,
password: None,
password_file: None,
},
http: HttpConfiguration {
url: "http://127.0.0.1:9428/insert/jsonline?_stream_fields=topic"
.into(),
},
}
}
}
impl Configuration {
pub fn from_file<P: AsRef<Path>>(
path: P,
) -> Result<Self, ConfigurationError> {
let content = std::fs::read_to_string(path)?;
Ok(toml::from_str(&content)?)
}
}
fn default_mqtt_client_id() -> String {
env!("CARGO_PKG_NAME").into()
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigurationError {
#[error("Error reading file: {0}")]
Io(#[from] std::io::Error),
#[error("Error parsing TOML: {0}")]
Toml(#[from] toml::de::Error),
}

228
src/main.rs Normal file
View File

@ -0,0 +1,228 @@
mod backoff;
mod config;
mod mqtt;
mod relay;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, FixedOffset, Utc};
use futures::stream::StreamExt;
use serde::Serialize;
use tokio::signal::unix::SignalKind;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use backoff::Backoff;
use config::Configuration;
use mqtt::MqttClient;
static USER_AGENT: &str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
#[derive(Debug, Serialize)]
struct LogRecord {
#[serde(rename = "_time")]
time: DateTime<FixedOffset>,
#[serde(rename = "_msg")]
message: String,
topic: String,
}
impl TryFrom<paho_mqtt::Message> for LogRecord {
type Error = std::str::Utf8Error;
fn try_from(m: paho_mqtt::Message) -> Result<Self, Self::Error> {
let message = std::str::from_utf8(m.payload())?.to_string();
Ok(Self {
time: Utc::now().into(),
message,
topic: m.topic().to_string(),
})
}
}
async fn run_sender(
client: reqwest::Client,
url: &str,
chan: UnboundedReceiver<LogRecord>,
notify: &Notify,
) {
let mut backoff = Backoff::default();
let relay = relay::Relay::from(chan);
loop {
if relay.closed().await {
break;
}
let req = client
.post(url)
.header(reqwest::header::ACCEPT, "application/json")
.header(reqwest::header::CONTENT_LENGTH, "0");
debug!("Checking HTTP connection");
tokio::select! {
_ = notify.notified() => break,
r = req.send() => {
if let Err(e) = r {
error!("Error in HTTP request: {}", e);
tokio::select! {
_ = notify.notified() => break,
_ = backoff.sleep() => (),
}
continue;
}
backoff.reset();
debug!("HTTP connection successful");
}
}
let (stream, handle) = relay.new_stream();
let stream = stream
.map(|v| serde_json::to_string(&v).map(|v| format!("{}\n", v)));
let body = reqwest::Body::wrap_stream(stream);
let req = client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/stream+json")
.body(body);
info!("Starting HTTP sender stream");
if let Err(e) = req.send().await {
error!("HTTP request error: {}", e);
if let Err(e) = handle.await {
error!("Error in sender: {}", e);
}
} else {
break;
}
}
info!("Channel closed, stopping HTTP sender");
}
async fn run_subscriber(
mut client: MqttClient,
tx: UnboundedSender<LogRecord>,
notify: &Notify,
) {
let mut stream = client.stream().await;
tokio::select! {
_ = notify.notified() => return,
_ = client.reconnect() => (),
};
loop {
tokio::select! {
_ = notify.notified() => {
info!("Stopping MQTT subscriber");
break;
}
msg = stream.next() => {
let Some(Some(msg)) = msg else {
error!("Lost connection to MQTT broker, reconnecting");
client.reconnect().await;
continue;
};
if msg.retained() {
debug!("Ignoring retained message on topic {}", msg.topic());
continue;
}
match LogRecord::try_from(msg) {
Ok(m) => {
if let Err(e) = tx.send(m) {
error!("mpsc channel error: {}", e);
break;
}
}
Err(e) => {
warn!("Ignoring non-UTF8 payload: {}", e);
}
}
}
}
}
}
async fn start_sender(
config: &Configuration,
chan: UnboundedReceiver<LogRecord>,
notify: Arc<Notify>,
) -> Result<JoinHandle<()>, reqwest::Error> {
let client = reqwest::Client::builder()
.tcp_keepalive(Duration::from_secs(10))
.user_agent(USER_AGENT)
.build()?;
let url = config.http.url.clone();
Ok(tokio::spawn(async move {
run_sender(client, &url, chan, &notify).await
}))
}
async fn start_subscriber(
config: &Configuration,
tx: UnboundedSender<LogRecord>,
notify: Arc<Notify>,
) -> Result<JoinHandle<()>, mqtt::MqttClientError> {
let client = MqttClient::new(config)?;
Ok(tokio::spawn(async move {
run_subscriber(client, tx, &notify).await
}))
}
async fn wait_signal() -> Result<(), std::io::Error> {
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
tokio::select! {
_ = sigterm.recv() => debug!("Got signal SIGTERM"),
_ = sigint.recv() => debug!("Got signal SIGINT"),
};
Ok(())
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.init();
let config = if let Some(arg) = std::env::args().nth(1) {
match Configuration::from_file(arg) {
Ok(c) => c,
Err(e) => {
error!("Failed to load configuration: {}", e);
std::process::exit(1);
}
}
} else {
Configuration::default()
};
let notify = Arc::new(Notify::new());
let (tx, rx) = mpsc::unbounded_channel();
let mqtt_task = match start_subscriber(&config, tx, notify.clone()).await {
Ok(h) => h,
Err(e) => {
error!("Failed to start MQTT subscriber: {}", e);
std::process::exit(1);
}
};
let http_task = match start_sender(&config, rx, notify.clone()).await {
Ok(h) => h,
Err(e) => {
error!("Failed to start HTTP sender: {}", e);
std::process::exit(1);
}
};
let mut rc = 0;
if let Err(e) = wait_signal().await {
error!("Error setting up signal handler: {}", e);
rc = 1;
}
notify.notify_waiters();
if let Err(e) = mqtt_task.await {
debug!("Error in MQTT subscriber task: {}", e);
}
if let Err(e) = http_task.await {
debug!("Error in HTTP sender task: {}", e);
}
std::process::exit(rc);
}

118
src/mqtt.rs Normal file
View File

@ -0,0 +1,118 @@
use std::path::PathBuf;
use std::sync::Arc;
use paho_mqtt as mqtt;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use crate::backoff::Backoff;
use crate::config::Configuration;
pub struct MqttClient {
url: String,
client: Arc<Mutex<mqtt::AsyncClient>>,
topics: Vec<String>,
backoff: Backoff,
ca_file: Option<PathBuf>,
client_cert: Option<PathBuf>,
client_key: Option<PathBuf>,
username: Option<String>,
password: Option<String>,
}
impl MqttClient {
pub fn new(config: &Configuration) -> Result<Self, MqttClientError> {
let url = config.mqtt.url.clone();
let ca_file = config.mqtt.ca_file.clone();
let client_cert = config.mqtt.client_cert.clone();
let client_key = config.mqtt.client_key.clone();
let username = config.mqtt.username.clone();
let password = if let Some(f) = &config.mqtt.password_file {
Some(
std::fs::read_to_string(f)
.map_err(MqttClientError::PasswordFile)?
.trim()
.to_owned(),
)
} else {
config.mqtt.password.clone()
};
let opts = mqtt::CreateOptionsBuilder::new()
.server_uri(&config.mqtt.url)
.client_id(&config.mqtt.client_id)
.finalize();
let client = Arc::new(Mutex::new(mqtt::AsyncClient::new(opts)?));
let topics = config.mqtt.topics.clone();
let backoff = Backoff::default();
Ok(Self {
url,
client,
topics,
backoff,
ca_file,
client_cert,
client_key,
username,
password,
})
}
pub async fn connect(&mut self) -> Result<(), mqtt::Error> {
let client = self.client.lock().await;
client.connect(self.conn_opts()?).await?;
info!("Successfully connected to MQTT broker");
for topic in &self.topics {
debug!("Subscribing to topic: {}", topic);
client.subscribe(topic, 0).await?;
}
Ok(())
}
pub async fn reconnect(&mut self) {
while let Err(e) = self.connect().await {
error!("Reconnect failed: {}", e);
self.backoff.sleep().await;
}
self.backoff.reset();
}
pub async fn stream(
&mut self,
) -> mqtt::AsyncReceiver<Option<mqtt::Message>> {
self.client.lock().await.get_stream(100)
}
fn conn_opts(&self) -> Result<mqtt::ConnectOptions, mqtt::Error> {
let mut conn_opts = mqtt::ConnectOptionsBuilder::new();
if self.url.starts_with("mqtts://") || self.url.starts_with("ssl://") {
let mut ssl_opts = mqtt::SslOptionsBuilder::new();
ssl_opts.verify(true);
if let Some(ca_file) = &self.ca_file {
ssl_opts.trust_store(ca_file)?;
}
if let Some(client_cert) = &self.client_cert {
ssl_opts.key_store(client_cert)?;
}
if let Some(client_key) = &self.client_key {
ssl_opts.key_store(client_key)?;
}
let ssl_opts = ssl_opts.finalize();
conn_opts.ssl_options(ssl_opts);
}
if let [Some(username), Some(password)] =
[&self.username, &self.password]
{
conn_opts.user_name(username).password(password);
}
Ok(conn_opts.finalize())
}
}
#[derive(Debug, thiserror::Error)]
pub enum MqttClientError {
#[error("MQTT client error: {0}")]
Mqtt(#[from] mqtt::Error),
#[error("Could not read password from file: {0}")]
PasswordFile(std::io::Error),
}

52
src/relay.rs Normal file
View File

@ -0,0 +1,52 @@
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
pub struct Relay<T> {
channel: Arc<Mutex<UnboundedReceiver<T>>>,
}
impl<T: Send + 'static> From<UnboundedReceiver<T>> for Relay<T> {
fn from(channel: UnboundedReceiver<T>) -> Self {
let channel = Arc::new(Mutex::new(channel));
Self { channel }
}
}
impl<T: Send + 'static> Relay<T> {
pub async fn closed(&self) -> bool {
let chan = self.channel.lock().await;
chan.is_closed()
}
pub fn new_stream(&self) -> (UnboundedReceiverStream<T>, JoinHandle<()>) {
let (tx, rx) = mpsc::unbounded_channel();
let h = tokio::spawn({
let chan = self.channel.clone();
async move {
let mut chan = chan.lock().await;
loop {
tokio::select! {
it = chan.recv() => {
if let Some(it) = it {
if tx.send(it).is_err() {
break;
}
} else {
debug!("Upstream channel closed");
break
}
}
_ = tx.closed() => break,
};
}
}
});
(UnboundedReceiverStream::new(rx), h)
}
}