Streaming downloading and hash verification works

This commit is contained in:
Richard Feldman 2022-11-21 15:30:37 -05:00
parent ea8bb8820b
commit 7b7f64e6c0
No known key found for this signature in database
GPG Key ID: F1F21AA5B1D9E43B
3 changed files with 122 additions and 145 deletions

View File

@ -20,6 +20,7 @@ base64-url = "1.4.13"
reqwest = { version = "0.11.13", features = [ "blocking", "rustls-tls" ] }
bumpalo.workspace = true
tempfile.workspace = true
[dev-dependencies]
pretty_assertions = "1.3.0"

View File

@ -1,15 +1,11 @@
use crate::https::{self, PackageMetadata, Problem};
use roc_error_macros::internal_error;
use std::{
env, fs,
path::{Path, PathBuf},
};
use roc_error_macros::internal_error;
use tar::Archive;
use crate::https::{self, PackageMetadata, Problem};
const MAX_DOWNLOAD_BYTES: u64 = 32 * 1_000_000_000; // GB
const TARBALL_BUFFER_SIZE: usize = 16 * 1_000_000; // MB
#[derive(Copy, Clone, Debug)]
pub enum RocCacheDir<'a> {
@ -37,21 +33,43 @@ pub fn install_package<'a>(
url: &'a str,
) -> Result<(PathBuf, Option<&'a str>), Problem> {
let metadata = PackageMetadata::try_from(url).map_err(Problem::InvalidUrl)?;
let dest_dir = match roc_cache_dir {
match roc_cache_dir {
RocCacheDir::Persistent(cache_dir) => {
let dest_dir =
path_inside_cache(cache_dir, metadata.cache_subfolder, metadata.content_hash);
// e.g. ~/.cache/roc/example.com/roc-packages/
let parent_dir = cache_dir.join(metadata.cache_subfolder);
// e.g. ~/.cache/roc/example.com/roc-packages/jDRlAFAA3738vu3-vMpLUoyxtA86Z7CaZneoOKrihbE
let dest_dir = parent_dir.join(metadata.content_hash);
if dest_dir.exists() {
// If the cache dir exists already, we assume it has the correct contents
// (it's a cache, after all!) and return early without downloading anything.
return Ok((dest_dir, metadata.root_module_filename));
// (it's a cache, after all!) and return without downloading anything.
Ok((dest_dir, metadata.root_module_filename))
} else {
// Create the destination directory, since it didn't exist already.
fs::create_dir_all(&dest_dir).map_err(Problem::IoErr)?;
}
// Download into a tempdir; only move it to dest_dir if hash verification passes.
let tempdir = tempfile::tempdir().map_err(Problem::IoErr)?;
let tempdir_path = tempdir.path();
let hash = https::download_and_hash(url, tempdir_path, MAX_DOWNLOAD_BYTES)?;
dest_dir
// Download the tarball into memory and verify it.
// The tarball name is the hash of its contents.
if hash == metadata.content_hash {
// Now that we've verified the hash, rename the tempdir to the real dir.
// Create the destination dir's parent dir, since it may not exist yet.
fs::create_dir_all(parent_dir).map_err(Problem::IoErr)?;
// This should be super cheap - just an inode change.
fs::rename(tempdir_path, &dest_dir).map_err(Problem::IoErr)?;
// The package's files are now in the cache. We're done!
Ok((dest_dir, metadata.root_module_filename))
} else {
Err(Problem::InvalidContentHash {
expected: metadata.content_hash.to_string(),
actual: hash,
})
}
}
}
RocCacheDir::Disallowed => {
internal_error!(
@ -60,29 +78,8 @@ pub fn install_package<'a>(
)
}
#[cfg(test)]
RocCacheDir::Temp(temp_dir) => temp_dir.path().to_path_buf(),
};
// Download the tarball into memory and verify it. Early return if it fails verification,
// before we would create any directories in the cache.
let tarball_bytes = {
let mut buf = Vec::with_capacity(TARBALL_BUFFER_SIZE);
https::download_and_verify(url, metadata.content_hash, &mut buf, MAX_DOWNLOAD_BYTES)?;
buf
};
Archive::new(tarball_bytes.as_slice())
.unpack(&dest_dir)
.map_err(Problem::IoErr)?;
// The package's files are now in the cache. We're done!
Ok((dest_dir, metadata.root_module_filename))
}
fn path_inside_cache(roc_cache_dir: &Path, cache_subfolder: &str, content_hash: &str) -> PathBuf {
roc_cache_dir.join(cache_subfolder).join(content_hash)
RocCacheDir::Temp(temp_dir) => Ok((temp_dir.path().to_path_buf(), None)),
}
}
#[cfg(windows)]

View File

@ -1,6 +1,7 @@
use blake3::Hasher;
use reqwest::Method;
use std::io::{self, ErrorKind, Read, Write};
use std::{
io::{self, Read},
path::Path,
};
use crate::tarball::Compression;
@ -10,9 +11,7 @@ use crate::tarball::Compression;
// Here are all the officially supported options: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding
// We can consider supporting more, but that would bloat the `roc` binary more, so
// let's try to avoid doing that.
const ACCEPT_ENCODING: &str = "br, gzip, deflate";
const BROTLI_BUFFER_BYTES: usize = 8 * 1_000_000; // MB
const DOWNLOAD_CHUNK_SIZE: usize = 4096;
pub struct PackageMetadata<'a> {
/// The BLAKE3 hash of the tarball's contents. Also the .tar filename on disk.
@ -118,56 +117,46 @@ pub enum Problem {
DownloadTooBig(u64),
}
/// Download and decompress the given URL, verifying its contents against the hash in the URL.
/// Downloads it into a tempfile.
pub fn download_and_verify(
pub fn download_and_hash(
url: &str,
content_hash: &str,
dest: &mut impl Write,
dest_dir: &Path,
max_download_bytes: u64,
) -> Result<(), Problem> {
let resp = ureq::get(url)
.set("Accept-Encoding", ACCEPT_ENCODING)
.call()
) -> Result<String, Problem> {
// TODO apparently it really improves performance to construct a Client once and then reuse it,
// instead of making a new Client for every request.
// Per https://github.com/seanmonstar/reqwest/issues/1454#issuecomment-1026076701
let resp = reqwest::blocking::Client::new()
.get(url)
.send()
.map_err(Problem::HttpErr)?;
match resp.header("Content-Length").map(str::parse) {
Some(Ok(content_len)) => {
if content_len as u64 > max_download_bytes {
return Err(Problem::DownloadTooBig(content_len));
}
let content_encoding = resp.header("Content-Encoding").unwrap_or_default();
let encoding = Encoding::new(content_encoding, url)?;
// Use .take to prevent a malicious server from sending back bytes
// until system resources are exhausted!
let mut reader = resp.into_reader().take(max_download_bytes);
// The server can respond with multiple encodings, per
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
// ...but we don't support that.
let hash = download(encoding, &mut reader, dest)?;
// The tarball name is the hash of its contents
if hash == content_hash {
Ok(())
} else {
Err(Problem::InvalidContentHash {
expected: content_hash.to_string(),
actual: hash,
})
}
// Some servers don't return Content-Length - e.g. Netlify seems to only sometimes return it.
// If they do, and if it says the file is going to be too big, don't bother downloading it!
if let Some(content_len) = resp.content_length() {
if content_len > max_download_bytes {
return Err(Problem::DownloadTooBig(content_len));
}
Some(Err(_)) => {
// The Content-Length header wasn't an integer
Err(Problem::InvalidContentLengthHeader)
}
None => Err(Problem::MissingContentLengthHeader),
}
// The server can respond with multiple encodings, per
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
// ...but we don't support that.
let encoding = {
let content_encoding = match resp.headers().get("content-encoding") {
Some(header) => header.to_str().unwrap_or_default(),
None => "",
};
Encoding::new(content_encoding, url)?
};
// Use .take to prevent a malicious server from sending back bytes
// until system resources are exhausted!
decompress_into(dest_dir, encoding, resp.take(max_download_bytes))
}
/// The content encodings we support
#[derive(Debug, Clone, Copy)]
enum Encoding {
Gzip,
Brotli,
@ -213,81 +202,71 @@ impl Encoding {
}
}
fn hash_and_unpack(dest_dir: &Path, reader: impl Read) -> Result<String, Problem> {
let mut hash_reader = HashReader::new(reader);
tar::Archive::new(&mut hash_reader)
.unpack(dest_dir)
.map_err(Problem::IoErr)?;
let mut buf = Vec::with_capacity(1024);
// Archive::new() doesn't always read all the bytes, but we need to read them all
// in order to get the correct hash!
hash_reader.read_to_end(&mut buf).map_err(Problem::IoErr)?;
Ok(base64_url::encode(hash_reader.finalize().as_bytes()))
}
/// Read from the given reader, decompress the bytes using the given Content-Encoding string,
/// write them to the given writer, and return the base64url-encoded BLAKE3 hash of what was written.
/// This both writes and hashes incrementally as it reads, so the only extra work that's done
/// at the end is base64url-encoding the final hash.
fn download<R: Read, W: Write>(
fn decompress_into(
dest_dir: &Path,
encoding: Encoding,
reader: &mut R,
writer: &mut W,
reader: impl Read,
) -> Result<String, Problem> {
let encoding = Encoding::Brotli;
match encoding {
Encoding::Brotli => {
let mut brotli_reader = brotli::Decompressor::new(reader, BROTLI_BUFFER_BYTES);
write_and_hash(&mut brotli_reader, writer).map_err(Problem::IoErr)
}
Encoding::Brotli => hash_and_unpack(
dest_dir,
brotli::Decompressor::new(reader, BROTLI_BUFFER_BYTES),
),
Encoding::Gzip => {
// Note: GzDecoder::new immediately parses the gzip header (so, calls read())
let mut gzip_reader = flate2::read::GzDecoder::new(reader);
write_and_hash(&mut gzip_reader, writer).map_err(Problem::IoErr)
hash_and_unpack(dest_dir, flate2::read::GzDecoder::new(reader))
}
Encoding::Deflate => {
let mut deflate_reader = flate2::read::DeflateDecoder::new(reader);
write_and_hash(&mut deflate_reader, writer).map_err(Problem::IoErr)
}
Encoding::Uncompressed => write_and_hash(reader, writer).map_err(Problem::IoErr),
Encoding::Deflate => hash_and_unpack(dest_dir, flate2::read::DeflateDecoder::new(reader)),
Encoding::Uncompressed => hash_and_unpack(dest_dir, reader),
}
}
/// Download the data from the reader into the writer, while hashing
/// along the way, then return the base64url-enceoded hash once it's done.
pub fn write_and_hash<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<String> {
let mut buf = Vec::with_capacity(DOWNLOAD_CHUNK_SIZE);
let mut hasher = Hasher::new();
/// Read something while calculating its BLAKE3 hash
struct HashReader<R: Read> {
reader: R,
hasher: blake3::Hasher,
}
loop {
match reader.read(&mut buf) {
Ok(0) => {
// We ran out of bytes to read, so we're done!
return Ok(base64_url::encode(hasher.finalize().as_bytes()));
}
Ok(_) => {
// Incorporate the bytes we just read into the hash.
hasher.update(&buf);
// Write all the bytes we just read until they've all been written.
{
let mut to_write = buf.as_slice();
loop {
match writer.write(to_write) {
Ok(0) => {
// We wrote everything. All done writing!
break;
}
Ok(bytes_written) => {
// Advance the buffer so we don't write the same bytes again!
to_write = &to_write[bytes_written..];
}
Err(err) if err.kind() == ErrorKind::Interrupted => {
// No action needed, just retry on the next iteration of the loop.
}
Err(err) => return Err(err),
}
}
}
// Reset the buffer for the next read.
buf.clear();
}
Err(err) if err.kind() == ErrorKind::Interrupted => {
// No action needed, just retry on the next iteration of the loop.
}
Err(err) => return Err(err),
impl<R: Read> HashReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
hasher: blake3::Hasher::new(),
}
}
pub fn finalize(&self) -> blake3::Hash {
self.hasher.finalize()
}
}
impl<R: Read> Read for HashReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let bytes_read = self.reader.read(buf)?;
self.hasher.update(&buf[0..bytes_read]);
Ok(bytes_read)
}
}