Files
Dustin C. Hatch 48b19604fd
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good
Do not replace current process with reboot command
Instead of replacing the current process with the reboot command
directly via `exec`, we need to run it in a child process and keep
the current process running.  The former method has the interesting
side-effect of getting the machine into a state where it can never
reboot:

1. When the reboot sentinel file appears, the coordinator acquires the
   lock and drains the node, then `exec`s the reboot command.
2. The DaemonSet pod goes into _Completed_ state once the reboot command
   finishes.  If the reboot command starts the reboot process
   immediately, there is no issue, but if it starts a delayed reboot,
   trouble ensues.
3. After a timeout, Kubernetes restarts the DaemonSet pod, starting the
   coordinator process again.
4. The coordinator notices that the reboot sentinel already exists and
   immediately `exec`s the reboot command again.
5. The reboot command restarts the delayed reboot process, pushing the
   actual reboot time further into the future.
6. Return to step 2.

To break this loop, someone needs to either remove the reboot sentinel
file, letting the coordinator start up and run without doing anything,
or forcably reboot the node.

We can avoid this loop by never exiting from the process managed by the
pod.  The reboot command runs and exits, but the parent process
continues until it's signalled to stop.
2025-10-08 20:19:48 -05:00

308 lines
10 KiB
Rust

use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Output, Stdio};
use std::sync::LazyLock;
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, ListParams, Patch, PatchParams};
use tokio::sync::Mutex;
static EXE: &str = env!("CARGO_BIN_EXE_k8s-reboot-coordinator");
static LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
async fn cordon_node(name: &str) -> Result<(), kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.cordon(name).await?;
Ok(())
}
async fn create_lease(
name: &str,
identity: &str,
) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let apply = PatchParams::apply(identity);
let leases: Api<Lease> = Api::default_namespaced(client);
let lease = Lease {
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
spec: Some(LeaseSpec {
holder_identity: Some(identity.into()),
..Default::default()
}),
};
leases.patch(name, &apply, &Patch::Apply(&lease)).await
}
fn delete_file<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
match std::fs::remove_file(path) {
Ok(_) => Ok(()),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}
async fn delete_lease(name: &str) {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
let _ = kube::runtime::wait::delete::delete_and_finalize(
leases,
name,
&Default::default(),
)
.await;
}
async fn get_a_node() -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
Ok(nodes.list(&Default::default()).await?.items.pop().unwrap())
}
async fn get_lease(name: &str) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
leases.get(name).await
}
async fn get_node_by_name(name: &str) -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.get(name).await
}
async fn get_pods_on_node(name: &str) -> Result<Vec<Pod>, kube::Error> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::all(client);
Ok(pods
.list(&ListParams::default().fields(&format!("spec.nodeName=={name}")))
.await?
.items)
}
fn new_sentinel(name: &str) -> PathBuf {
let mut sentinel = std::env::temp_dir();
sentinel.push(name);
delete_file(&sentinel).unwrap();
sentinel
}
fn run_it(
node_name: &str,
sentinel: &Path,
reboot_group: Option<&str>,
) -> std::io::Result<Child> {
let mut env: HashMap<_, _> = std::env::vars_os()
.filter(|(k, _)| k == "PATH" || k == "KUBECONFIG" || k == "RUST_LOG")
.collect();
env.insert("NODE_NAME".into(), node_name.into());
env.insert("REBOOT_SENTINEL".into(), sentinel.into());
if let Some(g) = reboot_group {
env.insert("REBOOT_LOCK_GROUP".into(), g.into());
}
Command::new(EXE)
.args(["sh", "-c", "echo 'test success' & kill $PPID"])
.env_clear()
.envs(&env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
}
fn signal(mut child: Child, signal: i32) -> std::io::Result<Output> {
let Ok(pid) = i32::try_from(child.id()) else {
let _ = child.kill();
let _ = child.wait();
panic!("Cannot send SIGTERM to child: PID {} too large", child.id());
};
unsafe { libc::kill(pid, signal) };
child.wait_with_output()
}
fn trigger(mut child: Child, sentinel: &Path) -> std::io::Result<Output> {
if let Err(e) = std::fs::File::create(sentinel) {
let _ = child.kill();
let _ = child.wait();
panic!("Failed to create sentinel file: {e}");
}
child.wait_with_output()
}
async fn uncordon_node(name: &str) -> Result<(), kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.uncordon(name).await?;
Ok(())
}
#[tokio::test]
async fn test_success() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("a278f32c-6cf9-4890-b50e-06d3f0c5259c");
let node_name = "7bc09505-ebd2-432a-ab05-a83d309c501a";
let child = run_it(node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some(node_name)
)
}
#[tokio::test]
async fn test_alt_group() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-group2").await;
let sentinel = new_sentinel("fc7cb011-f9ed-46ff-a310-76278bbd21de");
let node_name = "71124880-f4d4-44ff-a18d-f301837e0907";
let child = run_it(node_name, &sentinel, Some("group2")).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-group2").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some(node_name)
)
}
#[tokio::test]
async fn test_terminated() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("36cdc91b-a2d7-4845-a726-721ecc9787a7");
let node_name = "a763c92e-5db7-4806-8dd0-e1a72d42b022";
let child = run_it(node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = signal(child, libc::SIGTERM).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok(""));
assert!(matches!(
get_lease("reboot-lock-default").await,
Err(kube::Error::Api(e)) if e.code == 404
));
}
#[tokio::test]
async fn test_node_not_locked() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("115041f9-abef-496c-abc2-df7c9fbcc046");
let node = get_a_node().await.unwrap();
let node_name = node.metadata.name.unwrap();
let child = run_it(&node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_ref(),
Some(&node_name)
);
let node = get_node_by_name(&node_name).await.unwrap();
let pods = get_pods_on_node(&node_name).await.unwrap();
uncordon_node(&node_name).await.unwrap();
assert!(
node.spec
.unwrap()
.taints
.unwrap()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
assert_eq!(
pods.iter()
.filter(|p| {
!p.metadata
.owner_references
.clone()
.unwrap_or_default()
.iter()
.any(|o| o.kind == "DaemonSet")
})
.count(),
0
);
}
#[tokio::test]
async fn test_node_locked() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("e5a2fc90-4418-4556-a039-32a2425b1bd9");
let node = get_a_node().await.unwrap();
let node_name = node.metadata.name.unwrap();
create_lease("reboot-lock-default", &node_name)
.await
.unwrap();
cordon_node(&node_name).await.unwrap();
let child = run_it(&node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity.as_ref(), None,);
let node = get_node_by_name(&node_name).await.unwrap();
assert!(
!node.spec
.unwrap()
.taints
.unwrap_or_default()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_ref(),
Some(&node_name)
);
let node = get_node_by_name(&node_name).await.unwrap();
let pods = get_pods_on_node(&node_name).await.unwrap();
uncordon_node(&node_name).await.unwrap();
assert!(
node.spec
.unwrap()
.taints
.unwrap()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
assert_eq!(
pods.iter()
.filter(|p| {
!p.metadata
.owner_references
.clone()
.unwrap_or_default()
.iter()
.any(|o| o.kind == "DaemonSet")
})
.count(),
0
);
}