drain: Fix a race condition while waiting for pods
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good

Using a channel to transfer the list of pods from the task that is
evicting the pods to the task that is waiting for them to be deleted
creates a race condition.  It is possible for the watch event stream to
handle the pod delete event _before_ the channel delivers the pod
identifier, so the pod gets added to the wait list _after_ it's already
been deleted.  This results in the `wait_drained` task waiting forever
for the pod to be deleted, even though it is already gone.

To address this, we need to construct the wait list in the `drain_node`
task, as we are evicting pods.  This way, we can be sure that every pod
that was evicted is in the wait list immediately.
This commit is contained in:
2025-10-10 08:32:59 -05:00
parent 48b19604fd
commit 46b26199b0

View File

@@ -1,50 +1,34 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use futures::stream::{BoxStream, StreamExt};
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::Client;
use kube::api::{Api, ListParams, WatchEvent, WatchParams};
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::Mutex;
use tracing::{debug, error, info, trace, warn};
async fn wait_drained(
mut stream: BoxStream<'_, Result<WatchEvent<Pod>, kube::Error>>,
mut channel: Receiver<(String, String)>,
waitlist: Arc<Mutex<HashSet<(String, String)>>>,
) -> Result<(), kube::Error> {
let mut waitlist = HashSet::new();
let mut done = false;
loop {
tokio::select! {
item = channel.recv() => {
if let Some((namespace, name)) = item {
debug!("Waiting for pod {namespace}/{name}");
waitlist.insert((namespace, name));
} else {
done = true;
}
}
event = stream.next() => {
if let Some(event) = event {
trace!("Watch pod event: {event:?}");
if let WatchEvent::Deleted(pod) = event? {
if let (Some(namespace), Some(name)) =
(pod.metadata.namespace, pod.metadata.name)
{
info!("Pod {namespace}/{name} evicted");
waitlist.remove(&(namespace, name));
}
let n = waitlist.len();
if n == 0 && done {
break;
}
debug!(
"Waiting for {n} more {}",
if n == 1 { "pod" } else { "pods" }
);
}
} else {
while let Some(event) = stream.next().await {
trace!("Watch pod event: {event:?}");
if let WatchEvent::Deleted(pod) = event? {
if let (Some(namespace), Some(name)) =
(pod.metadata.namespace, pod.metadata.name)
{
info!("Pod {namespace}/{name} evicted");
let mut waitlist = waitlist.lock().await;
waitlist.remove(&(namespace, name));
let n = waitlist.len();
if n == 0 {
break;
}
debug!(
"Waiting for {n} more {}",
if n == 1 { "pod" } else { "pods" }
);
}
}
}
@@ -79,14 +63,15 @@ pub(crate) async fn drain_node(
debug!("No pods to evict from node {name}");
return Ok(());
}
let (tx, rx) = mpsc::channel(node_pods.len());
let waitlist = Arc::new(Mutex::new(HashSet::new()));
let wl = waitlist.clone();
let mut pods = HashMap::new();
let wait_task = tokio::spawn(async move {
let params = WatchParams::default().fields(&filter);
let stream = all_pods.watch(&params, "0").await?.boxed();
wait_drained(stream, rx).await
wait_drained(stream, wl).await
});
'outer: while !node_pods.is_empty() {
while !node_pods.is_empty() {
let mut failed = HashSet::new();
for (namespace, name) in node_pods.iter() {
info!("Evicting pod {namespace}/{name}");
@@ -103,15 +88,9 @@ pub(crate) async fn drain_node(
Err(kube::Error::Api(e)) if e.code == 404 => (),
Err(e) => error!("Failed to evict pod {name}: {e}"),
Ok(_) => {
if tx
.send((namespace.clone(), name.clone()))
.await
.is_err()
{
error!("Waiter channel closed");
break 'outer;
}
},
let mut waitlist = waitlist.lock().await;
waitlist.insert((namespace.clone(), name.clone()));
}
}
}
node_pods = failed;
@@ -124,7 +103,6 @@ pub(crate) async fn drain_node(
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
drop(tx);
wait_task.await.unwrap()?;
info!("Finished draining pods from {name}");
Ok(())