zstd: use Rust-backed bindings for its support

Summary:
This allows us to remove the Python binding without breaking existing zstd
users (commitcloud bundles). It might also make the future Rust migration
easier.

Using `zstd` create for its streaming APIs. `zstdelta`'s APIs cannot be used
since it requires decompressed length to be known, which wouldn't work for
streaming compressed data.

Note: For easier implementation, the Python land no longer processes data
in a streaming way. This is probably fine for the current bundle use-case.
In the long term, we might want to revisit the bundle format entirely.

As we're here, also expose zstdelta's APIs and add a test for it.

Reviewed By: DurhamG

Differential Revision: D8342421

fbshipit-source-id: 89902d551f4616469d6e1bc9b334a1c37c884775
This commit is contained in:
Jun Wu 2018-06-12 13:16:06 -07:00 committed by Facebook Github Bot
parent dedad2a212
commit f4fdd7deea
6 changed files with 228 additions and 63 deletions

View File

@ -0,0 +1,21 @@
[package]
name = "zstd"
version = "0.1.0"
[lib]
crate-type = ["cdylib"]
[profile.release]
lto = true
[dependencies]
zstd = "0.4.18"
zstdelta = { path = "../../../lib/zstdelta/" }
[dependencies.cpython]
version = "0.2"
default-features = false
features = ["extension-module-2-7"]
[dependencies.python27-sys]
version = "0.2"

View File

@ -0,0 +1,64 @@
#![allow(non_camel_case_types)]
#[macro_use]
extern crate cpython;
extern crate python27_sys;
extern crate zstd;
extern crate zstdelta;
mod pybuf;
use cpython::{exc, PyBytes, PyErr, PyObject, PyResult, Python};
use pybuf::SimplePyBuf;
use std::io;
use zstd::stream::{decode_all, encode_all};
use zstdelta::{apply, diff};
py_module_initializer!(zstd, initzstd, PyInit_zstd, |py, m| {
m.add(
py,
"apply",
py_fn!(py, apply_py(base: &PyObject, delta: &PyObject)),
)?;
m.add(
py,
"diff",
py_fn!(py, diff_py(base: &PyObject, data: &PyObject)),
)?;
m.add(py, "decode_all", py_fn!(py, decode_all_py(data: &PyObject)))?;
m.add(
py,
"encode_all",
py_fn!(py, encode_all_py(data: &PyObject, level: i32)),
)?;
Ok(())
});
/// Convert `io::Result<Vec<u8>>` to a `PyResult<PyBytes>`.
fn convert(py: Python, result: io::Result<Vec<u8>>) -> PyResult<PyBytes> {
result
.map_err(|e| PyErr::new::<exc::RuntimeError, _>(py, format!("{}", e)))
.map(|buf| PyBytes::new(py, &buf))
}
fn diff_py(py: Python, base: &PyObject, data: &PyObject) -> PyResult<PyBytes> {
let base = SimplePyBuf::new(py, base);
let data = SimplePyBuf::new(py, data);
convert(py, diff(base.as_ref(), data.as_ref()))
}
fn apply_py(py: Python, base: &PyObject, delta: &PyObject) -> PyResult<PyBytes> {
let base = SimplePyBuf::new(py, base);
let delta = SimplePyBuf::new(py, delta);
convert(py, apply(base.as_ref(), delta.as_ref()))
}
fn decode_all_py(py: Python, data: &PyObject) -> PyResult<PyBytes> {
let data = SimplePyBuf::new(py, data);
convert(py, decode_all(io::Cursor::new(data.as_ref())))
}
fn encode_all_py(py: Python, data: &PyObject, level: i32) -> PyResult<PyBytes> {
let data = SimplePyBuf::new(py, data);
convert(py, encode_all(io::Cursor::new(data.as_ref()), level))
}

View File

@ -0,0 +1,100 @@
// Copyright 2017 Facebook, Inc.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! A simple `Py_buffer` wrapper that allows zero-copy reading of Python
//! owned memory.
// The objects in memory have a relationship like:
//
// ```text
// SimplePyBuf<T> | Raw Data Python object
// +-----------+ | +-------+ +-----------+
// | Py_buffer | | | | <-- owns -- _ |
// | +-------+ | | | | +-----------+
// | | *buf -- points to --> | |
// | | len | | | | |
// | +-------+ | | +-------+
// +-----------+ |
// |
// Rust-managed | Python-managed
// ```
//
// Notes:
// - Raw data is owned by (embedded in, or pointed by) the Python object.
// Raw data gets freed when the Python object is destructed.
// - Py_buffer is not a Python object but a Python-defined C struct allowing
// native code to access "Raw data" directly. When constructing Py_buffer
// from a Python object, the refcount of that Python object increases.
// The refcount decreases when Py_buffer gets destructed via PyBuffer_Release.
// - Py_buffer is used to expose the raw pointer and length.
// - Memory alignment is up to the actual implementation of "Python object".
// For a mmap buffer, the libc mmap function guarantees that.
use cpython::{PyObject, Python};
use python27_sys as cpy;
use std::marker::PhantomData;
use std::mem;
use std::slice;
pub struct SimplePyBuf<T>(cpy::Py_buffer, PhantomData<T>);
// Since the buffer is read-only and Python cannot move the raw buffer (because
// we own the Py_buffer struct). It's safe to share and use SimplePyBuf in other
// threads.
unsafe impl<T> Send for SimplePyBuf<T> {}
unsafe impl<T> Sync for SimplePyBuf<T> {}
impl<T: Copy> SimplePyBuf<T> {
pub fn new(_py: Python, obj: &PyObject) -> Self {
// Note about GC on obj:
//
// Practically, obj here is some low-level, non-container ones like
// bytes or memoryview that does not support GC (i.e. do not have
// Py_TPFLAGS_HAVE_GC set). refcount is the only way to release them.
// So no need to pay extra attention on them - SimplePyBuf will get
// refcount right and that's enough.
//
// Otherwise (obj is a container type that does support GC), whoever
// owns this SimplePyBuf in the Rust world needs to do one of the
// following:
// - implement tp_traverse in its Python class
// - call PyObject_GC_UnTrack to let GC ignore obj
// Note about buffer mutability:
//
// The code here wants to access the buffer without taking Python GIL.
// Therefore `obj` should be a read-only object. That is true for Python
// bytes or buffer(some_other_immutable_obj). For now, explicitly
// whitelist those two types. Beware that `PyBuffer_Check` won't guarnatee
// its inner object is also immutable.
unsafe {
if cpy::PyBytes_Check(obj.as_ptr()) == 0 && cpy::PyBuffer_Check(obj.as_ptr()) == 0 {
panic!("potentially unsafe type");
}
let mut buf = mem::zeroed::<SimplePyBuf<T>>();
let r = cpy::PyObject_GetBuffer(obj.as_ptr(), &mut buf.0, cpy::PyBUF_SIMPLE);
if r == -1 {
panic!("failed to get Py_buffer");
}
buf
}
}
}
impl<T> AsRef<[T]> for SimplePyBuf<T> {
#[inline]
fn as_ref(&self) -> &[T] {
let len = self.0.len as usize / mem::size_of::<T>();
unsafe { slice::from_raw_parts(self.0.buf as *const T, len) }
}
}
impl<T> Drop for SimplePyBuf<T> {
fn drop(&mut self) {
let _gil = Python::acquire_gil();
unsafe { cpy::PyBuffer_Release(&mut self.0) }
}
}

View File

@ -3969,10 +3969,10 @@ class _zstdengine(compressionengine):
# Not all installs have the zstd module available. So defer importing
# until first access.
try:
from . import zstd
from .rust import zstd
# Force delayed import.
zstd.__version__
zstd.decode_all
return zstd
except ImportError:
return None
@ -4008,82 +4008,35 @@ class _zstdengine(compressionengine):
level = opts.get("level", 3)
zstd = self._module
z = zstd.ZstdCompressor(level=level).compressobj()
buf = stringio()
for chunk in it:
data = z.compress(chunk)
if data:
yield data
buf.write(chunk)
yield z.flush()
yield zstd.encode_all(buf.getvalue(), level)
def decompressorreader(self, fh):
zstd = self._module
dctx = zstd.ZstdDecompressor()
return chunkbuffer(dctx.read_from(fh))
def itervalues():
buf = fh.read()
yield zstd.decode_all(buf)
return chunkbuffer(itervalues())
class zstdrevlogcompressor(object):
def __init__(self, zstd, level=3):
# Writing the content size adds a few bytes to the output. However,
# it allows decompression to be more optimal since we can
# pre-allocate a buffer to hold the result.
self._cctx = zstd.ZstdCompressor(level=level, write_content_size=True)
self._dctx = zstd.ZstdDecompressor()
self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
self._zstd = zstd
self._level = level
def compress(self, data):
insize = len(data)
# Caller handles empty input case.
assert insize > 0
if insize < 50:
return None
elif insize <= 1000000:
compressed = self._cctx.compress(data)
if len(compressed) < insize:
return compressed
return None
else:
z = self._cctx.compressobj()
chunks = []
pos = 0
while pos < insize:
pos2 = pos + self._compinsize
chunk = z.compress(data[pos:pos2])
if chunk:
chunks.append(chunk)
pos = pos2
chunks.append(z.flush())
if sum(map(len, chunks)) < insize:
return "".join(chunks)
return None
return self._zstd.encode_all(data, self._level)
def decompress(self, data):
insize = len(data)
try:
# This was measured to be faster than other streaming
# decompressors.
dobj = self._dctx.decompressobj()
chunks = []
pos = 0
while pos < insize:
pos2 = pos + self._decompinsize
chunk = dobj.decompress(data[pos:pos2])
if chunk:
chunks.append(chunk)
pos = pos2
# Frame should be exhausted, so no finish() API.
return "".join(chunks)
except Exception as e:
raise error.RevlogError(_("revlog decompress error: %s") % str(e))
return self._zstd.decode_all(data)
def revlogcompressor(self, opts=None):
opts = opts or {}
return self.zstdrevlogcompressor(self._module, level=opts.get("level", 3))
return self.zstdrevlogcompressor(self._module)
compengines.register(_zstdengine())

View File

@ -1557,6 +1557,9 @@ rustextmodules = [
package="mercurial.rust",
manifest="mercurial/rust/treestate/Cargo.toml",
),
RustExtension(
"zstd", package="mercurial.rust", manifest="mercurial/rust/zstd/Cargo.toml"
),
]
rustextbinaries = [RustBinary("scm_daemon", manifest="exec/scm_daemon/Cargo.toml")]

24
tests/test-zstdelta.py Normal file
View File

@ -0,0 +1,24 @@
from __future__ import absolute_import
import os
import unittest
import silenttestrunner
from mercurial.rust import zstd
class testzstd(unittest.TestCase):
def testdelta(self):
base = os.urandom(100000)
data = base[:1000] + "x" + base[1000:90000] + base[90500:]
delta = zstd.diff(base, data)
# The delta is tiny
self.assertLess(len(delta), 100)
# The delta can be applied
self.assertEqual(zstd.apply(base, delta), data)
if __name__ == "__main__":
silenttestrunner.main(__name__)