Skip to content

Commit

Permalink
fix: open duplicated database (AppFlowy-IO#6197)
Browse files Browse the repository at this point in the history
* fix: open duplicated database

* chore: update logs
  • Loading branch information
appflowy authored Sep 5, 2024
1 parent 8688183 commit 7ea71fc
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ class BoardBloc extends Bloc<BoardEvent, BoardState> {
(event, emit) async {
await event.when(
initial: () async {
emit(BoardState.initial(viewId));
_startListening();
await _openDatabase(emit);
emit(BoardState.initial(viewId));
},
createRow: (groupId, position, title, targetRowId) async {
final primaryField = databaseController.fieldController.fieldInfos
Expand Down
2 changes: 1 addition & 1 deletion frontend/appflowy_flutter/macos/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ SPEC CHECKSUMS:
HotKey: e96d8a2ddbf4591131e2bb3f54e69554d90cdca6
hotkey_manager: c32bf0bfe8f934b7bc17ab4ad5c4c142960b023c
irondash_engine_context: da62996ee25616d2f01bbeb85dc115d813359478
local_notifier: e9506bc66fc70311e8bc7291fb70f743c081e4ff
local_notifier: c6c371695f914641ab7bc8601944f7e358632d0b
package_info_plus: fa739dd842b393193c5ca93c26798dff6e3d0e0c
path_provider_foundation: 3784922295ac71e43754bd15e0653ccfd36a147c
ReachabilitySwift: 7f151ff156cea1481a8411701195ac6a984f4979
Expand Down
1 change: 1 addition & 0 deletions frontend/rust-lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions frontend/rust-lib/flowy-database2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ strum = "0.25"
strum_macros = "0.25"
validator = { workspace = true, features = ["derive"] }
tokio-util.workspace = true
tokio-retry = "0.3"

[dev-dependencies]
event-integration-test = { path = "../event-integration-test", default-features = false }
Expand Down
30 changes: 21 additions & 9 deletions frontend/rust-lib/flowy-database2/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ use crate::services::cell::stringify_cell;
use crate::services::database::DatabaseEditor;
use crate::services::database_view::DatabaseLayoutDepsResolver;
use crate::services::field::translate_type_option::translate::TranslateTypeOption;
use tokio::sync::RwLock as TokioRwLock;

use crate::services::field_settings::default_field_settings_by_layout_map;
use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
use tokio::sync::RwLock as TokioRwLock;
use tokio_retry::strategy::ExponentialBackoff;
use tokio_retry::Retry;

pub trait DatabaseUser: Send + Sync {
fn user_id(&self) -> Result<i64, FlowyError>;
Expand Down Expand Up @@ -212,7 +213,7 @@ impl DatabaseManager {

#[instrument(level = "trace", skip_all, err)]
pub async fn open_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
let lock = self.workspace_database()?;
let workspace_database = self.workspace_database()?;
if let Some(database_editor) = self.removing_editor.lock().await.remove(database_id) {
self
.editors
Expand All @@ -223,12 +224,23 @@ impl DatabaseManager {
}

trace!("create database editor:{}", database_id);
let database = lock
.read()
.await
.get_or_create_database(database_id)
.await
.ok_or_else(|| FlowyError::collab_not_sync().with_context("open database error"))?;
// When the user opens the database from the left-side bar, it may fail because the workspace database
// hasn't finished syncing yet. In such cases, get_or_create_database will return None.
// The workaround is to add a retry mechanism to attempt fetching the database again.
let database = Retry::spawn(
ExponentialBackoff::from_millis(2).factor(1000).take(3),
|| async {
trace!("retry to open database:{}", database_id);
let database = workspace_database
.read()
.await
.get_or_create_database(database_id)
.await
.ok_or_else(|| FlowyError::collab_not_sync().with_context("open database error"))?;
Ok::<_, FlowyError>(database)
},
)
.await?;

let editor = DatabaseEditor::new(
self.user.clone(),
Expand Down
22 changes: 20 additions & 2 deletions frontend/rust-lib/flowy-storage-pub/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::Serialize;
use std::fmt::Display;
use std::ops::{Deref, DerefMut};
use tokio::sync::broadcast;
use tracing::error;
use tracing::warn;

#[async_trait]
pub trait StorageService: Send + Sync {
Expand Down Expand Up @@ -72,6 +72,24 @@ pub struct FileProgress {
pub error: Option<String>,
}

impl FileProgress {
pub fn new_progress(file_url: String, progress: f64) -> Self {
FileProgress {
file_url,
progress,
error: None,
}
}

pub fn new_error(file_url: String, error: String) -> Self {
FileProgress {
file_url,
progress: 0.0,
error: Some(error),
}
}
}

impl Display for FileProgress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "FileProgress: {} - {}", self.file_url, self.progress)
Expand Down Expand Up @@ -105,7 +123,7 @@ impl ProgressNotifier {
pub async fn notify(&mut self, progress: FileUploadState) {
self.current_value = Some(progress.clone());
if let Err(err) = self.tx.send(progress) {
error!("Failed to send progress notification: {:?}", err);
warn!("Failed to send progress notification: {:?}", err);
}
}
}
Expand Down
28 changes: 12 additions & 16 deletions frontend/rust-lib/flowy-storage/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,10 @@ impl StorageManager {
let mut conn = self.user_service.sqlite_connection(uid).ok()?;
let is_finish = is_upload_completed(&mut conn, &workspace_id, &parent_dir, &file_id).ok()?;

if let Err(err) = self.global_notifier.send(FileProgress {
file_url: url.to_string(),
progress: if is_finish { 1.0 } else { 0.0 },
error: None,
}) {
if let Err(err) = self.global_notifier.send(FileProgress::new_progress(
url.to_string(),
if is_finish { 1.0 } else { 0.0 },
)) {
error!("[File] send global notifier failed: {}", err);
}

Expand Down Expand Up @@ -594,11 +593,8 @@ async fn start_upload(
&upload_file.file_id,
)
.await?;
if let Err(err) = global_notifier.send(FileProgress {
file_url,
progress,
error: None,
}) {

if let Err(err) = global_notifier.send(FileProgress::new_progress(file_url, progress)) {
error!("[File] send global notifier failed: {}", err);
}

Expand Down Expand Up @@ -765,7 +761,6 @@ async fn complete_upload(
Ok(_) => {
info!("[File] completed upload file: {}", upload_file.file_id);
if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) {
info!("[File]: notify upload:{} finished", upload_file.file_id);
notifier
.notify(FileUploadState::Finished {
file_id: upload_file.file_id.clone(),
Expand All @@ -781,11 +776,12 @@ async fn complete_upload(
)
.await?;

if let Err(err) = global_notifier.send(FileProgress {
file_url,
progress: 1.0,
error: None,
}) {
let progress = FileProgress::new_progress(file_url, 1.0);
info!(
"[File]: notify upload progress:{}, {}",
upload_file.file_id, progress
);
if let Err(err) = global_notifier.send(progress) {
error!("[File] send global notifier failed: {}", err);
}

Expand Down
13 changes: 6 additions & 7 deletions frontend/rust-lib/flowy-storage/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,12 @@ impl FileUploader {
return None;
}

trace!(
"[File] Max concurrent uploads: {}, current: {}",
self.max_uploads,
self
.current_uploads
.load(std::sync::atomic::Ordering::SeqCst)
);
let current_uploads = self
.current_uploads
.load(std::sync::atomic::Ordering::SeqCst);
if current_uploads > 0 {
trace!("[File] current upload tasks: {}", current_uploads)
}

if self
.current_uploads
Expand Down

0 comments on commit 7ea71fc

Please sign in to comment.