Start work on porting rsync roundtrip tests

This commit is contained in:
Kovid Goyal 2023-07-18 21:43:40 +05:30
parent 05373c6e2a
commit 9a5b3a7315
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 123 additions and 51 deletions

View File

@ -186,7 +186,7 @@ typedef struct {
PyObject_HEAD PyObject_HEAD
rolling_checksum rc; rolling_checksum rc;
uint64_t signature_idx; uint64_t signature_idx;
size_t block_size; size_t total_data_in_delta;
Rsync rsync; Rsync rsync;
buffer buf, block_buf; buffer buf, block_buf;
PyObject *block_buf_view; PyObject *block_buf_view;
@ -198,16 +198,16 @@ Patcher_init(PyObject *s, PyObject *args, PyObject *kwds) {
static char *kwlist[] = {"expected_input_size", NULL}; static char *kwlist[] = {"expected_input_size", NULL};
unsigned long long expected_input_size = 0; unsigned long long expected_input_size = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|K", kwlist, &expected_input_size)) return -1; if (!PyArg_ParseTupleAndKeywords(args, kwds, "|K", kwlist, &expected_input_size)) return -1;
self->block_size = default_block_size; self->rsync.block_size = default_block_size;
if (expected_input_size > 0) { if (expected_input_size > 0) {
self->block_size = (size_t)round(sqrt((double)expected_input_size)); self->rsync.block_size = (size_t)round(sqrt((double)expected_input_size));
} }
const char *err = init_rsync(&self->rsync, self->block_size, 0, 0); const char *err = init_rsync(&self->rsync, self->rsync.block_size, 0, 0);
if (err != NULL) { PyErr_SetString(RsyncError, err); return -1; } if (err != NULL) { PyErr_SetString(RsyncError, err); return -1; }
self->block_buf.cap = self->block_size; self->block_buf.cap = self->rsync.block_size;
self->block_buf.data = malloc(self->block_size); self->block_buf.data = malloc(self->rsync.block_size);
if (self->block_buf.data == NULL) { PyErr_NoMemory(); return -1; } if (self->block_buf.data == NULL) { PyErr_NoMemory(); return -1; }
if (!(self->block_buf_view = PyMemoryView_FromMemory((char*)self->block_buf.data, self->block_size, PyBUF_WRITE))) return -1; if (!(self->block_buf_view = PyMemoryView_FromMemory((char*)self->block_buf.data, self->rsync.block_size, PyBUF_WRITE))) return -1;
return 0; return 0;
} }
@ -225,7 +225,8 @@ static PyObject*
signature_header(Patcher *self, PyObject *a2) { signature_header(Patcher *self, PyObject *a2) {
FREE_BUFFER_AFTER_FUNCTION Py_buffer dest = {0}; FREE_BUFFER_AFTER_FUNCTION Py_buffer dest = {0};
if (PyObject_GetBuffer(a2, &dest, PyBUF_WRITE) == -1) return NULL; if (PyObject_GetBuffer(a2, &dest, PyBUF_WRITE) == -1) return NULL;
if (dest.len < 12) { static const ssize_t header_size = 12;
if (dest.len < header_size) {
PyErr_SetString(RsyncError, "Output buffer is too small"); PyErr_SetString(RsyncError, "Output buffer is too small");
} }
uint8_t *o = dest.buf; uint8_t *o = dest.buf;
@ -233,8 +234,8 @@ signature_header(Patcher *self, PyObject *a2) {
le16b(o + 2, 0); // checksum type le16b(o + 2, 0); // checksum type
le16b(o + 4, 0); // strong hash type le16b(o + 4, 0); // strong hash type
le16b(o + 6, 0); // weak hash type le16b(o + 6, 0); // weak hash type
le32b(o + 8, self->block_size); // weak hash type le32b(o + 8, self->rsync.block_size); // block size
Py_RETURN_NONE; return PyLong_FromSsize_t(header_size);
} }
static PyObject* static PyObject*
@ -256,7 +257,7 @@ sign_block(Patcher *self, PyObject *args) {
le64b(o, self->signature_idx++); le64b(o, self->signature_idx++);
le32b(o + 8, weak_hash); le32b(o + 8, weak_hash);
le64b(o + 12, strong_hash); le64b(o + 12, strong_hash);
Py_RETURN_NONE; return PyLong_FromSize_t(signature_block_size);
} }
typedef enum { OpBlock, OpBlockRange, OpHash, OpData } OpType; typedef enum { OpBlock, OpBlockRange, OpHash, OpData } OpType;
@ -304,7 +305,7 @@ unserialize_op(uint8_t *data, size_t len, Operation *op) {
static bool static bool
write_block(Patcher *self, uint64_t block_index, PyObject *read, PyObject *write) { write_block(Patcher *self, uint64_t block_index, PyObject *read, PyObject *write) {
FREE_AFTER_FUNCTION PyObject *pos = PyLong_FromUnsignedLongLong((unsigned long long)(self->block_size * block_index)); FREE_AFTER_FUNCTION PyObject *pos = PyLong_FromUnsignedLongLong((unsigned long long)(self->rsync.block_size * block_index));
if (!pos) return false; if (!pos) return false;
FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(read, pos, self->block_buf_view, NULL); FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(read, pos, self->block_buf_view, NULL);
if (ret == NULL) return false; if (ret == NULL) return false;
@ -329,6 +330,7 @@ apply_op(Patcher *self, Operation op, PyObject *read, PyObject *write) {
} }
return true; return true;
case OpData: { case OpData: {
self->total_data_in_delta += op.data.len;
self->rsync.checksummer.update(self->rsync.checksummer.state, op.data.buf, op.data.len); self->rsync.checksummer.update(self->rsync.checksummer.state, op.data.buf, op.data.len);
FREE_AFTER_FUNCTION PyObject *view = PyMemoryView_FromMemory((char*)op.data.buf, op.data.len, PyBUF_READ); FREE_AFTER_FUNCTION PyObject *view = PyMemoryView_FromMemory((char*)op.data.buf, op.data.len, PyBUF_READ);
if (!view) return false; if (!view) return false;
@ -371,6 +373,18 @@ finish_delta_data(Patcher *self, PyObject *args UNUSED) {
Py_RETURN_NONE; Py_RETURN_NONE;
} }
static PyObject*
Patcher_block_size(Patcher* self, void* closure UNUSED) { return PyLong_FromSize_t(self->rsync.block_size); }
static PyObject*
Patcher_total_data_in_delta(Patcher* self, void* closure UNUSED) { return PyLong_FromSize_t(self->total_data_in_delta); }
PyGetSetDef Patcher_getsets[] = {
{"block_size", (getter)Patcher_block_size, NULL, NULL, NULL},
{"total_data_in_delta", (getter)Patcher_total_data_in_delta, NULL, NULL, NULL},
{NULL}
};
static PyMethodDef Patcher_methods[] = { static PyMethodDef Patcher_methods[] = {
METHODB(sign_block, METH_VARARGS), METHODB(sign_block, METH_VARARGS),
METHODB(signature_header, METH_O), METHODB(signature_header, METH_O),

View File

@ -24,7 +24,19 @@ class Hasher:
class Patcher: class Patcher:
def __init__(self, expected_input_size: int = 0): ... def __init__(self, expected_input_size: int = 0): ...
def signature_header(self, output: WriteBuffer) -> None: ... def signature_header(self, output: WriteBuffer) -> int: ...
def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> None: ... def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> int: ...
def apply_delta_data(self, data: ReadOnlyBuffer, read: Callable[[int, WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> None: ... def apply_delta_data(self, data: ReadOnlyBuffer, read: Callable[[int, WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> None: ...
def finish_delta_data(self) -> None: ... def finish_delta_data(self) -> None: ...
@property
def block_size(self) -> int: ...
@property
def total_data_in_delta(self) -> int: ...
class Differ:
def add_signature_data(self, data: ReadOnlyBuffer) -> None: ...
def finish_signature_data(self) -> None: ...
def next_op(self, read: Callable[[WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> bool: ...

View File

@ -11,7 +11,7 @@ from pathlib import Path
from kittens.transfer.main import parse_transfer_args from kittens.transfer.main import parse_transfer_args
from kittens.transfer.receive import File, files_for_receive from kittens.transfer.receive import File, files_for_receive
from kittens.transfer.rsync import decode_utf8_buffer, parse_ftc, Hasher from kittens.transfer.rsync import Differ, Hasher, Patcher, decode_utf8_buffer, parse_ftc
from kittens.transfer.utils import cwd_path, expand_home, home_path, set_paths from kittens.transfer.utils import cwd_path, expand_home, home_path, set_paths
from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, TransmissionType, ZlibDecompressor, iter_file_metadata from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, TransmissionType, ZlibDecompressor, iter_file_metadata
from kitty.file_transmission import TestFileTransmission as FileTransmission from kitty.file_transmission import TestFileTransmission as FileTransmission
@ -52,6 +52,87 @@ def serialized_cmd(**fields) -> str:
return ans.serialize() return ans.serialize()
def generate_data(block_size, num_blocks, *extra) -> bytes:
extra = ''.join(extra)
b = b'_' * (block_size * num_blocks) + extra.encode()
ans = bytearray(b)
for i in range(num_blocks):
offset = i * block_size
p = str(i).encode()
ans[offset:offset+len(p)] = p
return bytes(ans)
def patch_data(data, *patches):
total_patch_size = 0
ans = bytearray(data)
for patch in patches:
o, sep, r = patch.partition(':')
r = r.encode()
total_patch_size += len(r)
offset = int(o)
ans[offset:offset+len(r)] = r
return bytes(ans), len(patches), total_patch_size
def run_roundtrip_test(self: 'TestFileTransmission', src_data, changed, num_of_patches, total_patch_size):
buf = memoryview(bytearray(30))
signature = bytearray(0)
p = Patcher(len(src_data))
n = p.signature_header(buf)
signature.extend(buf[:n])
src = memoryview(src_data)
bs = p.block_size
while src:
n = p.sign_block(src[:bs], buf)
signature.extend(buf[:n])
src = src[bs:]
d = Differ()
src = memoryview(signature)
while src:
d.add_signature_data(src[:13])
src = src[13:]
d.finish_signature_data()
src = memoryview(src_data)
delta = bytearray(0)
def read_into(b):
global src
n = min(len(b), len(src))
if n > 0:
b[:n] = src[:n]
src = src[n:]
return n
def write_delta(b):
delta.extend(b)
while d.next_op(read_into, write_delta):
pass
delta = memoryview(delta)
def read_at(pos, output) -> int:
b = changed[pos:]
amt = min(len(output), len(b))
output[:amt] = b[:amt]
return amt
output = bytearray(0)
def write_changes(b):
output.extend(b)
while delta:
p.apply_delta_data(delta[:11], read_at, write_changes)
p.finish_delta_data()
self.assertEqual(src_data, bytes(output))
limit = 2 * (p.block_size * num_of_patches)
if limit > -1:
self.assertLess(
p.total_data_in_delta, limit, f'Unexpectedly poor delta performance: {total_patch_size=} {p.total_data_in_delta=} {limit=}')
def test_rsync_roundtrip(self: 'TestFileTransmission') -> None:
pass
class TestFileTransmission(BaseTest): class TestFileTransmission(BaseTest):
def setUp(self): def setUp(self):
@ -92,42 +173,7 @@ class TestFileTransmission(BaseTest):
self.ae(a, b) self.ae(a, b)
def test_rsync_roundtrip(self): def test_rsync_roundtrip(self):
self.skipTest("TODO: Needs to be ported") test_rsync_roundtrip(self)
a_path = os.path.join(self.tdir, 'a')
b_path = os.path.join(self.tdir, 'b')
c_path = os.path.join(self.tdir, 'c')
def files_equal(a_path, c_path):
self.ae(os.path.getsize(a_path), os.path.getsize(c_path))
with open(c_path, 'rb') as b, open(c_path, 'rb') as c:
self.ae(b.read(), c.read())
def patch(old_path, new_path, output_path, max_delta_len=0):
sig_loader = LoadSignature()
for chunk in signature_of_file(old_path):
sig_loader.add_chunk(chunk)
sig_loader.commit()
self.assertTrue(sig_loader.finished)
delta_len = 0
with PatchFile(old_path, output_path) as patcher:
for chunk in delta_for_file(new_path, sig_loader.signature):
self.assertFalse(patcher.finished)
patcher.write(chunk)
delta_len += len(chunk)
self.assertTrue(patcher.finished)
if max_delta_len:
self.assertLessEqual(delta_len, max_delta_len)
files_equal(output_path, new_path)
sz = 1024 * 1024 + 37
with open(a_path, 'wb') as f:
f.write(os.urandom(sz))
with open(b_path, 'wb') as f:
f.write(os.urandom(sz))
patch(a_path, b_path, c_path)
# test size of delta
patch(a_path, a_path, c_path, max_delta_len=256)
def test_file_get(self): def test_file_get(self):
# send refusal # send refusal