Merge pull request #140 from uqbar-dao/dr/cargo-clippy

cargo clippy cleanup
This commit is contained in:
dr-frmr 2024-01-10 00:55:50 -03:00 committed by GitHub
commit aef2b2afbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 104 additions and 147 deletions

View File

@ -165,7 +165,7 @@ fn main() {
fs::File::create(format!("{}/src/bootstrapped_processes.rs", pwd.display(),)).unwrap();
writeln!(
bootstrapped_processes,
"pub static BOOTSTRAPPED_PROCESSES: &[(&str, &'static [u8])] = &[",
"pub static BOOTSTRAPPED_PROCESSES: &[(&str, &[u8])] = &[",
)
.unwrap();
let modules_dir = format!("{}/modules", pwd.display());

View File

@ -48,24 +48,21 @@ pub async fn provider(
let connections = Arc::new(Mutex::new(connections));
while let Some(km) = recv_in_client.recv().await {
match &km.message {
Message::Request(req) => {
match handle_request(&our, &km, &req, &connections, &send_to_loop).await {
Ok(()) => {}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("eth: error handling request: {:?}", e),
})
.await;
}
if let Message::Request(req) = &km.message {
match handle_request(&our, &km, req, &connections, &send_to_loop).await {
Ok(()) => {}
Err(e) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("eth: error handling request: {:?}", e),
})
.await;
}
}
_ => {}
}
}
return Err(anyhow::anyhow!("eth: fatal: message receiver closed!"));
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
}
async fn handle_request(
@ -76,11 +73,11 @@ async fn handle_request(
send_to_loop: &MessageSender,
) -> Result<()> {
if let Ok(action) = serde_json::from_slice::<HttpServerRequest>(&req.body) {
return handle_http_server_request(action, km, connections).await;
handle_http_server_request(action, km, connections).await
} else if let Ok(action) = serde_json::from_slice::<EthRequest>(&req.body) {
return handle_eth_request(action, our, km, connections, send_to_loop).await;
handle_eth_request(action, our, km, connections, send_to_loop).await
} else {
return Err(anyhow::anyhow!("malformed request"));
Err(anyhow::anyhow!("malformed request"))
}
}
@ -96,7 +93,7 @@ async fn handle_http_server_request(
{
if message_type == WsMessageType::Text {
let bytes = &km.lazy_load_blob.as_ref().unwrap().bytes;
let text = std::str::from_utf8(&bytes).unwrap();
let text = std::str::from_utf8(bytes).unwrap();
let mut json: serde_json::Value = serde_json::from_str(text)?;
let mut id = json["id"].as_u64().unwrap();
@ -191,7 +188,7 @@ async fn spawn_provider_read_stream(
.or_insert(WsProviderSubscription::default());
ws_provider_subscription.provider = Some(ws_provider.clone());
ws_provider_subscription.subscription = Some(stream.id.clone());
ws_provider_subscription.subscription = Some(stream.id);
drop(connections_guard);
@ -298,7 +295,7 @@ async fn handle_external_websocket_passthrough(
continue;
};
let id = json["id"].as_u64().unwrap() as u32;
let channel_id = ws_request_ids.get(&id).unwrap().clone();
let channel_id: u32 = *ws_request_ids.get(&id).unwrap();
json["id"] = serde_json::Value::from(id - channel_id);

View File

@ -14,24 +14,15 @@ use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
pub type WsRequestIds = Arc<DashMap<u32, u32>>;
#[derive(Default)]
pub struct WsProviderSubscription {
pub handle: Option<JoinHandle<()>>,
pub provider: Option<Provider<Ws>>,
pub subscription: Option<U256>,
}
impl Default for WsProviderSubscription {
fn default() -> Self {
Self {
handle: None,
provider: None,
subscription: None,
}
}
}
impl WsProviderSubscription {
pub async fn kill(&self) -> () {
pub async fn kill(&self) {
if let Some(provider) = &self.provider {
if let Some(subscription) = &self.subscription {
let _ = provider.unsubscribe(subscription).await;

View File

@ -434,7 +434,7 @@ async fn handle_http_request(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_client: executed request, got response"),
content: "http_client: executed request, got response".to_string(),
})
.await;
let _ = send_to_loop
@ -472,7 +472,7 @@ async fn handle_http_request(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_client: executed request but got error"),
content: "http_client: executed request but got error".to_string(),
})
.await;
http_error_message(

View File

@ -246,13 +246,11 @@ async fn login_handler(
response.headers_mut().append(http::header::SET_COOKIE, v);
Ok(response)
}
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&"Failed to generate Auth JWT"),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response())
}
Err(_) => Ok(warp::reply::with_status(
warp::reply::json(&"Failed to generate Auth JWT"),
StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()),
}
}
Err(_) => Ok(warp::reply::with_status(
@ -316,7 +314,7 @@ async fn ws_handler(
let Some(auth_token) = serialized_headers.get("cookie") else {
return Err(warp::reject::not_found());
};
if !auth_cookie_valid(&our, &auth_token, &jwt_secret_bytes) {
if !auth_cookie_valid(&our, auth_token, &jwt_secret_bytes) {
return Err(warp::reject::not_found());
}
}
@ -381,33 +379,33 @@ async fn http_handler(
};
let bound_path = route.handler();
if bound_path.authenticated {
if !auth_cookie_valid(
if bound_path.authenticated
&& !auth_cookie_valid(
&our,
serialized_headers.get("cookie").unwrap_or(&"".to_string()),
&jwt_secret_bytes,
) {
// redirect to login page so they can get an auth token
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!(
"http_server: redirecting request from {socket_addr:?} to login page"
),
})
.await;
return Ok(warp::http::Response::builder()
.status(StatusCode::TEMPORARY_REDIRECT)
.header(
"Location",
format!(
"http://{}/login",
host.unwrap_or(Authority::from_static("localhost"))
),
)
.body(vec![])
.into_response());
}
)
{
// redirect to login page so they can get an auth token
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!(
"http_server: redirecting request from {socket_addr:?} to login page"
),
})
.await;
return Ok(warp::http::Response::builder()
.status(StatusCode::TEMPORARY_REDIRECT)
.header(
"Location",
format!(
"http://{}/login",
host.unwrap_or(Authority::from_static("localhost"))
),
)
.body(vec![])
.into_response());
}
if let Some(ref subdomain) = bound_path.secure_subdomain {
@ -487,8 +485,7 @@ async fn http_handler(
method: method.to_string(),
raw_path: format!(
"http://{}{}",
host.unwrap_or(Authority::from_static("localhost"))
.to_string(),
host.unwrap_or(Authority::from_static("localhost")),
original_path
),
headers: serialized_headers,

View File

@ -201,7 +201,7 @@ async fn handle_kernel_request(
match parent_caps.get(&cap) {
// TODO I don't think we *have* to verify the sigs here but it doesn't hurt...
Some(sig) => {
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), &sig) {
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => {
valid_capabilities.insert(cap, sig.to_vec());
}
@ -344,7 +344,7 @@ async fn handle_kernel_request(
})
.collect();
entry.capabilities.extend(signed_caps);
let _ = persist_state(&our_name, &send_to_loop, &process_map).await;
let _ = persist_state(&our_name, &send_to_loop, process_map).await;
}
// send 'run' message to a process that's already been initialized
t::KernelCommand::RunProcess(process_id) => {
@ -545,7 +545,7 @@ async fn start_process(
let (send_to_process, recv_in_process) =
mpsc::channel::<Result<t::KernelMessage, t::WrappedSendError>>(PROCESS_CHANNEL_CAPACITY);
let id = &process_metadata.process_id;
if senders.contains_key(&id) {
if senders.contains_key(id) {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
@ -662,7 +662,7 @@ pub async fn kernel(
for (process_id, persisted) in &process_map {
// runtime extensions will have a bytes_handle of "", because they have no
// WASM code saved in filesystem.
if persisted.on_exit.is_restart() && persisted.wasm_bytes_handle != "" {
if persisted.on_exit.is_restart() && !persisted.wasm_bytes_handle.is_empty() {
// read wasm bytes directly from vfs
// start process.
let wasm_bytes = match tokio::fs::read(format!(
@ -970,23 +970,18 @@ pub async fn kernel(
if our.name != kernel_message.source.node {
continue;
}
match kernel_message.message {
t::Message::Request(_) => {
handle_kernel_request(
our.name.clone(),
keypair.clone(),
kernel_message,
send_to_loop.clone(),
send_to_terminal.clone(),
&mut senders,
&mut process_handles,
&mut process_map,
caps_oracle_sender.clone(),
&engine,
).await;
}
_ => {}
}
handle_kernel_request(
our.name.clone(),
keypair.clone(),
kernel_message,
send_to_loop.clone(),
send_to_terminal.clone(),
&mut senders,
&mut process_handles,
&mut process_map,
caps_oracle_sender.clone(),
&engine,
).await;
} else {
// pass message to appropriate runtime module or process
match senders.get(&kernel_message.target.process) {
@ -1088,12 +1083,12 @@ pub async fn kernel(
})
// otherwise verify the signature before returning
} else {
match p.capabilities.get(&cap) {
match p.capabilities.get(cap) {
None => None,
Some(sig) => {
let pk = signature::UnparsedPublicKey::new(&signature::ED25519, keypair.public_key());
match pk.verify(
&rmp_serde::to_vec(&cap).unwrap_or_default(),
&rmp_serde::to_vec(cap).unwrap_or_default(),
sig,
) {
Ok(_) => Some((cap.clone(), sig.clone())),

View File

@ -145,8 +145,7 @@ impl ProcessState {
inner_request.capabilities = {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self
.caps_oracle
self.caps_oracle
.send(t::CapMessage::GetSome {
on: self.metadata.our.process.clone(),
caps: request
@ -382,7 +381,7 @@ impl ProcessState {
return Some((cap.clone(), sig.clone()));
}
// otherwise only return capabilities that were properly signed
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), &sig) {
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => Some((cap.clone(), sig.clone())),
Err(_) => None,
}
@ -402,7 +401,7 @@ impl ProcessState {
return Some((cap.clone(), sig.clone()));
}
// otherwise only return capabilities that were properly signed
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), &sig) {
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => Some((cap.clone(), sig.clone())),
Err(_) => None,
}

View File

@ -153,7 +153,7 @@ async fn handle_request(
Some(db) => db,
};
match db.get(&key) {
match db.get(key) {
Ok(Some(value)) => (
serde_json::to_vec(&KvResponse::Get { key: key.to_vec() }).unwrap(),
Some(value),
@ -195,7 +195,7 @@ async fn handle_request(
db.put(key, blob.bytes)?;
}
Some(tx_id) => {
let mut tx = match txs.get_mut(&tx_id) {
let mut tx = match txs.get_mut(tx_id) {
None => {
return Err(KvError::NoTx);
}
@ -219,7 +219,7 @@ async fn handle_request(
db.delete(key)?;
}
Some(tx_id) => {
let mut tx = match txs.get_mut(&tx_id) {
let mut tx = match txs.get_mut(tx_id) {
None => {
return Err(KvError::NoTx);
}
@ -238,7 +238,7 @@ async fn handle_request(
Some(db) => db,
};
let txs = match txs.remove(&tx_id).map(|(_, tx)| tx) {
let txs = match txs.remove(tx_id).map(|(_, tx)| tx) {
None => {
return Err(KvError::NoTx);
}
@ -421,12 +421,7 @@ async fn check_caps(
return Ok(());
}
let db_path = format!(
"{}/{}/{}",
kv_path,
request.package_id.to_string(),
request.db.to_string()
);
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db);
fs::create_dir_all(&db_path).await?;
let db = OptimisticTransactionDB::open_default(&db_path)?;
@ -441,12 +436,7 @@ async fn check_caps(
});
}
let db_path = format!(
"{}/{}/{}",
kv_path,
request.package_id.to_string(),
request.db.to_string()
);
let db_path = format!("{}/{}/{}", kv_path, request.package_id, request.db);
open_kvs.remove(&(request.package_id.clone(), request.db.clone()));
fs::remove_dir_all(&db_path).await?;
@ -497,7 +487,7 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: KvError) -> K
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&KvResponse::Err { error: error }).unwrap(),
body: serde_json::to_vec(&KvResponse::Err { error }).unwrap(),
metadata: None,
capabilities: vec![],
},

View File

@ -119,7 +119,7 @@ async fn main() {
let matches = app.get_matches();
let home_directory_path = matches.get_one::<String>("home").unwrap();
let port = matches.get_one::<u16>("port").unwrap().clone();
let port = *matches.get_one::<u16>("port").unwrap();
#[cfg(not(feature = "simulation-mode"))]
let rpc_url = matches.get_one::<String>("rpc").unwrap();
@ -210,9 +210,9 @@ async fn main() {
);
#[cfg(not(feature = "simulation-mode"))]
let (our, encoded_keyfile, decoded_keyfile) = serve_register_fe(
&home_directory_path,
home_directory_path,
our_ip.to_string(),
http_server_port.clone(),
http_server_port,
rpc_url.clone(),
)
.await;

View File

@ -179,7 +179,7 @@ async fn handle_request(
.map(|word| word.to_uppercase())
.unwrap_or("".to_string());
if !READ_KEYWORDS.contains(&first_word) {
return Err(SqliteError::NotAReadKeyword.into());
return Err(SqliteError::NotAReadKeyword);
}
let parameters = get_json_params(blob)?;
@ -235,7 +235,7 @@ async fn handle_request(
.unwrap_or("".to_string());
if !WRITE_KEYWORDS.contains(&first_word) {
return Err(SqliteError::NotAWriteKeyword.into());
return Err(SqliteError::NotAWriteKeyword);
}
let parameters = get_json_params(blob)?;
@ -243,7 +243,7 @@ async fn handle_request(
match tx_id {
Some(tx_id) => {
txs.entry(tx_id)
.or_insert_with(Vec::new)
.or_default()
.push((statement.clone(), parameters));
}
None => {
@ -440,17 +440,12 @@ async fn check_caps(
return Ok(());
}
let db_path = format!(
"{}/{}/{}",
sqlite_path,
request.package_id.to_string(),
request.db.to_string()
);
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db);
fs::create_dir_all(&db_path).await?;
let db_file_path = format!("{}/{}.db", db_path, request.db.to_string());
let db_file_path = format!("{}/{}.db", db_path, request.db);
let db = Connection::open(&db_file_path)?;
let db = Connection::open(db_file_path)?;
let _ = db.execute("PRAGMA journal_mode=WAL", []);
open_dbs.insert(
@ -466,12 +461,7 @@ async fn check_caps(
});
}
let db_path = format!(
"{}/{}/{}",
sqlite_path,
request.package_id.to_string(),
request.db.to_string()
);
let db_path = format!("{}/{}/{}", sqlite_path, request.package_id, request.db);
open_dbs.remove(&(request.package_id.clone(), request.db.clone()));
fs::remove_dir_all(&db_path).await?;
@ -522,7 +512,7 @@ fn json_to_sqlite(value: &serde_json::Value) -> Result<SqlValue, SqliteError> {
}
}
serde_json::Value::String(s) => {
match base64::decode(&s) {
match base64::decode(s) {
Ok(decoded_bytes) => {
// convert to SQLite Blob if it's a valid base64 string
Ok(SqlValue::Blob(decoded_bytes))
@ -545,7 +535,7 @@ fn get_json_params(blob: Option<LazyLoadBlob>) -> Result<Vec<SqlValue>, SqliteEr
Some(blob) => match serde_json::from_slice::<serde_json::Value>(&blob.bytes) {
Ok(serde_json::Value::Array(vec)) => vec
.iter()
.map(|value| json_to_sqlite(value))
.map(json_to_sqlite)
.collect::<Result<Vec<_>, _>>(),
_ => Err(SqliteError::InvalidParameters),
},
@ -567,7 +557,7 @@ fn make_error_message(our_name: String, km: &KernelMessage, error: SqliteError)
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&SqliteResponse::Err { error: error }).unwrap(),
body: serde_json::to_vec(&SqliteResponse::Err { error }).unwrap(),
metadata: None,
capabilities: vec![],
},

View File

@ -532,14 +532,14 @@ impl std::fmt::Display for Message {
&request.metadata.as_ref().unwrap_or(&"None".into()),
{
let mut caps_string = "[".to_string();
if request.capabilities.len() > 0 {
if !request.capabilities.is_empty() {
caps_string.push_str("\n ");
for cap in request.capabilities.iter() {
caps_string.push_str(&format!("{},\n ", cap.0));
}
caps_string.truncate(caps_string.len() - 4);
}
caps_string.push_str("]");
caps_string.push(']');
caps_string
},
),
@ -562,14 +562,14 @@ impl std::fmt::Display for Message {
},
{
let mut caps_string = "[".to_string();
if response.capabilities.len() > 0 {
if !response.capabilities.is_empty() {
caps_string.push_str("\n ");
for cap in response.capabilities.iter() {
caps_string.push_str(&format!("{},\n ", cap.0));
}
caps_string.truncate(caps_string.len() - 4);
}
caps_string.push_str("]");
caps_string.push(']');
caps_string
},
),

View File

@ -648,7 +648,7 @@ async fn check_caps(
return Ok(());
}
let (new_package_id, new_drive, _rest) = parse_package_and_drive(&new_path).await?;
let (new_package_id, new_drive, _rest) = parse_package_and_drive(new_path).await?;
let new_drive = format!("/{}/{}", new_package_id, new_drive);
// if both new and old path are within the package_id path, ok
if (src_package_id == package_id) && (src_package_id == new_package_id) {
@ -716,13 +716,11 @@ async fn check_caps(
Ok(())
}
VfsAction::CreateDrive => {
if src_package_id != package_id {
if !has_root_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
});
}
if src_package_id != package_id && !has_root_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
});
}
add_capability("read", &drive, &our_node, &source, &mut send_to_caps_oracle).await?;