Rewrite to run directly on nodes
All checks were successful
dustin/k8s-reboot-coordinator/pipeline/head This commit looks good

After some initial testing, I decided that the HTTP API approach to
managing the reboot lock is not going to work.  I originally implemented
it this way so that the reboot process on the nodes could stay the same
as it had always been, only adding a systemd unit to interact with the
server to obtain the lock and drain the node.  Unfortunately, this does
not actually work in practice because there is no way to ensure that the
new unit runs _first_ during the shutdown process.  In fact, systemd
practically _insists_ on stopping all running containers before any
other units.  The only solution, therefore, is to obtain the reboot lock
and drain the node before initiating the actual shutdown procedure.

I briefly considered installing a script on each node to handle all of
this, and configuring _dnf-automatic_ to run that.  I decided against
that, though, as I would prefer to have as much of the node
configuration managed by Kubnernetes as possible;  I don't want to have
to maintain that script with Ansible.

I decided that the best way to resolve these issues was to rewrite the
coordinator as a daemon that runs on every node.  It waits for a
sentinel file to appear (`/run/reboot-needed` by default), and then
tries to obtain the reboot lock, drain the node, and reboot the machine.
All of the logic is contained in the daemon and deployed by Kubernetes;
the only change that has to be deployed by Ansible is configuring
_dnf-automatic_ to run `touch /run/reboot-needed` instead of `shutdown
-r +5`.

This implementation is heavily inspired by [kured](https://kured.dev).
Both rely on a sentinel file to trigger the reboot, but Kured uses a
naive polling method for detecting it, which either means wasting a lot
of CPU checking frequently, or introducing large delays by checking
infrequently.  Kured also implements the reboot lock without using a
Lease, which may or may not be problematic if multiple nodes try to
reboot simultaneously.
This commit is contained in:
2025-10-08 07:33:56 -05:00
parent d4638239b3
commit 40e55a984b
16 changed files with 1334 additions and 1404 deletions

833
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,12 +4,18 @@ version = "0.1.0"
edition = "2024"
[dependencies]
futures = { version = "0.3.31", default-features = false, features = ["std"] }
hostname = "0.4.1"
inotify = "0.11.0"
k8s-openapi = { version = "0.26.0", features = ["earliest"] }
kube = "2.0.1"
rocket = { version = "0.5.1", default-features = false }
tokio = { version = "1.47.1", default-features = false, features = ["macros", "rt", "signal"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
[dev-dependencies]
kube = { version = "2.0.1", features = ["runtime"] }
kube-runtime = "2.0.1"
libc = "0.2.176"
mockito = "1.7.0"
serde_json = "1.0.145"

View File

@@ -11,10 +11,12 @@ RUN cargo build --release --locked
RUN strip target/release/k8s-reboot-coordinator
FROM scratch
FROM docker.io/library/busybox
COPY --from=build /src/target/release/k8s-reboot-coordinator /
ENV ROCKET_CLI_COLORS=false
ENV REBOOT_SENTINEL=/host/run/reboot-needed
ENTRYPOINT ["/k8s-reboot-coordinator"]
CMD ["nsenter", "-t", "1", "-m", "-n", "/bin/systemctl", "reboot", "--when", "+5 min"]

View File

@@ -1,5 +1,5 @@
apiVersion: apps/v1
kind: Deployment
kind: DaemonSet
metadata:
name: k8s-reboot-coordinator
labels:
@@ -27,24 +27,32 @@ spec:
env:
- name: RUST_LOG
value: info
- name: ROCKET_ADDRESS
value: 0.0.0.0
startupProbe:
httpGet:
path: /healthz
port: http
periodSeconds: 1
failureThreshold: 30
readinessProbe:
httpGet:
path: /healthz
port: http
periodSeconds: 600
failureThreshold: 3
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
capabilities:
add:
- CAP_DAC_READ_SEARCH
- CAP_SYS_CHROOT
- CAP_SYS_ADMIN
drop:
- ALL
privileged: true
readOnlyRootFilesystem: true
volumeMounts:
- mountPath: /host
name: host
readOnly: true
hostPID: true
securityContext:
runAsUser: 15473
runAsGroup: 15473
runAsNonRoot: true
runAsUser: 0
runAsGroup: 0
runAsNonRoot: false
serviceAccountName: k8s-reboot-coordinator
volumes:
- name: host
hostPath:
path: /
type: Directory

View File

@@ -3,4 +3,4 @@ kind: Kustomization
resources:
- rbac.yaml
- deployment.yaml
- daemonset.yaml

52
src/backoff.rs Normal file
View File

@@ -0,0 +1,52 @@
use std::time::Duration;
pub struct Backoff {
duration: Duration,
max: Duration,
}
impl Backoff {
pub fn new(duration: Duration, max: Duration) -> Self {
Self { duration, max }
}
pub fn next(&mut self) -> Duration {
let duration = self.duration;
self.duration = std::cmp::min(self.duration * 2, self.max);
duration
}
}
impl Default for Backoff {
fn default() -> Self {
Self::new(Duration::from_millis(250), Duration::from_secs(300))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_first() {
let mut backoff = Backoff::default();
assert_eq!(backoff.next(), Duration::from_millis(250));
}
#[test]
fn test_second() {
let mut backoff = Backoff::default();
backoff.next();
assert_eq!(backoff.next(), Duration::from_millis(500));
}
#[test]
fn test_max() {
let mut backoff =
Backoff::new(Duration::from_millis(250), Duration::from_secs(1));
backoff.next();
backoff.next();
backoff.next();
assert_eq!(backoff.next(), Duration::from_secs(1));
}
}

84
src/context.rs Normal file
View File

@@ -0,0 +1,84 @@
use std::ffi::OsString;
use std::path::{PathBuf, Path};
use tracing::error;
#[derive(Debug)]
pub struct Context {
reboot_cmd: Vec<OsString>,
lock_group: String,
node_name: String,
sentinel_path: PathBuf,
}
impl Context {
fn default_reboot_command() -> Vec<OsString> {
let cmd = ["systemctl", "reboot"];
cmd.iter().map(OsString::from).collect()
}
fn default_lock_group() -> String {
"default".into()
}
}
impl Context {
pub fn from_env() -> Self {
let reboot_cmd: Vec<_> = std::env::args_os().skip(1).collect();
Self {
reboot_cmd: if reboot_cmd.is_empty() {
Self::default_reboot_command()
} else {
reboot_cmd
},
lock_group: get_env("REBOOT_LOCK_GROUP", Self::default_lock_group),
node_name: get_env("NODE_NAME", || {
hostname::get()
.inspect_err(|e| error!("Error getting hostname: {e}"))
.ok()
.and_then(|s| {
s.into_string()
.inspect_err(|_| error!("Invalid hostname: not a valid Unicode value"))
.ok()
})
.unwrap_or_else(|| "localhost".into())
}),
sentinel_path: PathBuf::from(get_env("REBOOT_SENTINEL", || {
"/run/reboot-needed".into()
})),
}
}
pub fn lock_group(&self) -> &str {
self.lock_group.as_ref()
}
pub fn node_name(&self) -> &str {
self.node_name.as_ref()
}
pub fn reboot_cmd(&self) -> &[OsString] {
self.reboot_cmd.as_ref()
}
pub fn sentinel_path(&self) -> &Path {
self.sentinel_path.as_ref()
}
}
fn get_env<D>(name: &str, default: D) -> String
where
D: FnOnce() -> String,
{
use std::env::VarError;
match std::env::var(name) {
Ok(v) => v,
Err(VarError::NotPresent) => default(),
Err(VarError::NotUnicode(_)) => {
error!(
"Invalid Unicode value for environment variable {name}, using default"
);
default()
},
}
}

View File

@@ -1,11 +1,10 @@
use std::collections::{HashMap, HashSet};
use futures::stream::{BoxStream, StreamExt};
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::Client;
use kube::api::{Api, ListParams, WatchEvent, WatchParams};
use rocket::futures::stream::{BoxStream, StreamExt};
use rocket::tokio;
use rocket::tokio::sync::mpsc::{self, Receiver};
use tokio::sync::mpsc::{self, Receiver};
use tracing::{debug, error, info, trace, warn};
async fn wait_drained(

View File

@@ -1,28 +0,0 @@
mod drain;
mod lock;
use rocket::Request;
use rocket::http::Status;
#[rocket::catch(default)]
fn not_found(status: Status, _req: &Request) -> String {
match status.reason() {
Some(s) => format!("{s}\n"),
None => format!("{}\n", status.code),
}
}
#[rocket::get("/healthz")]
fn healthz() -> &'static str {
"UP"
}
#[rocket::launch]
pub fn rocket() -> _ {
rocket::build()
.mount(
"/",
rocket::routes![healthz, lock::lock_v1, lock::unlock_v1],
)
.register("/", rocket::catchers![not_found])
}

View File

@@ -1,25 +1,16 @@
use futures::stream::{StreamExt, TryStreamExt};
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, Patch, PatchParams, WatchEvent, WatchParams};
use rocket::form::Form;
use rocket::futures::{StreamExt, TryStreamExt};
use rocket::http::Status;
use rocket::request::{self, FromRequest, Request};
use tracing::{error, info, trace, warn};
use crate::drain;
use crate::backoff::Backoff;
#[derive(Debug, rocket::Responder)]
#[derive(Debug)]
pub enum LockError {
#[response(status = 500, content_type = "plain")]
ServerError(String),
#[response(status = 400, content_type = "plain")]
InvalidHeader(String),
#[response(status = 409, content_type = "plain")]
Conflict(String),
#[response(status = 422, content_type = "plain")]
FormError(String),
}
impl From<kube::Error> for LockError {
@@ -29,52 +20,20 @@ impl From<kube::Error> for LockError {
}
}
impl From<InvalidHeader> for LockError {
fn from(_h: InvalidHeader) -> Self {
Self::InvalidHeader("Invalid lock header\n".into())
}
}
impl From<rocket::form::Errors<'_>> for LockError {
fn from(errors: rocket::form::Errors<'_>) -> Self {
let mut message = String::from("Error processing request:\n");
for error in errors {
if let Some(name) = error.name {
message.push_str(&format!("{name}: "));
}
message.push_str(&error.kind.to_string());
message.push('\n');
}
Self::FormError(message)
}
}
pub struct LockRequestHeader;
#[derive(Debug)]
pub struct InvalidHeader;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for LockRequestHeader {
type Error = InvalidHeader;
async fn from_request(
req: &'r Request<'_>,
) -> request::Outcome<Self, Self::Error> {
match req.headers().get_one("K8s-Reboot-Lock") {
Some("lock") => request::Outcome::Success(Self),
_ => request::Outcome::Error((Status::BadRequest, InvalidHeader)),
impl std::fmt::Display for LockError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::ServerError(e) => write!(f, "{e}"),
Self::Conflict(e) => write!(f, "{e}"),
}
}
}
#[derive(rocket::FromForm)]
pub struct LockRequest {
hostname: String,
#[field(default = String::from("default"))]
group: String,
#[field(default = true)]
wait: bool,
impl std::error::Error for LockError {}
async fn get_lease(client: Client, name: &str) -> Result<Lease, kube::Error> {
let leases: Api<Lease> = Api::default_namespaced(client);
leases.get(name).await
}
async fn update_lease(
@@ -102,94 +61,554 @@ async fn wait_lease(client: Client, name: &str) -> Result<(), kube::Error> {
let leases: Api<Lease> = Api::default_namespaced(client);
let params =
WatchParams::default().fields(&format!("metadata.name={name}"));
let mut stream = leases.watch(&params, "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
trace!("Watch lease event: {event:?}");
match event {
WatchEvent::Added(l) | WatchEvent::Modified(l) => match l.spec {
Some(spec) if spec.holder_identity.is_some() => (),
_ => break,
},
WatchEvent::Bookmark(_) => (),
WatchEvent::Deleted(_) => break,
WatchEvent::Error(e) => return Err(kube::Error::Api(e)),
let mut backoff = Backoff::default();
loop {
let mut stream = leases.watch(&params, "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
trace!("Watch lease event: {event:?}");
match event {
WatchEvent::Added(l) | WatchEvent::Modified(l) => {
match l.spec {
Some(spec) if spec.holder_identity.is_some() => (),
_ => return Ok(()),
}
},
WatchEvent::Bookmark(_) => (),
WatchEvent::Deleted(_) => return Ok(()),
WatchEvent::Error(e) => return Err(kube::Error::Api(e)),
}
}
warn!("Lost watch stream");
tokio::time::sleep(backoff.next()).await;
}
Ok(())
}
#[rocket::post("/api/v1/lock", data = "<data>")]
pub async fn lock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<String, LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
loop {
match update_lease(
client.clone(),
&lock_name,
&data.hostname,
Some(&data.hostname),
)
.await
{
Ok(_) => break,
Err(kube::Error::Api(e)) => {
if e.code == 409 {
warn!("Lock already held: {}", e.message);
if !data.wait {
return Err(LockError::Conflict(format!(
"Another system is already rebooting: {}\n",
e.message,
)));
} else {
info!("Waiting for lease {lock_name}");
if let Err(e) =
wait_lease(client.clone(), &lock_name).await
{
error!("Error while waiting for lease: {e}");
pub struct Lock {
client: Client,
pub name: String,
pub holder: String,
}
impl Lock {
pub fn new<G: AsRef<str>, H: Into<String>>(
client: Client,
group: G,
hostname: H,
) -> Self {
Self {
client,
name: format!("reboot-lock-{}", group.as_ref()),
holder: hostname.into(),
}
}
pub async fn acquire(&self, wait: bool) -> Result<(), LockError> {
loop {
match update_lease(
self.client.clone(),
&self.name,
&self.holder,
Some(&self.holder),
)
.await
{
Ok(_) => break,
Err(kube::Error::Api(e)) => {
if e.code == 409 {
warn!("Lock already held: {}", e.message);
if !wait {
return Err(LockError::Conflict(format!(
"Another system is already rebooting: {}\n",
e.message,
)));
} else {
info!("Waiting for lease {}", self.name);
if let Err(e) =
wait_lease(self.client.clone(), &self.name)
.await
{
error!("Error while waiting for lease: {e}");
}
}
} else {
return Err(kube::Error::Api(e).into());
}
},
Err(e) => return Err(e.into()),
}
}
Ok(())
}
pub async fn is_held(&self) -> Result<bool, LockError> {
match get_lease(self.client.clone(), &self.name).await {
Ok(l) => {
if let Some(spec) = l.spec {
if let Some(holder) = spec.holder_identity {
Ok(holder == self.holder)
} else {
Ok(false)
}
} else {
return Err(kube::Error::Api(e).into());
Ok(false)
}
},
Err(e) => return Err(e.into()),
Err(kube::Error::Api(e)) if e.code == 404 => Ok(false),
Err(e) => Err(e)?,
}
}
if let Err(e) = drain::cordon_node(client.clone(), &data.hostname).await {
error!("Failed to cordon node {}: {e}", data.hostname);
} else if let Err(e) =
drain::drain_node(client.clone(), &data.hostname).await
{
error!("Failed to drain node {}: {e}", data.hostname);
pub async fn release(&self) -> Result<(), LockError> {
update_lease(self.client.clone(), &self.name, &self.holder, None)
.await?;
Ok(())
}
Ok(format!(
"Acquired reboot lock for group {}, host {}\n",
data.group, data.hostname
))
}
#[rocket::post("/api/v1/unlock", data = "<data>")]
pub async fn unlock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<(), LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
update_lease(client.clone(), &lock_name, &data.hostname, None).await?;
if let Err(e) = drain::uncordon_node(client.clone(), &data.hostname).await
{
error!("Failed to uncordon node {}: {e}", data.hostname);
#[cfg(test)]
mod test {
use super::*;
use mockito::Matcher;
use serde_json::json;
async fn mock_client() -> (mockito::ServerGuard, Client) {
let server = mockito::Server::new_async().await;
let config = kube::Config::new(server.url().parse().unwrap());
let client = Client::try_from(config).unwrap();
(server, client)
}
fn make_lease(holder: Option<&str>) -> serde_json::Value {
let mut lease = json!({
"kind": "Lease",
"apiVersion": "coordination.k8s.io/v1",
"metadata": {
"name": "reboot-lock-default",
"namespace": "default",
"uid": "a0eb6ab0-94d3-4973-8091-0b670876a750",
"resourceVersion": "24015",
"creationTimestamp": "2025-09-20T13:18:22Z",
"managedFields": [
]
},
"spec": {}
});
if let Some(holder) = holder {
lease["spec"]["holderIdentity"] = holder.into();
let mf =
lease["metadata"]["managedFields"].as_array_mut().unwrap();
mf.push(json!({
"manager": holder,
"operation": "Apply",
"apiVersion": "coordination.k8s.io/v1",
"time": "2025-09-20T13:18:22Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:spec": {
"f:holderIdentity": {}
}
}
}
))
}
lease
}
fn leases_url_path(namespace: &str) -> String {
format!("/apis/coordination.k8s.io/v1/namespaces/{namespace}/leases")
}
fn lease_url_path(namespace: &str, name: &str) -> String {
format!("{}/{name}", leases_url_path(namespace))
}
#[tokio::test]
async fn test_get_lease_notfound() {
let (mut server, client) = mock_client().await;
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(404)
.create();
let lease = get_lease(client, "reboot-lock-default").await;
mock_lease.assert();
assert!(matches!(lease, Err(kube::Error::Api(e)) if e.code == 404));
}
#[tokio::test]
async fn test_get_lease_success() {
let (mut server, client) = mock_client().await;
let lease_json = make_lease(None);
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(200)
.with_header("Content-Type", "application/json")
.with_body(lease_json.to_string())
.create();
let lease = get_lease(client, "reboot-lock-default").await.unwrap();
mock_lease.assert();
assert_eq!(
lease.metadata.name.as_deref(),
Some("reboot-lock-default")
);
}
#[tokio::test]
async fn test_lock_is_held_notfound() {
let (mut server, client) = mock_client().await;
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(404)
.create();
let lock = Lock::new(client, "default", "test1.example.org");
let is_held = lock.is_held().await.unwrap();
mock_lease.assert();
assert!(!is_held);
}
#[tokio::test]
async fn test_lock_is_held_no_holder() {
let (mut server, client) = mock_client().await;
let lease_json = make_lease(None);
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(200)
.with_header("Content-Type", "application/json")
.with_body(lease_json.to_string())
.create();
let lock = Lock::new(client, "default", "test1.example.org");
let is_held = lock.is_held().await.unwrap();
mock_lease.assert();
assert!(!is_held);
}
#[tokio::test]
async fn test_lock_is_held_other_holder() {
let (mut server, client) = mock_client().await;
let lease_json = make_lease(Some("test1.example.org"));
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(200)
.with_header("Content-Type", "application/json")
.with_body(lease_json.to_string())
.create();
let lock = Lock::new(client, "default", "test2.example.org");
let is_held = lock.is_held().await.unwrap();
mock_lease.assert();
assert!(!is_held);
}
#[tokio::test]
async fn test_lock_is_held_true() {
let (mut server, client) = mock_client().await;
let lease_json = make_lease(Some("test1.example.org"));
let mock_lease = server
.mock("GET", &*lease_url_path("default", "reboot-lock-default"))
.with_status(200)
.with_body(lease_json.to_string())
.create();
let lock = Lock::new(client, "default", "test1.example.org");
let is_held = lock.is_held().await.unwrap();
mock_lease.assert();
assert!(is_held);
}
#[tokio::test]
async fn test_lock_acquire_success() {
let (mut server, client) = mock_client().await;
let lease_json = make_lease(Some("test1.example.org"));
let mock_lease = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test1.example.org".into(),
))
.match_body(
&*json!({
"apiVersion": "coordination.k8s.io/v1",
"kind": "Lease",
"metadata": {
"name": "reboot-lock-default"
},
"spec": {
"holderIdentity": "test1.example.org"
}
})
.to_string(),
)
.with_status(200)
.with_body(lease_json.to_string())
.create();
let lock = Lock::new(client, "default", "test1.example.org");
let result = lock.acquire(false).await;
mock_lease.assert();
assert!(matches!(result, Ok(())));
}
#[tokio::test]
async fn test_lock_acquire_conflict() {
let (mut server, client) = mock_client().await;
let mock_lease = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test1.example.org".into(),
))
.match_body(
&*json!({
"apiVersion": "coordination.k8s.io/v1",
"kind": "Lease",
"metadata": {
"name": "reboot-lock-default"
},
"spec": {
"holderIdentity": "test1.example.org"
}
})
.to_string(),
)
.with_status(409)
.create();
let lock = Lock::new(client, "default", "test1.example.org");
let result = lock.acquire(false).await;
mock_lease.assert();
assert!(matches!(result, Err(LockError::Conflict(_))));
}
#[tokio::test]
async fn test_lock_acquire_wait_deleted() {
use std::sync::{Arc, Mutex};
let _tsdg = crate::test::setup();
let orig_lease = make_lease(Some("test1.example.org"));
let new_lease = make_lease(Some("test8.example.org"));
let (mut server, client) = mock_client().await;
let x1 = Arc::new(Mutex::new(true));
let x2 = x1.clone();
let x3 = x1.clone();
let patch1 = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test8.example.org".into(),
))
.match_request(move |_| *x1.lock().unwrap())
.with_status(409)
.create();
let wait = server
.mock("GET", &*leases_url_path("default"))
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("watch".into(), "true".into()),
Matcher::UrlEncoded("timeoutSeconds".into(), "290".into()),
Matcher::UrlEncoded(
"fieldSelector".into(),
"metadata.name=reboot-lock-default".into(),
),
Matcher::UrlEncoded(
"allowWatchBookmarks".into(),
"true".into(),
),
Matcher::UrlEncoded("resourceVersion".into(), "0".into()),
]))
.with_status(200)
.with_chunked_body(move |w| {
w.write_all(
json!({
"type": "ADDED",
"object": orig_lease
})
.to_string()
.as_bytes(),
)?;
w.write_all("\n".as_bytes())?;
std::thread::sleep(std::time::Duration::from_millis(250));
w.write_all(
json!({
"type": "DELETED",
"object": orig_lease
})
.to_string()
.as_bytes(),
)?;
w.write_all("\n".as_bytes())?;
*x2.lock().unwrap() = false;
Ok(())
})
.create();
let patch2 = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test8.example.org".into(),
))
.match_request(move |_| !*x3.lock().unwrap())
.with_status(200)
.with_body(new_lease.to_string())
.create();
let lock = Lock::new(client, "default", "test8.example.org");
let result = tokio::time::timeout(
std::time::Duration::from_secs(1),
lock.acquire(true),
)
.await;
patch1.assert();
wait.assert();
patch2.assert();
match result {
Err(e) => panic!("{e}"),
Ok(Err(e)) => panic!("{e}"),
Ok(_) => (),
};
}
#[tokio::test]
async fn test_lock_acquire_wait_released() {
use std::sync::{Arc, Mutex};
let _tsdg = crate::test::setup();
let orig_lease = make_lease(Some("test1.example.org"));
let free_lease = make_lease(None);
let new_lease = make_lease(Some("test9.example.org"));
let (mut server, client) = mock_client().await;
let x1 = Arc::new(Mutex::new(true));
let x2 = x1.clone();
let x3 = x1.clone();
let patch1 = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test9.example.org".into(),
))
.match_request(move |_| *x1.lock().unwrap())
.with_status(409)
.create();
let wait = server
.mock("GET", &*leases_url_path("default"))
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("watch".into(), "true".into()),
Matcher::UrlEncoded("timeoutSeconds".into(), "290".into()),
Matcher::UrlEncoded(
"fieldSelector".into(),
"metadata.name=reboot-lock-default".into(),
),
Matcher::UrlEncoded(
"allowWatchBookmarks".into(),
"true".into(),
),
Matcher::UrlEncoded("resourceVersion".into(), "0".into()),
]))
.with_status(200)
.with_chunked_body(move |w| {
w.write_all(
json!({
"type": "ADDED",
"object": orig_lease
})
.to_string()
.as_bytes(),
)?;
w.write_all("\n".as_bytes())?;
std::thread::sleep(std::time::Duration::from_millis(250));
w.write_all(
json!({
"type": "MODIFIED",
"object": free_lease
})
.to_string()
.as_bytes(),
)?;
w.write_all("\n".as_bytes())?;
*x2.lock().unwrap() = false;
Ok(())
})
.create();
let patch2 = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test9.example.org".into(),
))
.match_request(move |_| !*x3.lock().unwrap())
.with_status(200)
.with_body(new_lease.to_string())
.create();
let lock = Lock::new(client, "default", "test9.example.org");
let result = tokio::time::timeout(
std::time::Duration::from_secs(1),
lock.acquire(true),
)
.await;
patch1.assert();
wait.assert();
patch2.assert();
match result {
Err(e) => panic!("{e}"),
Ok(Err(e)) => panic!("{e}"),
Ok(_) => (),
};
}
#[tokio::test]
async fn test_lock_release_created() {
let _tsdg = crate::test::setup();
let (mut server, client) = mock_client().await;
let lease = make_lease(None);
let mock = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test4.example.org".into(),
))
.with_status(201)
.with_body(lease.to_string())
.create();
let lock = Lock::new(client, "default", "test4.example.org");
let result = lock.release().await;
mock.assert();
if result.is_err() {
panic!("{result:?}");
}
}
#[tokio::test]
async fn test_lock_release_owned() {
let _tsdg = crate::test::setup();
let (mut server, client) = mock_client().await;
let lease = make_lease(Some("test4.example.org"));
let mock = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test4.example.org".into(),
))
.with_status(200)
.with_body(lease.to_string())
.create();
let lock = Lock::new(client, "default", "test4.example.org");
let result = lock.release().await;
mock.assert();
if result.is_err() {
panic!("{result:?}");
}
}
#[tokio::test]
async fn test_lock_release_not_owned() {
let _tsdg = crate::test::setup();
let (mut server, client) = mock_client().await;
let lease = make_lease(Some("test1.example.org"));
let mock = server
.mock("PATCH", &*lease_url_path("default", "reboot-lock-default"))
.match_query(Matcher::UrlEncoded(
"fieldManager".into(),
"test4.example.org".into(),
))
.with_status(200)
.with_body(lease.to_string())
.create();
let lock = Lock::new(client, "default", "test4.example.org");
let result = lock.release().await;
mock.assert();
if result.is_err() {
panic!("{result:?}");
}
}
Ok(())
}

View File

@@ -1,9 +1,168 @@
#[rocket::launch]
fn rocket() -> _ {
mod backoff;
mod context;
mod drain;
mod lock;
#[cfg(test)]
mod test;
use std::ffi::OsString;
use std::io::ErrorKind;
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::process::Command;
use std::time::Duration;
use inotify::{EventMask, Inotify, WatchMask};
use tokio::signal::unix::{SignalKind, signal};
use tracing::{debug, error, info};
use crate::backoff::Backoff;
use crate::context::Context;
#[tokio::main(flavor = "current_thread")]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.init();
k8s_reboot_coordinator::rocket()
let client = match kube::Client::try_default().await {
Ok(c) => c,
Err(e) => {
error!("Failed to initialize Kubernetes client: {e}");
std::process::exit(1);
},
};
let task = tokio::spawn(async move {
if let Err(e) = inner_main(client).await {
error!("Unexpected error: {e}");
}
});
if let Err(e) = wait_signalled().await {
error!("Error setting up signal handlers: {e}");
std::process::exit(1);
}
task.abort();
if let Err(e) = task.await {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
}
}
}
async fn inner_main(
client: kube::Client,
) -> Result<(), Box<dyn std::error::Error>> {
let ctx = Context::from_env();
let lock =
lock::Lock::new(client.clone(), ctx.lock_group(), ctx.node_name());
release_lock(client.clone(), &lock).await;
wait_sentinel(&ctx).await?;
acquire_lock(client.clone(), &lock, &ctx).await;
info!("Initiating node reboot");
exec_cmd(ctx.reboot_cmd());
unreachable!();
}
async fn acquire_lock(client: kube::Client, lock: &lock::Lock, ctx: &Context) {
let mut backoff = Backoff::default();
info!("Waiting to acquire reboot lock");
while let Err(e) = lock.acquire(true).await {
error!("Failed to acquire reboot lock: {e}");
tokio::time::sleep(backoff.next()).await;
}
if let Err(e) = drain::cordon_node(client.clone(), ctx.node_name()).await {
error!("Error cordoning node: {e}");
} else if let Err(e) =
drain::drain_node(client.clone(), ctx.node_name()).await
{
error!("Error draining node: {e}");
}
}
fn exec_cmd(cmd: &[OsString]) {
let program = &cmd[0];
let args = &cmd[1..];
let error = Command::new(program).args(args).exec();
let rc = match error.kind() {
ErrorKind::NotFound => 127,
ErrorKind::PermissionDenied => 126,
_ => 1,
};
eprintln!("{}: {error}", program.to_string_lossy());
std::process::exit(rc);
}
async fn release_lock(client: kube::Client, lock: &lock::Lock) {
let mut backoff =
Backoff::new(Duration::from_millis(50), Duration::from_secs(10));
loop {
match lock.is_held().await {
Ok(false) => {
debug!("Did not hold reboot lock");
return;
},
Ok(true) => break,
Err(e) => {
error!("Failed to check lock status: {e}");
},
}
tokio::time::sleep(backoff.next()).await;
}
info!("Releasing reboot lock");
if let Err(e) = lock.release().await {
error!("Failed to release reboot lock: {e}");
} else if let Err(e) = drain::uncordon_node(client, &lock.holder).await {
error!("Failed to uncordon node: {e}");
}
}
async fn wait_signalled() -> std::io::Result<()> {
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM"),
_ = sigint.recv() => info!("Received SIGINT"),
}
Ok(())
}
async fn wait_sentinel(context: &Context) -> std::io::Result<()> {
use futures::TryStreamExt;
if context.sentinel_path().metadata().is_ok() {
info!("Sentinel file already exists");
return Ok(());
}
let inotify = Inotify::init()?;
let watch_dir = context
.sentinel_path()
.parent()
.unwrap_or_else(|| Path::new("/"));
debug!(
"Watching for sentinel file: {}",
context.sentinel_path().display()
);
inotify.watches().add(watch_dir, WatchMask::CREATE)?;
let mut buf = [0; 1024];
let mut stream = inotify.into_event_stream(&mut buf)?;
while let Some(evt) = stream.try_next().await? {
match evt {
inotify::Event {
mask: EventMask::CREATE,
wd: _,
cookie: _,
name,
} if name.as_deref() == context.sentinel_path().file_name() => {
info!("Reboot sentinel appeared");
break;
},
_ => (),
}
}
Ok(())
}

9
src/test.rs Normal file
View File

@@ -0,0 +1,9 @@
pub(crate) fn setup() -> tracing::subscriber::DefaultGuard {
use tracing::Level;
tracing::subscriber::set_default(
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.with_test_writer()
.finish(),
)
}

View File

@@ -1,21 +0,0 @@
use rocket::http::Status;
#[test]
fn test_healthz() {
super::setup();
let client = super::client();
let response = client.get("/healthz").dispatch();
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string(), Some("UP".into()));
}
#[test]
fn test_not_found() {
super::setup();
let client = super::client();
let response = client.get("/bogus").dispatch();
assert_eq!(response.status(), Status::NotFound);
assert_eq!(response.into_string(), Some("Not Found\n".into()));
}

View File

@@ -1,461 +0,0 @@
use std::sync::LazyLock;
use k8s_openapi::api::coordination::v1::Lease;
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::Client;
use kube::api::{Api, ListParams};
use rocket::async_test;
use rocket::futures::FutureExt;
use rocket::http::{ContentType, Header, Status};
use rocket::tokio;
use rocket::tokio::sync::Mutex;
static LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
async fn delete_lease(name: &str) {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
let _ = kube::runtime::wait::delete::delete_and_finalize(
leases,
name,
&Default::default(),
)
.await;
}
async fn get_lease(name: &str) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
leases.get(name).await
}
async fn get_a_node() -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
Ok(nodes.list(&Default::default()).await?.items.pop().unwrap())
}
async fn get_node_by_name(name: &str) -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.get(name).await
}
async fn get_pods_on_node(name: &str) -> Result<Vec<Pod>, kube::Error> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::all(client);
Ok(pods
.list(&ListParams::default().fields(&format!("spec.nodeName=={name}")))
.await?
.items)
}
#[async_test]
async fn test_lock_v1_success() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(
response.into_string().await.as_deref(),
Some(
"Acquired reboot lock for group default, host test1.example.org\n"
)
);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_custom_group() {
super::setup();
delete_lease("reboot-lock-testgroup").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org&group=testgroup")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(
response.into_string().await.as_deref(),
Some(
"Acquired reboot lock for group testgroup, host test1.example.org\n"
)
);
let lease = get_lease("reboot-lock-testgroup").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_conflict() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(
response.into_string().await.as_deref(),
Some(
"Acquired reboot lock for group default, host test1.example.org\n"
)
);
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org&wait=false")
.dispatch()
.await;
assert_eq!(response.status(), Status::Conflict);
let want_msg = concat!(
"Another system is already rebooting:",
" Apply failed with 1 conflict:",
" conflict with \"test1.example.org\":",
" .spec.holderIdentity",
"\n",
);
assert_eq!(response.into_string().await.as_deref(), Some(want_msg));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_conflict_wait() {
super::setup();
let _lock = &*LOCK.lock().await;
tracing::info!("Deleting existing lease");
delete_lease("reboot-lock-default").await;
tracing::info!("Creating first lease");
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(
response.into_string().await.as_deref(),
Some(
"Acquired reboot lock for group default, host test1.example.org\n"
)
);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let timer = std::time::Instant::now();
let _task = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(1))
.then(|_| async {
tracing::info!("Deleting first lease");
delete_lease("reboot-lock-default").await
})
.await
});
tracing::info!("Creating second lease");
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(
response.into_string().await.as_deref(),
Some(
"Acquired reboot lock for group default, host test2.example.org\n"
)
);
let duration = timer.elapsed().as_millis();
assert!(duration > 1000 && duration < 2000);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test2.example.org")
);
}
#[test]
fn test_lock_v1_no_header() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/lock")
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch();
assert_eq!(response.status(), Status::BadRequest);
assert_eq!(
response.into_string().as_deref(),
Some("Invalid lock header\n")
);
}
#[test]
fn test_lock_v1_no_data() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("")
.dispatch();
assert_eq!(response.status(), Status::UnprocessableEntity);
assert_eq!(
response.into_string().as_deref(),
Some("Error processing request:\nhostname: missing\n")
);
}
#[async_test]
async fn test_unlock_v1_success() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity, None);
}
#[async_test]
async fn test_unlock_v1_not_locked() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity.as_deref(), None);
}
#[async_test]
async fn test_unlock_v1_not_mine() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[test]
fn test_unlock_v1_no_header() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/unlock")
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch();
assert_eq!(response.status(), Status::BadRequest);
assert_eq!(
response.into_string().as_deref(),
Some("Invalid lock header\n")
);
}
#[test]
fn test_unlock_v1_no_data() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("")
.dispatch();
assert_eq!(response.status(), Status::UnprocessableEntity);
assert_eq!(
response.into_string().as_deref(),
Some("Error processing request:\nhostname: missing\n")
);
}
#[async_test]
async fn test_lock_v1_drain() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let node = get_a_node().await.unwrap();
let hostname = node.metadata.name.clone().unwrap();
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body(format!("hostname={hostname}"))
.dispatch()
.await;
let status = response.status();
assert_eq!(
response.into_string().await,
Some(format!(
"Acquired reboot lock for group default, host {hostname}\n"
))
);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_ref(),
Some(&hostname)
);
let node = get_node_by_name(&hostname).await.unwrap();
assert!(
node.spec
.unwrap()
.taints
.unwrap()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
let pods = get_pods_on_node(&hostname).await.unwrap();
assert_eq!(
pods.iter()
.filter(|p| {
!p.metadata
.owner_references
.clone()
.unwrap_or_default()
.iter()
.any(|o| o.kind == "DaemonSet")
})
.count(),
0
);
}
#[async_test]
async fn test_unlock_v1_uncordon() {
super::setup();
let _lock = &*LOCK.lock().await;
let node = get_a_node().await.unwrap();
let hostname = node.metadata.name.clone().unwrap();
let client = super::async_client().await;
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body(format!("hostname={hostname}"))
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None,);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_ne!(lease.spec.unwrap().holder_identity.as_ref(), Some(&hostname));
let node = get_node_by_name(&hostname).await.unwrap();
assert!(
!node
.spec
.unwrap()
.taints
.unwrap_or_default()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
}

View File

@@ -1,36 +0,0 @@
mod basic;
mod lock;
use std::sync::LazyLock;
use rocket::local::asynchronous::Client as AsyncClient;
use rocket::local::blocking::Client;
static SETUP: LazyLock<()> = LazyLock::new(|| {
unsafe {
std::env::set_var("ROCKET_CLI_COLORS", "false");
}
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::new(concat!(
env!("CARGO_PKG_NAME"),
"=trace,",
"k8s_reboot_coordinator=trace,",
"info",
)))
.with_test_writer()
.init();
});
fn setup() {
LazyLock::force(&SETUP);
}
async fn async_client() -> AsyncClient {
AsyncClient::tracked(k8s_reboot_coordinator::rocket())
.await
.unwrap()
}
fn client() -> Client {
Client::tracked(k8s_reboot_coordinator::rocket()).unwrap()
}

307
tests/krc-it-test/main.rs Normal file
View File

@@ -0,0 +1,307 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Output, Stdio};
use std::sync::LazyLock;
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::api::core::v1::{Node, Pod};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, ListParams, Patch, PatchParams};
use tokio::sync::Mutex;
static EXE: &str = env!("CARGO_BIN_EXE_k8s-reboot-coordinator");
static LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
async fn cordon_node(name: &str) -> Result<(), kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.cordon(name).await?;
Ok(())
}
async fn create_lease(
name: &str,
identity: &str,
) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let apply = PatchParams::apply(identity);
let leases: Api<Lease> = Api::default_namespaced(client);
let lease = Lease {
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
spec: Some(LeaseSpec {
holder_identity: Some(identity.into()),
..Default::default()
}),
};
leases.patch(name, &apply, &Patch::Apply(&lease)).await
}
fn delete_file<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
match std::fs::remove_file(path) {
Ok(_) => Ok(()),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}
async fn delete_lease(name: &str) {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
let _ = kube::runtime::wait::delete::delete_and_finalize(
leases,
name,
&Default::default(),
)
.await;
}
async fn get_a_node() -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
Ok(nodes.list(&Default::default()).await?.items.pop().unwrap())
}
async fn get_lease(name: &str) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
leases.get(name).await
}
async fn get_node_by_name(name: &str) -> Result<Node, kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.get(name).await
}
async fn get_pods_on_node(name: &str) -> Result<Vec<Pod>, kube::Error> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::all(client);
Ok(pods
.list(&ListParams::default().fields(&format!("spec.nodeName=={name}")))
.await?
.items)
}
fn new_sentinel(name: &str) -> PathBuf {
let mut sentinel = std::env::temp_dir();
sentinel.push(name);
delete_file(&sentinel).unwrap();
sentinel
}
fn run_it(
node_name: &str,
sentinel: &Path,
reboot_group: Option<&str>,
) -> std::io::Result<Child> {
let mut env: HashMap<_, _> = std::env::vars_os()
.filter(|(k, _)| k == "PATH" || k == "KUBECONFIG" || k == "RUST_LOG")
.collect();
env.insert("NODE_NAME".into(), node_name.into());
env.insert("REBOOT_SENTINEL".into(), sentinel.into());
if let Some(g) = reboot_group {
env.insert("REBOOT_LOCK_GROUP".into(), g.into());
}
Command::new(EXE)
.args(["echo", "test success"])
.env_clear()
.envs(&env)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
}
fn signal(mut child: Child, signal: i32) -> std::io::Result<Output> {
let Ok(pid) = i32::try_from(child.id()) else {
let _ = child.kill();
let _ = child.wait();
panic!("Cannot send SIGTERM to child: PID {} too large", child.id());
};
unsafe { libc::kill(pid, signal) };
child.wait_with_output()
}
fn trigger(mut child: Child, sentinel: &Path) -> std::io::Result<Output> {
if let Err(e) = std::fs::File::create(sentinel) {
let _ = child.kill();
let _ = child.wait();
panic!("Failed to create sentinel file: {e}");
}
child.wait_with_output()
}
async fn uncordon_node(name: &str) -> Result<(), kube::Error> {
let client = Client::try_default().await?;
let nodes: Api<Node> = Api::all(client);
nodes.uncordon(name).await?;
Ok(())
}
#[tokio::test]
async fn test_success() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("a278f32c-6cf9-4890-b50e-06d3f0c5259c");
let node_name = "7bc09505-ebd2-432a-ab05-a83d309c501a";
let child = run_it(node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some(node_name)
)
}
#[tokio::test]
async fn test_alt_group() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-group2").await;
let sentinel = new_sentinel("fc7cb011-f9ed-46ff-a310-76278bbd21de");
let node_name = "71124880-f4d4-44ff-a18d-f301837e0907";
let child = run_it(node_name, &sentinel, Some("group2")).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-group2").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some(node_name)
)
}
#[tokio::test]
async fn test_terminated() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("36cdc91b-a2d7-4845-a726-721ecc9787a7");
let node_name = "a763c92e-5db7-4806-8dd0-e1a72d42b022";
let child = run_it(node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = signal(child, libc::SIGTERM).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok(""));
assert!(matches!(
get_lease("reboot-lock-default").await,
Err(kube::Error::Api(e)) if e.code == 404
));
}
#[tokio::test]
async fn test_node_not_locked() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("115041f9-abef-496c-abc2-df7c9fbcc046");
let node = get_a_node().await.unwrap();
let node_name = node.metadata.name.unwrap();
let child = run_it(&node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(250));
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_ref(),
Some(&node_name)
);
let node = get_node_by_name(&node_name).await.unwrap();
let pods = get_pods_on_node(&node_name).await.unwrap();
uncordon_node(&node_name).await.unwrap();
assert!(
node.spec
.unwrap()
.taints
.unwrap()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
assert_eq!(
pods.iter()
.filter(|p| {
!p.metadata
.owner_references
.clone()
.unwrap_or_default()
.iter()
.any(|o| o.kind == "DaemonSet")
})
.count(),
0
);
}
#[tokio::test]
async fn test_node_locked() {
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let sentinel = new_sentinel("e5a2fc90-4418-4556-a039-32a2425b1bd9");
let node = get_a_node().await.unwrap();
let node_name = node.metadata.name.unwrap();
create_lease("reboot-lock-default", &node_name)
.await
.unwrap();
cordon_node(&node_name).await.unwrap();
let child = run_it(&node_name, &sentinel, None).unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity.as_ref(), None,);
let node = get_node_by_name(&node_name).await.unwrap();
assert!(
!node.spec
.unwrap()
.taints
.unwrap_or_default()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
let output = trigger(child, &sentinel).unwrap();
eprint!("{}", str::from_utf8(&output.stderr).unwrap());
assert_eq!(str::from_utf8(&output.stdout), Ok("test success\n"));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_ref(),
Some(&node_name)
);
let node = get_node_by_name(&node_name).await.unwrap();
let pods = get_pods_on_node(&node_name).await.unwrap();
uncordon_node(&node_name).await.unwrap();
assert!(
node.spec
.unwrap()
.taints
.unwrap()
.iter()
.any(|t| t.key == "node.kubernetes.io/unschedulable"
&& t.effect == "NoSchedule")
);
assert_eq!(
pods.iter()
.filter(|p| {
!p.metadata
.owner_references
.clone()
.unwrap_or_default()
.iter()
.any(|o| o.kind == "DaemonSet")
})
.count(),
0
);
}