diff --git a/frontend/.vscode/launch.json b/frontend/.vscode/launch.json index 1bc6978a44e9..f759352c153f 100644 --- a/frontend/.vscode/launch.json +++ b/frontend/.vscode/launch.json @@ -12,7 +12,7 @@ "program": "./lib/main.dart", "type": "dart", "env": { - "RUST_LOG": "debug", + "RUST_LOG": "trace", "RUST_BACKTRACE": "1" }, // uncomment the following line to testing performance. diff --git a/frontend/appflowy_flutter/pubspec.yaml b/frontend/appflowy_flutter/pubspec.yaml index f1b8f1f95dee..6f8f8439e7ca 100644 --- a/frontend/appflowy_flutter/pubspec.yaml +++ b/frontend/appflowy_flutter/pubspec.yaml @@ -15,7 +15,7 @@ publish_to: "none" # Remove this line if you wish to publish to pub.dev # In iOS, build-name is used as CFBundleShortVersionString while build-number used as CFBundleVersion. # Read more about iOS versioning at # https://developer.apple.com/library/archive/documentation/General/Reference/InfoPlistKeyReference/Articles/CoreFoundationKeys.html -version: 0.4.0 +version: 0.4.1 environment: flutter: ">=3.18.0-0.2.pre" diff --git a/frontend/rust-lib/event-integration/src/lib.rs b/frontend/rust-lib/event-integration/src/lib.rs index 0edb91139c0c..646b3c0d9c2c 100644 --- a/frontend/rust-lib/event-integration/src/lib.rs +++ b/frontend/rust-lib/event-integration/src/lib.rs @@ -53,10 +53,8 @@ impl EventIntegrationTest { let path = path_buf.to_str().unwrap().to_string(); let device_id = uuid::Uuid::new_v4().to_string(); - let level = "trace"; - std::env::set_var("RUST_LOG", level); let config = AppFlowyCoreConfig::new(path.clone(), path, device_id, name).log_filter( - level, + "trace", vec![ "flowy_test".to_string(), "tokio".to_string(), @@ -79,27 +77,16 @@ impl EventIntegrationTest { } } - pub fn get_appflowy_cloud_server(&self) -> Arc { - self - .appflowy_core - .server_provider - .get_appflowy_cloud_server() - .unwrap() + pub fn get_server(&self) -> Arc { + self.appflowy_core.server_provider.get_server().unwrap() } pub async fn wait_ws_connected(&self) { - if self - .get_appflowy_cloud_server() - .get_ws_state() - .is_connected() - { + if self.get_server().get_ws_state().is_connected() { return; } - let mut ws_state = self - .get_appflowy_cloud_server() - .subscribe_ws_state() - .unwrap(); + let mut ws_state = self.get_server().subscribe_ws_state().unwrap(); loop { select! { _ = sleep(Duration::from_secs(20)) => { @@ -121,7 +108,7 @@ impl EventIntegrationTest { oid: &str, collay_type: CollabType, ) -> Result { - let server = self.server_provider.get_appflowy_cloud_server().unwrap(); + let server = self.server_provider.get_server().unwrap(); let workspace_id = self.get_current_workspace().await.id; let uid = self.get_user_profile().await?.id; let doc_state = server diff --git a/frontend/rust-lib/event-integration/tests/asset/040_local.zip b/frontend/rust-lib/event-integration/tests/asset/040_local.zip index 452498260635..ba6b01f82fcb 100644 Binary files a/frontend/rust-lib/event-integration/tests/asset/040_local.zip and b/frontend/rust-lib/event-integration/tests/asset/040_local.zip differ diff --git a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/import_af_data_folder_test.rs b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/import_af_data_folder_test.rs index 516def105ac5..3e0160c1facb 100644 --- a/frontend/rust-lib/event-integration/tests/user/af_cloud_test/import_af_data_folder_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/af_cloud_test/import_af_data_folder_test.rs @@ -1,5 +1,6 @@ use crate::util::unzip_history_user_db; use assert_json_diff::assert_json_include; +use collab_database::rows::database_row_document_id_from_row_id; use collab_entity::CollabType; use event_integration::user_event::user_localhost_af_cloud; use event_integration::{document_data_from_document_doc_state, EventIntegrationTest}; @@ -45,6 +46,9 @@ async fn import_appflowy_data_folder_into_new_view_test() { assert_eq!(views.len(), 2); assert_eq!(views[1].name, import_container_name); + // the 040_local should be an empty document, so try to get the document data + let _ = test.get_document_data(&views[1].id).await; + let local_child_views = test.get_view(&views[1].id).await.child_views; assert_eq!(local_child_views.len(), 1); assert_eq!(local_child_views[0].name, "Document1"); @@ -61,6 +65,14 @@ async fn import_appflowy_data_folder_into_new_view_test() { assert_eq!(document2_child_views[0].name, "Grid1"); assert_eq!(document2_child_views[1].name, "Grid2"); + let rows = test.get_database(&document2_child_views[1].id).await.rows; + assert_eq!(rows.len(), 3); + + // In the 040_local, only the first row has a document with content + let row_document_id = database_row_document_id_from_row_id(&rows[0].id); + let row_document_data = test.get_document_data(&row_document_id).await; + assert_json_include!(actual: json!(row_document_data), expected: expected_row_doc_json()); + drop(cleaner); } @@ -370,3 +382,46 @@ fn expected_doc_2_json() -> Value { "page_id": "ZVogdaK9yO" }) } + +fn expected_row_doc_json() -> Value { + json!( { + "blocks": { + "eSBQHZ28e0": { + "children": "RbLAaE9UDJ", + "data": {}, + "external_id": null, + "external_type": null, + "id": "eSBQHZ28e0", + "parent": "", + "ty": "page" + }, + "eUIL6qjgj3": { + "children": "fUnGRcvPEA", + "data": { + "delta": [ + { + "insert": "document in database row" + } + ] + }, + "external_id": "-DliEUjHr2", + "external_type": "text", + "id": "eUIL6qjgj3", + "parent": "eSBQHZ28e0", + "ty": "paragraph" + } + }, + "meta": { + "children_map": { + "RbLAaE9UDJ": [ + "eUIL6qjgj3" + ], + "fUnGRcvPEA": [] + }, + "text_map": { + "-DliEUjHr2": "[{\"insert\":\"document in database row\"}]" + } + }, + "page_id": "eSBQHZ28e0" + }) +} diff --git a/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs b/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs index 2d7ee184c766..88ea6ac1ba72 100644 --- a/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs +++ b/frontend/rust-lib/event-integration/tests/user/migration_test/version_test.rs @@ -1,6 +1,7 @@ use event_integration::EventIntegrationTest; use flowy_core::DEFAULT_NAME; use flowy_folder::entities::ViewLayoutPB; +use std::time::Duration; use crate::util::unzip_history_user_db; @@ -140,10 +141,7 @@ async fn collab_db_backup_test() { assert_eq!(backups.len(), 1); assert_eq!( backups[0], - format!( - "collab_db_{}", - chrono::Local::now().format("%Y%m%d").to_string() - ) + format!("collab_db_{}", chrono::Local::now().format("%Y%m%d")) ); drop(cleaner); } @@ -157,7 +155,15 @@ async fn delete_outdated_collab_db_backup_test() { EventIntegrationTest::new_with_user_data_path(user_db_path, DEFAULT_NAME.to_string()).await; let uid = test.get_user_profile().await.unwrap().id; + // saving the backup is a background task, so we need to wait for it to finish + // 2 seconds should be enough for the background task to finish + tokio::time::sleep(Duration::from_secs(2)).await; let backups = test.user_manager.get_collab_backup_list(uid); + + if backups.len() != 10 { + dbg!("backups: {:?}", backups.clone()); + } + assert_eq!(backups.len(), 10); assert_eq!(backups[0], "collab_db_0.4.0_20231202"); assert_eq!(backups[1], "collab_db_0.4.0_20231203"); @@ -170,10 +176,7 @@ async fn delete_outdated_collab_db_backup_test() { assert_eq!(backups[8], "collab_db_0.4.0_20231210"); assert_eq!( backups[9], - format!( - "collab_db_{}", - chrono::Local::now().format("%Y%m%d").to_string() - ) + format!("collab_db_{}", chrono::Local::now().format("%Y%m%d")) ); drop(cleaner); } diff --git a/frontend/rust-lib/event-integration/tests/util.rs b/frontend/rust-lib/event-integration/tests/util.rs index 77eedf91ea7e..37364765a92d 100644 --- a/frontend/rust-lib/event-integration/tests/util.rs +++ b/frontend/rust-lib/event-integration/tests/util.rs @@ -27,7 +27,7 @@ use flowy_user::entities::{AuthenticatorPB, UpdateUserProfilePayloadPB}; use flowy_user::errors::FlowyError; use flowy_user::event_map::UserEvent::*; -use flowy_user_deps::cloud::{UserCloudService, UserCloudServiceProvider}; +use flowy_user_deps::cloud::UserCloudService; use flowy_user_deps::entities::Authenticator; pub fn get_supabase_config() -> Option { diff --git a/frontend/rust-lib/flowy-core/src/integrate/log.rs b/frontend/rust-lib/flowy-core/src/integrate/log.rs index 10286ee74942..0503ee26ed1f 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/log.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/log.rs @@ -19,7 +19,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec) -> Stri .map(|crate_name| format!("{}={}", crate_name, level)) .collect::>(); filters.push(format!("flowy_core={}", level)); - filters.push(format!("flowy_folder2={}", level)); + filters.push(format!("flowy_folder={}", level)); filters.push(format!("collab_sync={}", level)); filters.push(format!("collab_folder={}", level)); filters.push(format!("collab_persistence={}", level)); @@ -28,7 +28,7 @@ pub(crate) fn create_log_filter(level: String, with_crates: Vec) -> Stri filters.push(format!("collab_integrate={}", level)); filters.push(format!("collab={}", level)); filters.push(format!("flowy_user={}", level)); - filters.push(format!("flowy_document2={}", level)); + filters.push(format!("flowy_document={}", level)); filters.push(format!("flowy_database2={}", level)); filters.push(format!("flowy_server={}", level)); filters.push(format!("flowy_notification={}", "info")); diff --git a/frontend/rust-lib/flowy-core/src/integrate/server.rs b/frontend/rust-lib/flowy-core/src/integrate/server.rs index e79d7322bb48..04c7a461dd4e 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/server.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/server.rs @@ -33,6 +33,12 @@ pub enum Server { Supabase = 2, } +impl Server { + pub fn is_local(&self) -> bool { + matches!(self, Server::Local) + } +} + impl Display for Server { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -49,7 +55,6 @@ impl Display for Server { /// Each server implements the [AppFlowyServer] trait, which provides the [UserCloudService], etc. pub struct ServerProvider { config: AppFlowyCoreConfig, - server: RwLock, providers: RwLock>>, pub(crate) encryption: RwLock>, #[allow(dead_code)] @@ -57,7 +62,7 @@ pub struct ServerProvider { pub(crate) user_enable_sync: RwLock, /// The authenticator type of the user. - pub(crate) user_authenticator: RwLock, + authenticator: RwLock, pub(crate) uid: Arc>>, } @@ -70,10 +75,9 @@ impl ServerProvider { let encryption = EncryptionImpl::new(None); Self { config, - server: RwLock::new(server), providers: RwLock::new(HashMap::new()), user_enable_sync: RwLock::new(true), - user_authenticator: RwLock::new(Authenticator::Local), + authenticator: RwLock::new(Authenticator::from(server)), encryption: RwLock::new(Arc::new(encryption)), store_preferences, uid: Default::default(), @@ -81,30 +85,32 @@ impl ServerProvider { } pub fn get_server_type(&self) -> Server { - self.server.read().clone() + match &*self.authenticator.read() { + Authenticator::Local => Server::Local, + Authenticator::AppFlowyCloud => Server::AppFlowyCloud, + Authenticator::Supabase => Server::Supabase, + } } - pub fn set_server_type(&self, server_type: Server) { - let old_server_type = self.server.read().clone(); - if server_type != old_server_type { + pub fn set_authenticator(&self, authenticator: Authenticator) { + let old_server_type = self.get_server_type(); + *self.authenticator.write() = authenticator; + let new_server_type = self.get_server_type(); + + if old_server_type != new_server_type { self.providers.write().remove(&old_server_type); } - - *self.server.write() = server_type; - } - - pub fn get_user_authenticator(&self) -> Authenticator { - self.user_authenticator.read().clone() } - pub fn get_appflowy_cloud_server(&self) -> FlowyResult> { - let server = self.get_server(&Server::AppFlowyCloud)?; - Ok(server) + pub fn get_authenticator(&self) -> Authenticator { + self.authenticator.read().clone() } /// Returns a [AppFlowyServer] trait implementation base on the provider_type. - pub fn get_server(&self, server_type: &Server) -> FlowyResult> { - if let Some(provider) = self.providers.read().get(server_type) { + pub fn get_server(&self) -> FlowyResult> { + let server_type = self.get_server_type(); + + if let Some(provider) = self.providers.read().get(&server_type) { return Ok(provider.clone()); } diff --git a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs index 75fbec9ec2b5..60f1a103d2a8 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs @@ -32,7 +32,7 @@ use crate::integrate::server::{Server, ServerProvider}; impl FileStorageService for ServerProvider { fn create_object(&self, object: StorageObject) -> FutureResult { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { let storage = server?.file_storage().ok_or(FlowyError::internal())?; storage.create_object(object).await @@ -40,7 +40,7 @@ impl FileStorageService for ServerProvider { } fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { let storage = server?.file_storage().ok_or(FlowyError::internal())?; storage.delete_object_by_url(object_url).await @@ -48,7 +48,7 @@ impl FileStorageService for ServerProvider { } fn get_object_by_url(&self, object_url: String) -> FutureResult { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { let storage = server?.file_storage().ok_or(FlowyError::internal())?; storage.get_object_by_url(object_url).await @@ -58,42 +58,40 @@ impl FileStorageService for ServerProvider { impl UserCloudServiceProvider for ServerProvider { fn set_token(&self, token: &str) -> Result<(), FlowyError> { - let server = self.get_server(&self.get_server_type())?; + let server = self.get_server()?; server.set_token(token)?; Ok(()) } fn subscribe_token_state(&self) -> Option> { - let server = self.get_server(&self.get_server_type()).ok()?; + let server = self.get_server().ok()?; server.subscribe_token_state() } fn set_enable_sync(&self, uid: i64, enable_sync: bool) { - if let Ok(server) = self.get_server(&self.get_server_type()) { + if let Ok(server) = self.get_server() { server.set_enable_sync(uid, enable_sync); *self.user_enable_sync.write() = enable_sync; *self.uid.write() = Some(uid); } } - fn set_user_authenticator(&self, authenticator: &Authenticator) { - debug!("set user authenticator: {:?}", authenticator); - *self.user_authenticator.write() = authenticator.clone(); - } - /// When user login, the provider type is set by the [Authenticator] and save to disk for next use. /// /// Each [Authenticator] has a corresponding [Server]. The [Server] is used /// to create a new [AppFlowyServer] if it doesn't exist. Once the [Server] is set, /// it will be used when user open the app again. /// - fn set_authenticator(&self, authenticator: Authenticator) { - let server_type: Server = authenticator.into(); - self.set_server_type(server_type.clone()); + fn set_user_authenticator(&self, authenticator: &Authenticator) { + self.set_authenticator(authenticator.clone()); + } + + fn get_user_authenticator(&self) -> Authenticator { + self.get_authenticator() } fn set_network_reachable(&self, reachable: bool) { - if let Ok(server) = self.get_server(&self.get_server_type()) { + if let Ok(server) = self.get_server() { server.set_network_reachable(reachable); } } @@ -103,16 +101,10 @@ impl UserCloudServiceProvider for ServerProvider { self.encryption.write().set_secret(secret); } - fn get_authenticator(&self) -> Authenticator { - let server_type = self.get_server_type(); - Authenticator::from(server_type) - } - /// Returns the [UserCloudService] base on the current [Server]. /// Creates a new [AppFlowyServer] if it doesn't exist. fn get_user_service(&self) -> Result, FlowyError> { - let server_type = self.get_server_type(); - let user_service = self.get_server(&server_type)?.user_service(); + let user_service = self.get_server()?.user_service(); Ok(user_service) } @@ -131,19 +123,19 @@ impl UserCloudServiceProvider for ServerProvider { impl FolderCloudService for ServerProvider { fn create_workspace(&self, uid: i64, name: &str) -> FutureResult { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let name = name.to_string(); FutureResult::new(async move { server?.folder_service().create_workspace(uid, &name).await }) } fn open_workspace(&self, workspace_id: &str) -> FutureResult<(), Error> { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server?.folder_service().open_workspace(&workspace_id).await }) } fn get_all_workspace(&self) -> FutureResult, Error> { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server?.folder_service().get_all_workspace().await }) } @@ -153,7 +145,7 @@ impl FolderCloudService for ServerProvider { uid: &i64, ) -> FutureResult, Error> { let uid = *uid; - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let workspace_id = workspace_id.to_string(); FutureResult::new(async move { server? @@ -169,7 +161,7 @@ impl FolderCloudService for ServerProvider { limit: usize, ) -> FutureResult, Error> { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server? .folder_service() @@ -187,7 +179,7 @@ impl FolderCloudService for ServerProvider { ) -> FutureResult { let object_id = object_id.to_string(); let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server? .folder_service() @@ -202,7 +194,7 @@ impl FolderCloudService for ServerProvider { objects: Vec, ) -> FutureResult<(), Error> { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server? .folder_service() @@ -213,7 +205,7 @@ impl FolderCloudService for ServerProvider { fn service_name(&self) -> String { self - .get_server(&self.get_server_type()) + .get_server() .map(|provider| provider.folder_service().service_name()) .unwrap_or_default() } @@ -227,7 +219,7 @@ impl DatabaseCloudService for ServerProvider { workspace_id: &str, ) -> FutureResult { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let database_id = object_id.to_string(); FutureResult::new(async move { server? @@ -244,7 +236,7 @@ impl DatabaseCloudService for ServerProvider { workspace_id: &str, ) -> FutureResult { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server? .database_service() @@ -258,7 +250,7 @@ impl DatabaseCloudService for ServerProvider { object_id: &str, limit: usize, ) -> FutureResult, Error> { - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let database_id = object_id.to_string(); FutureResult::new(async move { server? @@ -277,7 +269,7 @@ impl DocumentCloudService for ServerProvider { ) -> FutureResult { let workspace_id = workspace_id.to_string(); let document_id = document_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); FutureResult::new(async move { server? .document_service() @@ -293,7 +285,7 @@ impl DocumentCloudService for ServerProvider { workspace_id: &str, ) -> FutureResult, Error> { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let document_id = document_id.to_string(); FutureResult::new(async move { server? @@ -309,7 +301,7 @@ impl DocumentCloudService for ServerProvider { workspace_id: &str, ) -> FutureResult, Error> { let workspace_id = workspace_id.to_string(); - let server = self.get_server(&self.get_server_type()); + let server = self.get_server(); let document_id = document_id.to_string(); FutureResult::new(async move { server? @@ -328,7 +320,7 @@ impl CollabCloudPluginProvider for ServerProvider { #[instrument(level = "debug", skip(self, context), fields(server_type = %self.get_server_type()))] fn get_plugins(&self, context: CollabPluginProviderContext) -> Fut>> { // If the user is local, we don't need to create a sync plugin. - if self.user_authenticator.read().is_local() { + if self.get_server_type().is_local() { debug!( "User authenticator is local, skip create sync plugin for: {}", context @@ -343,7 +335,7 @@ impl CollabCloudPluginProvider for ServerProvider { collab_object, local_collab, } => { - if let Ok(server) = self.get_server(&Server::AppFlowyCloud) { + if let Ok(server) = self.get_server() { to_fut(async move { let mut plugins: Vec> = vec![]; @@ -394,7 +386,7 @@ impl CollabCloudPluginProvider for ServerProvider { } => { let mut plugins: Vec> = vec![]; if let Some(remote_collab_storage) = self - .get_server(&Server::Supabase) + .get_server() .ok() .and_then(|provider| provider.collab_storage(&collab_object)) { diff --git a/frontend/rust-lib/flowy-document/src/manager.rs b/frontend/rust-lib/flowy-document/src/manager.rs index ed4d5051529e..51eac389a0c3 100644 --- a/frontend/rust-lib/flowy-document/src/manager.rs +++ b/frontend/rust-lib/flowy-document/src/manager.rs @@ -8,7 +8,7 @@ use collab::core::origin::CollabOrigin; use collab::preclude::Collab; use collab_document::blocks::DocumentData; use collab_document::document::Document; -use collab_document::document_data::{default_document_collab_data, default_document_data}; +use collab_document::document_data::default_document_data; use collab_document::YrsDocAction; use collab_entity::CollabType; use lru::LruCache; @@ -86,25 +86,45 @@ impl DocumentManager { /// /// if the document already exists, return the existing document. /// if the data is None, will create a document with default data. + #[instrument(level = "info", skip(self, data))] pub async fn create_document( &self, uid: i64, doc_id: &str, data: Option, ) -> FlowyResult<()> { - tracing::trace!("create a document: {:?}", doc_id); if self.is_doc_exist(doc_id).unwrap_or(false) { Err(FlowyError::new( ErrorCode::RecordAlreadyExists, format!("document {} already exists", doc_id), )) } else { - let encoded_collab_v1 = - doc_state_from_document_data(doc_id, data.unwrap_or_else(default_document_data))?; - let collab = self - .collab_for_document(uid, doc_id, encoded_collab_v1.doc_state.to_vec(), false) - .await?; - collab.lock().flush(); + let result: Result = self + .cloud_service + .get_document_doc_state(doc_id, &self.user.workspace_id()?) + .await; + + match result { + Ok(data) => { + let collab = self.collab_for_document(uid, doc_id, data, false).await?; + collab.lock().flush(); + }, + Err(err) => { + if err.is_record_not_found() { + let doc_state = + doc_state_from_document_data(doc_id, data.unwrap_or_else(default_document_data))? + .doc_state + .to_vec(); + let collab = self + .collab_for_document(uid, doc_id, doc_state, false) + .await?; + collab.lock().flush(); + } else { + return Err(err); + } + }, + } + Ok(()) } } @@ -119,29 +139,10 @@ impl DocumentManager { let mut doc_state = vec![]; if !self.is_doc_exist(doc_id)? { // Try to get the document from the cloud service - let result: Result = self + doc_state = self .cloud_service .get_document_doc_state(doc_id, &self.user.workspace_id()?) - .await; - - doc_state = match result { - Ok(data) => data, - Err(err) => { - if err.is_record_not_found() { - // The document's ID exists in the cloud, but its content does not. - // This occurs when user A's document hasn't finished syncing and user B tries to open it. - // As a result, a blank document is created for user B. - event!( - tracing::Level::INFO, - "can't find the document in the cloud, doc_id: {}", - doc_id - ); - default_document_collab_data(doc_id).doc_state.to_vec() - } else { - return Err(err); - } - }, - } + .await?; } let uid = self.user.user_id()?; diff --git a/frontend/rust-lib/flowy-document/tests/document/util.rs b/frontend/rust-lib/flowy-document/tests/document/util.rs index 96a1e61040c5..b38fd66e045d 100644 --- a/frontend/rust-lib/flowy-document/tests/document/util.rs +++ b/frontend/rust-lib/flowy-document/tests/document/util.rs @@ -21,7 +21,7 @@ use collab_integrate::RocksCollabDB; use flowy_document::document::MutexDocument; use flowy_document::manager::{DocumentManager, DocumentUser}; use flowy_document_deps::cloud::*; -use flowy_error::FlowyError; +use flowy_error::{ErrorCode, FlowyError}; use flowy_storage::{FileStorageService, StorageObject}; use lib_infra::async_trait::async_trait; use lib_infra::future::{to_fut, Fut, FutureResult}; @@ -135,10 +135,16 @@ pub struct LocalTestDocumentCloudServiceImpl(); impl DocumentCloudService for LocalTestDocumentCloudServiceImpl { fn get_document_doc_state( &self, - _document_id: &str, + document_id: &str, _workspace_id: &str, ) -> FutureResult { - FutureResult::new(async move { Ok(vec![]) }) + let document_id = document_id.to_string(); + FutureResult::new(async move { + Err(FlowyError::new( + ErrorCode::RecordNotFound, + format!("Document {} not found", document_id), + )) + }) } fn get_document_snapshots( diff --git a/frontend/rust-lib/flowy-folder/src/manager.rs b/frontend/rust-lib/flowy-folder/src/manager.rs index 6017c58672f8..1d569373ab52 100644 --- a/frontend/rust-lib/flowy-folder/src/manager.rs +++ b/frontend/rust-lib/flowy-folder/src/manager.rs @@ -398,6 +398,7 @@ impl FolderManager { params: CreateViewParams, ) -> FlowyResult { let view_layout: ViewLayout = params.layout.clone().into(); + // TODO(nathan): remove orphan view. Just use for create document in row let handler = self.get_handler(&view_layout)?; let user_id = self.user.user_id()?; handler diff --git a/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs b/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs index 6cb0b617bf4c..97585f52b058 100644 --- a/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs +++ b/frontend/rust-lib/flowy-server/src/local_server/impls/document.rs @@ -2,7 +2,7 @@ use anyhow::Error; use collab::core::collab::CollabDocState; use flowy_document_deps::cloud::*; -use flowy_error::FlowyError; +use flowy_error::{ErrorCode, FlowyError}; use lib_infra::future::FutureResult; pub(crate) struct LocalServerDocumentCloudServiceImpl(); @@ -10,10 +10,16 @@ pub(crate) struct LocalServerDocumentCloudServiceImpl(); impl DocumentCloudService for LocalServerDocumentCloudServiceImpl { fn get_document_doc_state( &self, - _document_id: &str, + document_id: &str, _workspace_id: &str, ) -> FutureResult { - FutureResult::new(async move { Ok(vec![]) }) + let document_id = document_id.to_string(); + FutureResult::new(async move { + Err(FlowyError::new( + ErrorCode::RecordNotFound, + format!("Document {} not found", document_id), + )) + }) } fn get_document_snapshots( diff --git a/frontend/rust-lib/flowy-user-deps/src/cloud.rs b/frontend/rust-lib/flowy-user-deps/src/cloud.rs index 997f282b4b4a..5be76a8eb0f4 100644 --- a/frontend/rust-lib/flowy-user-deps/src/cloud.rs +++ b/frontend/rust-lib/flowy-user-deps/src/cloud.rs @@ -86,16 +86,15 @@ pub trait UserCloudServiceProvider: Send + Sync + 'static { /// * `enable_sync`: A boolean indicating whether synchronization should be enabled or disabled. fn set_enable_sync(&self, uid: i64, enable_sync: bool); - /// Sets the authentication type for a user. The authentication type is the type when user sign in or sign up. - fn set_user_authenticator(&self, authenticator: &Authenticator); - /// Sets the authenticator when user sign in or sign up. /// /// # Arguments /// * `authenticator`: An `Authenticator` object. - fn set_authenticator(&self, authenticator: Authenticator); + fn set_user_authenticator(&self, authenticator: &Authenticator); + + fn get_user_authenticator(&self) -> Authenticator; - /// Sets the network reachability status. + /// Sets the network reachability statset_user_authenticatorus. /// /// # Arguments /// * `reachable`: A boolean indicating whether the network is reachable. @@ -107,12 +106,6 @@ pub trait UserCloudServiceProvider: Send + Sync + 'static { /// * `secret`: A `String` representing the encryption secret. fn set_encrypt_secret(&self, secret: String); - /// Retrieves the current authenticator. - /// - /// # Returns - /// The current `Authenticator` object. - fn get_authenticator(&self) -> Authenticator; - /// Retrieves the user-specific cloud service. /// /// # Returns diff --git a/frontend/rust-lib/flowy-user/src/event_handler.rs b/frontend/rust-lib/flowy-user/src/event_handler.rs index 05ffed13f6c7..51bfc5d67ec1 100644 --- a/frontend/rust-lib/flowy-user/src/event_handler.rs +++ b/frontend/rust-lib/flowy-user/src/event_handler.rs @@ -43,8 +43,16 @@ pub async fn sign_in_with_email_password_handler( let params: SignInParams = data.into_inner().try_into()?; let auth_type = params.auth_type.clone(); - let user_profile: UserProfilePB = manager.sign_in(params, auth_type).await?.into(); - data_result_ok(user_profile) + let old_authenticator = manager.cloud_services.get_user_authenticator(); + match manager.sign_in(params, auth_type).await { + Ok(profile) => data_result_ok(UserProfilePB::from(profile)), + Err(err) => { + manager + .cloud_services + .set_user_authenticator(&old_authenticator); + return Err(err); + }, + } } #[tracing::instrument( @@ -63,10 +71,18 @@ pub async fn sign_up( ) -> DataResult { let manager = upgrade_manager(manager)?; let params: SignUpParams = data.into_inner().try_into()?; - let auth_type = params.auth_type.clone(); + let authenticator = params.auth_type.clone(); - let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?; - data_result_ok(user_profile.into()) + let old_authenticator = manager.cloud_services.get_user_authenticator(); + match manager.sign_up(authenticator, BoxAny::new(params)).await { + Ok(profile) => data_result_ok(UserProfilePB::from(profile)), + Err(err) => { + manager + .cloud_services + .set_user_authenticator(&old_authenticator); + return Err(err); + }, + } } #[tracing::instrument(level = "debug", skip(manager))] @@ -135,7 +151,6 @@ pub async fn set_appearance_setting( if setting.theme.is_empty() { setting.theme = APPEARANCE_DEFAULT_THEME.to_string(); } - store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?; Ok(()) } diff --git a/frontend/rust-lib/flowy-user/src/manager.rs b/frontend/rust-lib/flowy-user/src/manager.rs index a437beb9bfee..23568a78c427 100644 --- a/frontend/rust-lib/flowy-user/src/manager.rs +++ b/frontend/rust-lib/flowy-user/src/manager.rs @@ -287,7 +287,8 @@ impl UserManager { params: SignInParams, authenticator: Authenticator, ) -> Result { - self.update_authenticator(&authenticator).await; + self.cloud_services.set_user_authenticator(&authenticator); + let response: AuthResponse = self .cloud_services .get_user_service()? @@ -326,10 +327,6 @@ impl UserManager { Ok(user_profile) } - pub(crate) async fn update_authenticator(&self, authenticator: &Authenticator) { - self.cloud_services.set_authenticator(authenticator.clone()); - } - /// Manages the user sign-up process, potentially migrating data if necessary. /// /// This asynchronous function interacts with an external authentication service to register and sign up a user @@ -346,7 +343,7 @@ impl UserManager { // sign out the current user if there is one let migration_user = self.get_migration_user(&authenticator).await; - self.update_authenticator(&authenticator).await; + self.cloud_services.set_user_authenticator(&authenticator); let auth_service = self.cloud_services.get_user_service()?; let response: AuthResponse = auth_service.sign_up(params).await?; let new_user_profile = UserProfile::from((&response, &authenticator)); @@ -722,7 +719,7 @@ impl UserManager { authenticator: &Authenticator, email: &str, ) -> Result { - self.update_authenticator(authenticator).await; + self.cloud_services.set_user_authenticator(authenticator); let auth_service = self.cloud_services.get_user_service()?; let url = auth_service @@ -737,8 +734,8 @@ impl UserManager { oauth_provider: &str, ) -> Result { self - .update_authenticator(&Authenticator::AppFlowyCloud) - .await; + .cloud_services + .set_user_authenticator(&Authenticator::AppFlowyCloud); let auth_service = self.cloud_services.get_user_service()?; let url = auth_service .generate_oauth_url_with_provider(oauth_provider) diff --git a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs index 8d28ce849543..55ef069586d0 100644 --- a/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs +++ b/frontend/rust-lib/flowy-user/src/services/data_import/appflowy_data_import.rs @@ -135,7 +135,6 @@ pub(crate) fn import_appflowy_data_folder( &mut all_imported_object_ids, &imported_collab_by_oid, &row_object_ids, - &document_object_ids, )?; // the object ids now only contains the document collab object ids @@ -143,7 +142,7 @@ pub(crate) fn import_appflowy_data_folder( if let Some(imported_collab) = imported_collab_by_oid.get(object_id) { let new_object_id = old_to_new_id_map.lock().renew_id(object_id); document_object_ids.lock().insert(new_object_id.clone()); - tracing::debug!("import from: {}, to: {}", object_id, new_object_id,); + debug!("import from: {}, to: {}", object_id, new_object_id,); import_collab_object( imported_collab, session.user_id, @@ -184,6 +183,9 @@ pub(crate) fn import_appflowy_data_folder( collab_write_txn, )?; + document_object_ids + .lock() + .insert(import_container_view_id.clone()); let import_container_view = ViewBuilder::new(session.user_id, session.user_workspace.id.clone()) .with_view_id(import_container_view_id) @@ -191,6 +193,7 @@ pub(crate) fn import_appflowy_data_folder( .with_name(name) .with_child_views(child_views) .build(); + Ok(vec![import_container_view]) }, } @@ -256,7 +259,6 @@ fn migrate_databases<'a, W>( imported_object_ids: &mut Vec, imported_collab_by_oid: &HashMap, row_object_ids: &Mutex>, - document_object_ids: &Mutex>, ) -> Result<(), PersistenceError> where W: YrsDocAction<'a>, @@ -265,7 +267,6 @@ where // Migrate databases let mut database_object_ids = vec![]; let imported_database_row_object_ids = RwLock::new(HashSet::new()); - let imported_database_row_document_object_ids = RwLock::new(HashSet::new()); for object_id in &mut *imported_object_ids { if let Some(database_collab) = imported_collab_by_oid.get(object_id) { @@ -298,9 +299,6 @@ where row_order.id = RowId::from(new_row_id); imported_database_row_object_ids.write().insert(old_row_id); - imported_database_row_document_object_ids - .write() - .insert(old_row_document_id); }); // collect the ids @@ -310,13 +308,7 @@ where .map(|order| order.id.clone().into_inner()) .collect::>(); - let new_row_document_ids = new_row_ids - .iter() - .map(|id| database_row_document_id_from_row_id(id)) - .collect::>(); - row_object_ids.lock().extend(new_row_ids); - document_object_ids.lock().extend(new_row_document_ids); }); let new_object_id = old_to_new_id_map.lock().renew_id(object_id); @@ -333,22 +325,10 @@ where } } let imported_database_row_object_ids = imported_database_row_object_ids.read(); - let imported_database_row_document_object_ids = imported_database_row_document_object_ids.read(); - - debug!( - "imported_database_row_object_ids: {:?}", - imported_database_row_object_ids - ); - - debug!( - "imported_database_row_document_object_ids: {:?}", - imported_database_row_document_object_ids - ); // remove the database object ids from the object ids imported_object_ids.retain(|id| !database_object_ids.contains(id)); imported_object_ids.retain(|id| !imported_database_row_object_ids.contains(id)); - imported_object_ids.retain(|id| !imported_database_row_document_object_ids.contains(id)); for imported_row_id in &*imported_database_row_object_ids { if let Some(imported_collab) = imported_collab_by_oid.get(imported_row_id) { @@ -369,18 +349,15 @@ where } let imported_row_document_id = database_row_document_id_from_row_id(imported_row_id); - if let Some(imported_collab) = imported_collab_by_oid.get(&imported_row_document_id) { + if imported_collab_by_oid + .get(&imported_row_document_id) + .is_some() + { let new_row_document_id = old_to_new_id_map.lock().renew_id(&imported_row_document_id); info!( - "import database row document from: {}, to: {}", + "map row document from: {}, to: {}", imported_row_document_id, new_row_document_id, ); - import_collab_object( - imported_collab, - session.user_id, - &new_row_document_id, - collab_write_txn, - ); } } Ok(()) diff --git a/frontend/rust-lib/flowy-user/src/services/db.rs b/frontend/rust-lib/flowy-user/src/services/db.rs index 36e7c5bf36e9..61f8c6b26954 100644 --- a/frontend/rust-lib/flowy-user/src/services/db.rs +++ b/frontend/rust-lib/flowy-user/src/services/db.rs @@ -258,7 +258,9 @@ impl CollabDBZipBackup { } // Clean up old backups - self.clean_old_backups()?; + if let Err(err) = self.clean_old_backups() { + error!("Clean up old backups failed: {:?}", err); + } Ok(()) } @@ -292,7 +294,7 @@ impl CollabDBZipBackup { let path = entry.path(); if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("zip") { if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) { - if let Some(timestamp_str) = file_name.split("_").last() { + if let Some(timestamp_str) = file_name.split('_').last() { match latest_zip { Some((latest_timestamp, _)) if timestamp_str > latest_timestamp.as_str() => { latest_zip = Some((timestamp_str.to_string(), path)); @@ -343,6 +345,7 @@ impl CollabDBZipBackup { // Remove backups older than 10 days let threshold_str = threshold_date.format(zip_time_format()).to_string(); + info!("Current backup: {:?}", backups.len()); // If there are more than 10 backups, remove the oldest ones while backups.len() > 10 { if let Some((date_str, path)) = backups.first() {