mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-30 01:44:58 +03:00
make cache entries go stale
This commit is contained in:
parent
dc267a41f9
commit
0972b09d85
@ -163,11 +163,9 @@ struct ModuleState {
|
|||||||
print_tx: PrintSender,
|
print_tx: PrintSender,
|
||||||
/// cache of ETH requests
|
/// cache of ETH requests
|
||||||
request_cache: RequestCache,
|
request_cache: RequestCache,
|
||||||
/// duration since we sent last eth_blockNumber request
|
|
||||||
since_last_block_number: Arc<Mutex<Option<Instant>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestCache = Arc<Mutex<IndexMap<Vec<u8>, EthResponse>>>;
|
type RequestCache = Arc<Mutex<IndexMap<Vec<u8>, (EthResponse, Instant)>>>;
|
||||||
|
|
||||||
const DELAY_MS: u64 = 1_000;
|
const DELAY_MS: u64 = 1_000;
|
||||||
const MAX_REQUEST_CACHE_LEN: usize = 500;
|
const MAX_REQUEST_CACHE_LEN: usize = 500;
|
||||||
@ -253,7 +251,6 @@ pub async fn provider(
|
|||||||
send_to_loop,
|
send_to_loop,
|
||||||
print_tx,
|
print_tx,
|
||||||
request_cache: Arc::new(Mutex::new(IndexMap::new())),
|
request_cache: Arc::new(Mutex::new(IndexMap::new())),
|
||||||
since_last_block_number: Arc::new(Mutex::new(None)),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// convert saved configs into data structure that we will use to route queries
|
// convert saved configs into data structure that we will use to route queries
|
||||||
@ -620,7 +617,6 @@ async fn handle_eth_action(
|
|||||||
let response_channels = state.response_channels.clone();
|
let response_channels = state.response_channels.clone();
|
||||||
let print_tx = state.print_tx.clone();
|
let print_tx = state.print_tx.clone();
|
||||||
let mut request_cache = Arc::clone(&state.request_cache);
|
let mut request_cache = Arc::clone(&state.request_cache);
|
||||||
let mut since_last_block_number = Arc::clone(&state.since_last_block_number);
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
std::time::Duration::from_secs(timeout),
|
std::time::Duration::from_secs(timeout),
|
||||||
@ -633,7 +629,6 @@ async fn handle_eth_action(
|
|||||||
&mut receiver,
|
&mut receiver,
|
||||||
&print_tx,
|
&print_tx,
|
||||||
&mut request_cache,
|
&mut request_cache,
|
||||||
&mut since_last_block_number,
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@ -653,7 +648,6 @@ async fn handle_eth_action(
|
|||||||
&mut receiver,
|
&mut receiver,
|
||||||
&print_tx,
|
&print_tx,
|
||||||
&mut request_cache,
|
&mut request_cache,
|
||||||
&mut since_last_block_number,
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@ -719,7 +713,6 @@ async fn fulfill_request(
|
|||||||
remote_request_receiver: &mut ProcessMessageReceiver,
|
remote_request_receiver: &mut ProcessMessageReceiver,
|
||||||
print_tx: &PrintSender,
|
print_tx: &PrintSender,
|
||||||
request_cache: &mut RequestCache,
|
request_cache: &mut RequestCache,
|
||||||
since_last_block_number: &mut Arc<Mutex<Option<Instant>>>,
|
|
||||||
) -> EthResponse {
|
) -> EthResponse {
|
||||||
let serialized_action = serde_json::to_vec(eth_action).unwrap();
|
let serialized_action = serde_json::to_vec(eth_action).unwrap();
|
||||||
let EthAction::Request {
|
let EthAction::Request {
|
||||||
@ -731,15 +724,12 @@ async fn fulfill_request(
|
|||||||
return EthResponse::Err(EthError::PermissionDenied); // will never hit
|
return EthResponse::Err(EthError::PermissionDenied); // will never hit
|
||||||
};
|
};
|
||||||
{
|
{
|
||||||
let since_last_block_number = since_last_block_number.lock().await;
|
let mut request_cache = request_cache.lock().await;
|
||||||
if since_last_block_number.is_some_and(|t| t.elapsed() > Duration::from_millis(DELAY_MS))
|
if let Some((cache_hit, time_of_hit)) = request_cache.shift_remove(&serialized_action) {
|
||||||
|| method != "eth_blockNumber"
|
// refresh cache entry (it is most recently accessed) & return it
|
||||||
{
|
if time_of_hit.elapsed() > Duration::from_millis(DELAY_MS) {
|
||||||
let mut request_cache = request_cache.lock().await;
|
|
||||||
if let Some(cache_hit) = request_cache.shift_remove(&serialized_action) {
|
|
||||||
// refresh cache entry (it is most recently accessed) & return it
|
|
||||||
println!("cache hit\r");
|
println!("cache hit\r");
|
||||||
request_cache.insert(serialized_action, cache_hit.clone());
|
request_cache.insert(serialized_action, (cache_hit.clone(), time_of_hit));
|
||||||
return cache_hit;
|
return cache_hit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -812,7 +802,7 @@ async fn fulfill_request(
|
|||||||
// drop 10% oldest cache entries
|
// drop 10% oldest cache entries
|
||||||
request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10);
|
request_cache.drain(0..MAX_REQUEST_CACHE_LEN / 10);
|
||||||
}
|
}
|
||||||
request_cache.insert(serialized_action, response.clone());
|
request_cache.insert(serialized_action, (response.clone(), Instant::now()));
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
Err(rpc_error) => {
|
Err(rpc_error) => {
|
||||||
|
Loading…
Reference in New Issue
Block a user