Close HTTP stream after each batch of messages

When Victoria Logs shuts down, it waits several seconds for clients to
disconnect themselves, then forcibly closes the connections to those
that do not.  With the long-running streaming connection, there is no
way for the server to indicate that the client should disconnect itself.
This can cause data loss as messages that are sent during this window
are not consumed by Victoria Logs.

To improve the resilience of the sender, it now uses multiple short
streaming HTTP requests, instead of one long-running request.  The relay
forwards messages from the MQTT subscriber to the sender's stream, then
closes the stream as soon as there is a 100ms delay between messages.
When the stream closes, the sender completes the HTTP request and
proceeds to the next iteration of the loop.  The relay will not create a
new stream until a new message arrives from the MQTT subscriber.

With this approach, when Victoria Logs starts shutting down, there is a
significantly reduced opportunity for data loss.  It is still possible
that messages sent in the request could be lost, if Victoria Logs
accepts the preflight request but not the actual stream.  Addressing
this possibility would be quite a bit more complex, so hopefully it does
not become too much of a problem.
master
Dustin 2025-05-01 07:05:59 -05:00
parent ec364b8fd3
commit 3de1b53acd
2 changed files with 65 additions and 41 deletions

View File

@ -55,31 +55,39 @@ async fn run_sender(
) {
let mut backoff = Backoff::default();
let relay = relay::Relay::from(chan);
loop {
'outer: loop {
if relay.closed().await {
break;
}
let Some(Some((stream, handle))) = (tokio::select! {
s = relay.new_stream(1) => Some(s),
_ = notify.notified() => None,
}) else {
break;
};
'inner: loop {
let req = client
.post(url)
.header(reqwest::header::ACCEPT, "application/json")
.header(reqwest::header::CONTENT_LENGTH, "0");
debug!("Checking HTTP connection");
tokio::select! {
_ = notify.notified() => break,
_ = notify.notified() => break 'outer,
r = req.send() => {
if let Err(e) = r {
error!("Error in HTTP request: {}", e);
tokio::select! {
_ = notify.notified() => break,
_ = notify.notified() => break 'outer,
_ = backoff.sleep() => (),
}
continue;
}
backoff.reset();
debug!("HTTP connection successful");
break 'inner;
}
}
}
let (stream, handle) = relay.new_stream();
let stream = stream
.map(|v| {
trace!("{:?}", v);
@ -98,11 +106,13 @@ async fn run_sender(
if let Err(e) = handle.await {
error!("Error in sender: {}", e);
}
backoff.sleep().await;
} else {
break;
debug!("Finished HTTP POST request");
backoff.reset();
}
}
info!("Channel closed, stopping HTTP sender");
info!("Stopping HTTP sender");
}
async fn run_subscriber(

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};
pub struct Relay<T> {
channel: Arc<Mutex<UnboundedReceiver<T>>>,
@ -23,30 +24,43 @@ impl<T: Send + 'static> Relay<T> {
chan.is_closed()
}
pub fn new_stream(&self) -> (UnboundedReceiverStream<T>, JoinHandle<()>) {
let (tx, rx) = mpsc::unbounded_channel();
pub async fn new_stream(
&self,
buffer: usize,
) -> Option<(ReceiverStream<T>, JoinHandle<()>)> {
let chan = self.channel.clone();
let mut chan = chan.lock().await;
if let Some(it) = chan.recv().await {
let (tx, rx) = mpsc::channel(buffer);
let h = tokio::spawn({
let chan = self.channel.clone();
async move {
let mut chan = chan.lock().await;
if tx.send(it).await.is_err() {
warn!("Downstream channel closed unexpectedly");
return;
}
let dur = Duration::from_millis(100);
loop {
tokio::select! {
it = chan.recv() => {
if let Some(it) = it {
if tx.send(it).is_err() {
let Some(it) = it else {
break;
};
if tx.send(it).await.is_err() {
debug!("Downstream channel closed");
break;
}
} else {
debug!("Upstream channel closed");
break
}
_ = tokio::time::sleep(dur) => break
}
_ = tx.closed() => break,
};
}
}
});
(UnboundedReceiverStream::new(rx), h)
Some((ReceiverStream::new(rx), h))
} else {
None
}
}
}