EventSource implementation.
This commit is contained in:
@@ -1,4 +1,11 @@
|
||||
use crate::core::session::URLParser;
|
||||
pub mod parser;
|
||||
pub mod stream;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{core::session::URLParser, TypeState};
|
||||
|
||||
pub enum URLParameter {
|
||||
Types,
|
||||
@@ -16,3 +23,22 @@ impl URLParser for URLParameter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Changes {
|
||||
changes: HashMap<String, HashMap<TypeState, String>>,
|
||||
}
|
||||
|
||||
impl Changes {
|
||||
pub fn account_changes(&mut self, account_id: &str) -> Option<HashMap<TypeState, String>> {
|
||||
self.changes.remove(account_id)
|
||||
}
|
||||
|
||||
pub fn changed_accounts(&self) -> impl Iterator<Item = &String> {
|
||||
self.changes.keys()
|
||||
}
|
||||
|
||||
pub fn into_innter(self) -> HashMap<String, HashMap<TypeState, String>> {
|
||||
self.changes
|
||||
}
|
||||
}
|
||||
|
||||
292
src/event_source/parser.rs
Normal file
292
src/event_source/parser.rs
Normal file
@@ -0,0 +1,292 @@
|
||||
use crate::StateChange;
|
||||
|
||||
use super::Changes;
|
||||
|
||||
const MAX_EVENT_SIZE: usize = 1024 * 1024;
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum EventType {
|
||||
Ping,
|
||||
State,
|
||||
}
|
||||
|
||||
impl Default for EventType {
|
||||
fn default() -> Self {
|
||||
Self::State
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Event {
|
||||
pub event: EventType,
|
||||
pub id: Vec<u8>,
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
enum EventParserState {
|
||||
Init,
|
||||
Comment,
|
||||
Field,
|
||||
Value,
|
||||
}
|
||||
|
||||
impl Default for EventParserState {
|
||||
fn default() -> Self {
|
||||
Self::Init
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct EventParser {
|
||||
state: EventParserState,
|
||||
field: Vec<u8>,
|
||||
value: Vec<u8>,
|
||||
bytes: Option<Vec<u8>>,
|
||||
pos: usize,
|
||||
result: Event,
|
||||
}
|
||||
|
||||
impl EventParser {
|
||||
pub fn push_bytes(&mut self, bytes: Vec<u8>) {
|
||||
self.bytes = Some(bytes);
|
||||
}
|
||||
|
||||
pub fn needs_bytes(&self) -> bool {
|
||||
self.bytes.is_none()
|
||||
}
|
||||
|
||||
pub fn filter_state(&mut self) -> Option<crate::Result<Changes>> {
|
||||
#[allow(clippy::while_let_on_iterator)]
|
||||
while let Some(event) = self.next() {
|
||||
match event {
|
||||
Ok(Event {
|
||||
event: EventType::State,
|
||||
data,
|
||||
..
|
||||
}) => {
|
||||
return match serde_json::from_slice::<StateChange>(&data) {
|
||||
Ok(state_change) => Some(Ok(Changes {
|
||||
changes: state_change.changed,
|
||||
})),
|
||||
Err(err) => Some(Err(err.into())),
|
||||
};
|
||||
}
|
||||
Ok(Event {
|
||||
event: EventType::Ping,
|
||||
..
|
||||
}) => {
|
||||
#[cfg(feature = "debug")]
|
||||
use std::iter::FromIterator;
|
||||
#[cfg(feature = "debug")]
|
||||
return Some(Ok(Changes {
|
||||
changes: std::collections::HashMap::from_iter([(
|
||||
"ping".to_string(),
|
||||
std::collections::HashMap::new(),
|
||||
)]),
|
||||
}));
|
||||
}
|
||||
Err(err) => return Some(Err(err)),
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for EventParser {
|
||||
type Item = crate::Result<Event>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let bytes = self.bytes.as_ref()?;
|
||||
|
||||
for byte in bytes.get(self.pos..)? {
|
||||
self.pos += 1;
|
||||
|
||||
match self.state {
|
||||
EventParserState::Init => match byte {
|
||||
b':' => {
|
||||
self.state = EventParserState::Comment;
|
||||
}
|
||||
b'\r' | b' ' => (),
|
||||
b'\n' => {
|
||||
return Some(Ok(std::mem::take(&mut self.result)));
|
||||
}
|
||||
_ => {
|
||||
self.state = EventParserState::Field;
|
||||
self.field.push(*byte);
|
||||
}
|
||||
},
|
||||
EventParserState::Comment => {
|
||||
if *byte == b'\n' {
|
||||
self.state = EventParserState::Init;
|
||||
}
|
||||
}
|
||||
EventParserState::Field => match byte {
|
||||
b'\r' => (),
|
||||
b'\n' => {
|
||||
self.state = EventParserState::Init;
|
||||
self.field.clear();
|
||||
}
|
||||
b':' => {
|
||||
self.state = EventParserState::Value;
|
||||
}
|
||||
_ => {
|
||||
if self.field.len() >= MAX_EVENT_SIZE {
|
||||
return Some(Err(crate::Error::Internal(
|
||||
"EventSource response is too long.".to_string(),
|
||||
)));
|
||||
}
|
||||
|
||||
self.field.push(*byte);
|
||||
}
|
||||
},
|
||||
EventParserState::Value => match byte {
|
||||
b'\r' => (),
|
||||
b' ' if self.value.is_empty() => (),
|
||||
b'\n' => {
|
||||
self.state = EventParserState::Init;
|
||||
match &self.field[..] {
|
||||
b"id" => {
|
||||
self.result.id.extend_from_slice(&self.value);
|
||||
}
|
||||
b"data" => {
|
||||
self.result.data.extend_from_slice(&self.value);
|
||||
}
|
||||
b"event" => {
|
||||
if self.value == b"ping" {
|
||||
self.result.event = EventType::Ping;
|
||||
} else {
|
||||
self.result.event = EventType::State;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
//ignore
|
||||
}
|
||||
}
|
||||
self.field.clear();
|
||||
self.value.clear();
|
||||
}
|
||||
_ => {
|
||||
if (self.field.len() + self.value.len()) >= MAX_EVENT_SIZE {
|
||||
return Some(Err(crate::Error::Internal(
|
||||
"EventSource response is too long.".to_string(),
|
||||
)));
|
||||
}
|
||||
|
||||
self.value.push(*byte);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
self.bytes = None;
|
||||
self.pos = 0;
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::{Event, EventType};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct EventString {
|
||||
event: EventType,
|
||||
id: String,
|
||||
data: String,
|
||||
}
|
||||
|
||||
impl From<Event> for EventString {
|
||||
fn from(event: Event) -> Self {
|
||||
Self {
|
||||
event: event.event,
|
||||
id: String::from_utf8(event.id).unwrap(),
|
||||
data: String::from_utf8(event.data).unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse() {
|
||||
let mut parser = super::EventParser::default();
|
||||
let mut results = Vec::new();
|
||||
|
||||
for frame in [
|
||||
Vec::from("event: state\nid: 0\ndata: test\n\n"),
|
||||
Vec::from("event: ping\nid:123\ndata: ping pa"),
|
||||
Vec::from("yload"),
|
||||
Vec::from("\n\n"),
|
||||
Vec::from(":comment\n\n"),
|
||||
Vec::from("data: YHOO\n"),
|
||||
Vec::from("data: +2\n"),
|
||||
Vec::from("data: 10\n\n"),
|
||||
Vec::from(": test stream\n"),
|
||||
Vec::from("data: first event\n"),
|
||||
Vec::from("id: 1\n\n"),
|
||||
Vec::from("data:second event\n"),
|
||||
Vec::from("id\n\n"),
|
||||
Vec::from("data: third event\n\n"),
|
||||
Vec::from("data:hello\n\ndata: world\n\n"),
|
||||
] {
|
||||
parser.push_bytes(frame);
|
||||
|
||||
#[allow(clippy::while_let_on_iterator)]
|
||||
while let Some(event) = parser.next() {
|
||||
results.push(EventString::from(event.unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
results,
|
||||
vec![
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "0".to_string(),
|
||||
data: "test".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::Ping,
|
||||
id: "123".to_string(),
|
||||
data: "ping payload".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "YHOO+210".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "1".to_string(),
|
||||
data: "first event".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "second event".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "third event".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "hello".to_string()
|
||||
},
|
||||
EventString {
|
||||
event: EventType::State,
|
||||
id: "".to_string(),
|
||||
data: "world".to_string()
|
||||
}
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
101
src/event_source/stream.rs
Normal file
101
src/event_source/stream.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{client::Client, core::session::URLPart, event_source::parser::EventParser, TypeState};
|
||||
use async_stream::stream;
|
||||
use futures_util::{Stream, StreamExt};
|
||||
use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE};
|
||||
|
||||
use super::Changes;
|
||||
|
||||
impl Client {
|
||||
pub async fn event_source(
|
||||
&mut self,
|
||||
mut types: Option<impl IntoIterator<Item = TypeState>>,
|
||||
close_after_state: bool,
|
||||
ping: Option<u32>,
|
||||
last_event_id: Option<&str>,
|
||||
) -> crate::Result<impl Stream<Item = crate::Result<Changes>>> {
|
||||
let mut event_source_url = String::with_capacity(self.session().event_source_url().len());
|
||||
|
||||
for part in self.event_source_url() {
|
||||
match part {
|
||||
URLPart::Value(value) => {
|
||||
event_source_url.push_str(value);
|
||||
}
|
||||
URLPart::Parameter(param) => match param {
|
||||
super::URLParameter::Types => {
|
||||
if let Some(types) = Option::take(&mut types) {
|
||||
event_source_url.push_str(
|
||||
&types
|
||||
.into_iter()
|
||||
.map(|state| state.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
);
|
||||
} else {
|
||||
event_source_url.push('*');
|
||||
}
|
||||
}
|
||||
super::URLParameter::CloseAfter => {
|
||||
event_source_url.push_str(if close_after_state { "state" } else { "no" });
|
||||
}
|
||||
super::URLParameter::Ping => {
|
||||
if let Some(ping) = ping {
|
||||
event_source_url.push_str(&ping.to_string());
|
||||
} else {
|
||||
event_source_url.push('0');
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Add headers
|
||||
let mut headers = self.headers().clone();
|
||||
headers.remove(CONTENT_TYPE);
|
||||
headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream"));
|
||||
if let Some(last_event_id) = last_event_id {
|
||||
headers.insert(
|
||||
"Last-Event-ID",
|
||||
HeaderValue::from_str(last_event_id).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let mut stream = Client::handle_error(
|
||||
reqwest::Client::builder()
|
||||
.timeout(Duration::from_millis(self.timeout()))
|
||||
.default_headers(headers)
|
||||
.build()?
|
||||
.get(event_source_url)
|
||||
.send()
|
||||
.await?,
|
||||
)
|
||||
.await?
|
||||
.bytes_stream();
|
||||
let mut parser = EventParser::default();
|
||||
|
||||
// TODO - use poll_next() to avoid pin_mut() call.
|
||||
Ok(stream! {
|
||||
loop {
|
||||
if let Some(changes) = parser.filter_state() {
|
||||
yield changes;
|
||||
continue;
|
||||
}
|
||||
if let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
parser.push_bytes(bytes.to_vec());
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
yield Err(err.into());
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user