k8s-reboot-coordinator/src/lock.rs

182 lines
5.7 KiB
Rust

use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use kube::api::{Api, Patch, PatchParams, WatchEvent, WatchParams};
use rocket::form::Form;
use rocket::futures::{StreamExt, TryStreamExt};
use rocket::http::Status;
use rocket::request::{self, FromRequest, Request};
use tracing::{error, info, trace, warn};
#[derive(Debug, rocket::Responder)]
pub enum LockError {
#[response(status = 500, content_type = "plain")]
ServerError(String),
#[response(status = 400, content_type = "plain")]
InvalidHeader(String),
#[response(status = 409, content_type = "plain")]
Conflict(String),
#[response(status = 422, content_type = "plain")]
FormError(String),
}
impl From<kube::Error> for LockError {
fn from(error: kube::Error) -> Self {
Self::ServerError(format!("{error}\n"))
}
}
impl From<InvalidHeader> for LockError {
fn from(_h: InvalidHeader) -> Self {
Self::InvalidHeader("Invalid lock header\n".into())
}
}
impl From<rocket::form::Errors<'_>> for LockError {
fn from(errors: rocket::form::Errors<'_>) -> Self {
let mut message = String::from("Error processing request:\n");
for error in errors {
if let Some(name) = error.name {
message.push_str(&format!("{name}: "));
}
message.push_str(&error.kind.to_string());
message.push('\n');
}
Self::FormError(message)
}
}
pub struct LockRequestHeader;
#[derive(Debug)]
pub struct InvalidHeader;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for LockRequestHeader {
type Error = InvalidHeader;
async fn from_request(
req: &'r Request<'_>,
) -> request::Outcome<Self, Self::Error> {
match req.headers().get_one("K8s-Reboot-Lock") {
Some("lock") => request::Outcome::Success(Self),
_ => request::Outcome::Error((Status::BadRequest, InvalidHeader)),
}
}
}
#[derive(rocket::FromForm)]
pub struct LockRequest {
hostname: String,
#[field(default = String::from("default"))]
group: String,
#[field(default = true)]
wait: bool,
}
async fn update_lease(
client: Client,
name: &str,
identity: &str,
holder: Option<&str>,
) -> Result<Lease, kube::Error> {
let apply = PatchParams::apply(identity);
let leases: Api<Lease> = Api::default_namespaced(client);
let lease = Lease {
metadata: ObjectMeta {
name: Some(name.into()),
..Default::default()
},
spec: Some(LeaseSpec {
holder_identity: holder.map(|i| i.into()),
..Default::default()
}),
};
leases.patch(name, &apply, &Patch::Apply(&lease)).await
}
async fn wait_lease(client: Client, name: &str) -> Result<(), kube::Error> {
let leases: Api<Lease> = Api::default_namespaced(client);
let params =
WatchParams::default().fields(&format!("metadata.name={name}"));
let mut stream = leases.watch(&params, "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
trace!("Watch lease event: {event:?}");
match event {
WatchEvent::Added(l) | WatchEvent::Modified(l) => match l.spec {
Some(spec) if spec.holder_identity.is_some() => (),
_ => break,
},
WatchEvent::Bookmark(_) => (),
WatchEvent::Deleted(_) => break,
WatchEvent::Error(e) => return Err(kube::Error::Api(e)),
}
}
Ok(())
}
#[rocket::post("/api/v1/lock", data = "<data>")]
pub async fn lock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<String, LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
loop {
match update_lease(
client.clone(),
&lock_name,
&data.hostname,
Some(&data.hostname),
)
.await
{
Ok(_) => break,
Err(kube::Error::Api(e)) => {
if e.code == 409 {
warn!("Lock already held: {}", e.message);
if !data.wait {
return Err(LockError::Conflict(format!(
"Another system is already rebooting: {}\n",
e.message,
)));
} else {
info!("Waiting for lease {lock_name}");
if let Err(e) =
wait_lease(client.clone(), &lock_name).await
{
error!("Error while waiting for lease: {e}");
}
}
} else {
return Err(kube::Error::Api(e).into());
}
},
Err(e) => return Err(e.into()),
}
}
Ok(format!(
"Acquired reboot lock for group {}, host {}\n",
data.group, data.hostname
))
}
#[rocket::post("/api/v1/unlock", data = "<data>")]
pub async fn unlock_v1(
lockheader: Result<LockRequestHeader, InvalidHeader>,
data: rocket::form::Result<'_, Form<LockRequest>>,
) -> Result<(), LockError> {
lockheader?;
let data = data?;
let client = Client::try_default().await.inspect_err(|e| {
error!("Could not connect to Kubernetes API server: {e}")
})?;
let lock_name = format!("reboot-lock-{}", data.group);
update_lease(client.clone(), &lock_name, &data.hostname, None).await?;
Ok(())
}