Begin lock/unlock implementation

This commit introduces two HTTP path operations:

* POST /api/v1/lock: Acquire a reboot lock
* POST /api/v1/unlock: Release a reboot lock

Both operations take a _multipart/form-data_ or
_application/x-www-form-urlencoded_ body with a required `hostname`
field.  This field indicates the name of the host acquiring/releasing
the lock.  the `lock` operation also takes an optional `wait` field.  If
this value is provided with a `false` value, and the reboot lock cannot
be acquired immediately, the request will fail with an HTTP 419
conflict.  If a `true` value is provided, or the field is omitted, the
request will block until the lock can be acquired.

Locking is implemented with a Kubernetes Lease resource using
Server-Side Apply.  By setting the field manager of the `holderIdentity`
field to match its value, we can ensure that there are no race
conditions in acquiring the lock; Kubernetes will reject the update if
both the new value and the field manager do not match.  This is
significantly safer than a more naïve check-then-set approach.
master
Dustin 2025-09-24 07:55:35 -05:00
parent 2a10c815be
commit 4bb72900fa
6 changed files with 1539 additions and 21 deletions

1033
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,12 @@ version = "0.1.0"
edition = "2024"
[dependencies]
k8s-openapi = { version = "0.26.0", features = ["earliest"] }
kube = "2.0.1"
rocket = { version = "0.5.1", default-features = false }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
[dev-dependencies]
kube = { version = "2.0.1", features = ["runtime"] }
kube-runtime = "2.0.1"

View File

@ -1,3 +1,5 @@
mod lock;
use rocket::Request;
use rocket::http::Status;
@ -17,6 +19,9 @@ fn healthz() -> &'static str {
#[rocket::launch]
pub fn rocket() -> _ {
rocket::build()
.mount("/", rocket::routes![healthz])
.mount(
"/",
rocket::routes![healthz, lock::lock_v1, lock::unlock_v1],
)
.register("/", rocket::catchers![not_found])
}

178
src/lock.rs Normal file
View File

@ -0,0 +1,178 @@
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, Patch, PatchParams, WatchEvent, WatchParams};
use rocket::form::Form;
use rocket::futures::{StreamExt, TryStreamExt};
use rocket::http::Status;
use rocket::request::{self, FromRequest, Request};
use tracing::{error, info, trace, warn};
#[derive(Debug, rocket::Responder)]
pub enum LockError {
#[response(status = 500, content_type = "plain")]
ServerError(String),
#[response(status = 400, content_type = "plain")]
InvalidHeader(String),
#[response(status = 409, content_type = "plain")]
Conflict(String),
#[response(status = 422, content_type = "plain")]
FormError(String),
}
impl From<kube::Error> for LockError {
fn from(error: kube::Error) -> Self {
Self::ServerError(format!("{error}\n"))
}
}
impl From<InvalidHeader> for LockError {
fn from(_h: InvalidHeader) -> Self {
Self::InvalidHeader("Invalid lock header\n".into())
}
}
impl From<rocket::form::Errors<'_>> for LockError {
fn from(errors: rocket::form::Errors<'_>) -> Self {
let mut message = String::from("Error processing request:\n");
for error in errors {
if let Some(name) = error.name {
message.push_str(&format!("{name}: "));
}
message.push_str(&error.kind.to_string());
message.push('\n');
}
Self::FormError(message)
}
}
pub struct LockRequestHeader;
#[derive(Debug)]
pub struct InvalidHeader;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for LockRequestHeader {
type Error = InvalidHeader;
async fn from_request(
req: &'r Request<'_>,
) -> request::Outcome<Self, Self::Error> {
match req.headers().get_one("K8s-Reboot-Lock") {
Some("lock") => request::Outcome::Success(Self),
_ => request::Outcome::Error((Status::BadRequest, InvalidHeader)),
}
}
}
#[derive(rocket::FromForm)]
pub struct LockRequest {
hostname: String,
#[field(default = String::from("default"))]
group: String,
#[field(default = true)]
wait: bool,
}
async fn update_lease(
client: Client,
name: &str,
identity: &str,
holder: Option<&str>,
) -> Result<Lease, kube::Error> {
let apply = PatchParams::apply(identity);
let leases: Api<Lease> = Api::default_namespaced(client);
let lease = Lease {
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
spec: Some(LeaseSpec {
holder_identity: holder.map(|i| i.into()),
..Default::default()
}),
};
leases.patch(name, &apply, &Patch::Apply(&lease)).await
}
async fn wait_lease(client: Client, name: &str) -> Result<(), kube::Error> {
let leases: Api<Lease> = Api::default_namespaced(client);
let params =
WatchParams::default().fields(&format!("metadata.name={name}"));
let mut stream = leases.watch(&params, "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
trace!("Watch lease event: {event:?}");
match event {
WatchEvent::Added(l) | WatchEvent::Modified(l) => match l.spec {
Some(spec) if spec.holder_identity.is_some() => (),
_ => break,
},
WatchEvent::Bookmark(_) => (),
WatchEvent::Deleted(_) => break,
WatchEvent::Error(e) => return Err(kube::Error::Api(e)),
}
}
Ok(())
}
#[rocket::post("/api/v1/lock", data = "<data>")]
pub async fn lock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<(), LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
loop {
match update_lease(
client.clone(),
&lock_name,
&data.hostname,
Some(&data.hostname),
)
.await
{
Ok(_) => break,
Err(kube::Error::Api(e)) => {
if e.code == 409 {
warn!("Lock already held: {}", e.message);
if !data.wait {
return Err(LockError::Conflict(format!(
"Another system is already rebooting: {}\n",
e.message,
)));
} else {
info!("Waiting for lease {lock_name}");
if let Err(e) =
wait_lease(client.clone(), &lock_name).await
{
error!("Error while waiting for lease: {e}");
}
}
} else {
return Err(kube::Error::Api(e).into());
}
},
Err(e) => return Err(e.into()),
}
}
Ok(())
}
#[rocket::post("/api/v1/unlock", data = "<data>")]
pub async fn unlock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<(), LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
update_lease(client.clone(), &lock_name, &data.hostname, None).await?;
Ok(())
}

328
tests/integration/lock.rs Normal file
View File

@ -0,0 +1,328 @@
use std::sync::LazyLock;
use k8s_openapi::api::coordination::v1::Lease;
use kube::Client;
use kube::api::Api;
use rocket::async_test;
use rocket::futures::FutureExt;
use rocket::http::{ContentType, Header, Status};
use rocket::tokio;
use rocket::tokio::sync::Mutex;
static LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
async fn delete_lease(name: &str) {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
let _ = kube::runtime::wait::delete::delete_and_finalize(
leases,
name,
&Default::default(),
)
.await;
}
async fn get_lease(name: &str) -> Result<Lease, kube::Error> {
let client = Client::try_default().await.unwrap();
let leases: Api<Lease> = Api::default_namespaced(client);
leases.get(name).await
}
#[async_test]
async fn test_lock_v1_success() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_custom_group() {
super::setup();
delete_lease("reboot-lock-testgroup").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org&group=testgroup")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let lease = get_lease("reboot-lock-testgroup").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_conflict() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org&wait=false")
.dispatch()
.await;
assert_eq!(response.status(), Status::Conflict);
let want_msg = concat!(
"Another system is already rebooting:",
" Apply failed with 1 conflict:",
" conflict with \"test1.example.org\":",
" .spec.holderIdentity",
"\n",
);
assert_eq!(response.into_string().await.as_deref(), Some(want_msg));
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[async_test]
async fn test_lock_v1_conflict_wait() {
super::setup();
let _lock = &*LOCK.lock().await;
tracing::info!("Deleting existing lease");
delete_lease("reboot-lock-default").await;
tracing::info!("Creating first lease");
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let timer = std::time::Instant::now();
let _task = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(1))
.then(|_| async {
tracing::info!("Deleting first lease");
delete_lease("reboot-lock-default").await
})
.await
});
tracing::info!("Creating second lease");
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let duration = timer.elapsed().as_millis();
assert!(duration > 1000 && duration < 2000);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test2.example.org")
);
}
#[test]
fn test_lock_v1_no_header() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/lock")
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch();
assert_eq!(response.status(), Status::BadRequest);
assert_eq!(
response.into_string().as_deref(),
Some("Invalid lock header\n")
);
}
#[test]
fn test_lock_v1_no_data() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("")
.dispatch();
assert_eq!(response.status(), Status::UnprocessableEntity);
assert_eq!(
response.into_string().as_deref(),
Some("Error processing request:\nhostname: missing\n")
);
}
#[async_test]
async fn test_unlock_v1_success() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity, None);
}
#[async_test]
async fn test_unlock_v1_not_locked() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(lease.spec.unwrap().holder_identity.as_deref(), None);
}
#[async_test]
async fn test_unlock_v1_not_mine() {
super::setup();
let _lock = &*LOCK.lock().await;
delete_lease("reboot-lock-default").await;
let client = super::async_client().await;
let response = client
.post("/api/v1/lock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch()
.await;
assert_eq!(response.status(), Status::Ok);
assert_eq!(response.into_string().await, None);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("hostname=test2.example.org")
.dispatch()
.await;
let status = response.status();
assert_eq!(response.into_string().await, None);
assert_eq!(status, Status::Ok);
let lease = get_lease("reboot-lock-default").await.unwrap();
assert_eq!(
lease.spec.unwrap().holder_identity.as_deref(),
Some("test1.example.org")
);
}
#[test]
fn test_unlock_v1_no_header() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/unlock")
.header(ContentType::Form)
.body("hostname=test1.example.org")
.dispatch();
assert_eq!(response.status(), Status::BadRequest);
assert_eq!(
response.into_string().as_deref(),
Some("Invalid lock header\n")
);
}
#[test]
fn test_unlock_v1_no_data() {
super::setup();
let client = super::client();
let response = client
.post("/api/v1/unlock")
.header(Header::new("K8s-Reboot-Lock", "lock"))
.header(ContentType::Form)
.body("")
.dispatch();
assert_eq!(response.status(), Status::UnprocessableEntity);
assert_eq!(
response.into_string().as_deref(),
Some("Error processing request:\nhostname: missing\n")
);
}

View File

@ -1,7 +1,9 @@
mod basic;
mod lock;
use std::sync::LazyLock;
use rocket::local::asynchronous::Client as AsyncClient;
use rocket::local::blocking::Client;
static SETUP: LazyLock<()> = LazyLock::new(|| {
@ -23,6 +25,12 @@ fn setup() {
LazyLock::force(&SETUP);
}
async fn async_client() -> AsyncClient {
AsyncClient::tracked(k8s_reboot_controller::rocket())
.await
.unwrap()
}
fn client() -> Client {
Client::tracked(k8s_reboot_controller::rocket()).unwrap()
}