From c55e1959424dc0374c189fd7aa9d11fcd84ccc04 Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 1 Nov 2024 22:30:29 -0700 Subject: [PATCH] kernel: add restart backoff (WIP: not yet compiling) --- kinode/src/kernel/mod.rs | 36 +++++++++- kinode/src/kernel/process.rs | 136 ++++++++++++++++++++++------------- 2 files changed, 123 insertions(+), 49 deletions(-) diff --git a/kinode/src/kernel/mod.rs b/kinode/src/kernel/mod.rs index 8ae81513..22c1cf8a 100644 --- a/kinode/src/kernel/mod.rs +++ b/kinode/src/kernel/mod.rs @@ -5,7 +5,7 @@ use std::{ path::PathBuf, sync::Arc, }; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{sync::{mpsc, Mutex}, task::JoinHandle}; use wasmtime::{Config, Engine, WasmBacktraceDetails}; /// Manipulate a single process. @@ -39,6 +39,23 @@ enum ProcessSender { Userspace(t::ProcessMessageSender), } +pub type ProcessRestartBackoffs = HashMap>>>; + +pub struct RestartBackoff { + /// if try to restart before this: + /// * wait till `next_soonest_restart_time` + /// * increment `consecutive_attempts` + /// else if try to restart after this: + /// * set `consecutive_attempts = 0`, + /// and in either case: + /// set `next_soonest_restart_time += 2 ** consecutive_attempts` seconds + next_soonest_restart_time: std::time::Instant, + /// how many times has process tried to restart in a row + consecutive_attempts: u32, + /// task that will do the restart after wait time has elapsed + restart_handle: Option>, +} + /// persist kernel's process_map state for next bootup /// TODO refactor this to hit the DB directly for performance's sake async fn persist_state(send_to_loop: &t::MessageSender, process_map: &t::ProcessMap) { @@ -78,6 +95,7 @@ async fn handle_kernel_request( caps_oracle: &t::CapMessageSender, engine: &Engine, home_directory_path: &PathBuf, + process_restart_backoffs: &mut ProcessRestartBackoffs, ) -> Option<()> { let t::Message::Request(request) = km.message else { return None; @@ -263,6 +281,7 @@ async fn handle_kernel_request( caps_oracle, &start_process_metadata, &home_directory_path, + &mut process_restart_backoffs, ) .await { @@ -494,6 +513,7 @@ async fn start_process( caps_oracle: &t::CapMessageSender, process_metadata: &StartProcessMetadata, home_directory_path: &PathBuf, + process_restart_backoffs: &mut ProcessRestartBackoffs, ) -> anyhow::Result<()> { let (send_to_process, recv_in_process) = mpsc::channel::>(PROCESS_CHANNEL_CAPACITY); @@ -515,6 +535,15 @@ async fn start_process( on_exit: process_metadata.persisted.on_exit.clone(), public: process_metadata.persisted.public, }; + let maybe_restart_backoff = if let t::OnExit::Restart = process_metadata.persisted.on_exit { + let restart_backoff = process_restart_backoffs + .remove(id) + .unwrap_or_else(|| Arc::new(Mutex::new(None))); + process_restart_backoffs.insert(id.clone(), Arc::clone(&restart_backoff)); + Some(restart_backoff) + } else { + None + }; process_handles.insert( id.clone(), tokio::spawn(process::make_process_loop( @@ -528,6 +557,7 @@ async fn start_process( caps_oracle.clone(), engine.clone(), home_directory_path.clone(), + maybe_restart_backoff, )), ); Ok(()) @@ -599,6 +629,8 @@ pub async fn kernel( // keeping only them in the updated post-boot process map let mut non_rebooted_processes: HashSet = HashSet::new(); + let mut process_restart_backoffs: ProcessRestartBackoffs = HashMap::new(); + for (process_id, persisted) in &process_map { // runtime extensions will have a bytes_handle of "", because they have no // Wasm code saved in filesystem. @@ -676,6 +708,7 @@ pub async fn kernel( &caps_oracle_sender, &start_process_metadata, &home_directory_path, + &mut process_restart_backoffs, ) .await { @@ -925,6 +958,7 @@ pub async fn kernel( &caps_oracle_sender, &engine, &home_directory_path, + &mut process_restart_backoffs, ).await { // drain process map of processes with OnExit::None process_map.retain(|_, persisted| !persisted.on_exit.is_none()); diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 72fa8956..371adca3 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -5,7 +5,7 @@ use std::{ path::PathBuf, sync::Arc, }; -use tokio::{fs, task::JoinHandle}; +use tokio::{fs, sync::Mutex, task::JoinHandle}; use wasi_common::sync::Dir; use wasmtime::{ component::{Component, Linker, ResourceTable as Table}, @@ -15,6 +15,8 @@ use wasmtime_wasi::{ pipe::MemoryOutputPipe, DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView, }; +use super::RestartBackoff; + const STACK_TRACE_SIZE: usize = 5000; pub struct ProcessContext { @@ -230,6 +232,7 @@ pub async fn make_process_loop( caps_oracle: t::CapMessageSender, engine: Engine, home_directory_path: PathBuf, + maybe_restart_backoff: Option>>>, ) -> anyhow::Result<()> { // before process can be instantiated, need to await 'run' message from kernel let mut pre_boot_queue = Vec::>::new(); @@ -388,6 +391,27 @@ pub async fn make_process_loop( } // if restart, tell ourselves to init the app again, with same capabilities t::OnExit::Restart => { + let restart_backoff = maybe_restart_backoff.unwrap(); + let restart_backoff_lock = restart_backoff.lock().await; + let now = std::time::Instant::now(); + let (wait_till, next_soonest_restart_time, consecutive_attempts) = match *restart_backoff_lock { + None => (None, now + std::time::Duration::from_secs(1), 0), + Some(ref rb) => { + if rb.next_soonest_restart_time <= now { + // no need to wait + (None, now + std::time::Duration::from_secs(1), 0) + } else { + // must wait + let base: u64 = 2; + ( + Some(rb.next_soonest_restart_time.clone()), + rb.next_soonest_restart_time.clone() + std::time::Duration::from_secs(base.pow(rb.consecutive_attempts)), + rb.consecutive_attempts.clone() + 1, + ) + } + }, + }; + // get caps before killing let (tx, rx) = tokio::sync::oneshot::channel(); caps_oracle @@ -423,53 +447,69 @@ pub async fn make_process_loop( .unwrap() .send(&send_to_loop) .await; - // then re-initialize with same capabilities - t::KernelMessage::builder() - .id(rand::random()) - .source((&our.node, KERNEL_PROCESS_ID.clone())) - .target((&our.node, KERNEL_PROCESS_ID.clone())) - .message(t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { - id: metadata.our.process.clone(), - wasm_bytes_handle: metadata.wasm_bytes_handle, - wit_version: metadata.wit_version, - on_exit: metadata.on_exit, - initial_capabilities, - public: metadata.public, - }) - .unwrap(), - metadata: None, - capabilities: vec![], - })) - .lazy_load_blob(Some(t::LazyLoadBlob { - mime: None, - bytes: wasm_bytes, - })) - .build() - .unwrap() - .send(&send_to_loop) - .await; - // then run - t::KernelMessage::builder() - .id(rand::random()) - .source((&our.node, KERNEL_PROCESS_ID.clone())) - .target((&our.node, KERNEL_PROCESS_ID.clone())) - .message(t::Message::Request(t::Request { - inherit: false, - expects_response: None, - body: serde_json::to_vec(&t::KernelCommand::RunProcess( - metadata.our.process.clone(), - )) - .unwrap(), - metadata: None, - capabilities: vec![], - })) - .build() - .unwrap() - .send(&send_to_loop) - .await; + + let reinitialize = async || { + // then re-initialize with same capabilities + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::InitializeProcess { + id: metadata.our.process.clone(), + wasm_bytes_handle: metadata.wasm_bytes_handle, + wit_version: metadata.wit_version, + on_exit: metadata.on_exit, + initial_capabilities, + public: metadata.public, + }) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .lazy_load_blob(Some(t::LazyLoadBlob { + mime: None, + bytes: wasm_bytes, + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; + // then run + t::KernelMessage::builder() + .id(rand::random()) + .source((&our.node, KERNEL_PROCESS_ID.clone())) + .target((&our.node, KERNEL_PROCESS_ID.clone())) + .message(t::Message::Request(t::Request { + inherit: false, + expects_response: None, + body: serde_json::to_vec(&t::KernelCommand::RunProcess( + metadata.our.process.clone(), + )) + .unwrap(), + metadata: None, + capabilities: vec![], + })) + .build() + .unwrap() + .send(&send_to_loop) + .await; + }; + + let restart_handle = match wait_till { + Some(wait_till) => Some(tokio::spawn(reinitialize)), + None => { + reinitialize().await; + None + } + } + *restart_backoff_lock = RestartBackoff { + next_soonest_restart_time, + consecutive_attempts, + restart_handle, + }; } // if requests, fire them t::OnExit::Requests(requests) => {