mqtt2vl/src/mqtt.rs

119 lines
3.6 KiB
Rust

use std::path::PathBuf;
use std::sync::Arc;
use paho_mqtt as mqtt;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use crate::backoff::Backoff;
use crate::config::Configuration;
pub struct MqttClient {
url: String,
client: Arc<Mutex<mqtt::AsyncClient>>,
topics: Vec<String>,
backoff: Backoff,
ca_file: Option<PathBuf>,
client_cert: Option<PathBuf>,
client_key: Option<PathBuf>,
username: Option<String>,
password: Option<String>,
}
impl MqttClient {
pub fn new(config: &Configuration) -> Result<Self, MqttClientError> {
let url = config.mqtt.url.clone();
let ca_file = config.mqtt.ca_file.clone();
let client_cert = config.mqtt.client_cert.clone();
let client_key = config.mqtt.client_key.clone();
let username = config.mqtt.username.clone();
let password = if let Some(f) = &config.mqtt.password_file {
Some(
std::fs::read_to_string(f)
.map_err(MqttClientError::PasswordFile)?
.trim()
.to_owned(),
)
} else {
config.mqtt.password.clone()
};
let opts = mqtt::CreateOptionsBuilder::new()
.server_uri(&config.mqtt.url)
.client_id(&config.mqtt.client_id)
.finalize();
let client = Arc::new(Mutex::new(mqtt::AsyncClient::new(opts)?));
let topics = config.mqtt.topics.clone();
let backoff = Backoff::default();
Ok(Self {
url,
client,
topics,
backoff,
ca_file,
client_cert,
client_key,
username,
password,
})
}
pub async fn connect(&mut self) -> Result<(), mqtt::Error> {
let client = self.client.lock().await;
client.connect(self.conn_opts()?).await?;
info!("Successfully connected to MQTT broker");
for topic in &self.topics {
debug!("Subscribing to topic: {}", topic);
client.subscribe(topic, 0).await?;
}
Ok(())
}
pub async fn reconnect(&mut self) {
while let Err(e) = self.connect().await {
error!("Reconnect failed: {}", e);
self.backoff.sleep().await;
}
self.backoff.reset();
}
pub async fn stream(
&mut self,
) -> mqtt::AsyncReceiver<Option<mqtt::Message>> {
self.client.lock().await.get_stream(100)
}
fn conn_opts(&self) -> Result<mqtt::ConnectOptions, mqtt::Error> {
let mut conn_opts = mqtt::ConnectOptionsBuilder::new();
if self.url.starts_with("mqtts://") || self.url.starts_with("ssl://") {
let mut ssl_opts = mqtt::SslOptionsBuilder::new();
ssl_opts.verify(true);
if let Some(ca_file) = &self.ca_file {
ssl_opts.trust_store(ca_file)?;
}
if let Some(client_cert) = &self.client_cert {
ssl_opts.key_store(client_cert)?;
}
if let Some(client_key) = &self.client_key {
ssl_opts.key_store(client_key)?;
}
let ssl_opts = ssl_opts.finalize();
conn_opts.ssl_options(ssl_opts);
}
if let [Some(username), Some(password)] =
[&self.username, &self.password]
{
conn_opts.user_name(username).password(password);
}
Ok(conn_opts.finalize())
}
}
#[derive(Debug, thiserror::Error)]
pub enum MqttClientError {
#[error("MQTT client error: {0}")]
Mqtt(#[from] mqtt::Error),
#[error("Could not read password from file: {0}")]
PasswordFile(std::io::Error),
}