mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-22 16:11:38 +03:00
Merge branch 'develop' into bp/newapps
This commit is contained in:
commit
65f4fcccaa
@ -609,7 +609,7 @@ fn handle_eth_sub_event(
|
||||
event: eth::SubscriptionResult,
|
||||
) -> Result<(), AppStoreLogError> {
|
||||
let eth::SubscriptionResult::Log(log) = event else {
|
||||
return Err(AppStoreLogError::DecodeLogError);
|
||||
return Ok(()); // not a log event
|
||||
};
|
||||
state.ingest_contract_event(*log, true)
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
use crate::{utils, DownloadRequest, LocalRequest};
|
||||
use crate::{KIMAP_ADDRESS, VFS_TIMEOUT};
|
||||
use alloy_sol_types::SolEvent;
|
||||
use kinode_process_lib::kernel_types::Erc721Metadata;
|
||||
use kinode_process_lib::{
|
||||
eth, kernel_types as kt, kimap, println, vfs, Address, NodeId, PackageId, Request,
|
||||
@ -17,7 +16,7 @@ use std::str::FromStr;
|
||||
pub enum AppStoreLogError {
|
||||
NoBlockNumber,
|
||||
GetNameError,
|
||||
DecodeLogError,
|
||||
DecodeLogError(kimap::DecodeLogError),
|
||||
PackageHashMismatch,
|
||||
InvalidPublisherName,
|
||||
MetadataNotFound,
|
||||
@ -30,7 +29,7 @@ impl std::fmt::Display for AppStoreLogError {
|
||||
match self {
|
||||
AppStoreLogError::NoBlockNumber => write!(f, "log with no block number"),
|
||||
AppStoreLogError::GetNameError => write!(f, "no corresponding name for namehash found"),
|
||||
AppStoreLogError::DecodeLogError => write!(f, "error decoding log data"),
|
||||
AppStoreLogError::DecodeLogError(e) => write!(f, "error decoding log data: {e:?}"),
|
||||
AppStoreLogError::PackageHashMismatch => write!(f, "mismatched package hash"),
|
||||
AppStoreLogError::InvalidPublisherName => write!(f, "invalid publisher name"),
|
||||
AppStoreLogError::MetadataNotFound => write!(f, "metadata not found"),
|
||||
@ -401,21 +400,25 @@ impl State {
|
||||
//
|
||||
let metadata_uri = String::from_utf8_lossy(¬e.data).to_string();
|
||||
|
||||
// generate ~metadata-hash notehash
|
||||
let hash_note = format!("~metadata-hash.{}", note.parent_path);
|
||||
let (tba, metadata_hash) = if update_listings {
|
||||
// generate ~metadata-hash full-path
|
||||
let hash_note = format!("~metadata-hash.{}", note.parent_path);
|
||||
|
||||
// owner can change which we don't track (yet?) so don't save, need to get when desired
|
||||
let (tba, _owner, data) = self.kimap.get(&hash_note).map_err(|e| {
|
||||
println!("Couldn't find {hash_note}: {e:?}");
|
||||
AppStoreLogError::MetadataHashMismatch
|
||||
})?;
|
||||
// owner can change which we don't track (yet?) so don't save, need to get when desired
|
||||
let (tba, _owner, data) = self.kimap.get(&hash_note).map_err(|e| {
|
||||
println!("Couldn't find {hash_note}: {e:?}");
|
||||
AppStoreLogError::MetadataHashMismatch
|
||||
})?;
|
||||
|
||||
let Some(hash_note) = data else {
|
||||
return Err(AppStoreLogError::MetadataNotFound);
|
||||
let Some(hash_note) = data else {
|
||||
return Err(AppStoreLogError::MetadataNotFound);
|
||||
};
|
||||
|
||||
(tba, String::from_utf8_lossy(&hash_note).to_string())
|
||||
} else {
|
||||
(eth::Address::ZERO, "".to_string())
|
||||
};
|
||||
|
||||
let metadata_hash = String::from_utf8_lossy(&hash_note).to_string();
|
||||
|
||||
// fetch metadata from the URI (currently only handling HTTP(S) URLs!)
|
||||
// assert that the metadata hash matches the fetched data
|
||||
let metadata = if update_listings {
|
||||
@ -431,10 +434,10 @@ impl State {
|
||||
match self.packages.entry(package_id) {
|
||||
std::collections::hash_map::Entry::Occupied(mut listing) => {
|
||||
let listing = listing.get_mut();
|
||||
listing.tba = tba;
|
||||
listing.metadata_uri = metadata_uri;
|
||||
listing.metadata_hash = metadata_hash;
|
||||
if update_listings {
|
||||
listing.tba = tba;
|
||||
listing.metadata_hash = metadata_hash;
|
||||
listing.metadata = metadata;
|
||||
}
|
||||
}
|
||||
@ -460,14 +463,41 @@ impl State {
|
||||
/// of stale metadata.
|
||||
pub fn update_listings(&mut self) {
|
||||
for (package_id, listing) in self.packages.iter_mut() {
|
||||
// if publisher is `sys`, we can skip
|
||||
if package_id.publisher() == "sys" {
|
||||
continue;
|
||||
}
|
||||
|
||||
// generate ~metadata-hash full-path
|
||||
let hash_note = format!(
|
||||
"~metadata-hash.{}",
|
||||
package_id.to_string().replace(":", ".")
|
||||
);
|
||||
|
||||
// owner can change which we don't track (yet?) so don't save, need to get when desired
|
||||
let Ok((tba, _owner, data)) = self.kimap.get(&hash_note) else {
|
||||
println!("Couldn't find {hash_note}");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(hash_note) = data else {
|
||||
println!("No data for {hash_note}");
|
||||
continue;
|
||||
};
|
||||
|
||||
let metadata_hash = String::from_utf8_lossy(&hash_note).to_string();
|
||||
|
||||
if let Ok(metadata) =
|
||||
utils::fetch_metadata_from_url(&listing.metadata_uri, &listing.metadata_hash, 30)
|
||||
utils::fetch_metadata_from_url(&listing.metadata_uri, &metadata_hash, 30)
|
||||
{
|
||||
if let Some(package_state) = &listing.state {
|
||||
auto_update(&self.our, package_id, &metadata, package_state);
|
||||
}
|
||||
listing.metadata = Some(metadata);
|
||||
}
|
||||
|
||||
listing.tba = tba;
|
||||
listing.metadata_hash = metadata_hash;
|
||||
}
|
||||
// kinode_process_lib::set_state(&serde_json::to_vec(self).unwrap());
|
||||
}
|
||||
|
@ -96,16 +96,17 @@ pub fn app_store_filter(state: &State) -> eth::Filter {
|
||||
pub fn fetch_and_subscribe_logs(state: &mut State) {
|
||||
let filter = app_store_filter(state);
|
||||
// get past logs, subscribe to new ones.
|
||||
// subscribe first so we don't miss any logs
|
||||
state.kimap.provider.subscribe_loop(1, filter.clone());
|
||||
for log in fetch_logs(
|
||||
&state.kimap.provider,
|
||||
&filter.clone().from_block(state.last_saved_block),
|
||||
&filter.from_block(state.last_saved_block),
|
||||
) {
|
||||
if let Err(e) = state.ingest_contract_event(log, false) {
|
||||
print_to_terminal(1, &format!("error ingesting log: {e}"));
|
||||
};
|
||||
}
|
||||
state.update_listings();
|
||||
state.kimap.provider.subscribe_loop(1, filter);
|
||||
}
|
||||
|
||||
/// fetch logs from the chain with a given filter
|
||||
|
@ -110,22 +110,18 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
),
|
||||
);
|
||||
|
||||
// subscribe to logs first, so no logs are missed
|
||||
println!("subscribing to new logs...");
|
||||
eth_provider.subscribe_loop(1, mints_filter.clone());
|
||||
eth_provider.subscribe_loop(2, notes_filter.clone());
|
||||
listen_to_new_blocks(); // sub_id: 3
|
||||
|
||||
// if block in state is < current_block, get logs from that part.
|
||||
println!("syncing old logs...");
|
||||
fetch_and_process_logs(ð_provider, &our, &mut state, mints_filter.clone());
|
||||
fetch_and_process_logs(ð_provider, &our, &mut state, notes_filter.clone());
|
||||
println!("done syncing old logs.");
|
||||
|
||||
let current_block = eth_provider.get_block_number().unwrap();
|
||||
println!("current block: {}", current_block);
|
||||
state.last_block = current_block;
|
||||
|
||||
println!("subscribing to new logs...");
|
||||
subscribe_to_logs(ð_provider, mints_filter.clone(), 1);
|
||||
subscribe_to_logs(ð_provider, notes_filter.clone(), 2);
|
||||
listen_to_new_blocks(); // sub_id: 3
|
||||
println!("subscribed to logs successfully");
|
||||
|
||||
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
|
||||
|
||||
loop {
|
||||
@ -227,9 +223,9 @@ fn handle_eth_message(
|
||||
Ok(Err(e)) => {
|
||||
println!("got eth subscription error ({e:?}), resubscribing");
|
||||
if e.id == 1 {
|
||||
subscribe_to_logs(ð_provider, mints_filter.clone(), 1);
|
||||
eth_provider.subscribe_loop(1, mints_filter.clone());
|
||||
} else if e.id == 2 {
|
||||
subscribe_to_logs(ð_provider, notes_filter.clone(), 2);
|
||||
eth_provider.subscribe_loop(2, notes_filter.clone());
|
||||
} else if e.id == 3 {
|
||||
listen_to_new_blocks();
|
||||
}
|
||||
@ -527,16 +523,3 @@ fn listen_to_new_blocks() {
|
||||
.send()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn subscribe_to_logs(eth_provider: ð::Provider, filter: eth::Filter, sub_id: u64) {
|
||||
loop {
|
||||
match eth_provider.subscribe(sub_id, filter.clone()) {
|
||||
Ok(()) => break,
|
||||
Err(_) => {
|
||||
println!("failed to subscribe to chain! trying again in 5s...");
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,6 @@ pub async fn vfs(
|
||||
// Clone Arcs for the new task
|
||||
let our_node = our_node.clone();
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let send_to_terminal = send_to_terminal.clone();
|
||||
let send_to_caps_oracle = send_to_caps_oracle.clone();
|
||||
let open_files = open_files.clone();
|
||||
let vfs_path = vfs_path.clone();
|
||||
@ -118,9 +117,6 @@ pub async fn vfs(
|
||||
)
|
||||
.await
|
||||
{
|
||||
Printout::new(1, format!("vfs: {e}"))
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
KernelMessage::builder()
|
||||
.id(km_id)
|
||||
.source((our_node.as_str(), VFS_PROCESS_ID.clone()))
|
||||
|
Loading…
Reference in New Issue
Block a user