mv sync seq to revision manager

This commit is contained in:
appflowy 2022-01-01 16:16:06 +08:00
parent 09ef0927f7
commit a654623c12
18 changed files with 446 additions and 478 deletions

View File

@ -92,7 +92,7 @@ pub(crate) async fn create_view(
let delta_data = Bytes::from(params.view_data);
let md5 = format!("{:x}", md5::compute(&delta_data));
let revision = Revision::new(&view.id, 0, 0, delta_data, RevType::Remote, user_id, md5);
let revision = Revision::new(&view.id, 0, 0, delta_data, user_id, md5);
let repeated_revision = RepeatedRevision::new(vec![revision]);
let mut create_doc_params = CreateDocParams::new();
create_doc_params.set_revisions(repeated_revision.try_into().unwrap());

View File

@ -249,9 +249,9 @@ async fn doc_create() {
let bytes = delta.to_bytes();
let md5 = md5(&bytes);
let revision = if i == 0 {
Revision::new(&doc_id, i, i, bytes, RevType::Remote, &user_id, md5)
Revision::new(&doc_id, i, i, bytes, &user_id, md5)
} else {
Revision::new(&doc_id, i - 1, i, bytes, RevType::Remote, &user_id, md5)
Revision::new(&doc_id, i - 1, i, bytes, &user_id, md5)
};
revisions.push(revision);
}

View File

@ -21,7 +21,7 @@ class Revision extends $pb.GeneratedMessage {
..a<$core.List<$core.int>>(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'deltaData', $pb.PbFieldType.OY)
..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'md5')
..aOS(5, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
..e<RevType>(6, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: RevType.Local, valueOf: RevType.valueOf, enumValues: RevType.values)
..e<RevType>(6, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'ty', $pb.PbFieldType.OE, defaultOrMaker: RevType.DeprecatedLocal, valueOf: RevType.valueOf, enumValues: RevType.values)
..aOS(7, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'userId')
..hasRequiredFields = false
;

View File

@ -9,13 +9,28 @@
import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class RevisionState extends $pb.ProtobufEnum {
static const RevisionState StateLocal = RevisionState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'StateLocal');
static const RevisionState Ack = RevisionState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack');
static const $core.List<RevisionState> values = <RevisionState> [
StateLocal,
Ack,
];
static final $core.Map<$core.int, RevisionState> _byValue = $pb.ProtobufEnum.initByValue(values);
static RevisionState? valueOf($core.int value) => _byValue[value];
const RevisionState._($core.int v, $core.String n) : super(v, n);
}
class RevType extends $pb.ProtobufEnum {
static const RevType Local = RevType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Local');
static const RevType Remote = RevType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Remote');
static const RevType DeprecatedLocal = RevType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedLocal');
static const RevType DeprecatedRemote = RevType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeprecatedRemote');
static const $core.List<RevType> values = <RevType> [
Local,
Remote,
DeprecatedLocal,
DeprecatedRemote,
];
static final $core.Map<$core.int, RevType> _byValue = $pb.ProtobufEnum.initByValue(values);
@ -24,18 +39,3 @@ class RevType extends $pb.ProtobufEnum {
const RevType._($core.int v, $core.String n) : super(v, n);
}
class RevState extends $pb.ProtobufEnum {
static const RevState StateLocal = RevState._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'StateLocal');
static const RevState Ack = RevState._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Ack');
static const $core.List<RevState> values = <RevState> [
StateLocal,
Ack,
];
static final $core.Map<$core.int, RevState> _byValue = $pb.ProtobufEnum.initByValue(values);
static RevState? valueOf($core.int value) => _byValue[value];
const RevState._($core.int v, $core.String n) : super(v, n);
}

View File

@ -8,28 +8,28 @@
import 'dart:core' as $core;
import 'dart:convert' as $convert;
import 'dart:typed_data' as $typed_data;
@$core.Deprecated('Use revTypeDescriptor instead')
const RevType$json = const {
'1': 'RevType',
'2': const [
const {'1': 'Local', '2': 0},
const {'1': 'Remote', '2': 1},
],
};
/// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEgkKBUxvY2FsEAASCgoGUmVtb3RlEAE=');
@$core.Deprecated('Use revStateDescriptor instead')
const RevState$json = const {
'1': 'RevState',
@$core.Deprecated('Use revisionStateDescriptor instead')
const RevisionState$json = const {
'1': 'RevisionState',
'2': const [
const {'1': 'StateLocal', '2': 0},
const {'1': 'Ack', '2': 1},
],
};
/// Descriptor for `RevState`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revStateDescriptor = $convert.base64Decode('CghSZXZTdGF0ZRIOCgpTdGF0ZUxvY2FsEAASBwoDQWNrEAE=');
/// Descriptor for `RevisionState`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revisionStateDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblN0YXRlEg4KClN0YXRlTG9jYWwQABIHCgNBY2sQAQ==');
@$core.Deprecated('Use revTypeDescriptor instead')
const RevType$json = const {
'1': 'RevType',
'2': const [
const {'1': 'DeprecatedLocal', '2': 0},
const {'1': 'DeprecatedRemote', '2': 1},
],
};
/// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`.
final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEhMKD0RlcHJlY2F0ZWRMb2NhbBAAEhQKEERlcHJlY2F0ZWRSZW1vdGUQAQ==');
@$core.Deprecated('Use revisionDescriptor instead')
const Revision$json = const {
'1': 'Revision',

View File

@ -152,15 +152,7 @@ impl ClientDocumentEditor {
let delta_data = delta.to_bytes();
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
let user_id = self.user.user_id()?;
let revision = Revision::new(
&self.doc_id,
base_rev_id,
rev_id,
delta_data,
RevType::Local,
&user_id,
md5,
);
let revision = Revision::new(&self.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5);
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(rev_id.into())
}

View File

@ -4,7 +4,7 @@ use crate::{
disk::{Persistence, RevisionDiskCache},
memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
},
sql_tables::{RevTableState, RevisionChangeset},
sql_tables::{RevisionChangeset, RevisionTableState},
};
use dashmap::DashMap;
use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
@ -25,7 +25,6 @@ pub struct RevisionCache {
doc_id: String,
disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
memory_cache: Arc<RevisionMemoryCache>,
sync_seq: Arc<RevisionSyncSeq>,
latest_rev_id: AtomicI64,
}
@ -33,86 +32,31 @@ impl RevisionCache {
pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
let disk_cache = Arc::new(Persistence::new(user_id, pool));
let memory_cache = Arc::new(RevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
let sync_seq = Arc::new(RevisionSyncSeq::new());
let doc_id = doc_id.to_owned();
Self {
doc_id,
disk_cache,
memory_cache,
sync_seq,
latest_rev_id: AtomicI64::new(0),
}
}
pub fn read_revisions(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
self.disk_cache.read_revisions(doc_id, None)
}
#[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
pub fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
let disk_cache = self.disk_cache.clone();
let conn = disk_cache.db_pool().get().map_err(internal_error)?;
let records = revisions
.into_iter()
.map(|revision| RevisionRecord {
revision,
state: RevisionState::StateLocal,
})
.collect::<Vec<_>>();
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = disk_cache.delete_revisions(doc_id, None, &*conn)?;
let _ = disk_cache.write_revisions(records, &*conn)?;
Ok(())
})
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_local_revision(&self, revision: Revision) -> FlowyResult<()> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(FlowyError::internal().context(format!("Duplicate local revision id: {}", revision.rev_id)));
}
let rev_id = revision.rev_id;
let record = RevisionRecord {
revision,
state: RevisionState::StateLocal,
};
let _ = self.memory_cache.add_revision(&record).await;
self.sync_seq.add_revision(record).await?;
let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_remote_revision(&self, revision: Revision) -> FlowyResult<()> {
pub async fn add(&self, revision: Revision, state: RevisionState) -> FlowyResult<RevisionRecord> {
if self.memory_cache.contains(&revision.rev_id) {
return Err(FlowyError::internal().context(format!("Duplicate remote revision id: {}", revision.rev_id)));
}
let rev_id = revision.rev_id;
let record = RevisionRecord {
revision,
state: RevisionState::Ack,
};
self.memory_cache.add_revision(&record).await;
let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
Ok(())
let record = RevisionRecord { revision, state };
self.memory_cache.add(&record).await;
self.set_latest_rev_id(rev_id);
Ok(record)
}
#[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))]
pub async fn ack_revision(&self, rev_id: i64) {
if self.sync_seq.ack_revision(&rev_id).await.is_ok() {
self.memory_cache.ack_revision(&rev_id).await;
}
}
pub async fn ack(&self, rev_id: i64) { self.memory_cache.ack(&rev_id).await; }
pub async fn latest_revision(&self) -> Revision {
let rev_id = self.latest_rev_id.load(SeqCst);
self.get_revision(rev_id).await.unwrap().revision
}
pub async fn get_revision(&self, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.get_revision(&rev_id).await {
None => match self.disk_cache.read_revisions(&self.doc_id, Some(vec![rev_id])) {
pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
match self.memory_cache.get(&rev_id).await {
None => match self.disk_cache.read_revision_records(&self.doc_id, Some(vec![rev_id])) {
Ok(mut records) => {
if records.is_empty() {
tracing::warn!("Can't find revision in {} with rev_id: {}", &self.doc_id, rev_id);
@ -129,13 +73,22 @@ impl RevisionCache {
}
}
pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
self.disk_cache.read_revision_records(doc_id, None)
}
pub async fn latest_revision(&self) -> Revision {
let rev_id = self.latest_rev_id.load(SeqCst);
self.get(rev_id).await.unwrap().revision
}
pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
let mut records = self.memory_cache.get_revisions_in_range(&range).await?;
let mut records = self.memory_cache.get_with_range(&range).await?;
let range_len = range.len() as usize;
if records.len() != range_len {
let disk_cache = self.disk_cache.clone();
let doc_id = self.doc_id.clone();
records = spawn_blocking(move || disk_cache.read_revisions_with_range(&doc_id, &range))
records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&doc_id, &range))
.await
.map_err(internal_error)??;
@ -149,42 +102,44 @@ impl RevisionCache {
.collect::<Vec<Revision>>())
}
pub(crate) fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
let sync_seq = self.sync_seq.clone();
#[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
pub fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
let disk_cache = self.disk_cache.clone();
let doc_id = self.doc_id.clone();
FutureResult::new(async move {
match sync_seq.next_sync_revision().await {
None => match sync_seq.next_sync_rev_id().await {
None => Ok(None),
Some(rev_id) => {
let records = disk_cache.read_revisions(&doc_id, Some(vec![rev_id]))?;
let mut revisions = records
.into_iter()
.map(|record| record.revision)
.collect::<Vec<Revision>>();
Ok(revisions.pop())
},
},
Some((_, record)) => Ok(Some(record.revision)),
}
let conn = disk_cache.db_pool().get().map_err(internal_error)?;
let records = revisions
.into_iter()
.map(|revision| RevisionRecord {
revision,
state: RevisionState::Local,
})
.collect::<Vec<_>>();
conn.immediate_transaction::<_, FlowyError, _>(|| {
let _ = disk_cache.delete_revision_records(doc_id, None, &*conn)?;
let _ = disk_cache.write_revision_records(records, &*conn)?;
Ok(())
})
}
#[inline]
fn set_latest_rev_id(&self, rev_id: i64) {
let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
}
}
impl RevisionMemoryCacheDelegate for Arc<Persistence> {
fn receive_checkpoint(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> {
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
self.write_revisions(records, &conn)
self.write_revision_records(records, &conn)
}
fn receive_ack(&self, doc_id: &str, rev_id: i64) {
let changeset = RevisionChangeset {
doc_id: doc_id.to_string(),
rev_id: rev_id.into(),
state: RevTableState::Acked,
state: RevisionTableState::Ack,
};
match self.update_revisions(vec![changeset]) {
match self.update_revision_record(vec![changeset]) {
Ok(_) => {},
Err(e) => tracing::error!("{}", e),
}
@ -200,70 +155,3 @@ pub struct RevisionRecord {
impl RevisionRecord {
pub fn ack(&mut self) { self.state = RevisionState::Ack; }
}
struct RevisionSyncSeq {
revs_map: Arc<DashMap<i64, RevisionRecord>>,
local_revs: Arc<RwLock<VecDeque<i64>>>,
}
impl std::default::Default for RevisionSyncSeq {
fn default() -> Self {
let local_revs = Arc::new(RwLock::new(VecDeque::new()));
RevisionSyncSeq {
revs_map: Arc::new(DashMap::new()),
local_revs,
}
}
}
impl RevisionSyncSeq {
fn new() -> Self { RevisionSyncSeq::default() }
async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
// The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.local_revs.read().await.back() {
if *rev_id >= record.revision.rev_id {
return Err(OTError::revision_id_conflict()
.context(format!("The new revision's id must be greater than {}", rev_id)));
}
}
self.local_revs.write().await.push_back(record.revision.rev_id);
self.revs_map.insert(record.revision.rev_id, record);
Ok(())
}
async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> {
if let Some(pop_rev_id) = self.next_sync_rev_id().await {
if &pop_rev_id != rev_id {
let desc = format!(
"The ack rev_id:{} is not equal to the current rev_id:{}",
rev_id, pop_rev_id
);
// tracing::error!("{}", desc);
return Err(FlowyError::internal().context(desc));
}
tracing::debug!("pop revision {}", pop_rev_id);
self.revs_map.remove(&pop_rev_id);
let _ = self.local_revs.write().await.pop_front();
}
Ok(())
}
async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
match self.local_revs.read().await.front() {
None => None,
Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
}
}
async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionSyncSeq {
#[allow(dead_code)]
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
#[allow(dead_code)]
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
}

View File

@ -1,6 +1,6 @@
use crate::services::doc::revision::RevisionRecord;
use crate::sql_tables::{RevTableSql, RevisionChangeset};
use crate::sql_tables::{RevisionChangeset, RevisionTableSql};
use diesel::SqliteConnection;
use flowy_collaboration::entities::revision::RevisionRange;
use flowy_database::ConnectionPool;
@ -9,15 +9,29 @@ use std::{fmt::Debug, sync::Arc};
pub trait RevisionDiskCache: Sync + Send {
type Error: Debug;
fn write_revisions(&self, revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), Self::Error>;
fn read_revisions(&self, doc_id: &str, rev_ids: Option<Vec<i64>>) -> Result<Vec<RevisionRecord>, Self::Error>;
fn read_revisions_with_range(
fn write_revision_records(
&self,
revisions: Vec<RevisionRecord>,
conn: &SqliteConnection,
) -> Result<(), Self::Error>;
// Read all the records if the rev_ids is None
fn read_revision_records(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error>;
fn read_revision_records_with_range(
&self,
doc_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error>;
fn update_revisions(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()>;
fn delete_revisions(
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()>;
// Delete all the records if the rev_ids is None
fn delete_revision_records(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
@ -35,45 +49,53 @@ pub(crate) struct Persistence {
impl RevisionDiskCache for Persistence {
type Error = FlowyError;
fn write_revisions(&self, revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), Self::Error> {
let _ = RevTableSql::create_rev_table(revisions, conn)?;
fn write_revision_records(
&self,
revisions: Vec<RevisionRecord>,
conn: &SqliteConnection,
) -> Result<(), Self::Error> {
let _ = RevisionTableSql::create(revisions, conn)?;
Ok(())
}
fn read_revisions(&self, doc_id: &str, rev_ids: Option<Vec<i64>>) -> Result<Vec<RevisionRecord>, Self::Error> {
fn read_revision_records(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = self.pool.get().map_err(internal_error)?;
let records = RevTableSql::read_rev_tables(&self.user_id, doc_id, rev_ids, &*conn)?;
let records = RevisionTableSql::read(&self.user_id, doc_id, rev_ids, &*conn)?;
Ok(records)
}
fn read_revisions_with_range(
fn read_revision_records_with_range(
&self,
doc_id: &str,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, Self::Error> {
let conn = &*self.pool.get().map_err(internal_error)?;
let revisions = RevTableSql::read_rev_tables_with_range(&self.user_id, doc_id, range.clone(), conn)?;
let revisions = RevisionTableSql::read_with_range(&self.user_id, doc_id, range.clone(), conn)?;
Ok(revisions)
}
fn update_revisions(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
let conn = &*self.pool.get().map_err(internal_error)?;
let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
for changeset in changesets {
let _ = RevTableSql::update_rev_table(changeset, conn)?;
let _ = RevisionTableSql::update(changeset, conn)?;
}
Ok(())
})?;
Ok(())
}
fn delete_revisions(
fn delete_revision_records(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<(), Self::Error> {
let _ = RevTableSql::delete_rev_tables(doc_id, rev_ids, conn)?;
let _ = RevisionTableSql::delete(doc_id, rev_ids, conn)?;
Ok(())
}

View File

@ -6,7 +6,7 @@ use std::{sync::Arc, time::Duration};
use tokio::{sync::RwLock, task::JoinHandle};
pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
fn receive_checkpoint(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
fn receive_ack(&self, doc_id: &str, rev_id: i64);
}
@ -31,7 +31,7 @@ impl RevisionMemoryCache {
pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
pub(crate) async fn add_revision(&self, record: &RevisionRecord) {
pub(crate) async fn add(&self, record: &RevisionRecord) {
if let Some(rev_id) = self.pending_write_revs.read().await.last() {
if *rev_id >= record.revision.rev_id {
tracing::error!("Duplicated revision added to memory_cache");
@ -44,7 +44,7 @@ impl RevisionMemoryCache {
self.make_checkpoint().await;
}
pub(crate) async fn ack_revision(&self, rev_id: &i64) {
pub(crate) async fn ack(&self, rev_id: &i64) {
match self.revs_map.get_mut(rev_id) {
None => {},
Some(mut record) => record.ack(),
@ -59,14 +59,11 @@ impl RevisionMemoryCache {
}
}
pub(crate) async fn get_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
pub(crate) async fn get(&self, rev_id: &i64) -> Option<RevisionRecord> {
self.revs_map.get(&rev_id).map(|r| r.value().clone())
}
pub(crate) async fn get_revisions_in_range(
&self,
range: &RevisionRange,
) -> Result<Vec<RevisionRecord>, FlowyError> {
pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<RevisionRecord>, FlowyError> {
let revs = range
.iter()
.flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
@ -106,7 +103,7 @@ impl RevisionMemoryCache {
},
});
if delegate.receive_checkpoint(save_records).is_ok() {
if delegate.checkpoint_tick(save_records).is_ok() {
revs_write_guard.clear();
drop(revs_write_guard);
}

View File

@ -1,5 +1,9 @@
use crate::{errors::FlowyError, services::doc::revision::RevisionCache};
use crate::{
errors::FlowyError,
services::doc::{revision::RevisionCache, RevisionRecord},
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_collaboration::{
entities::{
doc::DocumentInfo,
@ -11,9 +15,11 @@ use flowy_error::FlowyResult;
use lib_infra::future::FutureResult;
use lib_ot::{
core::{Operation, OperationTransformable},
errors::OTError,
rich_text::RichTextDelta,
};
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::RwLock;
pub trait RevisionServer: Send + Sync {
fn fetch_document(&self, doc_id: &str) -> FutureResult<DocumentInfo, FlowyError>;
@ -24,16 +30,19 @@ pub struct RevisionManager {
user_id: String,
rev_id_counter: RevIdCounter,
cache: Arc<RevisionCache>,
sync_seq: Arc<RevisionSyncSeq>,
}
impl RevisionManager {
pub fn new(user_id: &str, doc_id: &str, cache: Arc<RevisionCache>) -> Self {
let rev_id_counter = RevIdCounter::new(0);
let sync_seq = Arc::new(RevisionSyncSeq::new());
Self {
doc_id: doc_id.to_string(),
user_id: user_id.to_owned(),
rev_id_counter,
cache,
sync_seq,
}
}
@ -56,52 +65,122 @@ impl RevisionManager {
self.cache.reset_document(&self.doc_id, revisions.into_inner())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
assert_eq!(revision.ty, RevType::Remote);
self.rev_id_counter.set(revision.rev_id);
let _ = self.cache.add_remote_revision(revision.clone()).await?;
let _ = self.cache.add(revision.clone(), RevisionState::Ack).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, revision))]
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
assert_eq!(revision.ty, RevType::Local);
let _ = self.cache.add_local_revision(revision.clone()).await?;
let record = self.cache.add(revision.clone(), RevisionState::Local).await?;
self.sync_seq.add_revision(record).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
self.cache.ack_revision(rev_id).await;
if self.sync_seq.ack(&rev_id).await.is_ok() {
self.cache.ack(rev_id).await;
}
Ok(())
}
pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub fn next_rev_id(&self) -> (i64, i64) {
let cur = self.rev_id_counter.value();
let next = self.rev_id_counter.next();
(cur, next)
}
pub fn update_rev_id_counter_value(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
debug_assert!(range.doc_id == self.doc_id);
let revisions = self.cache.revisions_in_range(range.clone()).await?;
Ok(revisions)
}
pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> { self.cache.next_sync_revision() }
pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
let sync_seq = self.sync_seq.clone();
let cache = self.cache.clone();
FutureResult::new(async move {
match sync_seq.next_sync_revision().await {
None => match sync_seq.next_sync_rev_id().await {
None => Ok(None),
Some(rev_id) => Ok(cache.get(rev_id).await.map(|record| record.revision)),
},
Some((_, record)) => Ok(Some(record.revision)),
}
})
}
pub async fn latest_revision(&self) -> Revision { self.cache.latest_revision().await }
pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
self.cache.get_revision(rev_id).await.map(|record| record.revision)
self.cache.get(rev_id).await.map(|record| record.revision)
}
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionManager {
pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
struct RevisionSyncSeq {
revs_map: Arc<DashMap<i64, RevisionRecord>>,
local_revs: Arc<RwLock<VecDeque<i64>>>,
}
impl std::default::Default for RevisionSyncSeq {
fn default() -> Self {
let local_revs = Arc::new(RwLock::new(VecDeque::new()));
RevisionSyncSeq {
revs_map: Arc::new(DashMap::new()),
local_revs,
}
}
}
impl RevisionSyncSeq {
fn new() -> Self { RevisionSyncSeq::default() }
async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
// The last revision's rev_id must be greater than the new one.
if let Some(rev_id) = self.local_revs.read().await.back() {
if *rev_id >= record.revision.rev_id {
return Err(OTError::revision_id_conflict()
.context(format!("The new revision's id must be greater than {}", rev_id)));
}
}
self.local_revs.write().await.push_back(record.revision.rev_id);
self.revs_map.insert(record.revision.rev_id, record);
Ok(())
}
async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
if let Some(pop_rev_id) = self.next_sync_rev_id().await {
if &pop_rev_id != rev_id {
let desc = format!(
"The ack rev_id:{} is not equal to the current rev_id:{}",
rev_id, pop_rev_id
);
// tracing::error!("{}", desc);
return Err(FlowyError::internal().context(desc));
}
tracing::debug!("pop revision {}", pop_rev_id);
self.revs_map.remove(&pop_rev_id);
let _ = self.local_revs.write().await.pop_front();
}
Ok(())
}
async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
match self.local_revs.read().await.front() {
None => None,
Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
}
}
async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
}
struct RevisionLoader {
@ -113,7 +192,7 @@ struct RevisionLoader {
impl RevisionLoader {
async fn load(&self) -> Result<Vec<Revision>, FlowyError> {
let records = self.cache.read_revisions(&self.doc_id)?;
let records = self.cache.batch_get(&self.doc_id)?;
let revisions: Vec<Revision>;
if records.is_empty() {
let doc = self.server.fetch_document(&self.doc_id).await?;
@ -124,18 +203,20 @@ impl RevisionLoader {
doc.base_rev_id,
doc.rev_id,
delta_data,
RevType::Remote,
&self.user_id,
doc_md5,
);
let _ = self.cache.add_local_revision(revision.clone()).await?;
let _ = self.cache.add(revision.clone(), RevisionState::Ack).await?;
revisions = vec![revision];
} else {
for record in &records {
match record.state {
RevisionState::StateLocal => match self.cache.add_local_revision(record.revision.clone()).await {
Ok(_) => {},
Err(e) => tracing::error!("{}", e),
RevisionState::Local => {
//
match self.cache.add(record.revision.clone(), RevisionState::Local).await {
Ok(_) => {},
Err(e) => tracing::error!("{}", e),
}
},
RevisionState::Ack => {},
}
@ -180,3 +261,16 @@ fn correct_delta_if_need(delta: &mut RichTextDelta) {
delta.ops.push(Operation::Insert("\n".into()));
}
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionSyncSeq {
#[allow(dead_code)]
pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
#[allow(dead_code)]
pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
}
#[cfg(feature = "flowy_unit_test")]
impl RevisionManager {
pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
}

View File

@ -217,7 +217,6 @@ pub(crate) async fn handle_push_rev(
local_base_rev_id,
local_rev_id,
client_prime.to_bytes(),
RevType::Remote,
&user_id,
md5.clone(),
);
@ -229,7 +228,6 @@ pub(crate) async fn handle_push_rev(
local_base_rev_id,
local_rev_id,
server_prime.to_bytes(),
RevType::Local,
&user_id,
md5,
)))

View File

@ -1,29 +1,34 @@
use crate::{
errors::FlowyError,
services::doc::revision::RevisionRecord,
sql_tables::{doc::RevTable, mk_revision_record_from_table, RevTableState, RevTableType, RevisionChangeset},
sql_tables::{
doc::RevisionTable,
mk_revision_record_from_table,
RevTableType,
RevisionChangeset,
RevisionTableState,
},
};
use diesel::update;
use flowy_collaboration::entities::revision::RevisionRange;
use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection};
pub struct RevTableSql {}
pub struct RevisionTableSql {}
impl RevTableSql {
pub(crate) fn create_rev_table(revisions: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
impl RevisionTableSql {
pub(crate) fn create(revision_records: Vec<RevisionRecord>, conn: &SqliteConnection) -> Result<(), FlowyError> {
// Batch insert: https://diesel.rs/guides/all-about-inserts.html
let records = revisions
let records = revision_records
.into_iter()
.map(|record| {
let rev_ty: RevTableType = record.revision.ty.into();
let rev_state: RevTableState = record.state.into();
let rev_state: RevisionTableState = record.state.into();
(
dsl::doc_id.eq(record.revision.doc_id),
dsl::base_rev_id.eq(record.revision.base_rev_id),
dsl::rev_id.eq(record.revision.rev_id),
dsl::data.eq(record.revision.delta_data),
dsl::state.eq(rev_state),
dsl::ty.eq(rev_ty),
dsl::ty.eq(RevTableType::Local),
)
})
.collect::<Vec<_>>();
@ -32,7 +37,7 @@ impl RevTableSql {
Ok(())
}
pub(crate) fn update_rev_table(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
pub(crate) fn update(changeset: RevisionChangeset, conn: &SqliteConnection) -> Result<(), FlowyError> {
let filter = dsl::rev_table
.filter(dsl::rev_id.eq(changeset.rev_id.as_ref()))
.filter(dsl::doc_id.eq(changeset.doc_id));
@ -41,7 +46,7 @@ impl RevTableSql {
Ok(())
}
pub(crate) fn read_rev_tables(
pub(crate) fn read(
user_id: &str,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
@ -51,7 +56,7 @@ impl RevTableSql {
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));
}
let rows = sql.order(dsl::rev_id.asc()).load::<RevTable>(conn)?;
let rows = sql.order(dsl::rev_id.asc()).load::<RevisionTable>(conn)?;
let records = rows
.into_iter()
.map(|row| mk_revision_record_from_table(user_id, row))
@ -60,7 +65,7 @@ impl RevTableSql {
Ok(records)
}
pub(crate) fn read_rev_tables_with_range(
pub(crate) fn read_with_range(
user_id: &str,
doc_id: &str,
range: RevisionRange,
@ -71,7 +76,7 @@ impl RevTableSql {
.filter(dsl::rev_id.le(range.end))
.filter(dsl::doc_id.eq(doc_id))
.order(dsl::rev_id.asc())
.load::<RevTable>(conn)?;
.load::<RevisionTable>(conn)?;
let revisions = rev_tables
.into_iter()
@ -80,11 +85,7 @@ impl RevTableSql {
Ok(revisions)
}
pub(crate) fn delete_rev_tables(
doc_id: &str,
rev_ids: Option<Vec<i64>>,
conn: &SqliteConnection,
) -> Result<(), FlowyError> {
pub(crate) fn delete(doc_id: &str, rev_ids: Option<Vec<i64>>, conn: &SqliteConnection) -> Result<(), FlowyError> {
let mut sql = dsl::rev_table.filter(dsl::doc_id.eq(doc_id)).into_boxed();
if let Some(rev_ids) = rev_ids {
sql = sql.filter(dsl::rev_id.eq_any(rev_ids));

View File

@ -1,4 +1,5 @@
use crate::services::doc::revision::RevisionRecord;
use bytes::Bytes;
use diesel::sql_types::Integer;
use flowy_collaboration::{
entities::revision::{RevId, RevType, Revision, RevisionState},
@ -8,99 +9,80 @@ use flowy_database::schema::rev_table;
#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
#[table_name = "rev_table"]
pub(crate) struct RevTable {
pub(crate) struct RevisionTable {
id: i32,
pub(crate) doc_id: String,
pub(crate) base_rev_id: i64,
pub(crate) rev_id: i64,
pub(crate) data: Vec<u8>,
pub(crate) state: RevTableState,
pub(crate) ty: RevTableType,
pub(crate) state: RevisionTableState,
pub(crate) ty: RevTableType, // Deprecated
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
pub enum RevTableState {
pub enum RevisionTableState {
Local = 0,
Acked = 1,
Ack = 1,
}
impl std::default::Default for RevTableState {
fn default() -> Self { RevTableState::Local }
impl std::default::Default for RevisionTableState {
fn default() -> Self { RevisionTableState::Local }
}
impl std::convert::From<i32> for RevTableState {
impl std::convert::From<i32> for RevisionTableState {
fn from(value: i32) -> Self {
match value {
0 => RevTableState::Local,
1 => RevTableState::Acked,
0 => RevisionTableState::Local,
1 => RevisionTableState::Ack,
o => {
log::error!("Unsupported rev state {}, fallback to RevState::Local", o);
RevTableState::Local
RevisionTableState::Local
},
}
}
}
impl RevTableState {
impl RevisionTableState {
pub fn value(&self) -> i32 { *self as i32 }
}
impl_sql_integer_expression!(RevTableState);
impl_sql_integer_expression!(RevisionTableState);
impl std::convert::From<RevTableState> for RevisionState {
fn from(s: RevTableState) -> Self {
impl std::convert::From<RevisionTableState> for RevisionState {
fn from(s: RevisionTableState) -> Self {
match s {
RevTableState::Local => RevisionState::StateLocal,
RevTableState::Acked => RevisionState::Ack,
RevisionTableState::Local => RevisionState::Local,
RevisionTableState::Ack => RevisionState::Ack,
}
}
}
impl std::convert::From<RevisionState> for RevTableState {
impl std::convert::From<RevisionState> for RevisionTableState {
fn from(s: RevisionState) -> Self {
match s {
RevisionState::StateLocal => RevTableState::Local,
RevisionState::Ack => RevTableState::Acked,
RevisionState::Local => RevisionTableState::Local,
RevisionState::Ack => RevisionTableState::Ack,
}
}
}
pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevTable) -> RevisionRecord {
pub(crate) fn mk_revision_record_from_table(user_id: &str, table: RevisionTable) -> RevisionRecord {
let md5 = md5(&table.data);
let revision = Revision {
base_rev_id: table.base_rev_id,
rev_id: table.rev_id,
delta_data: table.data,
let revision = Revision::new(
&table.doc_id,
table.base_rev_id,
table.rev_id,
Bytes::from(table.data),
&user_id,
md5,
doc_id: table.doc_id,
ty: table.ty.into(),
user_id: user_id.to_owned(),
};
);
RevisionRecord {
revision,
state: table.state.into(),
}
}
impl std::convert::From<RevType> for RevTableType {
fn from(ty: RevType) -> Self {
match ty {
RevType::Local => RevTableType::Local,
RevType::Remote => RevTableType::Remote,
}
}
}
impl std::convert::From<RevTableType> for RevType {
fn from(ty: RevTableType) -> Self {
match ty {
RevTableType::Local => RevType::Local,
RevTableType::Remote => RevType::Remote,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
#[repr(i32)]
#[sql_type = "Integer"]
@ -130,8 +112,26 @@ impl RevTableType {
}
impl_sql_integer_expression!(RevTableType);
impl std::convert::From<RevType> for RevTableType {
fn from(ty: RevType) -> Self {
match ty {
RevType::DeprecatedLocal => RevTableType::Local,
RevType::DeprecatedRemote => RevTableType::Remote,
}
}
}
impl std::convert::From<RevTableType> for RevType {
fn from(ty: RevTableType) -> Self {
match ty {
RevTableType::Local => RevType::DeprecatedLocal,
RevTableType::Remote => RevType::DeprecatedRemote,
}
}
}
pub struct RevisionChangeset {
pub(crate) doc_id: String,
pub(crate) rev_id: RevId,
pub(crate) state: RevTableState,
pub(crate) state: RevisionTableState,
}

View File

@ -67,7 +67,7 @@ impl EditorTest {
self.editor.replace(interval, s).await.unwrap();
},
EditorScript::AssertRevisionState(rev_id, state) => {
let record = cache.get_revision(rev_id).await.unwrap();
let record = cache.get(rev_id).await.unwrap();
assert_eq!(record.state, state);
},
EditorScript::AssertCurrentRevId(rev_id) => {

View File

@ -21,7 +21,7 @@ pub struct Revision {
pub doc_id: String,
#[pb(index = 6)]
pub ty: RevType,
ty: RevType, // Deprecated
#[pb(index = 7)]
pub user_id: String,
@ -42,29 +42,11 @@ impl Revision {
pub fn is_initial(&self) -> bool { self.rev_id == 0 }
pub fn initial_revision(user_id: &str, doc_id: &str, delta_data: Bytes) -> Self {
let user_id = user_id.to_owned();
let doc_id = doc_id.to_owned();
let md5 = md5(&delta_data);
Self {
base_rev_id: 0,
rev_id: 0,
delta_data: delta_data.to_vec(),
md5,
doc_id,
ty: RevType::Local,
user_id,
}
Self::new(doc_id, 0, 0, delta_data, user_id, md5)
}
pub fn new(
doc_id: &str,
base_rev_id: i64,
rev_id: i64,
delta_data: Bytes,
ty: RevType,
user_id: &str,
md5: String,
) -> Revision {
pub fn new(doc_id: &str, base_rev_id: i64, rev_id: i64, delta_data: Bytes, user_id: &str, md5: String) -> Revision {
let user_id = user_id.to_owned();
let doc_id = doc_id.to_owned();
let delta_data = delta_data.to_vec();
@ -81,7 +63,7 @@ impl Revision {
delta_data,
md5,
doc_id,
ty,
ty: RevType::DeprecatedLocal,
user_id,
}
}
@ -160,22 +142,6 @@ impl std::fmt::Display for RevId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) }
}
// Deprecated
// TODO: remove RevType
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType {
Local = 0,
Remote = 1,
}
impl RevType {
pub fn is_local(&self) -> bool { self == &RevType::Local }
}
impl std::default::Default for RevType {
fn default() -> Self { RevType::Local }
}
#[derive(Debug, Clone, Default, ProtoBuf)]
pub struct RevisionRange {
#[pb(index = 1)]
@ -214,6 +180,16 @@ pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevisionState {
StateLocal = 0,
Ack = 1,
Local = 0,
Ack = 1,
}
#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
pub enum RevType {
DeprecatedLocal = 0,
DeprecatedRemote = 1,
}
impl std::default::Default for RevType {
fn default() -> Self { RevType::DeprecatedLocal }
}

View File

@ -164,7 +164,7 @@ impl Revision {
self.ty
}
pub fn clear_ty(&mut self) {
self.ty = RevType::Local;
self.ty = RevType::DeprecatedLocal;
}
// Param is passed by value, moved
@ -264,7 +264,7 @@ impl ::protobuf::Message for Revision {
if !self.doc_id.is_empty() {
my_size += ::protobuf::rt::string_size(5, &self.doc_id);
}
if self.ty != RevType::Local {
if self.ty != RevType::DeprecatedLocal {
my_size += ::protobuf::rt::enum_size(6, self.ty);
}
if !self.user_id.is_empty() {
@ -291,7 +291,7 @@ impl ::protobuf::Message for Revision {
if !self.doc_id.is_empty() {
os.write_string(5, &self.doc_id)?;
}
if self.ty != RevType::Local {
if self.ty != RevType::DeprecatedLocal {
os.write_enum(6, ::protobuf::ProtobufEnum::value(&self.ty))?;
}
if !self.user_id.is_empty() {
@ -391,7 +391,7 @@ impl ::protobuf::Clear for Revision {
self.delta_data.clear();
self.md5.clear();
self.doc_id.clear();
self.ty = RevType::Local;
self.ty = RevType::DeprecatedLocal;
self.user_id.clear();
self.unknown_fields.clear();
}
@ -956,10 +956,60 @@ impl ::protobuf::reflect::ProtobufValue for RevisionRange {
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevisionState {
StateLocal = 0,
Ack = 1,
}
impl ::protobuf::ProtobufEnum for RevisionState {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<RevisionState> {
match value {
0 => ::std::option::Option::Some(RevisionState::StateLocal),
1 => ::std::option::Option::Some(RevisionState::Ack),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [RevisionState] = &[
RevisionState::StateLocal,
RevisionState::Ack,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<RevisionState>("RevisionState", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for RevisionState {
}
impl ::std::default::Default for RevisionState {
fn default() -> Self {
RevisionState::StateLocal
}
}
impl ::protobuf::reflect::ProtobufValue for RevisionState {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevType {
Local = 0,
Remote = 1,
DeprecatedLocal = 0,
DeprecatedRemote = 1,
}
impl ::protobuf::ProtobufEnum for RevType {
@ -969,16 +1019,16 @@ impl ::protobuf::ProtobufEnum for RevType {
fn from_i32(value: i32) -> ::std::option::Option<RevType> {
match value {
0 => ::std::option::Option::Some(RevType::Local),
1 => ::std::option::Option::Some(RevType::Remote),
0 => ::std::option::Option::Some(RevType::DeprecatedLocal),
1 => ::std::option::Option::Some(RevType::DeprecatedRemote),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [RevType] = &[
RevType::Local,
RevType::Remote,
RevType::DeprecatedLocal,
RevType::DeprecatedRemote,
];
values
}
@ -996,7 +1046,7 @@ impl ::std::marker::Copy for RevType {
impl ::std::default::Default for RevType {
fn default() -> Self {
RevType::Local
RevType::DeprecatedLocal
}
}
@ -1006,56 +1056,6 @@ impl ::protobuf::reflect::ProtobufValue for RevType {
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum RevState {
StateLocal = 0,
Ack = 1,
}
impl ::protobuf::ProtobufEnum for RevState {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<RevState> {
match value {
0 => ::std::option::Option::Some(RevState::StateLocal),
1 => ::std::option::Option::Some(RevState::Ack),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [RevState] = &[
RevState::StateLocal,
RevState::Ack,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::EnumDescriptor> = ::protobuf::rt::LazyV2::INIT;
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new_pb_name::<RevState>("RevState", file_descriptor_proto())
})
}
}
impl ::std::marker::Copy for RevState {
}
impl ::std::default::Default for RevState {
fn default() -> Self {
RevState::StateLocal
}
}
impl ::protobuf::reflect::ProtobufValue for RevState {
fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
::protobuf::reflect::ReflectValueRef::Enum(::protobuf::ProtobufEnum::descriptor(self))
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0erevision.proto\"\xbc\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\
\x18\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\
@ -1067,58 +1067,58 @@ static file_descriptor_proto_data: &'static [u8] = b"\
\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\
\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\
\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\
\x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Re\
mote\x10\x01*#\n\x08RevState\x12\x0e\n\nStateLocal\x10\0\x12\x07\n\x03Ac\
k\x10\x01J\xe8\x07\n\x06\x12\x04\0\0\x1d\x01\n\x08\n\x01\x0c\x12\x03\0\0\
\x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\
\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\
\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\
\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\
\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\
\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\
\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\
\x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\
\0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\
\x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\
\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\
\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\
\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\
\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\
\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\
\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\
\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\
\x02\x05\x03\x12\x03\x08\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\t\x04\
\x17\n\x0c\n\x05\x04\0\x02\x06\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\0\x02\
\x06\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\t\x15\
\x16\n\n\n\x02\x04\x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\
\x0b\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x0c\x04\x20\n\x0c\n\x05\
\x04\x01\x02\0\x04\x12\x03\x0c\x04\x0c\n\x0c\n\x05\x04\x01\x02\0\x06\x12\
\x03\x0c\r\x15\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\x16\x1b\n\x0c\n\
\x05\x04\x01\x02\0\x03\x12\x03\x0c\x1e\x1f\n\n\n\x02\x04\x02\x12\x04\x0e\
\0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0e\x08\r\n\x0b\n\x04\x04\x02\
\x02\0\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\
\t\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\
\x02\0\x03\x12\x03\x0f\x12\x13\n\n\n\x02\x04\x03\x12\x04\x11\0\x15\x01\n\
\n\n\x03\x04\x03\x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\
\x03\x12\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\
\x05\x04\x03\x02\0\x01\x12\x03\x12\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\
\x12\x03\x12\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x14\n\
\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\
\x01\x01\x12\x03\x13\n\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\
\x12\x13\n\x0b\n\x04\x04\x03\x02\x02\x12\x03\x14\x04\x12\n\x0c\n\x05\x04\
\x03\x02\x02\x05\x12\x03\x14\x04\t\n\x0c\n\x05\x04\x03\x02\x02\x01\x12\
\x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\x02\x03\x12\x03\x14\x10\x11\n\n\n\
\x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\x03\x05\0\x01\x12\x03\x16\x05\x0c\
\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\
\x12\x03\x17\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x17\x0c\r\n\x0b\n\
\x04\x05\0\x02\x01\x12\x03\x18\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\
\x03\x18\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x18\r\x0e\n\n\n\x02\
\x05\x01\x12\x04\x1a\0\x1d\x01\n\n\n\x03\x05\x01\x01\x12\x03\x1a\x05\r\n\
\x0b\n\x04\x05\x01\x02\0\x12\x03\x1b\x04\x13\n\x0c\n\x05\x05\x01\x02\0\
\x01\x12\x03\x1b\x04\x0e\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x1b\x11\
\x12\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1c\x04\x0c\n\x0c\n\x05\x05\x01\
\x02\x01\x01\x12\x03\x1c\x04\x07\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\
\x1c\n\x0bb\x06proto3\
\x01(\x03R\x03end*(\n\rRevisionState\x12\x0e\n\nStateLocal\x10\0\x12\x07\
\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\n\x0fDeprecatedLocal\x10\0\x12\
\x14\n\x10DeprecatedRemote\x10\x01J\xe8\x07\n\x06\x12\x04\0\0\x1d\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\x01\n\n\n\
\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\
\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\
\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\
\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\
\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\
\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\
\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\
\x02\x02\x03\x12\x03\x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\
\x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\
\0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\
\x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\
\x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\
\x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\
\n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\
\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\
\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\x0b\n\x04\x04\0\x02\
\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\0\x02\x06\x05\x12\x03\t\x04\n\n\
\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\x04\0\x02\x06\
\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\x01\x12\x04\x0b\0\r\x01\n\n\n\x03\
\x04\x01\x01\x12\x03\x0b\x08\x18\n\x0b\n\x04\x04\x01\x02\0\x12\x03\x0c\
\x04\x20\n\x0c\n\x05\x04\x01\x02\0\x04\x12\x03\x0c\x04\x0c\n\x0c\n\x05\
\x04\x01\x02\0\x06\x12\x03\x0c\r\x15\n\x0c\n\x05\x04\x01\x02\0\x01\x12\
\x03\x0c\x16\x1b\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x0c\x1e\x1f\n\n\n\
\x02\x04\x02\x12\x04\x0e\0\x10\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0e\x08\
\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\x02\
\0\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0f\n\x0f\
\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x12\x13\n\n\n\x02\x04\x03\x12\
\x04\x11\0\x15\x01\n\n\n\x03\x04\x03\x01\x12\x03\x11\x08\x15\n\x0b\n\x04\
\x04\x03\x02\0\x12\x03\x12\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\
\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x12\x0b\x11\n\x0c\n\x05\
\x04\x03\x02\0\x03\x12\x03\x12\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\
\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x13\x04\t\n\x0c\
\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\n\x0f\n\x0c\n\x05\x04\x03\x02\x01\
\x03\x12\x03\x13\x12\x13\n\x0b\n\x04\x04\x03\x02\x02\x12\x03\x14\x04\x12\
\n\x0c\n\x05\x04\x03\x02\x02\x05\x12\x03\x14\x04\t\n\x0c\n\x05\x04\x03\
\x02\x02\x01\x12\x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\x02\x03\x12\x03\x14\
\x10\x11\n\n\n\x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\x03\x05\0\x01\x12\
\x03\x16\x05\x12\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\x04\x13\n\x0c\n\x05\
\x05\0\x02\0\x01\x12\x03\x17\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\
\x17\x11\x12\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x18\x04\x0c\n\x0c\n\x05\
\x05\0\x02\x01\x01\x12\x03\x18\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\
\x03\x18\n\x0b\n\n\n\x02\x05\x01\x12\x04\x1a\0\x1d\x01\n\n\n\x03\x05\x01\
\x01\x12\x03\x1a\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\x03\x1b\x04\x18\n\
\x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x1b\x04\x13\n\x0c\n\x05\x05\x01\x02\
\0\x02\x12\x03\x1b\x16\x17\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1c\x04\
\x19\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x1c\x04\x14\n\x0c\n\x05\x05\
\x01\x02\x01\x02\x12\x03\x1c\x17\x18b\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -20,11 +20,11 @@ message RevisionRange {
int64 start = 2;
int64 end = 3;
}
enum RevType {
Local = 0;
Remote = 1;
}
enum RevState {
enum RevisionState {
StateLocal = 0;
Ack = 1;
}
enum RevType {
DeprecatedLocal = 0;
DeprecatedRemote = 1;
}

View File

@ -88,8 +88,8 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
| "NetworkType"
| "UserEvent"
| "UserNotification"
| "RevisionState"
| "RevType"
| "RevState"
| "DocumentClientWSDataType"
| "DocumentServerWSDataType"
| "TrashType"