Paginate through extensions from the blob store (#12614)

This PR updates the background task used to fetch extensions from the
blob store to account for the possibility that the result set will be
paginated.

Will now paginate through all of the results and collect them up before
proceeding to determining which extensions need to be synced.

Release Notes:

- N/A
This commit is contained in:
Marshall Bowers 2024-06-03 17:17:46 -04:00 committed by GitHub
parent 14c2fab8ab
commit afc0650a49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -239,61 +239,74 @@ async fn fetch_extensions_from_blob_store(
) -> anyhow::Result<()> {
log::info!("fetching extensions from blob store");
let list = blob_store_client
.list_objects()
.bucket(blob_store_bucket)
.prefix("extensions/")
.send()
.await?;
let mut next_marker = None;
let mut published_versions = HashMap::<String, Vec<String>>::default();
let objects = list.contents.unwrap_or_default();
loop {
let list = blob_store_client
.list_objects()
.bucket(blob_store_bucket)
.prefix("extensions/")
.set_marker(next_marker.clone())
.send()
.await?;
let objects = list.contents.unwrap_or_default();
log::info!("fetched {} object(s) from blob store", objects.len());
let mut published_versions = HashMap::<&str, Vec<&str>>::default();
for object in &objects {
let Some(key) = object.key.as_ref() else {
continue;
};
let mut parts = key.split('/');
let Some(_) = parts.next().filter(|part| *part == "extensions") else {
continue;
};
let Some(extension_id) = parts.next() else {
continue;
};
let Some(version) = parts.next() else {
continue;
};
if parts.next() == Some("manifest.json") {
published_versions
.entry(extension_id)
.or_default()
.push(version);
for object in &objects {
let Some(key) = object.key.as_ref() else {
continue;
};
let mut parts = key.split('/');
let Some(_) = parts.next().filter(|part| *part == "extensions") else {
continue;
};
let Some(extension_id) = parts.next() else {
continue;
};
let Some(version) = parts.next() else {
continue;
};
if parts.next() == Some("manifest.json") {
published_versions
.entry(extension_id.to_owned())
.or_default()
.push(version.to_owned());
}
}
if let (Some(true), Some(last_object)) = (list.is_truncated, objects.last()) {
next_marker.clone_from(&last_object.key);
} else {
break;
}
}
log::info!("found {} published extensions", published_versions.len());
let known_versions = app_state.db.get_known_extension_versions().await?;
let mut new_versions = HashMap::<&str, Vec<NewExtensionVersion>>::default();
let empty = Vec::new();
for (extension_id, published_versions) in published_versions {
for (extension_id, published_versions) in &published_versions {
let known_versions = known_versions.get(extension_id).unwrap_or(&empty);
for published_version in published_versions {
if known_versions
.binary_search_by_key(&published_version, String::as_str)
.binary_search_by_key(&published_version, |known_version| known_version)
.is_err()
{
if let Some(extension) = fetch_extension_manifest(
blob_store_client,
blob_store_bucket,
extension_id,
published_version,
&extension_id,
&published_version,
)
.await
.log_err()
{
new_versions
.entry(extension_id)
.entry(&extension_id)
.or_default()
.push(extension);
}