mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-22 19:34:06 +03:00
app store: fetch metadata urls only after catching up on logs
This commit is contained in:
parent
3ac646a708
commit
b033b4b965
@ -45,12 +45,7 @@ pub fn init_frontend(our: &Address, http_server: &mut server::HttpServer) {
|
||||
.expect("failed to bind http path");
|
||||
}
|
||||
http_server
|
||||
.serve_ui(
|
||||
&our,
|
||||
"ui",
|
||||
vec!["/", "/app/:id", "/publish", "/download/:id", "my-downloads"],
|
||||
config.clone(),
|
||||
)
|
||||
.serve_ui(&our, "ui", vec!["/"], config.clone())
|
||||
.expect("failed to serve static UI");
|
||||
|
||||
http_server
|
||||
|
@ -132,7 +132,7 @@ fn handle_message(our: &Address, state: &mut State, message: &Message) -> anyhow
|
||||
return Err(anyhow::anyhow!("foo"));
|
||||
};
|
||||
let log = serde_json::from_slice(context)?;
|
||||
handle_eth_log(our, state, log)?;
|
||||
handle_eth_log(our, state, log, false)?;
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
@ -234,10 +234,17 @@ fn handle_local_request(state: &mut State, req: ChainRequests) -> anyhow::Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Result<()> {
|
||||
let block_number: u64 = log.block_number.ok_or(anyhow::anyhow!("blocknumbaerror"))?;
|
||||
fn handle_eth_log(
|
||||
our: &Address,
|
||||
state: &mut State,
|
||||
log: eth::Log,
|
||||
startup: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let block_number: u64 = log
|
||||
.block_number
|
||||
.ok_or(anyhow::anyhow!("log missing block number"))?;
|
||||
let note: kimap::Note =
|
||||
kimap::decode_note_log(&log).map_err(|e| anyhow::anyhow!("decodelogerror: {e:?}"))?;
|
||||
kimap::decode_note_log(&log).map_err(|e| anyhow::anyhow!("decode log error: {e:?}"))?;
|
||||
|
||||
let package_id = note
|
||||
.parent_path
|
||||
@ -293,9 +300,19 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re
|
||||
}
|
||||
};
|
||||
|
||||
if is_our_package {
|
||||
state.published.insert(package_id.clone());
|
||||
}
|
||||
|
||||
// if this is a startup event, we don't need to fetch metadata from the URI --
|
||||
// we'll loop over all listings after processing all logs and fetch them as needed.
|
||||
// fetch metadata from the URI (currently only handling HTTP(S) URLs!)
|
||||
// assert that the metadata hash matches the fetched data
|
||||
let metadata = fetch_metadata_from_url(&metadata_uri, &metadata_hash, 30)?;
|
||||
let metadata = if !startup {
|
||||
Some(fetch_metadata_from_url(&metadata_uri, &metadata_hash, 30)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match state.listings.entry(package_id.clone()) {
|
||||
std::collections::hash_map::Entry::Occupied(mut listing) => {
|
||||
@ -303,40 +320,63 @@ fn handle_eth_log(our: &Address, state: &mut State, log: eth::Log) -> anyhow::Re
|
||||
listing.metadata_uri = metadata_uri;
|
||||
listing.tba = tba;
|
||||
listing.metadata_hash = metadata_hash;
|
||||
listing.metadata = Some(metadata.clone());
|
||||
listing.metadata = metadata.clone();
|
||||
}
|
||||
std::collections::hash_map::Entry::Vacant(listing) => {
|
||||
listing.insert(PackageListing {
|
||||
tba,
|
||||
metadata_uri,
|
||||
metadata_hash,
|
||||
metadata: Some(metadata.clone()),
|
||||
metadata: metadata.clone(),
|
||||
auto_update: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if is_our_package {
|
||||
state.published.insert(package_id.clone());
|
||||
if !startup {
|
||||
// if auto_update is enabled, send a message to downloads to kick off the update.
|
||||
if let Some(listing) = state.listings.get(&package_id) {
|
||||
if listing.auto_update {
|
||||
print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id));
|
||||
Request::to(("our", "downloads", "app_store", "sys"))
|
||||
.body(&DownloadRequests::AutoUpdate(AutoUpdateRequest {
|
||||
package_id: crate::kinode::process::main::PackageId::from_process_lib(
|
||||
package_id,
|
||||
),
|
||||
metadata: metadata.unwrap().into(),
|
||||
}))
|
||||
.send()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.last_saved_block = block_number;
|
||||
|
||||
// if auto_update is enabled, send a message to downloads to kick off the update.
|
||||
if let Some(listing) = state.listings.get(&package_id) {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// after startup, fetch metadata for all listings
|
||||
/// we do this as a separate step to not repeatedly fetch outdated metadata
|
||||
/// as we process logs.
|
||||
fn update_all_metadata(state: &mut State) {
|
||||
for (package_id, listing) in state.listings.iter_mut() {
|
||||
let metadata =
|
||||
fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30).ok();
|
||||
listing.metadata = metadata.clone();
|
||||
if listing.auto_update {
|
||||
print_to_terminal(1, &format!("kicking off auto-update for: {}", package_id));
|
||||
let request = DownloadRequests::AutoUpdate(AutoUpdateRequest {
|
||||
package_id: crate::kinode::process::main::PackageId::from_process_lib(package_id),
|
||||
metadata: metadata.into(),
|
||||
});
|
||||
print_to_terminal(0, &format!("kicking off auto-update for: {}", package_id));
|
||||
Request::to(("our", "downloads", "app_store", "sys"))
|
||||
.body(&request)
|
||||
.send()?;
|
||||
.body(&DownloadRequests::AutoUpdate(AutoUpdateRequest {
|
||||
package_id: crate::kinode::process::main::PackageId::from_process_lib(
|
||||
package_id.clone(),
|
||||
),
|
||||
metadata: metadata.unwrap().into(),
|
||||
}))
|
||||
.send()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// create the filter used for app store getLogs and subscription.
|
||||
@ -365,10 +405,11 @@ pub fn fetch_and_subscribe_logs(our: &Address, state: &mut State) {
|
||||
&state.kimap.provider,
|
||||
&filter.from_block(state.last_saved_block),
|
||||
) {
|
||||
if let Err(e) = handle_eth_log(our, state, log) {
|
||||
if let Err(e) = handle_eth_log(our, state, log, true) {
|
||||
print_to_terminal(1, &format!("error ingesting log: {e}"));
|
||||
};
|
||||
}
|
||||
update_all_metadata(state);
|
||||
}
|
||||
|
||||
/// fetch logs from the chain with a given filter
|
||||
|
@ -275,10 +275,10 @@ fn handle_pending_notes(
|
||||
for (note, attempt) in notes.drain(..) {
|
||||
if attempt >= MAX_PENDING_ATTEMPTS {
|
||||
// skip notes that have exceeded max attempts
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!("dropping note from block {block} after {attempt} attempts"),
|
||||
);
|
||||
// print_to_terminal(
|
||||
// 1,
|
||||
// &format!("dropping note from block {block} after {attempt} attempts"),
|
||||
// );
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = handle_note(state, ¬e) {
|
||||
@ -288,10 +288,10 @@ fn handle_pending_notes(
|
||||
}
|
||||
Some(ee) => match ee {
|
||||
KnsError::NoParentError => {
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!("note still awaiting mint; attempt {attempt}"),
|
||||
);
|
||||
// print_to_terminal(
|
||||
// 1,
|
||||
// &format!("note still awaiting mint; attempt {attempt}"),
|
||||
// );
|
||||
keep_notes.push((note, attempt + 1));
|
||||
}
|
||||
},
|
||||
@ -439,10 +439,10 @@ fn handle_log(
|
||||
if let Err(e) = handle_note(state, &decoded) {
|
||||
if let Some(KnsError::NoParentError) = e.downcast_ref::<KnsError>() {
|
||||
if let Some(block_number) = log.block_number {
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!("adding note to pending_notes for block {block_number}"),
|
||||
);
|
||||
// print_to_terminal(
|
||||
// 1,
|
||||
// &format!("adding note to pending_notes for block {block_number}"),
|
||||
// );
|
||||
pending_notes
|
||||
.entry(block_number)
|
||||
.or_default()
|
||||
|
Loading…
Reference in New Issue
Block a user