drain: Wait for outer loop to complete
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good
There was a race condition while waiting for a node to be drained, especially if there are pods that cannot be evicted immediately when the wait starts. It was possible for the `wait_drained` function to return before all of the pods had been deleted, if the wait list temporarily became empty at some point. This could happen, for example, if multiple `WatchEvent` messages were processed from the stream before any messages were processed from the channel; even though there were pod identifiers waiting in the channel to be added to the wait list, if the wait list became empty after processing the watch events, the loop would complete. This is made much more likely if a PodDisruptionBudget temporarily prevents a pod from being evicted; it could take 5 or more seconds for that pod's identifier to be pushed to the channel, and in that time, the rest of the pods could be deleted. To resolve this, we need to ensure that the `wait_drained` function never returns until the sender side of the channel is dropped. This way, we are sure that no more pods will be added to the wait list, so when it gets emptied, we are sure we are actually done.
This commit is contained in:
14
src/drain.rs
14
src/drain.rs
@@ -13,11 +13,16 @@ async fn wait_drained(
|
||||
mut channel: Receiver<(String, String)>,
|
||||
) -> Result<(), kube::Error> {
|
||||
let mut waitlist = HashSet::new();
|
||||
let mut done = false;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some((namespace, name)) = channel.recv() => {
|
||||
debug!("Waiting for pod {namespace}/{name}");
|
||||
waitlist.insert((namespace, name));
|
||||
item = channel.recv() => {
|
||||
if let Some((namespace, name)) = item {
|
||||
debug!("Waiting for pod {namespace}/{name}");
|
||||
waitlist.insert((namespace, name));
|
||||
} else {
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
event = stream.next() => {
|
||||
if let Some(event) = event {
|
||||
@@ -30,7 +35,7 @@ async fn wait_drained(
|
||||
waitlist.remove(&(namespace, name));
|
||||
}
|
||||
let n = waitlist.len();
|
||||
if n == 0 {
|
||||
if n == 0 && done {
|
||||
break;
|
||||
}
|
||||
debug!(
|
||||
@@ -120,6 +125,7 @@ pub(crate) async fn drain_node(
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
drop(tx);
|
||||
wait_task.await.unwrap()?;
|
||||
info!("Finished draining pods from {name}");
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user