drain: Retry failed evictions

If evicting a pod fails with an HTTP 239 Too Many Requests error, it
means there is a PodDisruptionBudget that prevents the pod from being
deleted.  This can happen, for example, when draining a node that has
Longhorn volumes attached, as Longhorn creates a PDB for its instance
manager pods on such nodes.  Longhorn will automatically remove the PDB
once there are no workloads on that node that use its Volumes, so we
must continue to evict other pods and try evicting the failed pods again
later.  This behavior mostly mimics what `kubectl drain` does to handle
this same condition.
master
Dustin 2025-09-25 09:10:57 -05:00
parent 7d8ee51016
commit d937bd6fb2
3 changed files with 86 additions and 30 deletions

View File

@ -3,17 +3,58 @@ use std::collections::{HashMap, HashSet};
use k8s_openapi::api::core::v1::{Node, Pod}; use k8s_openapi::api::core::v1::{Node, Pod};
use kube::Client; use kube::Client;
use kube::api::{Api, ListParams, WatchEvent, WatchParams}; use kube::api::{Api, ListParams, WatchEvent, WatchParams};
use rocket::futures::{StreamExt, TryStreamExt}; use rocket::futures::stream::{BoxStream, StreamExt};
use tracing::{debug, info, trace}; use rocket::tokio;
use rocket::tokio::sync::mpsc::{self, Receiver};
use tracing::{debug, error, info, trace, warn};
async fn wait_drained(
mut stream: BoxStream<'_, Result<WatchEvent<Pod>, kube::Error>>,
mut channel: Receiver<(String, String)>,
) -> Result<(), kube::Error> {
let mut waitlist = HashSet::new();
loop {
tokio::select! {
Some((namespace, name)) = channel.recv() => {
debug!("Waiting for pod {namespace}/{name}");
waitlist.insert((namespace, name));
}
event = stream.next() => {
if let Some(event) = event {
trace!("Watch pod event: {event:?}");
if let WatchEvent::Deleted(pod) = event? {
if let (Some(namespace), Some(name)) =
(pod.metadata.namespace, pod.metadata.name)
{
info!("Pod {namespace}/{name} evicted");
waitlist.remove(&(namespace, name));
}
let n = waitlist.len();
if n == 0 {
break;
}
debug!(
"Waiting for {n} more {}",
if n == 1 { "pod" } else { "pods" }
);
}
} else {
break;
}
}
}
}
Ok(())
}
pub(crate) async fn drain_node( pub(crate) async fn drain_node(
client: Client, client: Client,
name: &str, name: &str,
) -> Result<(), kube::Error> { ) -> Result<(), kube::Error> {
let all_pods: Api<Pod> = Api::all(client.clone()); let all_pods: Api<Pod> = Api::all(client.clone());
let filter = &format!("spec.nodeName={name}"); let filter = format!("spec.nodeName={name}");
let mut node_pods: HashSet<_> = all_pods let mut node_pods: HashSet<_> = all_pods
.list(&ListParams::default().fields(filter)) .list(&ListParams::default().fields(&filter))
.await? .await?
.items .items
.into_iter() .into_iter()
@ -34,38 +75,52 @@ pub(crate) async fn drain_node(
debug!("No pods to evict from node {name}"); debug!("No pods to evict from node {name}");
return Ok(()); return Ok(());
} }
let (tx, rx) = mpsc::channel(node_pods.len());
let mut pods = HashMap::new(); let mut pods = HashMap::new();
for (namespace, name) in node_pods.iter() { let wait_task = tokio::spawn(async move {
info!("Evicting pod {namespace}/{name}"); let params = WatchParams::default().fields(&filter);
let api = pods let stream = all_pods.watch(&params, "0").await?.boxed();
.entry(namespace) wait_drained(stream, rx).await
.or_insert_with_key(|k| Api::<Pod>::namespaced(client.clone(), k)); });
// Return early here because otherwise we would just wait forever for 'outer: while !node_pods.is_empty() {
// the pod to be deleted. let mut failed = HashSet::new();
api.evict(name, &Default::default()).await?; for (namespace, name) in node_pods.iter() {
} info!("Evicting pod {namespace}/{name}");
let mut stream = all_pods let api = pods.entry(namespace.clone()).or_insert_with_key(|k| {
.watch(&WatchParams::default().fields(filter), "0") Api::<Pod>::namespaced(client.clone(), k)
.await? });
.boxed(); match api.evict(name, &Default::default()).await {
while let Some(event) = stream.try_next().await? { Err(kube::Error::Api(e)) if e.code == 429 => {
trace!("Watch pod event: {event:?}"); warn!(
if let WatchEvent::Deleted(pod) = event { "Failed to evict pod {name}: {e}; will retry in 5 seconds"
if let (Some(namespace), Some(name)) = );
(pod.metadata.namespace, pod.metadata.name) failed.insert((namespace.clone(), name.clone()));
{ },
node_pods.remove(&(namespace, name)); Err(kube::Error::Api(e)) if e.code == 404 => (),
} Err(e) => error!("Failed to evict pod {name}: {e}"),
let n = node_pods.len(); Ok(_) => {
if n == 0 { if tx
break; .send((namespace.clone(), name.clone()))
.await
.is_err()
{
error!("Waiter channel closed");
break 'outer;
}
},
} }
}
node_pods = failed;
let n = node_pods.len();
if n > 0 {
debug!( debug!(
"Waiting for {n} more {}", "Waiting to retry {n} {}",
if n == 1 { "pod" } else { "pods" } if n == 1 { "pod" } else { "pods" }
); );
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
} }
wait_task.await.unwrap()?;
info!("Finished draining pods from {name}"); info!("Finished draining pods from {name}");
Ok(()) Ok(())
} }

View File

@ -24,6 +24,7 @@ pub enum LockError {
impl From<kube::Error> for LockError { impl From<kube::Error> for LockError {
fn from(error: kube::Error) -> Self { fn from(error: kube::Error) -> Self {
error!("Error processing request: {error}");
Self::ServerError(format!("{error}\n")) Self::ServerError(format!("{error}\n"))
} }
} }

View File

@ -446,7 +446,7 @@ async fn test_unlock_v1_uncordon() {
assert_eq!(response.into_string().await, None,); assert_eq!(response.into_string().await, None,);
assert_eq!(status, Status::Ok); assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap(); let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity, None); assert_ne!(lease.spec.unwrap().holder_identity.as_ref(), Some(&hostname));
let node = get_node_by_name(&hostname).await.unwrap(); let node = get_node_by_name(&hostname).await.unwrap();
assert!( assert!(
!node !node