From f6e8becc3a405dfc81619a32f6a31a163a069d83 Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Mon, 13 Oct 2025 10:06:48 -0500 Subject: [PATCH] drain: Handle yet another race condition Found another race condition: If the first pod evicted is deleted quickly, before any other pods are evicted, the wait list will become empty immediately, causing the `wait_drained` function to return too early. I've completely rewritten the `drain_node` function (again) to hopefully handle all of these races. Now, it's purely reactive: instead of getting a list of pods to evict ahead of time, it uses the `Added` events of the watch stream to determine the pods to evict. As soon as a pod is determined to be a candidate for eviction, it is added to the wait list. If eviction fails of a pod fails irrecoverably, that pod is removed from the wait list, to prevent the loop from running forever. This works because `Added` events for all current pods will arrive as soon as the stream is opened. `Deleted` events will start arriving once all the `Added` events are processed. The key difference between this implementation and the previous one, though, is when pods are added to the wait list. Previously, we only added them to the list _after_ they were evicted, but this made populating the list too slow. Now, since we add them to the list _before_ they are evicted, we can be sure the list is never empty until every pod is deleted (or unable to be evicted at all). --- src/drain.rs | 169 ++++++++++++++++++++++++++------------------------- 1 file changed, 85 insertions(+), 84 deletions(-) diff --git a/src/drain.rs b/src/drain.rs index c6af2b6..c808538 100644 --- a/src/drain.rs +++ b/src/drain.rs @@ -1,38 +1,71 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; -use futures::stream::{BoxStream, StreamExt}; +use futures::stream::StreamExt; use k8s_openapi::api::core::v1::{Node, Pod}; use kube::Client; -use kube::api::{Api, ListParams, WatchEvent, WatchParams}; +use kube::api::{Api, WatchEvent, WatchParams}; use tokio::sync::Mutex; use tracing::{debug, error, info, trace, warn}; -async fn wait_drained( - mut stream: BoxStream<'_, Result, kube::Error>>, - waitlist: Arc>>, -) -> Result<(), kube::Error> { - while let Some(event) = stream.next().await { - trace!("Watch pod event: {event:?}"); - if let WatchEvent::Deleted(pod) = event? { - if let (Some(namespace), Some(name)) = - (pod.metadata.namespace, pod.metadata.name) - { - info!("Pod {namespace}/{name} evicted"); - let mut waitlist = waitlist.lock().await; - waitlist.remove(&(namespace, name)); - let n = waitlist.len(); - if n == 0 { - break; - } - debug!( - "Waiting for {n} more {}", - if n == 1 { "pod" } else { "pods" } +type Waitlist = Arc>>; + +async fn evict_pod(api: Api, name: &str) -> bool { + loop { + match api.evict(name, &Default::default()).await { + Err(kube::Error::Api(e)) if e.code == 429 => { + warn!( + "Failed to evict pod {name}: {e}; will retry in 5 seconds" ); - } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + }, + Err(kube::Error::Api(e)) if e.code == 404 => { + return false; + }, + Err(e) => { + error!("Failed to evict pod {name}: {e}"); + return false; + }, + Ok(_) => return true, } } - Ok(()) +} + +async fn handle_added( + pod: Pod, + waitlist: Waitlist, + task_set: &mut tokio::task::JoinSet<()>, + client: Client, +) -> Option<()> { + let namespace = pod.metadata.namespace?; + let name = pod.metadata.name?; + let owners = pod.metadata.owner_references.unwrap_or_default(); + if owners.iter().any(|o| o.kind == "DaemonSet") { + info!("Ignoring DaemonSet pod {name}"); + return None; + } + info!("Evicting pod {namespace}/{name}"); + { + let mut waitlist = waitlist.lock().await; + waitlist.insert((namespace.clone(), name.clone())); + } + let api = Api::namespaced(client.clone(), &namespace); + task_set.spawn(async move { + if !evict_pod(api, &name).await { + let mut waitlist = waitlist.lock().await; + waitlist.remove(&(namespace, name)); + } + }); + Some(()) +} + +async fn handle_deleted(pod: Pod, waitlist: Waitlist) -> Option<()> { + let namespace = pod.metadata.namespace?; + let name = pod.metadata.name?; + info!("Pod {namespace}/{name} evicted"); + let mut waitlist = waitlist.lock().await; + waitlist.remove(&(namespace, name)); + Some(()) } pub(crate) async fn drain_node( @@ -41,69 +74,37 @@ pub(crate) async fn drain_node( ) -> Result<(), kube::Error> { let all_pods: Api = Api::all(client.clone()); let filter = format!("spec.nodeName={name}"); - let mut node_pods: HashSet<_> = all_pods - .list(&ListParams::default().fields(&filter)) - .await? - .items - .into_iter() - .filter_map(|p| { - let name = p.metadata.name?; - let namespace = p.metadata.namespace?; - let owners = p.metadata.owner_references.unwrap_or_default(); - - if owners.iter().any(|o| o.kind == "DaemonSet") { - info!("Ignoring DaemonSet pod {name}"); - None - } else { - Some((namespace, name)) - } - }) - .collect(); - if node_pods.is_empty() { - debug!("No pods to evict from node {name}"); - return Ok(()); - } + let params = WatchParams::default().fields(&filter); let waitlist = Arc::new(Mutex::new(HashSet::new())); - let wl = waitlist.clone(); - let mut pods = HashMap::new(); - let wait_task = tokio::spawn(async move { - let params = WatchParams::default().fields(&filter); - let stream = all_pods.watch(¶ms, "0").await?.boxed(); - wait_drained(stream, wl).await - }); - while !node_pods.is_empty() { - let mut failed = HashSet::new(); - for (namespace, name) in node_pods.iter() { - info!("Evicting pod {namespace}/{name}"); - let api = pods.entry(namespace.clone()).or_insert_with_key(|k| { - Api::::namespaced(client.clone(), k) - }); - match api.evict(name, &Default::default()).await { - Err(kube::Error::Api(e)) if e.code == 429 => { - warn!( - "Failed to evict pod {name}: {e}; will retry in 5 seconds" - ); - failed.insert((namespace.clone(), name.clone())); - }, - Err(kube::Error::Api(e)) if e.code == 404 => (), - Err(e) => error!("Failed to evict pod {name}: {e}"), - Ok(_) => { - let mut waitlist = waitlist.lock().await; - waitlist.insert((namespace.clone(), name.clone())); - } - } + let mut task_set = tokio::task::JoinSet::new(); + let mut stream = all_pods.watch(¶ms, "0").await?.boxed(); + while let Some(event) = stream.next().await { + trace!("Watch pod event: {event:?}"); + match event? { + WatchEvent::Added(pod) => { + handle_added( + pod, + waitlist.clone(), + &mut task_set, + client.clone(), + ) + .await; + }, + WatchEvent::Deleted(pod) => { + handle_deleted(pod, waitlist.clone()).await; + }, + _ => (), } - node_pods = failed; - let n = node_pods.len(); - if n > 0 { - debug!( - "Waiting to retry {n} {}", - if n == 1 { "pod" } else { "pods" } - ); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let n = waitlist.lock().await.len(); + if n == 0 { + break; } + debug!( + "Waiting for {n} more {}", + if n == 1 { "pod" } else { "pods" } + ); } - wait_task.await.unwrap()?; + task_set.shutdown().await; info!("Finished draining pods from {name}"); Ok(()) }