add handling for set/stop mirror/auto_update

This commit is contained in:
dr-frmr 2024-01-26 02:30:54 -03:00
parent 0b27d041b4
commit cdcbf8e57b
No known key found for this signature in database
2 changed files with 113 additions and 47 deletions

View File

@ -318,7 +318,22 @@ fn handle_local_request(
Ok(()) => LocalResponse::UninstallResponse(UninstallResponse::Success),
Err(_) => LocalResponse::UninstallResponse(UninstallResponse::Failure),
},
_ => todo!(),
LocalRequest::StartMirroring(package) => match state.start_mirroring(package) {
Ok(()) => LocalResponse::MirrorResponse(MirrorResponse::Success),
Err(_) => LocalResponse::MirrorResponse(MirrorResponse::Failure),
},
LocalRequest::StopMirroring(package) => match state.stop_mirroring(package) {
Ok(()) => LocalResponse::MirrorResponse(MirrorResponse::Success),
Err(_) => LocalResponse::MirrorResponse(MirrorResponse::Failure),
},
LocalRequest::StartAutoUpdate(package) => match state.start_auto_update(package) {
Ok(()) => LocalResponse::AutoUpdateResponse(AutoUpdateResponse::Success),
Err(_) => LocalResponse::AutoUpdateResponse(AutoUpdateResponse::Failure),
},
LocalRequest::StopAutoUpdate(package) => match state.stop_auto_update(package) {
Ok(()) => LocalResponse::AutoUpdateResponse(AutoUpdateResponse::Success),
Err(_) => LocalResponse::AutoUpdateResponse(AutoUpdateResponse::Failure),
},
}
}

View File

@ -19,44 +19,6 @@ sol! {
pub type PackageHash = String;
/// this process's saved state
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the address of the contract we are using to read package listings
/// this is set by runtime distro at boot-time
pub contract_address: Option<String>,
/// the last block at which we saved the state of the listings to disk.
/// we don't want to save the state every time we get a new listing,
/// so we only save it every so often and then mark the block at which
/// that last occurred here. when we boot, we can read logs starting
/// from this block and rebuild latest state.
pub last_saved_block: u64,
/// we keep the full state of the package manager here, calculated from
/// the listings contract logs. in the future, we'll offload this and
/// only track a certain number of packages...
listed_packages: HashMap<PackageHash, PackageListing>,
/// we keep the full state of the packages we have downloaded here.
/// in order to keep this synchronized with our filesystem, we will
/// ingest apps on disk if we have to rebuild our state. this is also
/// updated every time we download, create, or uninstall a package.
downloaded_packages: HashMap<PackageId, PackageState>,
}
/// state of an individual package we have downloaded
#[derive(Debug, Serialize, Deserialize)]
pub struct PackageState {
/// the node we last downloaded the package from
/// this is "us" if we don't know the source (usually cause it's a local install)
pub mirrored_from: Option<NodeId>,
/// the version of the package we have downloaded
pub our_version: String,
/// are we serving this package to others?
pub mirroring: bool,
/// if we get a listing data update, will we try to download it?
pub auto_update: bool,
pub metadata: Option<OnchainPackageMetadata>,
}
/// listing information derived from metadata hash in listing event
#[derive(Debug, Serialize, Deserialize)]
pub struct PackageListing {
@ -80,6 +42,7 @@ pub struct OnchainPackageMetadata {
pub versions: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RequestedPackage {
pub mirror: bool,
pub auto_update: bool,
@ -87,6 +50,44 @@ pub struct RequestedPackage {
pub desired_version_hash: Option<String>,
}
/// state of an individual package we have downloaded
#[derive(Debug, Serialize, Deserialize)]
pub struct PackageState {
/// the node we last downloaded the package from
/// this is "us" if we don't know the source (usually cause it's a local install)
pub mirrored_from: Option<NodeId>,
/// the version of the package we have downloaded
pub our_version: String,
/// are we serving this package to others?
pub mirroring: bool,
/// if we get a listing data update, will we try to download it?
pub auto_update: bool,
pub metadata: Option<OnchainPackageMetadata>,
}
/// this process's saved state
#[derive(Debug, Serialize, Deserialize)]
pub struct State {
/// the address of the contract we are using to read package listings
/// this is set by runtime distro at boot-time
pub contract_address: Option<String>,
/// the last block at which we saved the state of the listings to disk.
/// we don't want to save the state every time we get a new listing,
/// so we only save it every so often and then mark the block at which
/// that last occurred here. when we boot, we can read logs starting
/// from this block and rebuild latest state.
pub last_saved_block: u64,
/// we keep the full state of the package manager here, calculated from
/// the listings contract logs. in the future, we'll offload this and
/// only track a certain number of packages...
listed_packages: HashMap<PackageHash, PackageListing>, // TODO use `kv`
/// we keep the full state of the packages we have downloaded here.
/// in order to keep this synchronized with our filesystem, we will
/// ingest apps on disk if we have to rebuild our state. this is also
/// updated every time we download, create, or uninstall a package.
downloaded_packages: HashMap<PackageId, PackageState>, // TODO use `kv`
}
impl State {
/// To create a new state, we populate the downloaded_packages map
/// with all packages parseable from our filesystem.
@ -110,6 +111,50 @@ impl State {
self.downloaded_packages.get(package)
}
// saves state
pub fn start_mirroring(&mut self, package_id: &PackageId) -> anyhow::Result<()> {
let package_state = self
.downloaded_packages
.get_mut(package_id)
.ok_or(anyhow::anyhow!("package not found"))?;
package_state.mirroring = true;
crate::set_state(&bincode::serialize(self)?);
Ok(())
}
// saves state
pub fn stop_mirroring(&mut self, package_id: &PackageId) -> anyhow::Result<()> {
let package_state = self
.downloaded_packages
.get_mut(package_id)
.ok_or(anyhow::anyhow!("package not found"))?;
package_state.mirroring = false;
crate::set_state(&bincode::serialize(self)?);
Ok(())
}
// saves state
pub fn start_auto_update(&mut self, package_id: &PackageId) -> anyhow::Result<()> {
let package_state = self
.downloaded_packages
.get_mut(package_id)
.ok_or(anyhow::anyhow!("package not found"))?;
package_state.auto_update = true;
crate::set_state(&bincode::serialize(self)?);
Ok(())
}
// saves state
pub fn stop_auto_update(&mut self, package_id: &PackageId) -> anyhow::Result<()> {
let package_state = self
.downloaded_packages
.get_mut(package_id)
.ok_or(anyhow::anyhow!("package not found"))?;
package_state.auto_update = false;
crate::set_state(&bincode::serialize(self)?);
Ok(())
}
/// Done in response to any new onchain listing update other than 'delete'
fn update_listing(&mut self, listing: PackageListing) {
self.listed_packages.insert(
@ -131,9 +176,10 @@ impl State {
path: "/".to_string(),
action: vfs::VfsAction::ReadDir,
})?)
.send_and_await_response(3)?? else {
return Err(anyhow::anyhow!("vfs: bad response"));
};
.send_and_await_response(3)??
else {
return Err(anyhow::anyhow!("vfs: bad response"));
};
let response = serde_json::from_slice::<vfs::VfsResponse>(&body)?;
let vfs::VfsResponse::ReadDir(entries) = response else {
return Err(anyhow::anyhow!("vfs: unexpected response: {:?}", response));
@ -141,12 +187,16 @@ impl State {
let mut downloaded_packages = HashMap::new();
for entry in entries {
// ignore non-package dirs
let Ok(package_id) = entry.path[1..].parse::<PackageId>() else { continue };
let Ok(package_id) = entry.path[1..].parse::<PackageId>() else {
continue;
};
if entry.file_type == vfs::FileType::Directory {
let zip_file = vfs::File {
path: format!("/{}/pkg/{}.zip", package_id, package_id),
};
let Ok(zip_file_bytes) = zip_file.read() else { continue };
let Ok(zip_file_bytes) = zip_file.read() else {
continue;
};
// generate entry from this data
// for the version hash, take the SHA-256 hash of the zip file
let our_version = generate_version_hash(&zip_file_bytes);
@ -198,7 +248,8 @@ impl State {
})?)
.blob(blob.clone())
.send_and_await_response(5)??;
let vfs::VfsResponse::Ok = serde_json::from_slice::<vfs::VfsResponse>(response.body())? else {
let vfs::VfsResponse::Ok = serde_json::from_slice::<vfs::VfsResponse>(response.body())?
else {
return Err(anyhow::anyhow!(
"cannot add NewPackage: do not have capability to access vfs"
));
@ -241,8 +292,8 @@ impl State {
for entry in &manifest {
let process_id = format!("{}:{}", entry.process_name, package_id);
let Ok(parsed_new_process_id) = process_id.parse::<ProcessId>() else {
continue;
};
continue;
};
Request::new()
.target(("our", "kernel", "distro", "sys"))
.body(serde_json::to_vec(&kt::KernelCommand::KillProcess(