mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-30 03:52:50 +03:00
kernel: add restart backoff (WIP: not yet compiling)
This commit is contained in:
parent
1c197c6ae3
commit
c55e195942
@ -5,7 +5,7 @@ use std::{
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
use tokio::{sync::{mpsc, Mutex}, task::JoinHandle};
|
||||
use wasmtime::{Config, Engine, WasmBacktraceDetails};
|
||||
|
||||
/// Manipulate a single process.
|
||||
@ -39,6 +39,23 @@ enum ProcessSender {
|
||||
Userspace(t::ProcessMessageSender),
|
||||
}
|
||||
|
||||
pub type ProcessRestartBackoffs = HashMap<t::ProcessId, Arc<Mutex<Option<RestartBackoff>>>>;
|
||||
|
||||
pub struct RestartBackoff {
|
||||
/// if try to restart before this:
|
||||
/// * wait till `next_soonest_restart_time`
|
||||
/// * increment `consecutive_attempts`
|
||||
/// else if try to restart after this:
|
||||
/// * set `consecutive_attempts = 0`,
|
||||
/// and in either case:
|
||||
/// set `next_soonest_restart_time += 2 ** consecutive_attempts` seconds
|
||||
next_soonest_restart_time: std::time::Instant,
|
||||
/// how many times has process tried to restart in a row
|
||||
consecutive_attempts: u32,
|
||||
/// task that will do the restart after wait time has elapsed
|
||||
restart_handle: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
/// persist kernel's process_map state for next bootup
|
||||
/// TODO refactor this to hit the DB directly for performance's sake
|
||||
async fn persist_state(send_to_loop: &t::MessageSender, process_map: &t::ProcessMap) {
|
||||
@ -78,6 +95,7 @@ async fn handle_kernel_request(
|
||||
caps_oracle: &t::CapMessageSender,
|
||||
engine: &Engine,
|
||||
home_directory_path: &PathBuf,
|
||||
process_restart_backoffs: &mut ProcessRestartBackoffs,
|
||||
) -> Option<()> {
|
||||
let t::Message::Request(request) = km.message else {
|
||||
return None;
|
||||
@ -263,6 +281,7 @@ async fn handle_kernel_request(
|
||||
caps_oracle,
|
||||
&start_process_metadata,
|
||||
&home_directory_path,
|
||||
&mut process_restart_backoffs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -494,6 +513,7 @@ async fn start_process(
|
||||
caps_oracle: &t::CapMessageSender,
|
||||
process_metadata: &StartProcessMetadata,
|
||||
home_directory_path: &PathBuf,
|
||||
process_restart_backoffs: &mut ProcessRestartBackoffs,
|
||||
) -> anyhow::Result<()> {
|
||||
let (send_to_process, recv_in_process) =
|
||||
mpsc::channel::<Result<t::KernelMessage, t::WrappedSendError>>(PROCESS_CHANNEL_CAPACITY);
|
||||
@ -515,6 +535,15 @@ async fn start_process(
|
||||
on_exit: process_metadata.persisted.on_exit.clone(),
|
||||
public: process_metadata.persisted.public,
|
||||
};
|
||||
let maybe_restart_backoff = if let t::OnExit::Restart = process_metadata.persisted.on_exit {
|
||||
let restart_backoff = process_restart_backoffs
|
||||
.remove(id)
|
||||
.unwrap_or_else(|| Arc::new(Mutex::new(None)));
|
||||
process_restart_backoffs.insert(id.clone(), Arc::clone(&restart_backoff));
|
||||
Some(restart_backoff)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
process_handles.insert(
|
||||
id.clone(),
|
||||
tokio::spawn(process::make_process_loop(
|
||||
@ -528,6 +557,7 @@ async fn start_process(
|
||||
caps_oracle.clone(),
|
||||
engine.clone(),
|
||||
home_directory_path.clone(),
|
||||
maybe_restart_backoff,
|
||||
)),
|
||||
);
|
||||
Ok(())
|
||||
@ -599,6 +629,8 @@ pub async fn kernel(
|
||||
// keeping only them in the updated post-boot process map
|
||||
let mut non_rebooted_processes: HashSet<t::ProcessId> = HashSet::new();
|
||||
|
||||
let mut process_restart_backoffs: ProcessRestartBackoffs = HashMap::new();
|
||||
|
||||
for (process_id, persisted) in &process_map {
|
||||
// runtime extensions will have a bytes_handle of "", because they have no
|
||||
// Wasm code saved in filesystem.
|
||||
@ -676,6 +708,7 @@ pub async fn kernel(
|
||||
&caps_oracle_sender,
|
||||
&start_process_metadata,
|
||||
&home_directory_path,
|
||||
&mut process_restart_backoffs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -925,6 +958,7 @@ pub async fn kernel(
|
||||
&caps_oracle_sender,
|
||||
&engine,
|
||||
&home_directory_path,
|
||||
&mut process_restart_backoffs,
|
||||
).await {
|
||||
// drain process map of processes with OnExit::None
|
||||
process_map.retain(|_, persisted| !persisted.on_exit.is_none());
|
||||
|
@ -5,7 +5,7 @@ use std::{
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{fs, task::JoinHandle};
|
||||
use tokio::{fs, sync::Mutex, task::JoinHandle};
|
||||
use wasi_common::sync::Dir;
|
||||
use wasmtime::{
|
||||
component::{Component, Linker, ResourceTable as Table},
|
||||
@ -15,6 +15,8 @@ use wasmtime_wasi::{
|
||||
pipe::MemoryOutputPipe, DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView,
|
||||
};
|
||||
|
||||
use super::RestartBackoff;
|
||||
|
||||
const STACK_TRACE_SIZE: usize = 5000;
|
||||
|
||||
pub struct ProcessContext {
|
||||
@ -230,6 +232,7 @@ pub async fn make_process_loop(
|
||||
caps_oracle: t::CapMessageSender,
|
||||
engine: Engine,
|
||||
home_directory_path: PathBuf,
|
||||
maybe_restart_backoff: Option<Arc<Mutex<Option<RestartBackoff>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
// before process can be instantiated, need to await 'run' message from kernel
|
||||
let mut pre_boot_queue = Vec::<Result<t::KernelMessage, t::WrappedSendError>>::new();
|
||||
@ -388,6 +391,27 @@ pub async fn make_process_loop(
|
||||
}
|
||||
// if restart, tell ourselves to init the app again, with same capabilities
|
||||
t::OnExit::Restart => {
|
||||
let restart_backoff = maybe_restart_backoff.unwrap();
|
||||
let restart_backoff_lock = restart_backoff.lock().await;
|
||||
let now = std::time::Instant::now();
|
||||
let (wait_till, next_soonest_restart_time, consecutive_attempts) = match *restart_backoff_lock {
|
||||
None => (None, now + std::time::Duration::from_secs(1), 0),
|
||||
Some(ref rb) => {
|
||||
if rb.next_soonest_restart_time <= now {
|
||||
// no need to wait
|
||||
(None, now + std::time::Duration::from_secs(1), 0)
|
||||
} else {
|
||||
// must wait
|
||||
let base: u64 = 2;
|
||||
(
|
||||
Some(rb.next_soonest_restart_time.clone()),
|
||||
rb.next_soonest_restart_time.clone() + std::time::Duration::from_secs(base.pow(rb.consecutive_attempts)),
|
||||
rb.consecutive_attempts.clone() + 1,
|
||||
)
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// get caps before killing
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
caps_oracle
|
||||
@ -423,6 +447,8 @@ pub async fn make_process_loop(
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
|
||||
let reinitialize = async || {
|
||||
// then re-initialize with same capabilities
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
@ -470,6 +496,20 @@ pub async fn make_process_loop(
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
};
|
||||
|
||||
let restart_handle = match wait_till {
|
||||
Some(wait_till) => Some(tokio::spawn(reinitialize)),
|
||||
None => {
|
||||
reinitialize().await;
|
||||
None
|
||||
}
|
||||
}
|
||||
*restart_backoff_lock = RestartBackoff {
|
||||
next_soonest_restart_time,
|
||||
consecutive_attempts,
|
||||
restart_handle,
|
||||
};
|
||||
}
|
||||
// if requests, fire them
|
||||
t::OnExit::Requests(requests) => {
|
||||
|
Loading…
Reference in New Issue
Block a user