All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good
Instead of replacing the current process with the reboot command directly via `exec`, we need to run it in a child process and keep the current process running. The former method has the interesting side-effect of getting the machine into a state where it can never reboot: 1. When the reboot sentinel file appears, the coordinator acquires the lock and drains the node, then `exec`s the reboot command. 2. The DaemonSet pod goes into _Completed_ state once the reboot command finishes. If the reboot command starts the reboot process immediately, there is no issue, but if it starts a delayed reboot, trouble ensues. 3. After a timeout, Kubernetes restarts the DaemonSet pod, starting the coordinator process again. 4. The coordinator notices that the reboot sentinel already exists and immediately `exec`s the reboot command again. 5. The reboot command restarts the delayed reboot process, pushing the actual reboot time further into the future. 6. Return to step 2. To break this loop, someone needs to either remove the reboot sentinel file, letting the coordinator start up and run without doing anything, or forcably reboot the node. We can avoid this loop by never exiting from the process managed by the pod. The reboot command runs and exits, but the parent process continues until it's signalled to stop.
308 lines
10 KiB
Rust
308 lines
10 KiB
Rust
use std::collections::HashMap;
|
|
use std::io::ErrorKind;
|
|
use std::path::{Path, PathBuf};
|
|
use std::process::{Child, Command, Output, Stdio};
|
|
use std::sync::LazyLock;
|
|
|
|
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
|
|
use k8s_openapi::api::core::v1::{Node, Pod};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
|
use kube::Client;
|
|
use kube::api::{Api, ListParams, Patch, PatchParams};
|
|
use tokio::sync::Mutex;
|
|
|
|
static EXE: &str = env!("CARGO_BIN_EXE_k8s-reboot-coordinator");
|
|
static LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
|
|
|
async fn cordon_node(name: &str) -> Result<(), kube::Error> {
|
|
let client = Client::try_default().await?;
|
|
let nodes: Api<Node> = Api::all(client);
|
|
nodes.cordon(name).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_lease(
|
|
name: &str,
|
|
identity: &str,
|
|
) -> Result<Lease, kube::Error> {
|
|
let client = Client::try_default().await.unwrap();
|
|
let apply = PatchParams::apply(identity);
|
|
let leases: Api<Lease> = Api::default_namespaced(client);
|
|
let lease = Lease {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.into()),
|
|
..Default::default()
|
|
},
|
|
spec: Some(LeaseSpec {
|
|
holder_identity: Some(identity.into()),
|
|
..Default::default()
|
|
}),
|
|
};
|
|
leases.patch(name, &apply, &Patch::Apply(&lease)).await
|
|
}
|
|
|
|
fn delete_file<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
|
|
match std::fs::remove_file(path) {
|
|
Ok(_) => Ok(()),
|
|
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
|
|
async fn delete_lease(name: &str) {
|
|
let client = Client::try_default().await.unwrap();
|
|
let leases: Api<Lease> = Api::default_namespaced(client);
|
|
let _ = kube::runtime::wait::delete::delete_and_finalize(
|
|
leases,
|
|
name,
|
|
&Default::default(),
|
|
)
|
|
.await;
|
|
}
|
|
|
|
async fn get_a_node() -> Result<Node, kube::Error> {
|
|
let client = Client::try_default().await?;
|
|
let nodes: Api<Node> = Api::all(client);
|
|
Ok(nodes.list(&Default::default()).await?.items.pop().unwrap())
|
|
}
|
|
|
|
async fn get_lease(name: &str) -> Result<Lease, kube::Error> {
|
|
let client = Client::try_default().await.unwrap();
|
|
let leases: Api<Lease> = Api::default_namespaced(client);
|
|
leases.get(name).await
|
|
}
|
|
|
|
async fn get_node_by_name(name: &str) -> Result<Node, kube::Error> {
|
|
let client = Client::try_default().await?;
|
|
let nodes: Api<Node> = Api::all(client);
|
|
nodes.get(name).await
|
|
}
|
|
|
|
async fn get_pods_on_node(name: &str) -> Result<Vec<Pod>, kube::Error> {
|
|
let client = Client::try_default().await?;
|
|
let pods: Api<Pod> = Api::all(client);
|
|
Ok(pods
|
|
.list(&ListParams::default().fields(&format!("spec.nodeName=={name}")))
|
|
.await?
|
|
.items)
|
|
}
|
|
|
|
fn new_sentinel(name: &str) -> PathBuf {
|
|
let mut sentinel = std::env::temp_dir();
|
|
sentinel.push(name);
|
|
delete_file(&sentinel).unwrap();
|
|
sentinel
|
|
}
|
|
|
|
fn run_it(
|
|
node_name: &str,
|
|
sentinel: &Path,
|
|
reboot_group: Option<&str>,
|
|
) -> std::io::Result<Child> {
|
|
let mut env: HashMap<_, _> = std::env::vars_os()
|
|
.filter(|(k, _)| k == "PATH" || k == "KUBECONFIG" || k == "RUST_LOG")
|
|
.collect();
|
|
env.insert("NODE_NAME".into(), node_name.into());
|
|
env.insert("REBOOT_SENTINEL".into(), sentinel.into());
|
|
if let Some(g) = reboot_group {
|
|
env.insert("REBOOT_LOCK_GROUP".into(), g.into());
|
|
}
|
|
Command::new(EXE)
|
|
.args(["sh", "-c", "echo 'test success' & kill $PPID"])
|
|
.env_clear()
|
|
.envs(&env)
|
|
.stdin(Stdio::null())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
}
|
|
|
|
fn signal(mut child: Child, signal: i32) -> std::io::Result<Output> {
|
|
let Ok(pid) = i32::try_from(child.id()) else {
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
panic!("Cannot send SIGTERM to child: PID {} too large", child.id());
|
|
};
|
|
unsafe { libc::kill(pid, signal) };
|
|
child.wait_with_output()
|
|
}
|
|
|
|
fn trigger(mut child: Child, sentinel: &Path) -> std::io::Result<Output> {
|
|
if let Err(e) = std::fs::File::create(sentinel) {
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
panic!("Failed to create sentinel file: {e}");
|
|
}
|
|
child.wait_with_output()
|
|
}
|
|
|
|
async fn uncordon_node(name: &str) -> Result<(), kube::Error> {
|
|
let client = Client::try_default().await?;
|
|
let nodes: Api<Node> = Api::all(client);
|
|
nodes.uncordon(name).await?;
|
|
Ok(())
|
|
}
|
|
|
|
|
|
#[tokio::test]
|
|
async fn test_success() {
|
|
let _lock = &*LOCK.lock().await;
|
|
|
|
delete_lease("reboot-lock-default").await;
|
|
let sentinel = new_sentinel("a278f32c-6cf9-4890-b50e-06d3f0c5259c");
|
|
let node_name = "7bc09505-ebd2-432a-ab05-a83d309c501a";
|
|
let child = run_it(node_name, &sentinel, None).unwrap();
|
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
|
let output = trigger(child, &sentinel).unwrap();
|
|
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
|
|
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
|
|
let lease = get_lease("reboot-lock-default").await.unwrap();
|
|
assert_eq!(
|
|
lease.spec.unwrap().holder_identity.as_deref(),
|
|
Some(node_name)
|
|
)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_alt_group() {
|
|
let _lock = &*LOCK.lock().await;
|
|
|
|
delete_lease("reboot-lock-group2").await;
|
|
let sentinel = new_sentinel("fc7cb011-f9ed-46ff-a310-76278bbd21de");
|
|
let node_name = "71124880-f4d4-44ff-a18d-f301837e0907";
|
|
let child = run_it(node_name, &sentinel, Some("group2")).unwrap();
|
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
|
let output = trigger(child, &sentinel).unwrap();
|
|
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
|
|
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
|
|
let lease = get_lease("reboot-lock-group2").await.unwrap();
|
|
assert_eq!(
|
|
lease.spec.unwrap().holder_identity.as_deref(),
|
|
Some(node_name)
|
|
)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_terminated() {
|
|
let _lock = &*LOCK.lock().await;
|
|
|
|
delete_lease("reboot-lock-default").await;
|
|
let sentinel = new_sentinel("36cdc91b-a2d7-4845-a726-721ecc9787a7");
|
|
let node_name = "a763c92e-5db7-4806-8dd0-e1a72d42b022";
|
|
let child = run_it(node_name, &sentinel, None).unwrap();
|
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
|
let output = signal(child, libc::SIGTERM).unwrap();
|
|
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
|
|
assert_eq!(str::from_utf8(&output.stdout), Ok(""));
|
|
assert!(matches!(
|
|
get_lease("reboot-lock-default").await,
|
|
Err(kube::Error::Api(e)) if e.code == 404
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_not_locked() {
|
|
let _lock = &*LOCK.lock().await;
|
|
|
|
delete_lease("reboot-lock-default").await;
|
|
let sentinel = new_sentinel("115041f9-abef-496c-abc2-df7c9fbcc046");
|
|
let node = get_a_node().await.unwrap();
|
|
let node_name = node.metadata.name.unwrap();
|
|
let child = run_it(&node_name, &sentinel, None).unwrap();
|
|
std::thread::sleep(std::time::Duration::from_millis(250));
|
|
let output = trigger(child, &sentinel).unwrap();
|
|
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
|
|
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
|
|
let lease = get_lease("reboot-lock-default").await.unwrap();
|
|
assert_eq!(
|
|
lease.spec.unwrap().holder_identity.as_ref(),
|
|
Some(&node_name)
|
|
);
|
|
let node = get_node_by_name(&node_name).await.unwrap();
|
|
let pods = get_pods_on_node(&node_name).await.unwrap();
|
|
uncordon_node(&node_name).await.unwrap();
|
|
assert!(
|
|
node.spec
|
|
.unwrap()
|
|
.taints
|
|
.unwrap()
|
|
.iter()
|
|
.any(|t| t.key == "node.kubernetes.io/unschedulable"
|
|
&& t.effect == "NoSchedule")
|
|
);
|
|
assert_eq!(
|
|
pods.iter()
|
|
.filter(|p| {
|
|
!p.metadata
|
|
.owner_references
|
|
.clone()
|
|
.unwrap_or_default()
|
|
.iter()
|
|
.any(|o| o.kind == "DaemonSet")
|
|
})
|
|
.count(),
|
|
0
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_node_locked() {
|
|
let _lock = &*LOCK.lock().await;
|
|
|
|
delete_lease("reboot-lock-default").await;
|
|
let sentinel = new_sentinel("e5a2fc90-4418-4556-a039-32a2425b1bd9");
|
|
let node = get_a_node().await.unwrap();
|
|
let node_name = node.metadata.name.unwrap();
|
|
create_lease("reboot-lock-default", &node_name)
|
|
.await
|
|
.unwrap();
|
|
cordon_node(&node_name).await.unwrap();
|
|
let child = run_it(&node_name, &sentinel, None).unwrap();
|
|
std::thread::sleep(std::time::Duration::from_millis(500));
|
|
let lease = get_lease("reboot-lock-default").await.unwrap();
|
|
assert_eq!(lease.spec.unwrap().holder_identity.as_ref(), None,);
|
|
let node = get_node_by_name(&node_name).await.unwrap();
|
|
assert!(
|
|
!node.spec
|
|
.unwrap()
|
|
.taints
|
|
.unwrap_or_default()
|
|
.iter()
|
|
.any(|t| t.key == "node.kubernetes.io/unschedulable"
|
|
&& t.effect == "NoSchedule")
|
|
);
|
|
let output = trigger(child, &sentinel).unwrap();
|
|
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
|
|
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
|
|
let lease = get_lease("reboot-lock-default").await.unwrap();
|
|
assert_eq!(
|
|
lease.spec.unwrap().holder_identity.as_ref(),
|
|
Some(&node_name)
|
|
);
|
|
let node = get_node_by_name(&node_name).await.unwrap();
|
|
let pods = get_pods_on_node(&node_name).await.unwrap();
|
|
uncordon_node(&node_name).await.unwrap();
|
|
assert!(
|
|
node.spec
|
|
.unwrap()
|
|
.taints
|
|
.unwrap()
|
|
.iter()
|
|
.any(|t| t.key == "node.kubernetes.io/unschedulable"
|
|
&& t.effect == "NoSchedule")
|
|
);
|
|
assert_eq!(
|
|
pods.iter()
|
|
.filter(|p| {
|
|
!p.metadata
|
|
.owner_references
|
|
.clone()
|
|
.unwrap_or_default()
|
|
.iter()
|
|
.any(|o| o.kind == "DaemonSet")
|
|
})
|
|
.count(),
|
|
0
|
|
);
|
|
}
|