From 0972b09d8589c177836f566d0631947b0747868b Mon Sep 17 00:00:00 2001 From: hosted-fornet Date: Fri, 30 Aug 2024 09:00:22 -0700 Subject: [PATCH] make cache entries go stale --- kinode/src/eth/mod.rs | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index ae355bab..f410338f 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -163,11 +163,9 @@ struct ModuleState { print_tx: PrintSender, /// cache of ETH requests request_cache: RequestCache, - /// duration since we sent last eth_blockNumber request - since_last_block_number: Arc>>, } -type RequestCache = Arc, EthResponse>>>; +type RequestCache = Arc, (EthResponse, Instant)>>>; const DELAY_MS: u64 = 1_000; const MAX_REQUEST_CACHE_LEN: usize = 500; @@ -253,7 +251,6 @@ pub async fn provider( send_to_loop, print_tx, request_cache: Arc::new(Mutex::new(IndexMap::new())), - since_last_block_number: Arc::new(Mutex::new(None)), }; // convert saved configs into data structure that we will use to route queries @@ -620,7 +617,6 @@ async fn handle_eth_action( let response_channels = state.response_channels.clone(); let print_tx = state.print_tx.clone(); let mut request_cache = Arc::clone(&state.request_cache); - let mut since_last_block_number = Arc::clone(&state.since_last_block_number); tokio::spawn(async move { match tokio::time::timeout( std::time::Duration::from_secs(timeout), @@ -633,7 +629,6 @@ async fn handle_eth_action( &mut receiver, &print_tx, &mut request_cache, - &mut since_last_block_number, ), ) .await @@ -653,7 +648,6 @@ async fn handle_eth_action( &mut receiver, &print_tx, &mut request_cache, - &mut since_last_block_number, ), ) .await @@ -719,7 +713,6 @@ async fn fulfill_request( remote_request_receiver: &mut ProcessMessageReceiver, print_tx: &PrintSender, request_cache: &mut RequestCache, - since_last_block_number: &mut Arc>>, ) -> EthResponse { let serialized_action = serde_json::to_vec(eth_action).unwrap(); let EthAction::Request { @@ -731,15 +724,12 @@ async fn fulfill_request( return EthResponse::Err(EthError::PermissionDenied); // will never hit }; { - let since_last_block_number = since_last_block_number.lock().await; - if since_last_block_number.is_some_and(|t| t.elapsed() > Duration::from_millis(DELAY_MS)) - || method != "eth_blockNumber" - { - let mut request_cache = request_cache.lock().await; - if let Some(cache_hit) = request_cache.shift_remove(&serialized_action) { - // refresh cache entry (it is most recently accessed) & return it + let mut request_cache = request_cache.lock().await; + if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) { + // refresh cache entry (it is most recently accessed) & return it + if time_of_hit.elapsed() > Duration::from_millis(DELAY_MS) { println!("cache hit\r"); - request_cache.insert(serialized_action, cache_hit.clone()); + request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit)); return cache_hit; } } @@ -812,7 +802,7 @@ async fn fulfill_request( // drop 10% oldest cache entries request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10); } - request_cache.insert(serialized_action, response.clone()); + request_cache.insert(serialized_action, (response.clone(), Instant::now())); return response; } Err(rpc_error) => {