Compare commits

..

No commits in common. "4156a106854ddf4776cadc8b9ac663d6464ffb1d" and "38e826b454e1ad1cea1e12286b0f75f3315a18bb" have entirely different histories.

12 changed files with 34 additions and 495 deletions

30
Cargo.lock generated
View File

@ -2,15 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
dependencies = [
"memchr",
]
[[package]]
name = "async-channel"
version = "1.8.0"
@ -253,17 +244,6 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
]
[[package]]
name = "indexmap"
version = "1.9.2"
@ -341,12 +321,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matchers"
version = "0.1.0"
@ -401,12 +375,10 @@ name = "mqttmarionette"
version = "0.1.0"
dependencies = [
"async-trait",
"hostname",
"inotify",
"mozprofile",
"mozrunner",
"paho-mqtt",
"regex",
"serde",
"serde_json",
"tokio",
@ -563,8 +535,6 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]

View File

@ -5,12 +5,10 @@ edition = "2021"
[dependencies]
async-trait = "0.1.60"
hostname = "0.3.1"
inotify = "0.10.0"
mozprofile = "0.9.0"
mozrunner = "0.15.0"
paho-mqtt = "0.11.1"
regex = "1.7.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.23.0", features = ["io-util", "macros", "net", "rt", "signal", "sync", "time"] }
@ -18,4 +16,4 @@ tokio-stream = "0.1.11"
toml = "0.5.10"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] }
x11 = { version = "2.20.1", features = ["dpms", "xlib", "xrandr"] }
x11 = { version = "2.20.1", features = ["xlib", "xrandr"] }

View File

@ -3,8 +3,6 @@ use std::path::{Path, PathBuf};
use serde::Deserialize;
use crate::util;
#[derive(Debug)]
pub enum ConfigError {
Io(std::io::Error),
@ -75,8 +73,6 @@ impl Default for MqttConfig {
#[derive(Debug, Default, Deserialize)]
pub struct Configuration {
#[serde(default = "util::hostname")]
pub unique_id: String,
pub mqtt: MqttConfig,
}

View File

@ -1,47 +0,0 @@
use regex::Regex;
use serde::Serialize;
use crate::util;
#[derive(Serialize)]
pub struct HassDevice {
pub identifiers: Vec<String>,
pub manufacturer: String,
pub model: String,
pub name: String,
pub sw_version: String,
}
impl Default for HassDevice {
fn default() -> Self {
Self {
identifiers: vec![util::hostname()],
manufacturer: "Dustin C. Hatch".into(),
model: env!("CARGO_PKG_VERSION").into(),
name: "Browser HUD".into(),
sw_version: env!("CARGO_PKG_VERSION").into(),
}
}
}
#[derive(Serialize)]
pub struct HassConfig {
pub availability_topic: String,
pub command_topic: Option<String>,
pub device: HassDevice,
pub name: String,
pub state_topic: String,
pub unique_id: String,
pub object_id: String,
pub icon: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub retain: Option<bool>,
}
pub fn slugify(s: impl AsRef<str>) -> String {
let re = Regex::new("[^a-z0-9_]").unwrap();
let s = s.as_ref().to_ascii_lowercase();
let s = re.replace_all(&s, "_");
let re = Regex::new("_+").unwrap();
re.replace_all(s.trim_matches('_'), "_").into()
}

View File

@ -1,18 +1,13 @@
mod browser;
mod config;
mod hass;
mod marionette;
mod monitor;
mod mqtt;
mod session;
mod util;
#[cfg(unix)]
mod x11;
use std::sync::Arc;
use tokio::signal::unix::{self, SignalKind};
use tokio::sync::Notify;
use tracing::info;
use tracing_subscriber::filter::EnvFilter;
@ -29,18 +24,14 @@ async fn main() {
config::load_config(std::env::var("MQTTMARIONETTE_CONFIG").ok())
.unwrap();
let stop = Arc::new(Notify::new());
let stopwait = stop.clone();
let task = tokio::spawn(async move {
let session = Session::begin(config).await.unwrap();
session.run(stopwait).await;
session.run().await;
});
wait_signal().await;
stop.notify_waiters();
task.await;
task.abort();
}
#[cfg(unix)]

View File

@ -8,7 +8,6 @@ pub enum MessageError {
Io(std::io::Error),
Parse(ParseIntError),
Utf8(Utf8Error),
Disconnected,
}
impl From<std::io::Error> for MessageError {
@ -35,7 +34,6 @@ impl std::fmt::Display for MessageError {
Self::Io(e) => write!(f, "I/O error: {}", e),
Self::Parse(e) => write!(f, "Error parsing message: {}", e),
Self::Utf8(e) => write!(f, "Error parsing message: {}", e),
Self::Disconnected => write!(f, "Disconnected"),
}
}
}
@ -46,7 +44,6 @@ impl std::error::Error for MessageError {
Self::Io(e) => Some(e),
Self::Parse(e) => Some(e),
Self::Utf8(e) => Some(e),
Self::Disconnected => None,
}
}
}

View File

@ -150,9 +150,6 @@ impl MarionetteConnection {
{
let mut buf = vec![];
stream.read_until(b':', &mut buf).await?;
if buf.is_empty() {
return Err(MessageError::Disconnected);
}
let length: usize =
std::str::from_utf8(&buf[..buf.len() - 1])?.parse()?;
trace!("Message length: {:?}", length);

View File

@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
@ -8,28 +7,28 @@ use paho_mqtt::{
AsyncReceiver, ConnectOptions, ConnectOptionsBuilder,
CreateOptionsBuilder, ServerResponse, SslOptionsBuilder,
};
use tokio::sync::{Mutex, Notify};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, trace, warn};
use tracing::{info, trace};
use crate::config::Configuration;
use crate::hass::{self, HassConfig};
#[async_trait]
pub trait MessageHandler {
async fn navigate(&mut self, publisher: &MqttPublisher, msg: &Message);
async fn power(&mut self, publisher: &MqttPublisher, msg: &Message);
async fn navigate(
&mut self,
publisher: &MqttPublisher,
msg: &Message,
);
}
#[derive(Debug)]
pub enum MessageType {
Navigate,
PowerState,
}
pub struct MqttClient<'a> {
config: &'a Configuration,
client: Arc<Mutex<AsyncClient>>,
client: AsyncClient,
stream: AsyncReceiver<Option<Message>>,
topics: TopicMatcher<MessageType>,
}
@ -43,13 +42,10 @@ impl<'a> MqttClient<'a> {
config.mqtt.port
);
info!("Connecting to MQTT server {}", uri);
let client_opts = CreateOptionsBuilder::new()
.client_id(&config.unique_id)
.server_uri(uri)
.finalize();
let client_opts =
CreateOptionsBuilder::new().server_uri(uri).finalize();
let mut client = AsyncClient::new(client_opts)?;
let stream = client.get_stream(10);
let client = Arc::new(Mutex::new(client));
let topics = TopicMatcher::new();
Ok(Self {
config,
@ -60,71 +56,35 @@ impl<'a> MqttClient<'a> {
}
pub async fn connect(&mut self) -> Result<ServerResponse, Error> {
let opts = self.conn_opts()?;
trace!("Connect options: {:?}", opts);
let res = self.client.lock().await.connect(opts).await?;
let res = self.client.connect(self.conn_opts()?).await?;
info!("Successfully connected to MQTT broker");
self.on_connect().await;
Ok(res)
}
pub async fn subscribe(&mut self) -> Result<(), Error> {
let client = self.client.lock().await;
pub async fn subscribe(&mut self) -> Result<ServerResponse, Error> {
let prefix = &self.config.mqtt.topic_prefix;
let t_nav = format!("{}/+/navigate", prefix);
let t_power = format!("{}/power", prefix);
client.subscribe(&t_nav, 0).await?;
client.subscribe(&t_power, 0).await?;
let res = self.client.subscribe(&t_nav, 0).await?;
self.topics.insert(t_nav, MessageType::Navigate);
self.topics.insert(t_power, MessageType::PowerState);
Ok(())
Ok(res)
}
pub fn publisher(&mut self) -> MqttPublisher {
MqttPublisher {
config: self.config,
client: self.client.clone(),
}
}
pub async fn run<H>(mut self, mut handler: H, stop: Arc<Notify>)
pub async fn run<H>(mut self, mut handler: H)
where
H: MessageHandler,
{
let publisher = MqttPublisher {
config: self.config,
client: self.client.clone(),
client: &mut self.client,
};
let msg = self.offline_message();
let client = self.client.clone();
tokio::spawn(async move {
stop.notified().await;
let client = client.lock().await;
if let Err(e) = client.publish(msg).await {
error!("Failed to publish offline message: {}", e);
}
client.stop_stream();
client.disconnect(None);
});
while let Some(msg) = self.stream.next().await {
let Some(msg) = msg else {
warn!("Lost connection to MQTT broker, reconnecting");
while let Err(e) = self.client.lock().await.reconnect().await {
error!("Reconnect failed: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
self.on_connect().await;
continue;
};
let Some(msg) = msg else {continue};
trace!("Received message: {:?}", msg);
for m in self.topics.matches(msg.topic()) {
match m.1 {
MessageType::Navigate => {
handler.navigate(&publisher, &msg).await;
}
MessageType::PowerState => {
handler.power(&publisher, &msg).await;
}
}
}
}
@ -132,7 +92,10 @@ impl<'a> MqttClient<'a> {
fn conn_opts(&self) -> Result<ConnectOptions, Error> {
let mut conn_opts = ConnectOptionsBuilder::new();
conn_opts.will_message(self.offline_message());
conn_opts.automatic_reconnect(
Duration::from_millis(500),
Duration::from_secs(30),
);
if self.config.mqtt.tls {
let ssl_opts = SslOptionsBuilder::new()
.trust_store(&self.config.mqtt.ca_file)?
@ -146,38 +109,11 @@ impl<'a> MqttClient<'a> {
}
Ok(conn_opts.finalize())
}
async fn on_connect(&mut self) {
if let Err(e) = self.subscribe().await {
warn!("Error subscribing to MQTT topics: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
let client = self.client.lock().await;
if let Err(e) = client.publish(self.online_message()).await {
error!("Failed to publish availability message: {}", e);
}
}
fn online_message(&self) -> Message {
Message::new_retained(
format!("{}/available", self.config.mqtt.topic_prefix),
"online",
0,
)
}
fn offline_message(&self) -> Message {
Message::new_retained(
format!("{}/available", self.config.mqtt.topic_prefix),
"offline",
0,
)
}
}
pub struct MqttPublisher<'a> {
config: &'a Configuration,
client: Arc<tokio::sync::Mutex<AsyncClient>>,
client: &'a mut AsyncClient,
}
impl<'a> MqttPublisher<'a> {
@ -189,7 +125,7 @@ impl<'a> MqttPublisher<'a> {
let topic =
format!("{}/{}/title", self.config.mqtt.topic_prefix, screen);
let msg = Message::new_retained(topic, title, 0);
(*self.client.lock().await).publish(msg).await?;
self.client.publish(msg).await?;
Ok(())
}
@ -201,96 +137,7 @@ impl<'a> MqttPublisher<'a> {
let topic =
format!("{}/{}/url", self.config.mqtt.topic_prefix, screen);
let msg = Message::new_retained(topic, url, 0);
self.client.lock().await.publish(msg).await?;
Ok(())
}
pub async fn publish_power_state(&self, state: bool) -> Result<(), Error> {
let topic = format!("{}/power_state", self.config.mqtt.topic_prefix);
let msg =
Message::new_retained(topic, if state { "ON" } else { "OFF" }, 0);
self.client.lock().await.publish(msg).await?;
Ok(())
}
pub async fn publish_config(&self, screen: &str) -> Result<(), Error> {
debug!("Publishing Home Assistant configuration");
let prefix = &self.config.mqtt.topic_prefix;
let availability_topic = format!("{}/available", prefix);
let command_topic = Some(format!("{}/{}/navigate", prefix, screen));
let name = format!("{} URL", screen);
let state_topic = format!("{}/{}/url", prefix, screen);
let key = format!(
"browserhud_{}_{}",
self.config.unique_id,
hass::slugify(screen)
);
let unique_id = format!("text.{}_url", key);
let object_id = unique_id.clone();
let msg = Message::new_retained(&availability_topic, "online", 0);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
let config = HassConfig {
availability_topic,
command_topic,
name,
state_topic,
unique_id,
object_id,
device: Default::default(),
icon: "mdi:monitor".into(),
retain: Some(true),
};
let msg = Message::new_retained(
format!("homeassistant/text/{}_url/config", key),
serde_json::to_string(&config).unwrap(),
0,
);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
let unique_id = format!("sensor.{}_title", key);
let object_id = unique_id.clone();
let config = HassConfig {
command_topic: None,
state_topic: format!("{}/{}/title", prefix, screen),
name: format!("{} Title", screen),
unique_id,
object_id,
retain: None,
..config
};
let msg = Message::new_retained(
format!("homeassistant/sensor/{}_title/config", key),
serde_json::to_string(&config).unwrap(),
0,
);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
let unique_id = format!("light.{}", key);
let object_id = unique_id.clone();
let command_topic = Some(format!("{}/power", prefix));
let state_topic = format!("{}/power_state", prefix);
let name = "Display Power".into();
let config = HassConfig {
command_topic,
state_topic,
name,
unique_id,
object_id,
..config
};
let msg = Message::new_retained(
format!("homeassistant/light/{}/config", key),
serde_json::to_string(&config).unwrap(),
0,
);
trace!("Publishing message: {:?}", msg);
self.client.lock().await.publish(msg).await?;
info!("Succesfully published Home Assistant config");
self.client.publish(msg).await?;
Ok(())
}
}

View File

@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tracing::{debug, error, info, trace, warn};
use crate::browser::{Browser, BrowserError};
@ -13,7 +11,7 @@ use crate::marionette::{Marionette, MarionetteConnection};
use crate::monitor::Monitor;
use crate::mqtt::{Message, MqttClient, MqttPublisher};
#[cfg(unix)]
use crate::x11::{dpms, xrandr, Display};
use crate::x11::{xrandr, Display};
#[derive(Debug)]
pub enum SessionError {
@ -100,7 +98,7 @@ impl Session {
})
}
pub async fn run(mut self, stop: Arc<Notify>) {
pub async fn run(mut self) {
let windows = match self.init_windows().await {
Ok(w) => w,
Err(e) => {
@ -116,24 +114,19 @@ impl Session {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
let publisher = client.publisher();
for w in windows.keys() {
if let Err(e) = publisher.publish_config(w).await {
error!(
"Failed to publish Home Assistant config for screen {}: {}",
w, e
);
if let Err(e) = client.subscribe().await {
warn!("Error subscribing to MQTT topics: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
break;
}
let handler = MessageHandler {
marionette: &mut self.marionette,
windows,
};
client.run(handler, stop).await;
client.run(handler).await;
}
async fn init_windows(
@ -230,19 +223,6 @@ impl<'a> crate::mqtt::MessageHandler for MessageHandler<'a> {
Err(e) => error!("Error getting title: {}", e),
}
}
async fn power(&mut self, publisher: &MqttPublisher, msg: &Message) {
match msg.payload_str().as_ref() {
"ON" => turn_screen_on(),
"OFF" => turn_screen_off(),
x => {
warn!("Received unexpected power state command: {}", x);
}
}
if let Err(e) = publisher.publish_power_state(is_screen_on()).await {
error!("Failed to publish power state: {}", e);
}
}
}
#[cfg(unix)]
@ -259,60 +239,3 @@ fn get_monitors() -> Vec<Monitor> {
})
.collect()
}
fn turn_screen_on() {
let display = match Display::open() {
Ok(d) => d,
Err(_) => {
error!("unable to open display \"{}\"", Display::name());
return;
}
};
if !dpms::force_level(&display, dpms::DpmsPowerLevel::On) {
error!("Failed to turn on display \"{}\"", Display::name());
}
if !dpms::disable(&display) {
error!(
"Failed to disable DPMS on display \"{}\"",
Display::name()
);
}
}
#[cfg(unix)]
fn turn_screen_off() {
let display = match Display::open() {
Ok(d) => d,
Err(_) => {
error!("unable to open display \"{}\"", Display::name());
return;
}
};
if !dpms::enable(&display) {
error!(
"Failed to enable DPMS on display \"{}\"",
Display::name()
);
}
if !dpms::force_level(&display, dpms::DpmsPowerLevel::Off) {
error!("Failed to turn off display \"{}\"", Display::name());
}
}
#[cfg(unix)]
fn is_screen_on() -> bool {
let display = match Display::open() {
Ok(d) => d,
Err(_) => {
error!("unable to open display \"{}\"", Display::name());
return false;
}
};
if dpms::query_extension(&display) && dpms::dpms_capable(&display) {
let info = dpms::get_info(&display);
if info.state && info.power_level != dpms::DpmsPowerLevel::On {
return false;
}
}
true
}

View File

@ -1,10 +0,0 @@
pub fn hostname() -> String {
if let Ok(h) = hostname::get() {
if let Some(h) = h.to_str() {
if let Some((h, _)) = h.split_once('.') {
return h.into();
};
};
};
"localhost".into()
}

View File

@ -1,120 +0,0 @@
use x11::dpms::{
DPMSCapable, DPMSDisable, DPMSEnable, DPMSForceLevel, DPMSGetTimeouts,
DPMSInfo, DPMSQueryExtension,
};
use x11::dpms::{DPMSModeOff, DPMSModeOn, DPMSModeStandby, DPMSModeSuspend};
use x11::xmd::{BOOL, CARD16};
use super::Display;
/// DPMS Power Level
///
/// There are four power levels specified by the Video Electronics Standards
/// Association (VESA) Display Power Management Signaling (DPMS) standard.
/// These are mapped onto the X DPMS Extension
#[derive(Eq, PartialEq)]
pub enum DpmsPowerLevel {
/// In use
On = DPMSModeOn as isize,
/// Blanked, low power
Standby = DPMSModeStandby as isize,
/// Blanked, lower power
Suspend = DPMSModeSuspend as isize,
/// Shut off, awaiting activity
Off = DPMSModeOff as isize,
Unknown = -1,
}
impl From<u16> for DpmsPowerLevel {
fn from(v: u16) -> Self {
#[allow(non_snake_case)]
match v {
x if x == DpmsPowerLevel::On as u16 => Self::On,
x if x == DpmsPowerLevel::Standby as u16 => Self::Standby,
x if x == DpmsPowerLevel::Suspend as u16 => Self::Suspend,
x if x == DpmsPowerLevel::Off as u16 => Self::Off,
_ => Self::Unknown,
}
}
}
/// Result from [`get_info`] function (`DPMSInfo`)
pub struct DpmsInfo {
/// Current power level
pub power_level: DpmsPowerLevel,
/// DPMS enabled/disabled state
pub state: bool,
}
/// Result from [`get_timeouts`] function (`DPMSGetTimeouts`)
pub struct DpmsTimeouts {
/// Amount of time of inactivity in seconds before standby mode is invoked
pub standby: u16,
/// Amount of time of inactivity in seconds before the second level of power
/// savings is invoked
pub suspend: u16,
/// Amount of time of inactivity in seconds before the third and final level
/// of power savings is invoked
pub off: u16,
}
/// Queries the X server to determine the availability of the DPMS Extension
pub fn query_extension(display: &Display) -> bool {
let mut event_base = 0;
let mut error_base = 0;
let r = unsafe {
DPMSQueryExtension(display.display, &mut event_base, &mut error_base)
};
r != 0
}
/// Returns the DPMS capability of the X server, either TRUE (capable of DPMS)
/// or FALSE (incapable of DPMS)
pub fn dpms_capable(display: &Display) -> bool {
let r = unsafe { DPMSCapable(display.display) };
r != 0
}
/// Returns information about the current DPMS state
pub fn get_info(display: &Display) -> DpmsInfo {
let mut power_level: CARD16 = 0;
let mut state: BOOL = 0;
unsafe { DPMSInfo(display.display, &mut power_level, &mut state) };
DpmsInfo {
power_level: power_level.into(),
state: state != 0,
}
}
/// Retrieves the timeout values used by the X server for DPMS timings
pub fn get_timeouts(display: &Display) -> DpmsTimeouts {
let mut standby: CARD16 = 0;
let mut suspend: CARD16 = 0;
let mut off: CARD16 = 0;
unsafe {
DPMSGetTimeouts(display.display, &mut standby, &mut suspend, &mut off)
};
DpmsTimeouts {
standby,
suspend,
off,
}
}
/// Forces a DPMS capable display into the specified power level
pub fn force_level(display: &Display, level: DpmsPowerLevel) -> bool {
let r = unsafe { DPMSForceLevel(display.display, level as u16) };
r != 0
}
/// Enables DPMS on the specified display
pub fn enable(display: &Display) -> bool {
let r = unsafe { DPMSEnable(display.display) };
r != 0
}
/// Disables DPMS on the specified display
pub fn disable(display: &Display) -> bool {
let r = unsafe { DPMSDisable(display.display) };
r != 0
}

View File

@ -1,5 +1,3 @@
#[allow(dead_code)]
pub mod dpms;
pub mod xrandr;
use std::ffi::CStr;
@ -20,7 +18,6 @@ pub struct Display {
display: *mut _XDisplay,
}
#[allow(dead_code)]
impl Display {
/// Open a connection to the X server
///