WebSocket support for self-signed certificates

main
Mauro D 2023-05-24 17:39:54 +00:00
parent 5c7fd055ec
commit 3cba63c224
2 changed files with 51 additions and 13 deletions

View File

@ -14,10 +14,11 @@ resolver = "2"
[dependencies] [dependencies]
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"]} reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"]}
tokio-tungstenite = { version = "0.18", features = ["rustls-tls-webpki-roots"], optional = true} tokio-tungstenite = { version = "0.19", features = ["rustls-tls-webpki-roots"], optional = true}
tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true } tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true }
futures-util = { version = "0.3", optional = true} futures-util = { version = "0.3", optional = true}
async-stream = { version = "0.3", optional = true} async-stream = { version = "0.3", optional = true}
rustls = { version = "0.21.0", features = ["dangerous_configuration"], optional = true }
serde = { version = "1.0", features = ["derive"]} serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0" serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"]} chrono = { version = "0.4", features = ["serde"]}
@ -29,7 +30,7 @@ maybe-async = "0.2"
[features] [features]
default = ["async"] default = ["async"]
async = ["futures-util", "async-stream", "reqwest/stream"] async = ["futures-util", "async-stream", "reqwest/stream"]
websockets = ["tokio", "tokio-tungstenite"] websockets = ["tokio", "tokio-tungstenite", "rustls"]
blocking = ["reqwest/blocking", "maybe-async/is_sync"] blocking = ["reqwest/blocking", "maybe-async/is_sync"]
debug = [] debug = []

View File

@ -9,15 +9,19 @@
* except according to those terms. * except according to those terms.
*/ */
use std::pin::Pin; use std::{pin::Pin, sync::Arc};
use ahash::AHashMap; use ahash::AHashMap;
use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt}; use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt};
use rustls::{
client::{ServerCertVerified, ServerCertVerifier},
Certificate, ClientConfig, ServerName,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_tungstenite::{ use tokio_tungstenite::{
tungstenite::{client::IntoClientRequest, Message}, tungstenite::{client::IntoClientRequest, Message},
MaybeTlsStream, WebSocketStream, Connector, MaybeTlsStream, WebSocketStream,
}; };
use crate::{ use crate::{
@ -123,9 +127,9 @@ pub struct WebSocketStateChange {
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct WebSocketProblem { pub struct WebSocketError {
#[serde(rename = "@type")] #[serde(rename = "@type")]
pub type_: WebSocketProblemType, pub type_: WebSocketErrorType,
#[serde(rename = "requestId")] #[serde(rename = "requestId")]
pub request_id: Option<String>, pub request_id: Option<String>,
@ -139,8 +143,8 @@ pub struct WebSocketProblem {
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub enum WebSocketProblemType { pub enum WebSocketErrorType {
Problem, RequestError,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -148,7 +152,7 @@ pub enum WebSocketProblemType {
enum WebSocketMessage_ { enum WebSocketMessage_ {
Response(WebSocketResponse), Response(WebSocketResponse),
StateChange(WebSocketStateChange), StateChange(WebSocketStateChange),
Error(WebSocketProblem), Error(WebSocketError),
} }
#[derive(Debug)] #[derive(Debug)]
@ -162,6 +166,23 @@ pub struct WsStream {
req_id: usize, req_id: usize,
} }
#[doc(hidden)]
struct DummyVerifier;
impl ServerCertVerifier for DummyVerifier {
fn verify_server_cert(
&self,
_e: &Certificate,
_i: &[Certificate],
_sn: &ServerName,
_sc: &mut dyn Iterator<Item = &[u8]>,
_o: &[u8],
_n: std::time::SystemTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
}
impl Client { impl Client {
pub async fn connect_ws( pub async fn connect_ws(
&self, &self,
@ -178,7 +199,23 @@ impl Client {
.headers_mut() .headers_mut()
.insert("Authorization", self.authorization.parse().unwrap()); .insert("Authorization", self.authorization.parse().unwrap());
let (stream, _) = tokio_tungstenite::connect_async(request).await?; let (stream, _) = if self.accept_invalid_certs & capabilities.url().starts_with("wss") {
tokio_tungstenite::connect_async_tls_with_config(
request,
None,
false,
Connector::Rustls(Arc::new(
ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(DummyVerifier {}))
.with_no_client_auth(),
))
.into(),
)
.await?
} else {
tokio_tungstenite::connect_async(request).await?
};
let (tx, mut rx) = stream.split(); let (tx, mut rx) = stream.split();
*self.ws.lock().await = WsStream { tx, req_id: 0 }.into(); *self.ws.lock().await = WsStream { tx, req_id: 0 }.into();
@ -221,7 +258,7 @@ impl Client {
.as_mut() .as_mut()
.ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?; .ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?;
// Assing request id // Assign request id
let request_id = ws.req_id.to_string(); let request_id = ws.req_id.to_string();
ws.req_id += 1; ws.req_id += 1;
@ -294,8 +331,8 @@ impl Client {
} }
} }
impl From<WebSocketProblem> for ProblemDetails { impl From<WebSocketError> for ProblemDetails {
fn from(problem: WebSocketProblem) -> Self { fn from(problem: WebSocketError) -> Self {
ProblemDetails::new( ProblemDetails::new(
problem.p_type, problem.p_type,
problem.status, problem.status,