diff --git a/src/drain.rs b/src/drain.rs index c6af2b6..c808538 100644 --- a/src/drain.rs +++ b/src/drain.rs @@ -1,38 +1,71 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; -use futures::stream::{BoxStream, StreamExt}; +use futures::stream::StreamExt; use k8s_openapi::api::core::v1::{Node, Pod}; use kube::Client; -use kube::api::{Api, ListParams, WatchEvent, WatchParams}; +use kube::api::{Api, WatchEvent, WatchParams}; use tokio::sync::Mutex; use tracing::{debug, error, info, trace, warn}; -async fn wait_drained( - mut stream: BoxStream<'_, Result, kube::Error>>, - waitlist: Arc>>, -) -> Result<(), kube::Error> { - while let Some(event) = stream.next().await { - trace!("Watch pod event: {event:?}"); - if let WatchEvent::Deleted(pod) = event? { - if let (Some(namespace), Some(name)) = - (pod.metadata.namespace, pod.metadata.name) - { - info!("Pod {namespace}/{name} evicted"); - let mut waitlist = waitlist.lock().await; - waitlist.remove(&(namespace, name)); - let n = waitlist.len(); - if n == 0 { - break; - } - debug!( - "Waiting for {n} more {}", - if n == 1 { "pod" } else { "pods" } +type Waitlist = Arc>>; + +async fn evict_pod(api: Api, name: &str) -> bool { + loop { + match api.evict(name, &Default::default()).await { + Err(kube::Error::Api(e)) if e.code == 429 => { + warn!( + "Failed to evict pod {name}: {e}; will retry in 5 seconds" ); - } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + }, + Err(kube::Error::Api(e)) if e.code == 404 => { + return false; + }, + Err(e) => { + error!("Failed to evict pod {name}: {e}"); + return false; + }, + Ok(_) => return true, } } - Ok(()) +} + +async fn handle_added( + pod: Pod, + waitlist: Waitlist, + task_set: &mut tokio::task::JoinSet<()>, + client: Client, +) -> Option<()> { + let namespace = pod.metadata.namespace?; + let name = pod.metadata.name?; + let owners = pod.metadata.owner_references.unwrap_or_default(); + if owners.iter().any(|o| o.kind == "DaemonSet") { + info!("Ignoring DaemonSet pod {name}"); + return None; + } + info!("Evicting pod {namespace}/{name}"); + { + let mut waitlist = waitlist.lock().await; + waitlist.insert((namespace.clone(), name.clone())); + } + let api = Api::namespaced(client.clone(), &namespace); + task_set.spawn(async move { + if !evict_pod(api, &name).await { + let mut waitlist = waitlist.lock().await; + waitlist.remove(&(namespace, name)); + } + }); + Some(()) +} + +async fn handle_deleted(pod: Pod, waitlist: Waitlist) -> Option<()> { + let namespace = pod.metadata.namespace?; + let name = pod.metadata.name?; + info!("Pod {namespace}/{name} evicted"); + let mut waitlist = waitlist.lock().await; + waitlist.remove(&(namespace, name)); + Some(()) } pub(crate) async fn drain_node( @@ -41,69 +74,37 @@ pub(crate) async fn drain_node( ) -> Result<(), kube::Error> { let all_pods: Api = Api::all(client.clone()); let filter = format!("spec.nodeName={name}"); - let mut node_pods: HashSet<_> = all_pods - .list(&ListParams::default().fields(&filter)) - .await? - .items - .into_iter() - .filter_map(|p| { - let name = p.metadata.name?; - let namespace = p.metadata.namespace?; - let owners = p.metadata.owner_references.unwrap_or_default(); - - if owners.iter().any(|o| o.kind == "DaemonSet") { - info!("Ignoring DaemonSet pod {name}"); - None - } else { - Some((namespace, name)) - } - }) - .collect(); - if node_pods.is_empty() { - debug!("No pods to evict from node {name}"); - return Ok(()); - } + let params = WatchParams::default().fields(&filter); let waitlist = Arc::new(Mutex::new(HashSet::new())); - let wl = waitlist.clone(); - let mut pods = HashMap::new(); - let wait_task = tokio::spawn(async move { - let params = WatchParams::default().fields(&filter); - let stream = all_pods.watch(¶ms, "0").await?.boxed(); - wait_drained(stream, wl).await - }); - while !node_pods.is_empty() { - let mut failed = HashSet::new(); - for (namespace, name) in node_pods.iter() { - info!("Evicting pod {namespace}/{name}"); - let api = pods.entry(namespace.clone()).or_insert_with_key(|k| { - Api::::namespaced(client.clone(), k) - }); - match api.evict(name, &Default::default()).await { - Err(kube::Error::Api(e)) if e.code == 429 => { - warn!( - "Failed to evict pod {name}: {e}; will retry in 5 seconds" - ); - failed.insert((namespace.clone(), name.clone())); - }, - Err(kube::Error::Api(e)) if e.code == 404 => (), - Err(e) => error!("Failed to evict pod {name}: {e}"), - Ok(_) => { - let mut waitlist = waitlist.lock().await; - waitlist.insert((namespace.clone(), name.clone())); - } - } + let mut task_set = tokio::task::JoinSet::new(); + let mut stream = all_pods.watch(¶ms, "0").await?.boxed(); + while let Some(event) = stream.next().await { + trace!("Watch pod event: {event:?}"); + match event? { + WatchEvent::Added(pod) => { + handle_added( + pod, + waitlist.clone(), + &mut task_set, + client.clone(), + ) + .await; + }, + WatchEvent::Deleted(pod) => { + handle_deleted(pod, waitlist.clone()).await; + }, + _ => (), } - node_pods = failed; - let n = node_pods.len(); - if n > 0 { - debug!( - "Waiting to retry {n} {}", - if n == 1 { "pod" } else { "pods" } - ); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let n = waitlist.lock().await.len(); + if n == 0 { + break; } + debug!( + "Waiting for {n} more {}", + if n == 1 { "pod" } else { "pods" } + ); } - wait_task.await.unwrap()?; + task_set.shutdown().await; info!("Finished draining pods from {name}"); Ok(()) }