Moved Session inside Arc<>

main
Mauro D 2022-07-05 18:10:32 +00:00
parent 931523bd20
commit adf6176a8f
8 changed files with 38 additions and 26 deletions

View File

@ -21,6 +21,7 @@ async-stream = "0.3"
base64 = "0.13" base64 = "0.13"
tokio-tungstenite = { version = "0.17", features = ["rustls-tls-webpki-roots"], optional = true} tokio-tungstenite = { version = "0.17", features = ["rustls-tls-webpki-roots"], optional = true}
tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true } tokio = { version = "1.16", default-features = false, features = ["io-util"], optional = true }
parking_lot = "0.12.0"
[features] [features]
default = [] default = []
@ -28,4 +29,4 @@ websockets = ["tokio", "tokio-tungstenite"]
debug = [] debug = []
[profile.bench] [profile.bench]
debug = true debug = true

View File

@ -1,5 +1,8 @@
use std::{ use std::{
sync::atomic::{AtomicBool, Ordering}, sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration, time::Duration,
}; };
@ -20,7 +23,7 @@ use crate::{
}; };
const DEFAULT_TIMEOUT_MS: u64 = 10 * 1000; const DEFAULT_TIMEOUT_MS: u64 = 10 * 1000;
static USER_AGENT: &str = concat!("stalwart-jmap/", env!("CARGO_PKG_VERSION")); static USER_AGENT: &str = concat!("jmap-client/", env!("CARGO_PKG_VERSION"));
pub enum Credentials { pub enum Credentials {
Basic(String), Basic(String),
@ -28,17 +31,21 @@ pub enum Credentials {
} }
pub struct Client { pub struct Client {
session: Session, session: parking_lot::Mutex<Arc<Session>>,
session_url: String, session_url: String,
api_url: String,
session_updated: AtomicBool, session_updated: AtomicBool,
#[cfg(feature = "websockets")]
pub(crate) authorization: String,
upload_url: Vec<URLPart<blob::URLParameter>>, upload_url: Vec<URLPart<blob::URLParameter>>,
download_url: Vec<URLPart<blob::URLParameter>>, download_url: Vec<URLPart<blob::URLParameter>>,
event_source_url: Vec<URLPart<event_source::URLParameter>>, event_source_url: Vec<URLPart<event_source::URLParameter>>,
timeout: u64,
headers: header::HeaderMap, headers: header::HeaderMap,
default_account_id: String, default_account_id: String,
timeout: u64,
#[cfg(feature = "websockets")]
pub(crate) authorization: String,
#[cfg(feature = "websockets")] #[cfg(feature = "websockets")]
pub(crate) ws: tokio::sync::Mutex<Option<crate::client_ws::WsStream>>, pub(crate) ws: tokio::sync::Mutex<Option<crate::client_ws::WsStream>>,
} }
@ -89,7 +96,8 @@ impl Client {
download_url: URLPart::parse(session.download_url())?, download_url: URLPart::parse(session.download_url())?,
upload_url: URLPart::parse(session.upload_url())?, upload_url: URLPart::parse(session.upload_url())?,
event_source_url: URLPart::parse(session.event_source_url())?, event_source_url: URLPart::parse(session.event_source_url())?,
session, api_url: session.api_url().to_string(),
session: parking_lot::Mutex::new(Arc::new(session)),
session_url: url.to_string(), session_url: url.to_string(),
session_updated: true.into(), session_updated: true.into(),
#[cfg(feature = "websockets")] #[cfg(feature = "websockets")]
@ -111,8 +119,8 @@ impl Client {
self.timeout self.timeout
} }
pub fn session(&self) -> &Session { pub fn session(&self) -> Arc<Session> {
&self.session self.session.lock().clone()
} }
pub fn session_url(&self) -> &str { pub fn session_url(&self) -> &str {
@ -136,7 +144,7 @@ impl Client {
.timeout(Duration::from_millis(self.timeout)) .timeout(Duration::from_millis(self.timeout))
.default_headers(self.headers.clone()) .default_headers(self.headers.clone())
.build()? .build()?
.post(self.session.api_url()) .post(&self.api_url)
.body(serde_json::to_string(&request)?) .body(serde_json::to_string(&request)?)
.send() .send()
.await?, .await?,
@ -146,14 +154,14 @@ impl Client {
.await?, .await?,
)?; )?;
if response.session_state() != self.session.state() { if response.session_state() != self.session.lock().state() {
self.session_updated.store(false, Ordering::Relaxed); self.session_updated.store(false, Ordering::Relaxed);
} }
Ok(response) Ok(response)
} }
pub async fn refresh_session(&mut self) -> crate::Result<()> { pub async fn refresh_session(&self) -> crate::Result<()> {
let session: Session = serde_json::from_slice( let session: Session = serde_json::from_slice(
&Client::handle_error( &Client::handle_error(
reqwest::Client::builder() reqwest::Client::builder()
@ -168,10 +176,7 @@ impl Client {
.bytes() .bytes()
.await?, .await?,
)?; )?;
self.download_url = URLPart::parse(session.download_url())?; *self.session.lock() = Arc::new(session);
self.upload_url = URLPart::parse(session.upload_url())?;
self.event_source_url = URLPart::parse(session.event_source_url())?;
self.session = session;
self.session_updated.store(true, Ordering::Relaxed); self.session_updated.store(true, Ordering::Relaxed);
Ok(()) Ok(())
} }

View File

@ -154,7 +154,8 @@ impl Client {
pub async fn connect_ws( pub async fn connect_ws(
&self, &self,
) -> crate::Result<Pin<Box<impl Stream<Item = crate::Result<WebSocketMessage>>>>> { ) -> crate::Result<Pin<Box<impl Stream<Item = crate::Result<WebSocketMessage>>>>> {
let capabilities = self.session().websocket_capabilities().ok_or_else(|| { let session = self.session();
let capabilities = session.websocket_capabilities().ok_or_else(|| {
crate::Error::Internal( crate::Error::Internal(
"JMAP server does not advertise any websocket capabilities.".to_string(), "JMAP server does not advertise any websocket capabilities.".to_string(),
) )

View File

@ -63,7 +63,7 @@ pub struct FilterOperator<T> {
conditions: Vec<Filter<T>>, conditions: Vec<Filter<T>>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub enum Operator { pub enum Operator {
#[serde(rename = "AND")] #[serde(rename = "AND")]
And, And,

View File

@ -35,7 +35,7 @@ pub struct Request<'x> {
#[serde(skip)] #[serde(skip)]
client: &'x Client, client: &'x Client,
#[serde(skip)] #[serde(skip)]
default_account_id: String, account_id: String,
pub using: Vec<URI>, pub using: Vec<URI>,
@ -421,11 +421,16 @@ impl<'x> Request<'x> {
using: vec![URI::Core, URI::Mail], using: vec![URI::Core, URI::Mail],
method_calls: vec![], method_calls: vec![],
created_ids: None, created_ids: None,
default_account_id: client.default_account_id().to_string(), account_id: client.default_account_id().to_string(),
client, client,
} }
} }
pub fn account_id(mut self, account_id: impl Into<String>) -> Self {
self.account_id = account_id.into();
self
}
pub async fn send(self) -> crate::Result<Response<TaggedMethodResponse>> { pub async fn send(self) -> crate::Result<Response<TaggedMethodResponse>> {
self.client.send(&self).await self.client.send(&self).await
} }
@ -452,7 +457,7 @@ impl<'x> Request<'x> {
pub fn params(&self, method: Method) -> RequestParams { pub fn params(&self, method: Method) -> RequestParams {
RequestParams { RequestParams {
account_id: self.default_account_id.clone(), account_id: self.account_id.clone(),
method, method,
call_id: self.method_calls.len(), call_id: self.method_calls.len(),
} }

View File

@ -8,7 +8,7 @@ use super::Changes;
impl Client { impl Client {
pub async fn event_source( pub async fn event_source(
&mut self, &self,
mut types: Option<impl IntoIterator<Item = TypeState>>, mut types: Option<impl IntoIterator<Item = TypeState>>,
close_after_state: bool, close_after_state: bool,
ping: Option<u32>, ping: Option<u32>,

View File

@ -13,8 +13,8 @@ impl Mailbox<Get> {
self.id.unwrap() self.id.unwrap()
} }
pub fn name(&self) -> &str { pub fn name(&self) -> Option<&str> {
self.name.as_ref().unwrap() self.name.as_deref()
} }
pub fn parent_id(&self) -> Option<&str> { pub fn parent_id(&self) -> Option<&str> {

View File

@ -96,7 +96,7 @@ pub struct Mailbox<State = Get> {
acl_patch: Option<HashMap<String, Vec<ACL>>>, acl_patch: Option<HashMap<String, Vec<ACL>>>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Role { pub enum Role {
#[serde(rename = "archive", alias = "ARCHIVE")] #[serde(rename = "archive", alias = "ARCHIVE")]