diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index fdae4f2339..7a4cd9fd23 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -222,9 +222,6 @@ CREATE TABLE "buffer_operations" ( "epoch" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "lamport_timestamp" INTEGER NOT NULL, - "local_timestamp" INTEGER NOT NULL, - "version" BLOB NOT NULL, - "is_undo" BOOLEAN NOT NULL, "value" BLOB NOT NULL, PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id) ); diff --git a/crates/collab/migrations/20230819154600_add_channel_buffers.sql b/crates/collab/migrations/20230819154600_add_channel_buffers.sql index fec18ddb8d..5e6e7ce339 100644 --- a/crates/collab/migrations/20230819154600_add_channel_buffers.sql +++ b/crates/collab/migrations/20230819154600_add_channel_buffers.sql @@ -10,10 +10,7 @@ CREATE TABLE "buffer_operations" ( "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE, "epoch" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, - "local_timestamp" INTEGER NOT NULL, "lamport_timestamp" INTEGER NOT NULL, - "version" BYTEA NOT NULL, - "is_undo" BOOLEAN NOT NULL, "value" BYTEA NOT NULL, PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id) ); diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index b0df905ecb..83f5b87079 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -1,6 +1,5 @@ use super::*; use prost::Message; -use std::ops::Range; use text::{EditOperation, InsertionTimestamp, UndoOperation}; impl Database { @@ -234,6 +233,7 @@ impl Database { let serialization_version: i32 = buffer .find_related(buffer_snapshot::Entity) .select_only() + .column(buffer_snapshot::Column::OperationSerializationVersion) .filter(buffer_snapshot::Column::Epoch.eq(buffer.epoch)) .into_values::<_, QueryVersion>() .one(&*tx) @@ -326,11 +326,7 @@ impl Database { let mut text_buffer = text::Buffer::new(0, 0, base_text); text_buffer - .apply_ops( - operations - .into_iter() - .filter_map(deserialize_wire_operation), - ) + .apply_ops(operations.into_iter().filter_map(operation_from_wire)) .unwrap(); let base_text = text_buffer.text(); @@ -363,71 +359,122 @@ fn operation_to_storage( buffer: &buffer::Model, _format: i32, ) -> Option { - match operation.variant.as_ref()? { - proto::operation::Variant::Edit(operation) => { - let value = edit_operation_to_storage(&operation.ranges, &operation.new_text); - let version = version_to_storage(&operation.version); - Some(buffer_operation::ActiveModel { - buffer_id: ActiveValue::Set(buffer.id), - epoch: ActiveValue::Set(buffer.epoch), - replica_id: ActiveValue::Set(operation.replica_id as i32), - lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32), - local_timestamp: ActiveValue::Set(operation.local_timestamp as i32), - is_undo: ActiveValue::Set(false), - version: ActiveValue::Set(version), - value: ActiveValue::Set(value), - }) - } - proto::operation::Variant::Undo(operation) => { - let value = undo_operation_to_storage(&operation.counts); - let version = version_to_storage(&operation.version); - Some(buffer_operation::ActiveModel { - buffer_id: ActiveValue::Set(buffer.id), - epoch: ActiveValue::Set(buffer.epoch), - replica_id: ActiveValue::Set(operation.replica_id as i32), - lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32), - local_timestamp: ActiveValue::Set(operation.local_timestamp as i32), - is_undo: ActiveValue::Set(true), - version: ActiveValue::Set(version), - value: ActiveValue::Set(value), - }) - } - proto::operation::Variant::UpdateSelections(_) => None, - proto::operation::Variant::UpdateDiagnostics(_) => None, - proto::operation::Variant::UpdateCompletionTriggers(_) => None, - } + let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? { + proto::operation::Variant::Edit(operation) => ( + operation.replica_id, + operation.lamport_timestamp, + storage::Operation { + local_timestamp: operation.local_timestamp, + version: version_to_storage(&operation.version), + is_undo: false, + edit_ranges: operation + .ranges + .iter() + .map(|range| storage::Range { + start: range.start, + end: range.end, + }) + .collect(), + edit_texts: operation.new_text.clone(), + undo_counts: Vec::new(), + }, + ), + proto::operation::Variant::Undo(operation) => ( + operation.replica_id, + operation.lamport_timestamp, + storage::Operation { + local_timestamp: operation.local_timestamp, + version: version_to_storage(&operation.version), + is_undo: true, + edit_ranges: Vec::new(), + edit_texts: Vec::new(), + undo_counts: operation + .counts + .iter() + .map(|entry| storage::UndoCount { + replica_id: entry.replica_id, + local_timestamp: entry.local_timestamp, + count: entry.count, + }) + .collect(), + }, + ), + _ => None?, + }; + + Some(buffer_operation::ActiveModel { + buffer_id: ActiveValue::Set(buffer.id), + epoch: ActiveValue::Set(buffer.epoch), + replica_id: ActiveValue::Set(replica_id as i32), + lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32), + value: ActiveValue::Set(value.encode_to_vec()), + }) } fn operation_from_storage( row: buffer_operation::Model, _format_version: i32, ) -> Result { - let version = version_from_storage(&row.version)?; - let operation = if row.is_undo { - let counts = undo_operation_from_storage(&row.value)?; + let operation = + storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?; + let version = version_from_storage(&operation.version); + Ok(if operation.is_undo { proto::operation::Variant::Undo(proto::operation::Undo { replica_id: row.replica_id as u32, - local_timestamp: row.local_timestamp as u32, + local_timestamp: operation.local_timestamp as u32, lamport_timestamp: row.lamport_timestamp as u32, version, - counts, + counts: operation + .undo_counts + .iter() + .map(|entry| proto::UndoCount { + replica_id: entry.replica_id, + local_timestamp: entry.local_timestamp, + count: entry.count, + }) + .collect(), }) } else { - let (ranges, new_text) = edit_operation_from_storage(&row.value)?; proto::operation::Variant::Edit(proto::operation::Edit { replica_id: row.replica_id as u32, - local_timestamp: row.local_timestamp as u32, + local_timestamp: operation.local_timestamp as u32, lamport_timestamp: row.lamport_timestamp as u32, version, - ranges, - new_text, + ranges: operation + .edit_ranges + .into_iter() + .map(|range| proto::Range { + start: range.start, + end: range.end, + }) + .collect(), + new_text: operation.edit_texts, }) - }; - Ok(operation) + }) +} + +fn version_to_storage(version: &Vec) -> Vec { + version + .iter() + .map(|entry| storage::VectorClockEntry { + replica_id: entry.replica_id, + timestamp: entry.timestamp, + }) + .collect() +} + +fn version_from_storage(version: &Vec) -> Vec { + version + .iter() + .map(|entry| proto::VectorClockEntry { + replica_id: entry.replica_id, + timestamp: entry.timestamp, + }) + .collect() } // This is currently a manual copy of the deserialization code in the client's langauge crate -pub fn deserialize_wire_operation(operation: proto::Operation) -> Option { +pub fn operation_from_wire(operation: proto::Operation) -> Option { match operation.variant? { proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation { timestamp: InsertionTimestamp { @@ -435,8 +482,14 @@ pub fn deserialize_wire_operation(operation: proto::Operation) -> Option Some(text::Operation::Undo { @@ -449,7 +502,7 @@ pub fn deserialize_wire_operation(operation: proto::Operation) -> Option Option Range { - text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize) -} - -fn deserialize_wire_version(message: &[proto::VectorClockEntry]) -> clock::Global { +fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global { let mut version = clock::Global::new(); for entry in message { version.observe(clock::Local { @@ -486,15 +535,23 @@ fn deserialize_wire_version(message: &[proto::VectorClockEntry]) -> clock::Globa mod storage { #![allow(non_snake_case)] - use prost::Message; - pub const SERIALIZATION_VERSION: i32 = 1; #[derive(Message)] - pub struct VectorClock { - #[prost(message, repeated, tag = "1")] - pub entries: Vec, + pub struct Operation { + #[prost(uint32, tag = "1")] + pub local_timestamp: u32, + #[prost(message, repeated, tag = "2")] + pub version: Vec, + #[prost(bool, tag = "3")] + pub is_undo: bool, + #[prost(message, repeated, tag = "4")] + pub edit_ranges: Vec, + #[prost(string, repeated, tag = "5")] + pub edit_texts: Vec, + #[prost(message, repeated, tag = "6")] + pub undo_counts: Vec, } #[derive(Message)] @@ -505,14 +562,6 @@ mod storage { pub timestamp: u32, } - #[derive(Message)] - pub struct TextEdit { - #[prost(message, repeated, tag = "1")] - pub ranges: Vec, - #[prost(string, repeated, tag = "2")] - pub texts: Vec, - } - #[derive(Message)] pub struct Range { #[prost(uint64, tag = "1")] @@ -521,12 +570,6 @@ mod storage { pub end: u64, } - #[derive(Message)] - pub struct Undo { - #[prost(message, repeated, tag = "1")] - pub entries: Vec, - } - #[derive(Message)] pub struct UndoCount { #[prost(uint32, tag = "1")] @@ -537,82 +580,3 @@ mod storage { pub count: u32, } } - -fn version_to_storage(version: &Vec) -> Vec { - storage::VectorClock { - entries: version - .iter() - .map(|entry| storage::VectorClockEntry { - replica_id: entry.replica_id, - timestamp: entry.timestamp, - }) - .collect(), - } - .encode_to_vec() -} - -fn version_from_storage(bytes: &[u8]) -> Result> { - let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?; - Ok(clock - .entries - .into_iter() - .map(|entry| proto::VectorClockEntry { - replica_id: entry.replica_id, - timestamp: entry.timestamp, - }) - .collect()) -} - -fn edit_operation_to_storage(ranges: &[proto::Range], texts: &[String]) -> Vec { - storage::TextEdit { - ranges: ranges - .iter() - .map(|range| storage::Range { - start: range.start, - end: range.end, - }) - .collect(), - texts: texts.to_vec(), - } - .encode_to_vec() -} - -fn edit_operation_from_storage(bytes: &[u8]) -> Result<(Vec, Vec)> { - let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?; - let ranges = edit - .ranges - .into_iter() - .map(|range| proto::Range { - start: range.start, - end: range.end, - }) - .collect(); - Ok((ranges, edit.texts)) -} - -fn undo_operation_to_storage(counts: &Vec) -> Vec { - storage::Undo { - entries: counts - .iter() - .map(|entry| storage::UndoCount { - replica_id: entry.replica_id, - local_timestamp: entry.local_timestamp, - count: entry.count, - }) - .collect(), - } - .encode_to_vec() -} - -fn undo_operation_from_storage(bytes: &[u8]) -> Result> { - let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?; - Ok(undo - .entries - .iter() - .map(|entry| proto::UndoCount { - replica_id: entry.replica_id, - local_timestamp: entry.local_timestamp, - count: entry.count, - }) - .collect()) -} diff --git a/crates/collab/src/db/tables/buffer_operation.rs b/crates/collab/src/db/tables/buffer_operation.rs index 59626c1e77..37bd4bedfe 100644 --- a/crates/collab/src/db/tables/buffer_operation.rs +++ b/crates/collab/src/db/tables/buffer_operation.rs @@ -12,9 +12,6 @@ pub struct Model { pub lamport_timestamp: i32, #[sea_orm(primary_key)] pub replica_id: i32, - pub local_timestamp: i32, - pub version: Vec, - pub is_undo: bool, pub value: Vec, }