diff --git a/src/drain.rs b/src/drain.rs index 7ec6728..17f78c5 100644 --- a/src/drain.rs +++ b/src/drain.rs @@ -3,17 +3,58 @@ use std::collections::{HashMap, HashSet}; use k8s_openapi::api::core::v1::{Node, Pod}; use kube::Client; use kube::api::{Api, ListParams, WatchEvent, WatchParams}; -use rocket::futures::{StreamExt, TryStreamExt}; -use tracing::{debug, info, trace}; +use rocket::futures::stream::{BoxStream, StreamExt}; +use rocket::tokio; +use rocket::tokio::sync::mpsc::{self, Receiver}; +use tracing::{debug, error, info, trace, warn}; + +async fn wait_drained( + mut stream: BoxStream<'_, Result, kube::Error>>, + mut channel: Receiver<(String, String)>, +) -> Result<(), kube::Error> { + let mut waitlist = HashSet::new(); + loop { + tokio::select! { + Some((namespace, name)) = channel.recv() => { + debug!("Waiting for pod {namespace}/{name}"); + waitlist.insert((namespace, name)); + } + event = stream.next() => { + if let Some(event) = event { + trace!("Watch pod event: {event:?}"); + if let WatchEvent::Deleted(pod) = event? { + if let (Some(namespace), Some(name)) = + (pod.metadata.namespace, pod.metadata.name) + { + info!("Pod {namespace}/{name} evicted"); + waitlist.remove(&(namespace, name)); + } + let n = waitlist.len(); + if n == 0 { + break; + } + debug!( + "Waiting for {n} more {}", + if n == 1 { "pod" } else { "pods" } + ); + } + } else { + break; + } + } + } + } + Ok(()) +} pub(crate) async fn drain_node( client: Client, name: &str, ) -> Result<(), kube::Error> { let all_pods: Api = Api::all(client.clone()); - let filter = &format!("spec.nodeName={name}"); + let filter = format!("spec.nodeName={name}"); let mut node_pods: HashSet<_> = all_pods - .list(&ListParams::default().fields(filter)) + .list(&ListParams::default().fields(&filter)) .await? .items .into_iter() @@ -34,38 +75,52 @@ pub(crate) async fn drain_node( debug!("No pods to evict from node {name}"); return Ok(()); } + let (tx, rx) = mpsc::channel(node_pods.len()); let mut pods = HashMap::new(); - for (namespace, name) in node_pods.iter() { - info!("Evicting pod {namespace}/{name}"); - let api = pods - .entry(namespace) - .or_insert_with_key(|k| Api::::namespaced(client.clone(), k)); - // Return early here because otherwise we would just wait forever for - // the pod to be deleted. - api.evict(name, &Default::default()).await?; - } - let mut stream = all_pods - .watch(&WatchParams::default().fields(filter), "0") - .await? - .boxed(); - while let Some(event) = stream.try_next().await? { - trace!("Watch pod event: {event:?}"); - if let WatchEvent::Deleted(pod) = event { - if let (Some(namespace), Some(name)) = - (pod.metadata.namespace, pod.metadata.name) - { - node_pods.remove(&(namespace, name)); - } - let n = node_pods.len(); - if n == 0 { - break; + let wait_task = tokio::spawn(async move { + let params = WatchParams::default().fields(&filter); + let stream = all_pods.watch(¶ms, "0").await?.boxed(); + wait_drained(stream, rx).await + }); + 'outer: while !node_pods.is_empty() { + let mut failed = HashSet::new(); + for (namespace, name) in node_pods.iter() { + info!("Evicting pod {namespace}/{name}"); + let api = pods.entry(namespace.clone()).or_insert_with_key(|k| { + Api::::namespaced(client.clone(), k) + }); + match api.evict(name, &Default::default()).await { + Err(kube::Error::Api(e)) if e.code == 429 => { + warn!( + "Failed to evict pod {name}: {e}; will retry in 5 seconds" + ); + failed.insert((namespace.clone(), name.clone())); + }, + Err(kube::Error::Api(e)) if e.code == 404 => (), + Err(e) => error!("Failed to evict pod {name}: {e}"), + Ok(_) => { + if tx + .send((namespace.clone(), name.clone())) + .await + .is_err() + { + error!("Waiter channel closed"); + break 'outer; + } + }, } + } + node_pods = failed; + let n = node_pods.len(); + if n > 0 { debug!( - "Waiting for {n} more {}", + "Waiting to retry {n} {}", if n == 1 { "pod" } else { "pods" } ); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } + wait_task.await.unwrap()?; info!("Finished draining pods from {name}"); Ok(()) } diff --git a/src/lock.rs b/src/lock.rs index 3e2ec2f..3fccc43 100644 --- a/src/lock.rs +++ b/src/lock.rs @@ -24,6 +24,7 @@ pub enum LockError { impl From for LockError { fn from(error: kube::Error) -> Self { + error!("Error processing request: {error}"); Self::ServerError(format!("{error}\n")) } } diff --git a/tests/integration/lock.rs b/tests/integration/lock.rs index 9b4cd06..aa093cc 100644 --- a/tests/integration/lock.rs +++ b/tests/integration/lock.rs @@ -446,7 +446,7 @@ async fn test_unlock_v1_uncordon() { assert_eq!(response.into_string().await, None,); assert_eq!(status, Status::Ok); let lease = get_lease("reboot-lock-default").await.unwrap(); - assert_eq!(lease.spec.unwrap().holder_identity, None); + assert_ne!(lease.spec.unwrap().holder_identity.as_ref(), Some(&hostname)); let node = get_node_by_name(&hostname).await.unwrap(); assert!( !node