WebSockets implementation.

main
Mauro D 2022-05-17 14:41:16 +00:00
parent 53abec1222
commit 93802339c2
12 changed files with 487 additions and 21 deletions

View File

@ -17,10 +17,14 @@ serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"]}
reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"]}
futures-util = "0.3"
async-stream = "0.3.3"
async-stream = "0.3"
base64 = "0.13"
tokio-tungstenite = { version = "0.17", features = ["rustls-tls-webpki-roots"], optional = true}
tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true }
[features]
default = []
websockets = ["tokio", "tokio-tungstenite"]
debug = []
[profile.bench]

View File

@ -28,6 +28,8 @@ pub struct Client {
timeout: u64,
headers: header::HeaderMap,
default_account_id: String,
#[cfg(feature = "websockets")]
ws: Option<crate::client_ws::WsStream>,
}
impl Client {
@ -77,6 +79,8 @@ impl Client {
timeout: DEFAULT_TIMEOUT_MS,
headers,
default_account_id,
#[cfg(feature = "websockets")]
ws: None,
})
}
@ -184,6 +188,16 @@ impl Client {
Err(Error::Server(format!("{}", response.status())))
}
}
#[cfg(feature = "websockets")]
pub fn set_ws_stream(&mut self, ws: crate::client_ws::WsStream) {
self.ws = Some(ws);
}
#[cfg(feature = "websockets")]
pub fn ws_stream(&mut self) -> Option<&mut crate::client_ws::WsStream> {
self.ws.as_mut()
}
}
#[cfg(test)]

285
src/client_ws.rs Normal file
View File

@ -0,0 +1,285 @@
use std::{collections::HashMap, pin::Pin};
use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_tungstenite::{
tungstenite::{client::IntoClientRequest, Message},
MaybeTlsStream, WebSocketStream,
};
use crate::{
client::Client,
core::{
error::{ProblemDetails, ProblemType},
request::{Arguments, Request},
response::{MethodResponse, Response},
},
event_source::Changes,
Method, StateChangeType, TypeState, URI,
};
#[derive(Debug, Serialize)]
struct WebSocketRequest {
#[serde(rename = "@type")]
pub _type: WebSocketRequestType,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
using: Vec<URI>,
#[serde(rename = "methodCalls")]
method_calls: Vec<(Method, Arguments, String)>,
#[serde(rename = "createdIds")]
#[serde(skip_serializing_if = "Option::is_none")]
created_ids: Option<HashMap<String, String>>,
}
#[derive(Debug, Deserialize)]
pub struct WebSocketResponse {
#[serde(rename = "@type")]
_type: WebSocketResponseType,
#[serde(rename = "requestId")]
request_id: Option<String>,
#[serde(rename = "methodResponses")]
method_responses: Vec<MethodResponse>,
#[serde(rename = "createdIds")]
created_ids: Option<HashMap<String, String>>,
#[serde(rename = "sessionState")]
session_state: String,
}
#[derive(Debug, Serialize, Deserialize)]
enum WebSocketResponseType {
Response,
}
#[derive(Debug, Serialize)]
struct WebSocketPushEnable {
#[serde(rename = "@type")]
_type: WebSocketPushEnableType,
#[serde(rename = "dataTypes")]
data_types: Option<Vec<StateChangeType>>,
#[serde(rename = "pushState")]
#[serde(skip_serializing_if = "Option::is_none")]
push_state: Option<String>,
}
#[derive(Debug, Serialize)]
struct WebSocketPushDisable {
#[serde(rename = "@type")]
_type: WebSocketPushDisableType,
}
#[derive(Debug, Serialize)]
enum WebSocketRequestType {
Request,
}
#[derive(Debug, Serialize)]
enum WebSocketPushEnableType {
WebSocketPushEnable,
}
#[derive(Debug, Serialize)]
enum WebSocketPushDisableType {
WebSocketPushDisable,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum WebSocketStateChangeType {
StateChange,
}
#[derive(Deserialize, Debug)]
pub struct WebSocketStateChange {
#[serde(rename = "@type")]
pub type_: WebSocketStateChangeType,
pub changed: HashMap<String, HashMap<TypeState, String>>,
#[serde(rename = "pushState")]
push_state: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct WebSocketProblem {
#[serde(rename = "@type")]
pub type_: WebSocketProblemType,
#[serde(rename = "requestId")]
pub request_id: Option<String>,
#[serde(rename = "type")]
p_type: ProblemType,
status: Option<u32>,
title: Option<String>,
detail: Option<String>,
limit: Option<usize>,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum WebSocketProblemType {
Problem,
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum WebSocketMessage_ {
Response(WebSocketResponse),
StateChange(WebSocketStateChange),
Error(WebSocketProblem),
}
#[derive(Debug)]
pub enum WebSocketMessage {
Response(Response<MethodResponse>),
StateChange(Changes),
}
pub struct WsStream {
tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
req_id: usize,
}
impl Client {
pub async fn connect_ws(
&mut self,
) -> crate::Result<Pin<Box<impl Stream<Item = crate::Result<WebSocketMessage>>>>> {
let capabilities = self.session().websocket_capabilities().ok_or_else(|| {
crate::Error::Internal(
"JMAP server does not advertise any websocket capabilities.".to_string(),
)
})?;
let mut request = capabilities.url().into_client_request()?;
request
.headers_mut()
.insert("Authorization", "Bearer 123".parse().unwrap()); //TODO implement
let (stream, _) = tokio_tungstenite::connect_async(request).await?;
let (tx, mut rx) = stream.split();
self.set_ws_stream(WsStream { tx, req_id: 0 });
Ok(Box::pin(async_stream::stream! {
while let Some(message) = rx.next().await {
match message {
Ok(message) if message.is_text() => {
match serde_json::from_slice::<WebSocketMessage_>(&message.into_data()) {
Ok(message) => match message {
WebSocketMessage_::Response(response) => {
yield Ok(WebSocketMessage::Response(Response::new(
response.method_responses,
response.created_ids,
response.session_state,
response.request_id,
)))
}
WebSocketMessage_::StateChange(changes) => {
yield Ok(WebSocketMessage::StateChange(Changes::new(
changes.push_state,
changes.changed,
)))
}
WebSocketMessage_::Error(err) => yield Err(ProblemDetails::from(err).into()),
},
Err(err) => yield Err(err.into()),
}
}
Ok(_) => (),
Err(err) => yield Err(err.into()),
}
}
}))
}
pub async fn send_ws(&mut self, request: Request<'_>) -> crate::Result<String> {
let ws = self
.ws_stream()
.ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?;
// Assing request id
let request_id = ws.req_id.to_string();
ws.req_id += 1;
ws.tx
.send(Message::text(
serde_json::to_string(&WebSocketRequest {
_type: WebSocketRequestType::Request,
id: request_id.clone().into(),
using: request.using,
method_calls: request.method_calls,
created_ids: request.created_ids,
})
.unwrap_or_default(),
))
.await?;
Ok(request_id)
}
pub async fn enable_push_ws(
&mut self,
data_types: Option<impl IntoIterator<Item = StateChangeType>>,
push_state: Option<impl Into<String>>,
) -> crate::Result<()> {
self.ws_stream()
.ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?
.tx
.send(Message::text(
serde_json::to_string(&WebSocketPushEnable {
_type: WebSocketPushEnableType::WebSocketPushEnable,
data_types: data_types.map(|it| it.into_iter().collect()),
push_state: push_state.map(|it| it.into()),
})
.unwrap_or_default(),
))
.await
.map_err(|err| err.into())
}
pub async fn disable_push_ws(&mut self) -> crate::Result<()> {
self.ws_stream()
.ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?
.tx
.send(Message::text(
serde_json::to_string(&WebSocketPushDisable {
_type: WebSocketPushDisableType::WebSocketPushDisable,
})
.unwrap_or_default(),
))
.await
.map_err(|err| err.into())
}
pub async fn ws_ping(&mut self) -> crate::Result<()> {
self.ws_stream()
.ok_or_else(|| crate::Error::Internal("Websocket stream not set.".to_string()))?
.tx
.send(Message::Ping(vec![]))
.await
.map_err(|err| err.into())
}
}
impl From<WebSocketProblem> for ProblemDetails {
fn from(problem: WebSocketProblem) -> Self {
ProblemDetails::new(
problem.p_type,
problem.status,
problem.title,
problem.detail,
problem.limit,
problem.request_id,
)
}
}

View File

@ -10,6 +10,7 @@ pub struct ProblemDetails {
title: Option<String>,
detail: Option<String>,
limit: Option<usize>,
request_id: Option<String>,
}
#[derive(Debug, Deserialize)]
@ -75,6 +76,24 @@ pub enum MethodErrorType {
}
impl ProblemDetails {
pub fn new(
p_type: ProblemType,
status: Option<u32>,
title: Option<String>,
detail: Option<String>,
limit: Option<usize>,
request_id: Option<String>,
) -> Self {
ProblemDetails {
p_type,
status,
title,
detail,
limit,
request_id,
}
}
pub fn error(&self) -> &ProblemType {
&self.p_type
}
@ -94,6 +113,10 @@ impl ProblemDetails {
pub fn limit(&self) -> Option<usize> {
self.limit
}
pub fn request_id(&self) -> Option<&str> {
self.request_id.as_deref()
}
}
impl MethodError {

View File

@ -33,14 +33,14 @@ pub struct Request<'x> {
#[serde(skip)]
default_account_id: String,
using: Vec<URI>,
pub using: Vec<URI>,
#[serde(rename = "methodCalls")]
method_calls: Vec<(Method, Arguments, String)>,
pub method_calls: Vec<(Method, Arguments, String)>,
#[serde(rename = "createdIds")]
#[serde(skip_serializing_if = "Option::is_none")]
created_ids: Option<HashMap<String, String>>,
pub created_ids: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize)]
@ -416,6 +416,11 @@ impl<'x> Request<'x> {
Option::take(&mut self.client).unwrap().send(&self).await
}
#[cfg(feature = "websockets")]
pub async fn send_ws(mut self) -> crate::Result<String> {
Option::take(&mut self.client).unwrap().send_ws(self).await
}
pub async fn send_single<T>(mut self) -> crate::Result<T>
where
T: DeserializeOwned,

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use crate::{
blob::copy::CopyBlobResponse,
@ -19,7 +19,7 @@ use super::{
query::QueryResponse, query_changes::QueryChangesResponse, set::SetResponse,
};
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct Response<T> {
#[serde(rename = "methodResponses")]
method_responses: Vec<T>,
@ -29,9 +29,25 @@ pub struct Response<T> {
#[serde(rename = "sessionState")]
session_state: String,
request_id: Option<String>,
}
impl<T> Response<T> {
pub fn new(
method_responses: Vec<T>,
created_ids: Option<HashMap<String, String>>,
session_state: String,
request_id: Option<String>,
) -> Self {
Response {
method_responses,
created_ids,
session_state,
request_id,
}
}
pub fn method_responses(&self) -> &[T] {
self.method_responses.as_ref()
}
@ -40,6 +56,10 @@ impl<T> Response<T> {
self.method_responses
}
pub fn unwrap_method_response(mut self) -> T {
self.method_responses.pop().unwrap()
}
pub fn created_ids(&self) -> Option<impl Iterator<Item = (&String, &String)>> {
self.created_ids.as_ref().map(|map| map.iter())
}
@ -47,6 +67,10 @@ impl<T> Response<T> {
pub fn session_state(&self) -> &str {
&self.session_state
}
pub fn request_id(&self) -> Option<&str> {
self.request_id.as_deref()
}
}
impl Response<MethodResponse> {

View File

@ -2,7 +2,10 @@ use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use crate::email::{MailCapabilities, SubmissionCapabilities};
use crate::{
email::{MailCapabilities, SubmissionCapabilities},
URI,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
@ -55,6 +58,7 @@ pub enum Capabilities {
Core(CoreCapabilities),
Mail(MailCapabilities),
Submission(SubmissionCapabilities),
WebSocket(WebSocketCapabilities),
Empty(EmptyCapabilities),
Other(serde_json::Value),
}
@ -86,6 +90,14 @@ pub struct CoreCapabilities {
collation_algorithms: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebSocketCapabilities {
#[serde(rename = "url")]
url: String,
#[serde(rename = "supportsPush")]
supports_push: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmptyCapabilities {}
@ -94,8 +106,48 @@ impl Session {
self.capabilities.keys()
}
pub fn capability(&self, capability: &str) -> Option<&Capabilities> {
self.capabilities.get(capability)
pub fn capability(&self, capability: impl AsRef<str>) -> Option<&Capabilities> {
self.capabilities.get(capability.as_ref())
}
pub fn has_capability(&self, capability: impl AsRef<str>) -> bool {
self.capabilities.contains_key(capability.as_ref())
}
pub fn websocket_capabilities(&self) -> Option<&WebSocketCapabilities> {
self.capabilities
.get(URI::WebSocket.as_ref())
.and_then(|v| match v {
Capabilities::WebSocket(capabilities) => Some(capabilities),
_ => None,
})
}
pub fn core_capabilities(&self) -> Option<&CoreCapabilities> {
self.capabilities
.get(URI::Core.as_ref())
.and_then(|v| match v {
Capabilities::Core(capabilities) => Some(capabilities),
_ => None,
})
}
pub fn mail_capabilities(&self) -> Option<&MailCapabilities> {
self.capabilities
.get(URI::Mail.as_ref())
.and_then(|v| match v {
Capabilities::Mail(capabilities) => Some(capabilities),
_ => None,
})
}
pub fn submission_capabilities(&self) -> Option<&SubmissionCapabilities> {
self.capabilities
.get(URI::Submission.as_ref())
.and_then(|v| match v {
Capabilities::Submission(capabilities) => Some(capabilities),
_ => None,
})
}
pub fn accounts(&self) -> impl Iterator<Item = &String> {
@ -191,6 +243,16 @@ impl CoreCapabilities {
}
}
impl WebSocketCapabilities {
pub fn url(&self) -> &str {
&self.url
}
pub fn supports_push(&self) -> bool {
self.supports_push
}
}
pub trait URLParser: Sized {
fn parse(value: &str) -> Option<Self>;
}

View File

@ -31,6 +31,10 @@ pub struct Changes {
}
impl Changes {
pub fn new(id: Option<String>, changes: HashMap<String, HashMap<TypeState, String>>) -> Self {
Self { id, changes }
}
pub fn id(&self) -> Option<&str> {
self.id.as_deref()
}

View File

@ -1,7 +1,6 @@
use std::time::Duration;
use std::{pin::Pin, time::Duration};
use crate::{client::Client, core::session::URLPart, event_source::parser::EventParser, TypeState};
use async_stream::stream;
use futures_util::{Stream, StreamExt};
use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE};
@ -14,7 +13,7 @@ impl Client {
close_after_state: bool,
ping: Option<u32>,
last_event_id: Option<&str>,
) -> crate::Result<impl Stream<Item = crate::Result<Changes>>> {
) -> crate::Result<Pin<Box<impl Stream<Item = crate::Result<Changes>>>>> {
let mut event_source_url = String::with_capacity(self.session().event_source_url().len());
for part in self.event_source_url() {
@ -74,8 +73,7 @@ impl Client {
.bytes_stream();
let mut parser = EventParser::default();
// TODO - use poll_next() to avoid pin_mut() call.
Ok(stream! {
Ok(Box::pin(async_stream::stream! {
loop {
if let Some(changes) = parser.filter_state() {
yield changes;
@ -96,6 +94,6 @@ impl Client {
break;
}
}
})
}))
}
}

View File

@ -17,7 +17,8 @@ pub mod push_subscription;
pub mod thread;
pub mod vacation_response;
pub use futures_util;
#[cfg(feature = "websockets")]
pub mod client_ws;
#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub enum URI {
@ -33,6 +34,22 @@ pub enum URI {
Contacts,
#[serde(rename = "urn:ietf:params:jmap:calendars")]
Calendars,
#[serde(rename = "urn:ietf:params:jmap:websocket")]
WebSocket,
}
impl AsRef<str> for URI {
fn as_ref(&self) -> &str {
match self {
URI::Core => "urn:ietf:params:jmap:core",
URI::Mail => "urn:ietf:params:jmap:mail",
URI::Submission => "urn:ietf:params:jmap:submission",
URI::VacationResponse => "urn:ietf:params:jmap:vacationresponse",
URI::Contacts => "urn:ietf:params:jmap:contacts",
URI::Calendars => "urn:ietf:params:jmap:calendars",
URI::WebSocket => "urn:ietf:params:jmap:websocket",
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
@ -111,12 +128,12 @@ pub enum TypeState {
EmailSubmission,
}
#[derive(Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub enum StateChangeType {
StateChange,
}
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
pub struct StateChange {
#[serde(rename = "@type")]
pub type_: StateChangeType,
@ -139,6 +156,8 @@ pub enum Error {
Server(String),
Method(MethodError),
Set(SetError<String>),
#[cfg(feature = "websockets")]
WebSocket(tokio_tungstenite::tungstenite::error::Error),
}
impl From<reqwest::Error> for Error {
@ -159,6 +178,12 @@ impl From<MethodError> for Error {
}
}
impl From<ProblemDetails> for Error {
fn from(e: ProblemDetails) -> Self {
Error::Problem(e)
}
}
impl From<SetError<String>> for Error {
fn from(e: SetError<String>) -> Self {
Error::Set(e)
@ -171,6 +196,13 @@ impl From<&str> for Error {
}
}
#[cfg(feature = "websockets")]
impl From<tokio_tungstenite::tungstenite::error::Error> for Error {
fn from(e: tokio_tungstenite::tungstenite::error::Error) -> Self {
Error::WebSocket(e)
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -181,6 +213,8 @@ impl Display for Error {
Error::Server(e) => write!(f, "Server error: {}", e),
Error::Method(e) => write!(f, "Method error: {}", e),
Error::Set(e) => write!(f, "Set error: {}", e),
#[cfg(feature = "websockets")]
Error::WebSocket(e) => write!(f, "WebSocket error: {}", e),
}
}
}

View File

@ -6,7 +6,7 @@ use crate::{
response::{PushSubscriptionGetResponse, PushSubscriptionSetResponse},
set::{Create, SetRequest},
},
Method, Set,
Method, Set, TypeState,
};
use super::{Keys, PushSubscription};
@ -52,6 +52,19 @@ impl Client {
.updated(id)
}
pub async fn push_subscription_update_types(
&mut self,
id: &str,
types: Option<impl IntoIterator<Item = TypeState>>,
) -> crate::Result<Option<PushSubscription>> {
let mut request = self.build();
request.set_push_subscription().update(id).types(types);
request
.send_single::<PushSubscriptionSetResponse>()
.await?
.updated(id)
}
pub async fn push_subscription_destroy(&mut self, id: &str) -> crate::Result<()> {
let mut request = self.build();
request.set_push_subscription().destroy([id]);

View File

@ -31,8 +31,8 @@ impl PushSubscription<Set> {
self
}
pub fn types(&mut self, types: Option<impl Iterator<Item = TypeState>>) -> &mut Self {
self.types = types.map(|s| s.collect());
pub fn types(&mut self, types: Option<impl IntoIterator<Item = TypeState>>) -> &mut Self {
self.types = types.map(|s| s.into_iter().collect());
self
}
}