drain: Handle yet another race condition
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good

Found another race condition: If the first pod evicted is deleted
quickly, before any other pods are evicted, the wait list will become
empty immediately, causing the `wait_drained` function to return too
early.

I've completely rewritten the `drain_node` function (again) to hopefully
handle all of these races.  Now, it's purely reactive: instead of
getting a list of pods to evict ahead of time, it uses the `Added`
events of the watch stream to determine the pods to evict.  As soon as a
pod is determined to be a candidate for eviction, it is added to the
wait list.  If eviction fails of a pod fails irrecoverably, that pod
is removed from the wait list, to prevent the loop from running forever.

This works because `Added` events for all current pods will arrive as
soon as the stream is opened.  `Deleted` events will start arriving once
all the `Added` events are processed.  The key difference between this
implementation and the previous one, though, is when pods are added to
the wait list.  Previously, we only added them to the list _after_ they
were evicted, but this made populating the list too slow.  Now, since we
add them to the list _before_ they are evicted, we can be sure the list
is never empty until every pod is deleted (or unable to be evicted at
all).
This commit is contained in:
2025-10-13 10:06:48 -05:00
parent 07be7027f4
commit f6e8becc3a

View File

@@ -1,38 +1,71 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::sync::Arc;
use futures::stream::{BoxStream, StreamExt};
use futures::stream::StreamExt;
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::Client;
use kube::api::{Api, ListParams, WatchEvent, WatchParams};
use kube::api::{Api, WatchEvent, WatchParams};
use tokio::sync::Mutex;
use tracing::{debug, error, info, trace, warn};
async fn wait_drained(
mut stream: BoxStream<'_, Result<WatchEvent<Pod>, kube::Error>>,
waitlist: Arc<Mutex<HashSet<(String, String)>>>,
) -> Result<(), kube::Error> {
while let Some(event) = stream.next().await {
trace!("Watch pod event: {event:?}");
if let WatchEvent::Deleted(pod) = event? {
if let (Some(namespace), Some(name)) =
(pod.metadata.namespace, pod.metadata.name)
{
info!("Pod {namespace}/{name} evicted");
let mut waitlist = waitlist.lock().await;
waitlist.remove(&(namespace, name));
let n = waitlist.len();
if n == 0 {
break;
}
debug!(
"Waiting for {n} more {}",
if n == 1 { "pod" } else { "pods" }
type Waitlist = Arc<Mutex<HashSet<(String, String)>>>;
async fn evict_pod(api: Api<Pod>, name: &str) -> bool {
loop {
match api.evict(name, &Default::default()).await {
Err(kube::Error::Api(e)) if e.code == 429 => {
warn!(
"Failed to evict pod {name}: {e}; will retry in 5 seconds"
);
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
},
Err(kube::Error::Api(e)) if e.code == 404 => {
return false;
},
Err(e) => {
error!("Failed to evict pod {name}: {e}");
return false;
},
Ok(_) => return true,
}
}
Ok(())
}
async fn handle_added(
pod: Pod,
waitlist: Waitlist,
task_set: &mut tokio::task::JoinSet<()>,
client: Client,
) -> Option<()> {
let namespace = pod.metadata.namespace?;
let name = pod.metadata.name?;
let owners = pod.metadata.owner_references.unwrap_or_default();
if owners.iter().any(|o| o.kind == "DaemonSet") {
info!("Ignoring DaemonSet pod {name}");
return None;
}
info!("Evicting pod {namespace}/{name}");
{
let mut waitlist = waitlist.lock().await;
waitlist.insert((namespace.clone(), name.clone()));
}
let api = Api::namespaced(client.clone(), &namespace);
task_set.spawn(async move {
if !evict_pod(api, &name).await {
let mut waitlist = waitlist.lock().await;
waitlist.remove(&(namespace, name));
}
});
Some(())
}
async fn handle_deleted(pod: Pod, waitlist: Waitlist) -> Option<()> {
let namespace = pod.metadata.namespace?;
let name = pod.metadata.name?;
info!("Pod {namespace}/{name} evicted");
let mut waitlist = waitlist.lock().await;
waitlist.remove(&(namespace, name));
Some(())
}
pub(crate) async fn drain_node(
@@ -41,69 +74,37 @@ pub(crate) async fn drain_node(
) -> Result<(), kube::Error> {
let all_pods: Api<Pod> = Api::all(client.clone());
let filter = format!("spec.nodeName={name}");
let mut node_pods: HashSet<_> = all_pods
.list(&ListParams::default().fields(&filter))
.await?
.items
.into_iter()
.filter_map(|p| {
let name = p.metadata.name?;
let namespace = p.metadata.namespace?;
let owners = p.metadata.owner_references.unwrap_or_default();
if owners.iter().any(|o| o.kind == "DaemonSet") {
info!("Ignoring DaemonSet pod {name}");
None
} else {
Some((namespace, name))
}
})
.collect();
if node_pods.is_empty() {
debug!("No pods to evict from node {name}");
return Ok(());
}
let params = WatchParams::default().fields(&filter);
let waitlist = Arc::new(Mutex::new(HashSet::new()));
let wl = waitlist.clone();
let mut pods = HashMap::new();
let wait_task = tokio::spawn(async move {
let params = WatchParams::default().fields(&filter);
let stream = all_pods.watch(&params, "0").await?.boxed();
wait_drained(stream, wl).await
});
while !node_pods.is_empty() {
let mut failed = HashSet::new();
for (namespace, name) in node_pods.iter() {
info!("Evicting pod {namespace}/{name}");
let api = pods.entry(namespace.clone()).or_insert_with_key(|k| {
Api::<Pod>::namespaced(client.clone(), k)
});
match api.evict(name, &Default::default()).await {
Err(kube::Error::Api(e)) if e.code == 429 => {
warn!(
"Failed to evict pod {name}: {e}; will retry in 5 seconds"
);
failed.insert((namespace.clone(), name.clone()));
},
Err(kube::Error::Api(e)) if e.code == 404 => (),
Err(e) => error!("Failed to evict pod {name}: {e}"),
Ok(_) => {
let mut waitlist = waitlist.lock().await;
waitlist.insert((namespace.clone(), name.clone()));
}
}
let mut task_set = tokio::task::JoinSet::new();
let mut stream = all_pods.watch(&params, "0").await?.boxed();
while let Some(event) = stream.next().await {
trace!("Watch pod event: {event:?}");
match event? {
WatchEvent::Added(pod) => {
handle_added(
pod,
waitlist.clone(),
&mut task_set,
client.clone(),
)
.await;
},
WatchEvent::Deleted(pod) => {
handle_deleted(pod, waitlist.clone()).await;
},
_ => (),
}
node_pods = failed;
let n = node_pods.len();
if n > 0 {
debug!(
"Waiting to retry {n} {}",
if n == 1 { "pod" } else { "pods" }
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let n = waitlist.lock().await.len();
if n == 0 {
break;
}
debug!(
"Waiting for {n} more {}",
if n == 1 { "pod" } else { "pods" }
);
}
wait_task.await.unwrap()?;
task_set.shutdown().await;
info!("Finished draining pods from {name}");
Ok(())
}