From 41c87d87afa3a1a9165fd530739aa3e9cd3992f4 Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Fri, 30 Dec 2022 14:39:10 -0600 Subject: [PATCH] mqtt: Add MqttClient::connect method Separating the `connect` call out of the `MqttClient::new` function makes is such that we do not have to create a new object for each iteration of the initial connection loop. Instead, we just create one object and repeatedly call its `connect` method until it succeeds --- src/mqtt.rs | 26 +++++++++++++++----------- src/session.rs | 11 ++++------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/mqtt.rs b/src/mqtt.rs index a22fa3c..36061c6 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -3,20 +3,21 @@ use std::time::Duration; pub use paho_mqtt::Error; use paho_mqtt::{ AsyncClient, AsyncReceiver, ConnectOptions, ConnectOptionsBuilder, - CreateOptionsBuilder, Message, SslOptionsBuilder, + CreateOptionsBuilder, Message, ServerResponse, SslOptionsBuilder, }; use tokio_stream::StreamExt; use tracing::{info, trace}; use crate::config::Configuration; -pub struct MqttClient { +pub struct MqttClient<'a> { + config: &'a Configuration, client: AsyncClient, stream: AsyncReceiver>, } -impl MqttClient { - pub async fn new(config: &Configuration) -> Result { +impl<'a> MqttClient<'a> { + pub fn new(config: &'a Configuration) -> Result { let uri = format!( "{}://{}:{}", if config.mqtt.tls { "ssl" } else { "tcp" }, @@ -28,10 +29,13 @@ impl MqttClient { CreateOptionsBuilder::new().server_uri(uri).finalize(); let mut client = AsyncClient::new(client_opts)?; let stream = client.get_stream(10); - client.connect(Self::conn_opts(config)?).await?; - info!("Successfully connected to MQTT broker"); + Ok(Self { config, client, stream }) + } - Ok(Self { client, stream }) + pub async fn connect(&mut self) -> Result { + let res = self.client.connect(self.conn_opts()?).await?; + info!("Successfully connected to MQTT broker"); + Ok(res) } pub async fn run(mut self) { @@ -41,20 +45,20 @@ impl MqttClient { } } - fn conn_opts(config: &Configuration) -> Result { + fn conn_opts(&self) -> Result { let mut conn_opts = ConnectOptionsBuilder::new(); conn_opts.automatic_reconnect( Duration::from_millis(500), Duration::from_secs(30), ); - if config.mqtt.tls { + if self.config.mqtt.tls { let ssl_opts = SslOptionsBuilder::new() - .trust_store(&config.mqtt.ca_file)? + .trust_store(&self.config.mqtt.ca_file)? .finalize(); conn_opts.ssl_options(ssl_opts); } if let [Some(username), Some(password)] = - [&config.mqtt.username, &config.mqtt.password] + [&self.config.mqtt.username, &self.config.mqtt.password] { conn_opts.user_name(username).password(password); } diff --git a/src/session.rs b/src/session.rs index 2883126..d37ba5f 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,11 +2,11 @@ use std::time::Duration; use tracing::{debug, info, warn}; -use crate::mqtt::MqttClient; use crate::browser::{Browser, BrowserError}; use crate::config::Configuration; use crate::marionette::error::ConnectionError; use crate::marionette::Marionette; +use crate::mqtt::MqttClient; #[derive(Debug)] pub enum SessionError { @@ -84,13 +84,10 @@ impl Session { } pub async fn run(&self) { - let client; + let mut client = MqttClient::new(&self.config).unwrap(); loop { - match MqttClient::new(&self.config).await { - Ok(c) => { - client = c; - break; - } + match client.connect().await { + Ok(_) => break, Err(e) => warn!("Failed to connect to MQTT server: {}", e), } tokio::time::sleep(Duration::from_secs(1)).await;