diff --git a/src/main.rs b/src/main.rs index 5aca772..75e0836 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,31 +55,39 @@ async fn run_sender( ) { let mut backoff = Backoff::default(); let relay = relay::Relay::from(chan); - loop { + 'outer: loop { if relay.closed().await { break; } - let req = client - .post(url) - .header(reqwest::header::ACCEPT, "application/json") - .header(reqwest::header::CONTENT_LENGTH, "0"); - debug!("Checking HTTP connection"); - tokio::select! { - _ = notify.notified() => break, - r = req.send() => { - if let Err(e) = r { - error!("Error in HTTP request: {}", e); - tokio::select! { - _ = notify.notified() => break, - _ = backoff.sleep() => (), + let Some(Some((stream, handle))) = (tokio::select! { + s = relay.new_stream(1) => Some(s), + _ = notify.notified() => None, + }) else { + break; + }; + 'inner: loop { + let req = client + .post(url) + .header(reqwest::header::ACCEPT, "application/json") + .header(reqwest::header::CONTENT_LENGTH, "0"); + debug!("Checking HTTP connection"); + tokio::select! { + _ = notify.notified() => break 'outer, + r = req.send() => { + if let Err(e) = r { + error!("Error in HTTP request: {}", e); + tokio::select! { + _ = notify.notified() => break 'outer, + _ = backoff.sleep() => (), + } + continue; } - continue; + backoff.reset(); + debug!("HTTP connection successful"); + break 'inner; } - backoff.reset(); - debug!("HTTP connection successful"); } } - let (stream, handle) = relay.new_stream(); let stream = stream .map(|v| { trace!("{:?}", v); @@ -98,11 +106,13 @@ async fn run_sender( if let Err(e) = handle.await { error!("Error in sender: {}", e); } + backoff.sleep().await; } else { - break; + debug!("Finished HTTP POST request"); + backoff.reset(); } } - info!("Channel closed, stopping HTTP sender"); + info!("Stopping HTTP sender"); } async fn run_subscriber( diff --git a/src/relay.rs b/src/relay.rs index 77ccfe6..4540cfa 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -1,10 +1,11 @@ use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::debug; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{debug, warn}; pub struct Relay { channel: Arc>>, @@ -23,30 +24,43 @@ impl Relay { chan.is_closed() } - pub fn new_stream(&self) -> (UnboundedReceiverStream, JoinHandle<()>) { - let (tx, rx) = mpsc::unbounded_channel(); - let h = tokio::spawn({ - let chan = self.channel.clone(); - async move { - let mut chan = chan.lock().await; - loop { - tokio::select! { - it = chan.recv() => { - if let Some(it) = it { - if tx.send(it).is_err() { + pub async fn new_stream( + &self, + buffer: usize, + ) -> Option<(ReceiverStream, JoinHandle<()>)> { + let chan = self.channel.clone(); + let mut chan = chan.lock().await; + if let Some(it) = chan.recv().await { + let (tx, rx) = mpsc::channel(buffer); + let h = tokio::spawn({ + let chan = self.channel.clone(); + async move { + let mut chan = chan.lock().await; + if tx.send(it).await.is_err() { + warn!("Downstream channel closed unexpectedly"); + return; + } + let dur = Duration::from_millis(100); + loop { + tokio::select! { + it = chan.recv() => { + let Some(it) = it else { + break; + }; + if tx.send(it).await.is_err() { + debug!("Downstream channel closed"); break; } - } else { - debug!("Upstream channel closed"); - break } + _ = tokio::time::sleep(dur) => break } - _ = tx.closed() => break, - }; + } } - } - }); + }); - (UnboundedReceiverStream::new(rx), h) + Some((ReceiverStream::new(rx), h)) + } else { + None + } } }