From 9a5b3a7315823caf9e55363327a1b1792f8c49ea Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 18 Jul 2023 21:43:40 +0530 Subject: [PATCH] Start work on porting rsync roundtrip tests --- kittens/transfer/algorithm.c | 38 ++++++---- kittens/transfer/rsync.pyi | 16 ++++- kitty_tests/file_transmission.py | 120 +++++++++++++++++++++---------- 3 files changed, 123 insertions(+), 51 deletions(-) diff --git a/kittens/transfer/algorithm.c b/kittens/transfer/algorithm.c index 6e012413c..5310b8d11 100644 --- a/kittens/transfer/algorithm.c +++ b/kittens/transfer/algorithm.c @@ -186,7 +186,7 @@ typedef struct { PyObject_HEAD rolling_checksum rc; uint64_t signature_idx; - size_t block_size; + size_t total_data_in_delta; Rsync rsync; buffer buf, block_buf; PyObject *block_buf_view; @@ -198,16 +198,16 @@ Patcher_init(PyObject *s, PyObject *args, PyObject *kwds) { static char *kwlist[] = {"expected_input_size", NULL}; unsigned long long expected_input_size = 0; if (!PyArg_ParseTupleAndKeywords(args, kwds, "|K", kwlist, &expected_input_size)) return -1; - self->block_size = default_block_size; + self->rsync.block_size = default_block_size; if (expected_input_size > 0) { - self->block_size = (size_t)round(sqrt((double)expected_input_size)); + self->rsync.block_size = (size_t)round(sqrt((double)expected_input_size)); } - const char *err = init_rsync(&self->rsync, self->block_size, 0, 0); + const char *err = init_rsync(&self->rsync, self->rsync.block_size, 0, 0); if (err != NULL) { PyErr_SetString(RsyncError, err); return -1; } - self->block_buf.cap = self->block_size; - self->block_buf.data = malloc(self->block_size); + self->block_buf.cap = self->rsync.block_size; + self->block_buf.data = malloc(self->rsync.block_size); if (self->block_buf.data == NULL) { PyErr_NoMemory(); return -1; } - if (!(self->block_buf_view = PyMemoryView_FromMemory((char*)self->block_buf.data, self->block_size, PyBUF_WRITE))) return -1; + if (!(self->block_buf_view = PyMemoryView_FromMemory((char*)self->block_buf.data, self->rsync.block_size, PyBUF_WRITE))) return -1; return 0; } @@ -225,7 +225,8 @@ static PyObject* signature_header(Patcher *self, PyObject *a2) { FREE_BUFFER_AFTER_FUNCTION Py_buffer dest = {0}; if (PyObject_GetBuffer(a2, &dest, PyBUF_WRITE) == -1) return NULL; - if (dest.len < 12) { + static const ssize_t header_size = 12; + if (dest.len < header_size) { PyErr_SetString(RsyncError, "Output buffer is too small"); } uint8_t *o = dest.buf; @@ -233,8 +234,8 @@ signature_header(Patcher *self, PyObject *a2) { le16b(o + 2, 0); // checksum type le16b(o + 4, 0); // strong hash type le16b(o + 6, 0); // weak hash type - le32b(o + 8, self->block_size); // weak hash type - Py_RETURN_NONE; + le32b(o + 8, self->rsync.block_size); // block size + return PyLong_FromSsize_t(header_size); } static PyObject* @@ -256,7 +257,7 @@ sign_block(Patcher *self, PyObject *args) { le64b(o, self->signature_idx++); le32b(o + 8, weak_hash); le64b(o + 12, strong_hash); - Py_RETURN_NONE; + return PyLong_FromSize_t(signature_block_size); } typedef enum { OpBlock, OpBlockRange, OpHash, OpData } OpType; @@ -304,7 +305,7 @@ unserialize_op(uint8_t *data, size_t len, Operation *op) { static bool write_block(Patcher *self, uint64_t block_index, PyObject *read, PyObject *write) { - FREE_AFTER_FUNCTION PyObject *pos = PyLong_FromUnsignedLongLong((unsigned long long)(self->block_size * block_index)); + FREE_AFTER_FUNCTION PyObject *pos = PyLong_FromUnsignedLongLong((unsigned long long)(self->rsync.block_size * block_index)); if (!pos) return false; FREE_AFTER_FUNCTION PyObject *ret = PyObject_CallFunctionObjArgs(read, pos, self->block_buf_view, NULL); if (ret == NULL) return false; @@ -329,6 +330,7 @@ apply_op(Patcher *self, Operation op, PyObject *read, PyObject *write) { } return true; case OpData: { + self->total_data_in_delta += op.data.len; self->rsync.checksummer.update(self->rsync.checksummer.state, op.data.buf, op.data.len); FREE_AFTER_FUNCTION PyObject *view = PyMemoryView_FromMemory((char*)op.data.buf, op.data.len, PyBUF_READ); if (!view) return false; @@ -371,6 +373,18 @@ finish_delta_data(Patcher *self, PyObject *args UNUSED) { Py_RETURN_NONE; } +static PyObject* +Patcher_block_size(Patcher* self, void* closure UNUSED) { return PyLong_FromSize_t(self->rsync.block_size); } +static PyObject* +Patcher_total_data_in_delta(Patcher* self, void* closure UNUSED) { return PyLong_FromSize_t(self->total_data_in_delta); } + +PyGetSetDef Patcher_getsets[] = { + {"block_size", (getter)Patcher_block_size, NULL, NULL, NULL}, + {"total_data_in_delta", (getter)Patcher_total_data_in_delta, NULL, NULL, NULL}, + {NULL} +}; + + static PyMethodDef Patcher_methods[] = { METHODB(sign_block, METH_VARARGS), METHODB(signature_header, METH_O), diff --git a/kittens/transfer/rsync.pyi b/kittens/transfer/rsync.pyi index 8873f7b5a..b91537ec8 100644 --- a/kittens/transfer/rsync.pyi +++ b/kittens/transfer/rsync.pyi @@ -24,7 +24,19 @@ class Hasher: class Patcher: def __init__(self, expected_input_size: int = 0): ... - def signature_header(self, output: WriteBuffer) -> None: ... - def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> None: ... + def signature_header(self, output: WriteBuffer) -> int: ... + def sign_block(self, block: ReadOnlyBuffer, output: WriteBuffer) -> int: ... def apply_delta_data(self, data: ReadOnlyBuffer, read: Callable[[int, WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> None: ... def finish_delta_data(self) -> None: ... + + @property + def block_size(self) -> int: ... + @property + def total_data_in_delta(self) -> int: ... + + +class Differ: + + def add_signature_data(self, data: ReadOnlyBuffer) -> None: ... + def finish_signature_data(self) -> None: ... + def next_op(self, read: Callable[[WriteBuffer], int], write: Callable[[ReadOnlyBuffer], None]) -> bool: ... diff --git a/kitty_tests/file_transmission.py b/kitty_tests/file_transmission.py index 3aafd16dd..a10a5bf5b 100644 --- a/kitty_tests/file_transmission.py +++ b/kitty_tests/file_transmission.py @@ -11,7 +11,7 @@ from pathlib import Path from kittens.transfer.main import parse_transfer_args from kittens.transfer.receive import File, files_for_receive -from kittens.transfer.rsync import decode_utf8_buffer, parse_ftc, Hasher +from kittens.transfer.rsync import Differ, Hasher, Patcher, decode_utf8_buffer, parse_ftc from kittens.transfer.utils import cwd_path, expand_home, home_path, set_paths from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, TransmissionType, ZlibDecompressor, iter_file_metadata from kitty.file_transmission import TestFileTransmission as FileTransmission @@ -52,6 +52,87 @@ def serialized_cmd(**fields) -> str: return ans.serialize() +def generate_data(block_size, num_blocks, *extra) -> bytes: + extra = ''.join(extra) + b = b'_' * (block_size * num_blocks) + extra.encode() + ans = bytearray(b) + for i in range(num_blocks): + offset = i * block_size + p = str(i).encode() + ans[offset:offset+len(p)] = p + return bytes(ans) + + +def patch_data(data, *patches): + total_patch_size = 0 + ans = bytearray(data) + for patch in patches: + o, sep, r = patch.partition(':') + r = r.encode() + total_patch_size += len(r) + offset = int(o) + ans[offset:offset+len(r)] = r + return bytes(ans), len(patches), total_patch_size + + +def run_roundtrip_test(self: 'TestFileTransmission', src_data, changed, num_of_patches, total_patch_size): + buf = memoryview(bytearray(30)) + signature = bytearray(0) + p = Patcher(len(src_data)) + n = p.signature_header(buf) + signature.extend(buf[:n]) + src = memoryview(src_data) + bs = p.block_size + while src: + n = p.sign_block(src[:bs], buf) + signature.extend(buf[:n]) + src = src[bs:] + d = Differ() + src = memoryview(signature) + while src: + d.add_signature_data(src[:13]) + src = src[13:] + d.finish_signature_data() + src = memoryview(src_data) + delta = bytearray(0) + def read_into(b): + global src + n = min(len(b), len(src)) + if n > 0: + b[:n] = src[:n] + src = src[n:] + return n + def write_delta(b): + delta.extend(b) + while d.next_op(read_into, write_delta): + pass + delta = memoryview(delta) + + def read_at(pos, output) -> int: + b = changed[pos:] + amt = min(len(output), len(b)) + output[:amt] = b[:amt] + return amt + + output = bytearray(0) + + def write_changes(b): + output.extend(b) + + while delta: + p.apply_delta_data(delta[:11], read_at, write_changes) + p.finish_delta_data() + self.assertEqual(src_data, bytes(output)) + limit = 2 * (p.block_size * num_of_patches) + if limit > -1: + self.assertLess( + p.total_data_in_delta, limit, f'Unexpectedly poor delta performance: {total_patch_size=} {p.total_data_in_delta=} {limit=}') + + +def test_rsync_roundtrip(self: 'TestFileTransmission') -> None: + pass + + class TestFileTransmission(BaseTest): def setUp(self): @@ -92,42 +173,7 @@ class TestFileTransmission(BaseTest): self.ae(a, b) def test_rsync_roundtrip(self): - self.skipTest("TODO: Needs to be ported") - a_path = os.path.join(self.tdir, 'a') - b_path = os.path.join(self.tdir, 'b') - c_path = os.path.join(self.tdir, 'c') - - def files_equal(a_path, c_path): - self.ae(os.path.getsize(a_path), os.path.getsize(c_path)) - with open(c_path, 'rb') as b, open(c_path, 'rb') as c: - self.ae(b.read(), c.read()) - - def patch(old_path, new_path, output_path, max_delta_len=0): - sig_loader = LoadSignature() - for chunk in signature_of_file(old_path): - sig_loader.add_chunk(chunk) - sig_loader.commit() - self.assertTrue(sig_loader.finished) - delta_len = 0 - with PatchFile(old_path, output_path) as patcher: - for chunk in delta_for_file(new_path, sig_loader.signature): - self.assertFalse(patcher.finished) - patcher.write(chunk) - delta_len += len(chunk) - self.assertTrue(patcher.finished) - if max_delta_len: - self.assertLessEqual(delta_len, max_delta_len) - files_equal(output_path, new_path) - - sz = 1024 * 1024 + 37 - with open(a_path, 'wb') as f: - f.write(os.urandom(sz)) - with open(b_path, 'wb') as f: - f.write(os.urandom(sz)) - - patch(a_path, b_path, c_path) - # test size of delta - patch(a_path, a_path, c_path, max_delta_len=256) + test_rsync_roundtrip(self) def test_file_get(self): # send refusal