From d937bd6fb2f631b5949adda8ad7948cd2103c262 Mon Sep 17 00:00:00 2001 From: "Dustin C. Hatch" Date: Thu, 25 Sep 2025 09:10:57 -0500 Subject: [PATCH] drain: Retry failed evictions If evicting a pod fails with an HTTP 239 Too Many Requests error, it means there is a PodDisruptionBudget that prevents the pod from being deleted. This can happen, for example, when draining a node that has Longhorn volumes attached, as Longhorn creates a PDB for its instance manager pods on such nodes. Longhorn will automatically remove the PDB once there are no workloads on that node that use its Volumes, so we must continue to evict other pods and try evicting the failed pods again later. This behavior mostly mimics what `kubectl drain` does to handle this same condition. --- src/drain.rs | 113 ++++++++++++++++++++++++++++---------- src/lock.rs | 1 + tests/integration/lock.rs | 2 +- 3 files changed, 86 insertions(+), 30 deletions(-) diff --git a/src/drain.rs b/src/drain.rs index 7ec6728..17f78c5 100644 --- a/src/drain.rs +++ b/src/drain.rs @@ -3,17 +3,58 @@ use std::collections::{HashMap, HashSet}; use k8s_openapi::api::core::v1::{Node, Pod}; use kube::Client; use kube::api::{Api, ListParams, WatchEvent, WatchParams}; -use rocket::futures::{StreamExt, TryStreamExt}; -use tracing::{debug, info, trace}; +use rocket::futures::stream::{BoxStream, StreamExt}; +use rocket::tokio; +use rocket::tokio::sync::mpsc::{self, Receiver}; +use tracing::{debug, error, info, trace, warn}; + +async fn wait_drained( + mut stream: BoxStream<'_, Result, kube::Error>>, + mut channel: Receiver<(String, String)>, +) -> Result<(), kube::Error> { + let mut waitlist = HashSet::new(); + loop { + tokio::select! { + Some((namespace, name)) = channel.recv() => { + debug!("Waiting for pod {namespace}/{name}"); + waitlist.insert((namespace, name)); + } + event = stream.next() => { + if let Some(event) = event { + trace!("Watch pod event: {event:?}"); + if let WatchEvent::Deleted(pod) = event? { + if let (Some(namespace), Some(name)) = + (pod.metadata.namespace, pod.metadata.name) + { + info!("Pod {namespace}/{name} evicted"); + waitlist.remove(&(namespace, name)); + } + let n = waitlist.len(); + if n == 0 { + break; + } + debug!( + "Waiting for {n} more {}", + if n == 1 { "pod" } else { "pods" } + ); + } + } else { + break; + } + } + } + } + Ok(()) +} pub(crate) async fn drain_node( client: Client, name: &str, ) -> Result<(), kube::Error> { let all_pods: Api = Api::all(client.clone()); - let filter = &format!("spec.nodeName={name}"); + let filter = format!("spec.nodeName={name}"); let mut node_pods: HashSet<_> = all_pods - .list(&ListParams::default().fields(filter)) + .list(&ListParams::default().fields(&filter)) .await? .items .into_iter() @@ -34,38 +75,52 @@ pub(crate) async fn drain_node( debug!("No pods to evict from node {name}"); return Ok(()); } + let (tx, rx) = mpsc::channel(node_pods.len()); let mut pods = HashMap::new(); - for (namespace, name) in node_pods.iter() { - info!("Evicting pod {namespace}/{name}"); - let api = pods - .entry(namespace) - .or_insert_with_key(|k| Api::::namespaced(client.clone(), k)); - // Return early here because otherwise we would just wait forever for - // the pod to be deleted. - api.evict(name, &Default::default()).await?; - } - let mut stream = all_pods - .watch(&WatchParams::default().fields(filter), "0") - .await? - .boxed(); - while let Some(event) = stream.try_next().await? { - trace!("Watch pod event: {event:?}"); - if let WatchEvent::Deleted(pod) = event { - if let (Some(namespace), Some(name)) = - (pod.metadata.namespace, pod.metadata.name) - { - node_pods.remove(&(namespace, name)); - } - let n = node_pods.len(); - if n == 0 { - break; + let wait_task = tokio::spawn(async move { + let params = WatchParams::default().fields(&filter); + let stream = all_pods.watch(¶ms, "0").await?.boxed(); + wait_drained(stream, rx).await + }); + 'outer: while !node_pods.is_empty() { + let mut failed = HashSet::new(); + for (namespace, name) in node_pods.iter() { + info!("Evicting pod {namespace}/{name}"); + let api = pods.entry(namespace.clone()).or_insert_with_key(|k| { + Api::::namespaced(client.clone(), k) + }); + match api.evict(name, &Default::default()).await { + Err(kube::Error::Api(e)) if e.code == 429 => { + warn!( + "Failed to evict pod {name}: {e}; will retry in 5 seconds" + ); + failed.insert((namespace.clone(), name.clone())); + }, + Err(kube::Error::Api(e)) if e.code == 404 => (), + Err(e) => error!("Failed to evict pod {name}: {e}"), + Ok(_) => { + if tx + .send((namespace.clone(), name.clone())) + .await + .is_err() + { + error!("Waiter channel closed"); + break 'outer; + } + }, } + } + node_pods = failed; + let n = node_pods.len(); + if n > 0 { debug!( - "Waiting for {n} more {}", + "Waiting to retry {n} {}", if n == 1 { "pod" } else { "pods" } ); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } + wait_task.await.unwrap()?; info!("Finished draining pods from {name}"); Ok(()) } diff --git a/src/lock.rs b/src/lock.rs index 3e2ec2f..3fccc43 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -24,6 +24,7 @@ pub enum LockError { impl From for LockError { fn from(error: kube::Error) -> Self { + error!("Error processing request: {error}"); Self::ServerError(format!("{error}\n")) } } diff --git a/tests/integration/lock.rs b/tests/integration/lock.rs index 9b4cd06..aa093cc 100644 --- a/tests/integration/lock.rs +++ b/tests/integration/lock.rs @@ -446,7 +446,7 @@ async fn test_unlock_v1_uncordon() { assert_eq!(response.into_string().await, None,); assert_eq!(status, Status::Ok); let lease = get_lease("reboot-lock-default").await.unwrap(); - assert_eq!(lease.spec.unwrap().holder_identity, None); + assert_ne!(lease.spec.unwrap().holder_identity.as_ref(), Some(&hostname)); let node = get_node_by_name(&hostname).await.unwrap(); assert!( !node