diff --git a/frontend/appflowy_flutter/lib/startup/startup.dart b/frontend/appflowy_flutter/lib/startup/startup.dart index 60b18fd8d61d..3741380567c8 100644 --- a/frontend/appflowy_flutter/lib/startup/startup.dart +++ b/frontend/appflowy_flutter/lib/startup/startup.dart @@ -14,6 +14,7 @@ import 'deps_resolver.dart'; import 'entry_point.dart'; import 'launch_configuration.dart'; import 'plugin/plugin.dart'; +import 'tasks/file_storage_task.dart'; import 'tasks/prelude.dart'; final getIt = GetIt.instance; @@ -126,6 +127,7 @@ class FlowyRunner { InitRustSDKTask(customApplicationPath: applicationDataDirectory), // Load Plugins, like document, grid ... const PluginLoadTask(), + const FileStorageTask(), // init the app widget // ignore in test mode diff --git a/frontend/appflowy_flutter/lib/startup/tasks/file_storage_task.dart b/frontend/appflowy_flutter/lib/startup/tasks/file_storage_task.dart new file mode 100644 index 000000000000..ee794ba9003b --- /dev/null +++ b/frontend/appflowy_flutter/lib/startup/tasks/file_storage_task.dart @@ -0,0 +1,149 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:ffi'; +import 'dart:isolate'; + +import 'package:appflowy_backend/dispatch/dispatch.dart'; +import 'package:appflowy_backend/log.dart'; +import 'package:appflowy_backend/protobuf/flowy-error/errors.pb.dart'; +import 'package:appflowy_backend/protobuf/flowy-storage/protobuf.dart'; +import 'package:appflowy_result/appflowy_result.dart'; +import 'package:flutter/foundation.dart'; +import 'package:fixnum/fixnum.dart'; + +import '../startup.dart'; + +class FileStorageTask extends LaunchTask { + const FileStorageTask(); + + @override + Future initialize(LaunchContext context) async { + context.getIt.registerSingleton( + FileStorageService(), + dispose: (service) async { + await service.dispose(); + }, + ); + } + + @override + Future dispose() async {} +} + +class FileStorageService { + FileStorageService() { + _port.handler = _controller.add; + _subscription = _controller.stream.listen( + (event) { + final fileProgress = FileProgress.fromJsonString(event); + if (fileProgress != null) { + Log.debug( + "Upload progress: file: ${fileProgress.fileUrl} ${fileProgress.progress}", + ); + final notifier = _notifierList[fileProgress.fileUrl]; + if (notifier != null) { + notifier.value = fileProgress; + } + } + }, + ); + + final payload = RegisterStreamPB()..port = Int64(_port.sendPort.nativePort); + FileStorageEventRegisterStream(payload).send(); + } + + final Map> _notifierList = {}; + final RawReceivePort _port = RawReceivePort(); + final StreamController _controller = StreamController.broadcast(); + late StreamSubscription _subscription; + + AutoRemoveNotifier onFileProgress({required String fileUrl}) { + _notifierList.remove(fileUrl)?.dispose(); + + final notifier = AutoRemoveNotifier( + FileProgress(fileUrl: fileUrl, progress: 0), + notifierList: _notifierList, + fileId: fileUrl, + ); + _notifierList[fileUrl] = notifier; + + // trigger the initial file state + getFileState(fileUrl); + + return notifier; + } + + Future> getFileState(String url) { + final payload = QueryFilePB()..url = url; + return FileStorageEventQueryFile(payload).send(); + } + + Future dispose() async { + // dispose all notifiers + for (final notifier in _notifierList.values) { + notifier.dispose(); + } + + await _controller.close(); + await _subscription.cancel(); + _port.close(); + } +} + +class FileProgress { + FileProgress({ + required this.fileUrl, + required this.progress, + this.error, + }); + + static FileProgress? fromJson(Map? json) { + if (json == null) { + return null; + } + + try { + if (json.containsKey('file_url') && json.containsKey('progress')) { + return FileProgress( + fileUrl: json['file_url'] as String, + progress: (json['progress'] as num).toDouble(), + error: json['error'] as String?, + ); + } + } catch (e) { + Log.error('unable to parse file progress: $e'); + } + return null; + } + + // Method to parse a JSON string and return a FileProgress object or null + static FileProgress? fromJsonString(String jsonString) { + try { + final Map jsonMap = jsonDecode(jsonString); + return FileProgress.fromJson(jsonMap); + } catch (e) { + return null; + } + } + + final double progress; + final String fileUrl; + final String? error; +} + +class AutoRemoveNotifier extends ValueNotifier { + AutoRemoveNotifier( + super.value, { + required this.fileId, + required Map> notifierList, + }) : _notifierList = notifierList; + + final String fileId; + final Map> _notifierList; + + @override + void dispose() { + _notifierList.remove(fileId); + super.dispose(); + } +} diff --git a/frontend/appflowy_flutter/packages/appflowy_backend/lib/dispatch/dispatch.dart b/frontend/appflowy_flutter/packages/appflowy_backend/lib/dispatch/dispatch.dart index 746e95fbd8ca..552c6a268f20 100644 --- a/frontend/appflowy_flutter/packages/appflowy_backend/lib/dispatch/dispatch.dart +++ b/frontend/appflowy_flutter/packages/appflowy_backend/lib/dispatch/dispatch.dart @@ -17,6 +17,7 @@ import 'package:appflowy_backend/protobuf/flowy-folder/protobuf.dart'; import 'package:appflowy_backend/protobuf/flowy-search/protobuf.dart'; import 'package:appflowy_backend/protobuf/flowy-user/protobuf.dart'; import 'package:appflowy_backend/protobuf/flowy-ai/protobuf.dart'; +import 'package:appflowy_backend/protobuf/flowy-storage/protobuf.dart'; import 'package:appflowy_result/appflowy_result.dart'; import 'package:ffi/ffi.dart'; import 'package:isolates/isolates.dart'; @@ -38,6 +39,7 @@ part 'dart_event/flowy-config/dart_event.dart'; part 'dart_event/flowy-date/dart_event.dart'; part 'dart_event/flowy-search/dart_event.dart'; part 'dart_event/flowy-ai/dart_event.dart'; +part 'dart_event/flowy-storage/dart_event.dart'; enum FFIException { RequestIsEmpty, diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.lock b/frontend/appflowy_tauri/src-tauri/Cargo.lock index 141b73bf963b..87b923e7dc13 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.lock +++ b/frontend/appflowy_tauri/src-tauri/Cargo.lock @@ -172,7 +172,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -192,7 +192,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -837,7 +837,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "again", "anyhow", @@ -888,7 +888,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "collab-entity", "collab-rt-entity", @@ -901,7 +901,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "futures-channel", "futures-util", @@ -1149,7 +1149,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -1174,7 +1174,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "async-trait", @@ -1571,7 +1571,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -2542,6 +2542,7 @@ dependencies = [ name = "flowy-storage" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "bytes", @@ -2553,12 +2554,15 @@ dependencies = [ "flowy-notification", "flowy-sqlite", "flowy-storage-pub", + "futures-util", "fxhash", + "lib-dispatch", "lib-infra", "mime_guess", "protobuf", "serde", "serde_json", + "strum_macros 0.25.2", "tokio", "tracing", "url", @@ -3117,7 +3121,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "futures-util", @@ -3134,7 +3138,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -3566,7 +3570,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -3857,6 +3861,7 @@ dependencies = [ "chrono", "futures", "futures-core", + "futures-util", "md5", "pin-project", "tempfile", @@ -6169,7 +6174,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", diff --git a/frontend/appflowy_tauri/src-tauri/Cargo.toml b/frontend/appflowy_tauri/src-tauri/Cargo.toml index 73cb2c466bc0..0e1560190bb5 100644 --- a/frontend/appflowy_tauri/src-tauri/Cargo.toml +++ b/frontend/appflowy_tauri/src-tauri/Cargo.toml @@ -53,7 +53,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72652086f5dfb6f22c67616f8873794cb5f28e1a" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" } [dependencies] serde_json.workspace = true diff --git a/frontend/appflowy_tauri/src/services/backend/index.ts b/frontend/appflowy_tauri/src/services/backend/index.ts index 1cc17c1e1bff..3e02ff7183a7 100644 --- a/frontend/appflowy_tauri/src/services/backend/index.ts +++ b/frontend/appflowy_tauri/src/services/backend/index.ts @@ -6,3 +6,4 @@ export * from "./models/flowy-error"; export * from "./models/flowy-config"; export * from "./models/flowy-date"; export * from "./models/flowy-search"; +export * from "./models/flowy-storage"; diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.lock b/frontend/appflowy_web_app/src-tauri/Cargo.lock index 12dd07429e96..d572c25cfb46 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.lock +++ b/frontend/appflowy_web_app/src-tauri/Cargo.lock @@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -183,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -811,7 +811,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "again", "anyhow", @@ -862,7 +862,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "collab-entity", "collab-rt-entity", @@ -875,7 +875,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "futures-channel", "futures-util", @@ -1132,7 +1132,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -1157,7 +1157,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "async-trait", @@ -1561,7 +1561,7 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -2572,6 +2572,7 @@ dependencies = [ name = "flowy-storage" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "bytes", @@ -2583,12 +2584,15 @@ dependencies = [ "flowy-notification", "flowy-sqlite", "flowy-storage-pub", + "futures-util", "fxhash", + "lib-dispatch", "lib-infra", "mime_guess", "protobuf", "serde", "serde_json", + "strum_macros 0.25.3", "tokio", "tracing", "url", @@ -3184,7 +3188,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "futures-util", @@ -3201,7 +3205,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -3638,7 +3642,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -3934,6 +3938,7 @@ dependencies = [ "chrono", "futures", "futures-core", + "futures-util", "md5", "pin-project", "tempfile", @@ -6233,7 +6238,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", diff --git a/frontend/appflowy_web_app/src-tauri/Cargo.toml b/frontend/appflowy_web_app/src-tauri/Cargo.toml index c58a87ca13d6..24bfc9d359a9 100644 --- a/frontend/appflowy_web_app/src-tauri/Cargo.toml +++ b/frontend/appflowy_web_app/src-tauri/Cargo.toml @@ -52,7 +52,7 @@ collab-user = { version = "0.2" } # Run the script: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72652086f5dfb6f22c67616f8873794cb5f28e1a" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" } [dependencies] serde_json.workspace = true diff --git a/frontend/appflowy_web_app/src/application/services/tauri-services/backend/index.ts b/frontend/appflowy_web_app/src/application/services/tauri-services/backend/index.ts index 38a126a402df..79bcb7085035 100644 --- a/frontend/appflowy_web_app/src/application/services/tauri-services/backend/index.ts +++ b/frontend/appflowy_web_app/src/application/services/tauri-services/backend/index.ts @@ -4,4 +4,5 @@ export * from "./models/flowy-folder"; export * from "./models/flowy-document"; export * from "./models/flowy-error"; export * from "./models/flowy-config"; -export * from "./models/flowy-date"; \ No newline at end of file +export * from "./models/flowy-date"; +export * from "./models/flowy-storage"; diff --git a/frontend/rust-lib/Cargo.lock b/frontend/rust-lib/Cargo.lock index dfc5249c780b..4d35e97c45e9 100644 --- a/frontend/rust-lib/Cargo.lock +++ b/frontend/rust-lib/Cargo.lock @@ -163,7 +163,7 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "app-error" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -183,7 +183,7 @@ dependencies = [ [[package]] name = "appflowy-ai-client" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -729,7 +729,7 @@ dependencies = [ [[package]] name = "client-api" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "again", "anyhow", @@ -780,7 +780,7 @@ dependencies = [ [[package]] name = "client-api-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "collab-entity", "collab-rt-entity", @@ -793,7 +793,7 @@ dependencies = [ [[package]] name = "client-websocket" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "futures-channel", "futures-util", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "collab-rt-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bincode", @@ -1035,7 +1035,7 @@ dependencies = [ [[package]] name = "collab-rt-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "async-trait", @@ -1281,7 +1281,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.8.0", + "phf 0.11.2", "smallvec", ] @@ -1395,7 +1395,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "database-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -2378,6 +2378,7 @@ dependencies = [ name = "flowy-storage" version = "0.1.0" dependencies = [ + "allo-isolate", "anyhow", "async-trait", "bytes", @@ -2389,13 +2390,16 @@ dependencies = [ "flowy-notification", "flowy-sqlite", "flowy-storage-pub", + "futures-util", "fxhash", + "lib-dispatch", "lib-infra", "mime_guess", "protobuf", "rand 0.8.5", "serde", "serde_json", + "strum_macros 0.25.2", "tokio", "tracing", "url", @@ -2795,7 +2799,7 @@ dependencies = [ [[package]] name = "gotrue" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "futures-util", @@ -2812,7 +2816,7 @@ dependencies = [ [[package]] name = "gotrue-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", @@ -3177,7 +3181,7 @@ dependencies = [ [[package]] name = "infra" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "bytes", @@ -3370,6 +3374,7 @@ dependencies = [ "chrono", "futures", "futures-core", + "futures-util", "md5", "pin-project", "rand 0.8.5", @@ -4138,7 +4143,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" dependencies = [ - "phf_macros", + "phf_macros 0.8.0", "phf_shared 0.8.0", "proc-macro-hack", ] @@ -4158,6 +4163,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ + "phf_macros 0.11.2", "phf_shared 0.11.2", ] @@ -4225,6 +4231,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "phf_macros" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3444646e286606587e49f3bcf1679b8cef1dc2c5ecc29ddacaffc305180d464b" +dependencies = [ + "phf_generator 0.11.2", + "phf_shared 0.11.2", + "proc-macro2", + "quote", + "syn 2.0.47", +] + [[package]] name = "phf_shared" version = "0.8.0" @@ -5377,7 +5396,7 @@ dependencies = [ [[package]] name = "shared-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=72652086f5dfb6f22c67616f8873794cb5f28e1a#72652086f5dfb6f22c67616f8873794cb5f28e1a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Cloud?rev=5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755#5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" dependencies = [ "anyhow", "app-error", diff --git a/frontend/rust-lib/Cargo.toml b/frontend/rust-lib/Cargo.toml index 0c85f38847ad..efc0b5a76c93 100644 --- a/frontend/rust-lib/Cargo.toml +++ b/frontend/rust-lib/Cargo.toml @@ -100,8 +100,8 @@ dashmap = "6.0.1" # Run the script.add_workspace_members: # scripts/tool/update_client_api_rev.sh new_rev_id # ⚠️⚠️⚠️️ -client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72652086f5dfb6f22c67616f8873794cb5f28e1a" } -client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "72652086f5dfb6f22c67616f8873794cb5f28e1a" } +client-api = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" } +client-api-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Cloud", rev = "5b5e561afbb161e25d77f3ae5f2ae50ffa2b1755" } [profile.dev] opt-level = 0 diff --git a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs index 7376ff238edd..87e36deac62a 100644 --- a/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs +++ b/frontend/rust-lib/event-integration-test/tests/document/af_cloud_test/file_upload_test.rs @@ -29,7 +29,7 @@ async fn af_cloud_upload_big_file_test() { .unwrap(); let mut rx = rx.unwrap(); - while let Some(state) = rx.recv().await { + while let Ok(state) = rx.recv().await { if let FileUploadState::Uploading { progress } = state { if progress > 0.1 { break; @@ -52,7 +52,7 @@ async fn af_cloud_upload_big_file_test() { .unwrap() { let timeout_duration = Duration::from_secs(180); - while let Some(state) = match timeout(timeout_duration, rx.recv()).await { + while let Ok(state) = match timeout(timeout_duration, rx.recv()).await { Ok(result) => result, Err(_) => { panic!("Timed out waiting for file upload completion"); @@ -114,7 +114,7 @@ async fn af_cloud_upload_6_files_test() { .await .retain(|upload| upload.file_id != file_id); } - while let Some(value) = receiver.recv().await { + while let Ok(value) = receiver.recv().await { if let FileUploadState::Finished { file_id } = value { cloned_uploads .lock() diff --git a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs index d21cc981a55c..1bed5471d4b7 100644 --- a/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs +++ b/frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs @@ -91,6 +91,15 @@ impl StorageCloudService for ServerProvider { .await } + async fn parse_object_url_v1(&self, url: &str) -> Option<(String, String, String)> { + self + .get_server() + .ok()? + .file_storage()? + .parse_object_url_v1(url) + .await + } + async fn create_upload( &self, workspace_id: &str, diff --git a/frontend/rust-lib/flowy-core/src/lib.rs b/frontend/rust-lib/flowy-core/src/lib.rs index db6a5d0ce8ed..182b2b7918f3 100644 --- a/frontend/rust-lib/flowy-core/src/lib.rs +++ b/frontend/rust-lib/flowy-core/src/lib.rs @@ -271,6 +271,7 @@ impl AppFlowyCore { Arc::downgrade(&document_manager), Arc::downgrade(&search_manager), Arc::downgrade(&ai_manager), + Arc::downgrade(&storage_manager), ), )); diff --git a/frontend/rust-lib/flowy-core/src/module.rs b/frontend/rust-lib/flowy-core/src/module.rs index 2c75cb376ec6..196fbce93543 100644 --- a/frontend/rust-lib/flowy-core/src/module.rs +++ b/frontend/rust-lib/flowy-core/src/module.rs @@ -5,6 +5,7 @@ use flowy_database2::DatabaseManager; use flowy_document::manager::DocumentManager as DocumentManager2; use flowy_folder::manager::FolderManager; use flowy_search::services::manager::SearchManager; +use flowy_storage::manager::StorageManager; use flowy_user::user_manager::UserManager; use lib_dispatch::prelude::AFPlugin; @@ -15,6 +16,7 @@ pub fn make_plugins( document_manager2: Weak, search_manager: Weak, ai_manager: Weak, + file_storage_manager: Weak, ) -> Vec { let store_preferences = user_session .upgrade() @@ -28,6 +30,7 @@ pub fn make_plugins( let date_plugin = flowy_date::event_map::init(); let search_plugin = flowy_search::event_map::init(search_manager); let ai_plugin = flowy_ai::event_map::init(ai_manager); + let file_storage_plugin = flowy_storage::event_map::init(file_storage_manager); vec![ user_plugin, folder_plugin, @@ -37,5 +40,6 @@ pub fn make_plugins( date_plugin, search_plugin, ai_plugin, + file_storage_plugin, ] } diff --git a/frontend/rust-lib/flowy-database2/src/manager.rs b/frontend/rust-lib/flowy-database2/src/manager.rs index 5280e07e4279..2119db8cfd5a 100644 --- a/frontend/rust-lib/flowy-database2/src/manager.rs +++ b/frontend/rust-lib/flowy-database2/src/manager.rs @@ -1,7 +1,6 @@ use anyhow::anyhow; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use std::borrow::BorrowMut; use std::collections::HashMap; use std::sync::{Arc, Weak}; @@ -129,42 +128,6 @@ impl DatabaseManager { Ok(()) } - //FIXME: we need to initialize sync plugin for newly created collabs - #[allow(dead_code)] - fn initialize_plugins( - &self, - uid: i64, - object_id: &str, - collab_type: CollabType, - collab: Arc>, - ) -> FlowyResult>> - where - T: BorrowMut + Send + Sync + 'static, - { - //FIXME: unfortunately UserDatabaseCollabService::build_collab_with_config is broken by - // design as it assumes that we can split collab building process, which we cannot because: - // 1. We should not be able to run plugins ie. SyncPlugin over not-fully initialized collab, - // and that's what originally build_collab_with_config did. - // 2. We cannot fully initialize collab from UserDatabaseCollabService, because - // WorkspaceDatabase itself requires UserDatabaseCollabService as constructor parameter. - // Ideally we should never need to initialize plugins that require collab instance as part of - // that collab construction process itself - it means that we should redesign SyncPlugin to only - // be fired once a collab is fully initialized. - let workspace_id = self - .user - .workspace_id() - .map_err(|err| DatabaseError::Internal(err.into()))?; - let object = self - .collab_builder - .collab_object(&workspace_id, uid, object_id, collab_type)?; - let collab = self.collab_builder.finalize( - object, - CollabBuilderConfig::default().sync_enable(true), - collab, - )?; - Ok(collab) - } - #[instrument( name = "database_initialize_with_new_user", level = "debug", diff --git a/frontend/rust-lib/flowy-database2/src/services/database/database_observe.rs b/frontend/rust-lib/flowy-database2/src/services/database/database_observe.rs index 038f5a12cef2..270bdfb65d63 100644 --- a/frontend/rust-lib/flowy-database2/src/services/database/database_observe.rs +++ b/frontend/rust-lib/flowy-database2/src/services/database/database_observe.rs @@ -33,7 +33,6 @@ pub(crate) async fn observe_sync_state(database_id: &str, database: &Arc>, diff --git a/frontend/rust-lib/flowy-server/src/af_cloud/impls/file_storage.rs b/frontend/rust-lib/flowy-server/src/af_cloud/impls/file_storage.rs index d1ecd224494c..552ee82c121a 100644 --- a/frontend/rust-lib/flowy-server/src/af_cloud/impls/file_storage.rs +++ b/frontend/rust-lib/flowy-server/src/af_cloud/impls/file_storage.rs @@ -59,6 +59,11 @@ where Ok(url) } + async fn parse_object_url_v1(&self, url: &str) -> Option<(String, String, String)> { + let value = self.0.try_get_client().ok()?.parse_blob_url_v1(url)?; + Some(value) + } + async fn create_upload( &self, workspace_id: &str, diff --git a/frontend/rust-lib/flowy-storage-pub/src/cloud.rs b/frontend/rust-lib/flowy-storage-pub/src/cloud.rs index 7fb40c09fb27..84a109683396 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/cloud.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/cloud.rs @@ -52,6 +52,8 @@ pub trait StorageCloudService: Send + Sync { file_id: &str, ) -> FlowyResult; + async fn parse_object_url_v1(&self, url: &str) -> Option<(String, String, String)>; + async fn create_upload( &self, workspace_id: &str, diff --git a/frontend/rust-lib/flowy-storage-pub/src/storage.rs b/frontend/rust-lib/flowy-storage-pub/src/storage.rs index 12124504b91f..e97b0cc65deb 100644 --- a/frontend/rust-lib/flowy-storage-pub/src/storage.rs +++ b/frontend/rust-lib/flowy-storage-pub/src/storage.rs @@ -3,8 +3,10 @@ use async_trait::async_trait; pub use client_api_entity::{CompletedPartRequest, CreateUploadResponse, UploadPartResponse}; use flowy_error::{FlowyError, FlowyResult}; use lib_infra::box_any::BoxAny; +use serde::Serialize; +use std::fmt::Display; use std::ops::{Deref, DerefMut}; -use tokio::sync::mpsc; +use tokio::sync::broadcast; use tracing::error; #[async_trait] @@ -38,12 +40,12 @@ pub trait StorageService: Send + Sync { } pub struct FileProgressReceiver { - pub rx: mpsc::Receiver, + pub rx: broadcast::Receiver, pub file_id: String, } impl Deref for FileProgressReceiver { - type Target = mpsc::Receiver; + type Target = broadcast::Receiver; fn deref(&self) -> &Self::Target { &self.rx @@ -63,31 +65,46 @@ pub enum FileUploadState { Finished { file_id: String }, } +#[derive(Clone, Debug, Serialize)] +pub struct FileProgress { + pub file_url: String, + pub progress: f64, + pub error: Option, +} + +impl Display for FileProgress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "FileProgress: {} - {}", self.file_url, self.progress) + } +} + #[derive(Debug)] pub struct ProgressNotifier { - tx: mpsc::Sender, + file_id: String, + tx: broadcast::Sender, pub current_value: Option, } impl ProgressNotifier { - pub fn new() -> (Self, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(5); - ( - ProgressNotifier { - tx, - current_value: None, - }, - rx, - ) + pub fn new(file_id: String) -> Self { + let (tx, _) = broadcast::channel(100); + ProgressNotifier { + file_id, + tx, + current_value: None, + } + } + + pub fn subscribe(&self) -> FileProgressReceiver { + FileProgressReceiver { + rx: self.tx.subscribe(), + file_id: self.file_id.clone(), + } } pub async fn notify(&mut self, progress: FileUploadState) { self.current_value = Some(progress.clone()); - // if self.tx.reserve().await.is_err() { - // return; - // } - - if let Err(err) = self.tx.send(progress).await { + if let Err(err) = self.tx.send(progress) { error!("Failed to send progress notification: {:?}", err); } } diff --git a/frontend/rust-lib/flowy-storage/Cargo.toml b/frontend/rust-lib/flowy-storage/Cargo.toml index d5a0ae0ff208..ba451fb5adb1 100644 --- a/frontend/rust-lib/flowy-storage/Cargo.toml +++ b/frontend/rust-lib/flowy-storage/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +lib-dispatch = { workspace = true } flowy-storage-pub.workspace = true serde_json.workspace = true serde.workspace = true @@ -23,6 +24,9 @@ flowy-notification = { workspace = true } flowy-derive.workspace = true protobuf = { workspace = true } dashmap.workspace = true +strum_macros = "0.25.2" +allo-isolate = { version = "^0.1", features = ["catch-unwind"] } +futures-util = "0.3.30" [dev-dependencies] tokio = { workspace = true, features = ["full"] } diff --git a/frontend/rust-lib/flowy-storage/Flowy.toml b/frontend/rust-lib/flowy-storage/Flowy.toml index f2e1792f46db..b0097e98d2be 100644 --- a/frontend/rust-lib/flowy-storage/Flowy.toml +++ b/frontend/rust-lib/flowy-storage/Flowy.toml @@ -1,2 +1,3 @@ # Check out the FlowyConfig (located in flowy_toml.rs) for more details. -proto_input = ["src/notification.rs"] \ No newline at end of file +proto_input = ["src/entities.rs", "src/event_map.rs", "src/notification.rs"] +event_files = ["src/event_map.rs"] diff --git a/frontend/rust-lib/flowy-storage/src/entities.rs b/frontend/rust-lib/flowy-storage/src/entities.rs new file mode 100644 index 000000000000..1decf5bc665d --- /dev/null +++ b/frontend/rust-lib/flowy-storage/src/entities.rs @@ -0,0 +1,22 @@ +use flowy_derive::ProtoBuf; + +#[derive(Default, ProtoBuf, Clone, Debug)] +pub struct RegisterStreamPB { + #[pb(index = 1)] + pub port: i64, +} + +#[derive(Default, ProtoBuf, Clone, Debug)] +pub struct QueryFilePB { + #[pb(index = 1)] + pub url: String, +} + +#[derive(Default, ProtoBuf, Clone, Debug)] +pub struct FileStatePB { + #[pb(index = 1)] + pub file_id: String, + + #[pb(index = 2)] + pub is_finish: bool, +} diff --git a/frontend/rust-lib/flowy-storage/src/event_handler.rs b/frontend/rust-lib/flowy-storage/src/event_handler.rs new file mode 100644 index 000000000000..8b34918e6b55 --- /dev/null +++ b/frontend/rust-lib/flowy-storage/src/event_handler.rs @@ -0,0 +1,38 @@ +use crate::entities::{FileStatePB, QueryFilePB, RegisterStreamPB}; +use crate::manager::StorageManager; +use flowy_error::{FlowyError, FlowyResult}; +use lib_dispatch::prelude::{data_result_ok, AFPluginData, AFPluginState, DataResult}; +use std::sync::{Arc, Weak}; + +fn upgrade_storage_manager( + ai_manager: AFPluginState>, +) -> FlowyResult> { + let manager = ai_manager + .upgrade() + .ok_or(FlowyError::internal().with_context("The storage manager is already dropped"))?; + Ok(manager) +} + +#[tracing::instrument(level = "debug", skip_all, err)] +pub(crate) async fn register_stream_handler( + data: AFPluginData, + storage_manager: AFPluginState>, +) -> Result<(), FlowyError> { + let manager = upgrade_storage_manager(storage_manager)?; + let data = data.into_inner(); + manager.register_file_progress_stream(data.port).await; + Ok(()) +} + +#[tracing::instrument(level = "debug", skip_all, err)] +pub(crate) async fn query_file_handler( + data: AFPluginData, + storage_manager: AFPluginState>, +) -> DataResult { + let manager = upgrade_storage_manager(storage_manager)?; + let data = data.into_inner(); + let pb = manager.query_file_state(&data.url).await.ok_or_else(|| { + FlowyError::record_not_found().with_context(format!("File not found: {}", data.url)) + })?; + data_result_ok(pb) +} diff --git a/frontend/rust-lib/flowy-storage/src/event_map.rs b/frontend/rust-lib/flowy-storage/src/event_map.rs new file mode 100644 index 000000000000..0509f268849b --- /dev/null +++ b/frontend/rust-lib/flowy-storage/src/event_map.rs @@ -0,0 +1,25 @@ +use crate::event_handler::{query_file_handler, register_stream_handler}; +use crate::manager::StorageManager; +use flowy_derive::{Flowy_Event, ProtoBuf_Enum}; +use lib_dispatch::prelude::*; +use std::sync::Weak; +use strum_macros::Display; + +pub fn init(manager: Weak) -> AFPlugin { + AFPlugin::new() + .name("file-storage") + .state(manager) + .event(FileStorageEvent::RegisterStream, register_stream_handler) + .event(FileStorageEvent::QueryFile, query_file_handler) +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)] +#[event_err = "FlowyError"] +pub enum FileStorageEvent { + /// Create a new workspace + #[event(input = "RegisterStreamPB")] + RegisterStream = 0, + + #[event(input = "QueryFilePB", output = "FileStatePB")] + QueryFile = 1, +} diff --git a/frontend/rust-lib/flowy-storage/src/lib.rs b/frontend/rust-lib/flowy-storage/src/lib.rs index 3d5e3033d396..ca0b5d05ce8b 100644 --- a/frontend/rust-lib/flowy-storage/src/lib.rs +++ b/frontend/rust-lib/flowy-storage/src/lib.rs @@ -1,3 +1,6 @@ +mod entities; +mod event_handler; +pub mod event_map; mod file_cache; pub mod manager; mod notification; diff --git a/frontend/rust-lib/flowy-storage/src/manager.rs b/frontend/rust-lib/flowy-storage/src/manager.rs index fb4ff4a57a2d..9a13b5c2d077 100644 --- a/frontend/rust-lib/flowy-storage/src/manager.rs +++ b/frontend/rust-lib/flowy-storage/src/manager.rs @@ -1,3 +1,4 @@ +use crate::entities::FileStatePB; use crate::file_cache::FileTempStorage; use crate::notification::{make_notification, StorageNotification}; use crate::sqlite_sql::{ @@ -6,6 +7,7 @@ use crate::sqlite_sql::{ update_upload_file_upload_id, UploadFilePartTable, UploadFileTable, }; use crate::uploader::{FileUploader, FileUploaderRunner, Signal, UploadTask, UploadTaskQueue}; +use allo_isolate::Isolate; use async_trait::async_trait; use dashmap::DashMap; use flowy_error::{ErrorCode, FlowyError, FlowyResult}; @@ -13,10 +15,11 @@ use flowy_sqlite::DBConnection; use flowy_storage_pub::chunked_byte::{ChunkedBytes, MIN_CHUNK_SIZE}; use flowy_storage_pub::cloud::{ObjectIdentity, ObjectValue, StorageCloudService}; use flowy_storage_pub::storage::{ - CompletedPartRequest, CreatedUpload, FileProgressReceiver, FileUploadState, ProgressNotifier, - StorageService, UploadPartResponse, + CompletedPartRequest, CreatedUpload, FileProgress, FileProgressReceiver, FileUploadState, + ProgressNotifier, StorageService, UploadPartResponse, }; use lib_infra::box_any::BoxAny; +use lib_infra::isolate_stream::{IsolateSink, SinkExt}; use lib_infra::util::timestamp; use std::io::ErrorKind; use std::path::{Path, PathBuf}; @@ -24,7 +27,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::watch; +use tokio::sync::{broadcast, watch}; use tracing::{debug, error, info, instrument, trace}; pub trait StorageUserService: Send + Sync + 'static { @@ -34,10 +37,14 @@ pub trait StorageUserService: Send + Sync + 'static { fn get_application_root_dir(&self) -> &str; } +type GlobalNotifier = broadcast::Sender; pub struct StorageManager { pub storage_service: Arc, + cloud_service: Arc, + user_service: Arc, uploader: Arc, progress_notifiers: Arc>, + global_notifier: GlobalNotifier, } impl Drop for StorageManager { @@ -56,17 +63,19 @@ impl StorageManager { "{}/cache_files", user_service.get_application_root_dir() )); + let (global_notifier, _) = broadcast::channel(1000); let temp_storage = Arc::new(FileTempStorage::new(temp_storage_path)); let (notifier, notifier_rx) = watch::channel(Signal::Proceed); let task_queue = Arc::new(UploadTaskQueue::new(notifier)); let progress_notifiers = Arc::new(DashMap::new()); let storage_service = Arc::new(StorageServiceImpl { - cloud_service, + cloud_service: cloud_service.clone(), user_service: user_service.clone(), temp_storage, task_queue: task_queue.clone(), is_exceed_storage_limit: is_exceed_storage_limit.clone(), progress_notifiers: progress_notifiers.clone(), + global_notifier: global_notifier.clone(), }); let uploader = Arc::new(FileUploader::new( @@ -80,11 +89,12 @@ impl StorageManager { )); let weak_uploader = Arc::downgrade(&uploader); + let cloned_user_service = user_service.clone(); tokio::spawn(async move { // Start uploading after 20 seconds tokio::time::sleep(Duration::from_secs(20)).await; if let Some(uploader) = weak_uploader.upgrade() { - if let Err(err) = prepare_upload_task(uploader, user_service).await { + if let Err(err) = prepare_upload_task(uploader, cloned_user_service).await { error!("prepare upload task failed: {}", err); } } @@ -92,11 +102,51 @@ impl StorageManager { Self { storage_service, + cloud_service, + user_service, uploader, progress_notifiers, + global_notifier, } } + pub async fn register_file_progress_stream(&self, port: i64) { + info!("register file progress stream: {}", port); + let mut sink = IsolateSink::new(Isolate::new(port)); + let mut rx = self.global_notifier.subscribe(); + tokio::spawn(async move { + while let Ok(progress) = rx.recv().await { + if let Ok(s) = serde_json::to_string(&progress) { + if let Err(err) = sink.send(s).await { + error!("[File]: send file progress failed: {}", err); + } + } + } + }); + } + + pub async fn query_file_state(&self, url: &str) -> Option { + let (workspace_id, parent_dir, file_id) = self.cloud_service.parse_object_url_v1(url).await?; + let current_workspace_id = self.user_service.workspace_id().ok()?; + if workspace_id != current_workspace_id { + return None; + } + + let uid = self.user_service.user_id().ok()?; + let mut conn = self.user_service.sqlite_connection(uid).ok()?; + let is_finish = is_upload_completed(&mut conn, &workspace_id, &parent_dir, &file_id).ok()?; + + if let Err(err) = self.global_notifier.send(FileProgress { + file_url: url.to_string(), + progress: if is_finish { 1.0 } else { 0.0 }, + error: None, + }) { + error!("[File] send global notifier failed: {}", err); + } + + Some(FileStatePB { file_id, is_finish }) + } + pub async fn initialize(&self, _workspace_id: &str) { self.enable_storage_write_access(); } @@ -167,6 +217,7 @@ pub struct StorageServiceImpl { task_queue: Arc, is_exceed_storage_limit: Arc, progress_notifiers: Arc>, + global_notifier: GlobalNotifier, } #[async_trait] @@ -299,15 +350,11 @@ impl StorageService for StorageServiceImpl { .await; } - let (notifier, receiver) = ProgressNotifier::new(); - let receiver = FileProgressReceiver { - rx: receiver, - file_id: file_id.to_string(), - }; + let notifier = ProgressNotifier::new(file_id.to_string()); + let receiver = notifier.subscribe(); self .progress_notifiers .insert(file_id.to_string(), notifier); - Ok::<_, FlowyError>((CreatedUpload { url, file_id }, Some(receiver))) }, Err(err) => { @@ -333,6 +380,7 @@ impl StorageService for StorageServiceImpl { chunks, file_record, self.progress_notifiers.clone(), + self.global_notifier.clone(), ) .await { @@ -359,6 +407,7 @@ impl StorageService for StorageServiceImpl { &self.temp_storage, upload_file, self.progress_notifiers.clone(), + self.global_notifier.clone(), ) .await?; } else { @@ -381,20 +430,16 @@ impl StorageService for StorageServiceImpl { let workspace_id = self.user_service.workspace_id()?; is_upload_completed(&mut conn, &workspace_id, parent_idr, file_id).unwrap_or(false) }; + if is_completed { return Ok(None); } - let (notifier, receiver) = ProgressNotifier::new(); - let receiver = FileProgressReceiver { - rx: receiver, - file_id: file_id.to_string(), - }; - - self + let notifier = self .progress_notifiers - .insert(file_id.to_string(), notifier); - Ok(Some(receiver)) + .entry(file_id.to_string()) + .or_insert_with(|| ProgressNotifier::new(file_id.to_string())); + Ok(Some(notifier.subscribe())) } } @@ -437,6 +482,7 @@ async fn start_upload( mut chunked_bytes: ChunkedBytes, upload_file: &UploadFileTable, progress_notifiers: Arc>, + global_notifier: GlobalNotifier, ) -> FlowyResult<()> { // 4. gather existing completed parts let mut conn = user_service.sqlite_connection(user_service.user_id()?)?; @@ -540,6 +586,22 @@ async fn start_upload( upload_file.file_id, progress ); + + let file_url = cloud_service + .get_object_url_v1( + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.file_id, + ) + .await?; + if let Err(err) = global_notifier.send(FileProgress { + file_url, + progress, + error: None, + }) { + error!("[File] send global notifier failed: {}", err); + } + if let Some(mut notifier) = progress_notifiers.get_mut(&upload_file.file_id) { notifier .notify(FileUploadState::Uploading { progress }) @@ -573,6 +635,7 @@ async fn start_upload( &upload_file, completed_parts, &progress_notifiers, + &global_notifier, ) .await; if let Err(err) = complete_upload_result { @@ -594,6 +657,7 @@ async fn resume_upload( temp_storage: &Arc, upload_file: UploadFileTable, progress_notifiers: Arc>, + global_notifier: GlobalNotifier, ) -> FlowyResult<()> { trace!( "[File] resume upload for workspace: {}, parent_dir: {}, file_id: {}, local_file_path:{}", @@ -613,6 +677,7 @@ async fn resume_upload( chunked_bytes, &upload_file, progress_notifiers, + global_notifier, ) .await?; }, @@ -680,6 +745,7 @@ async fn complete_upload( upload_file: &UploadFileTable, parts: Vec, progress_notifiers: &Arc>, + global_notifier: &GlobalNotifier, ) -> Result<(), FlowyError> { trace!( "[File]: completing file upload: {}, num parts: {}", @@ -707,6 +773,22 @@ async fn complete_upload( .await; } + let file_url = cloud_service + .get_object_url_v1( + &upload_file.workspace_id, + &upload_file.parent_dir, + &upload_file.file_id, + ) + .await?; + + if let Err(err) = global_notifier.send(FileProgress { + file_url, + progress: 1.0, + error: None, + }) { + error!("[File] send global notifier failed: {}", err); + } + let conn = user_service.sqlite_connection(user_service.user_id()?)?; update_upload_file_completed(conn, &upload_file.upload_id)?; if let Err(err) = temp_storage diff --git a/frontend/rust-lib/lib-infra/Cargo.toml b/frontend/rust-lib/lib-infra/Cargo.toml index 416df9b04d4a..f07a6156213b 100644 --- a/frontend/rust-lib/lib-infra/Cargo.toml +++ b/frontend/rust-lib/lib-infra/Cargo.toml @@ -24,6 +24,7 @@ atomic_refcell = "0.1" allo-isolate = { version = "^0.1", features = ["catch-unwind"], optional = true } futures = "0.3.30" cfg-if = "1.0.0" +futures-util = "0.3.30" [dev-dependencies] rand = "0.8.5" diff --git a/frontend/rust-lib/lib-infra/src/isolate_stream.rs b/frontend/rust-lib/lib-infra/src/isolate_stream.rs index 3f2be5477bb5..358214e98550 100644 --- a/frontend/rust-lib/lib-infra/src/isolate_stream.rs +++ b/frontend/rust-lib/lib-infra/src/isolate_stream.rs @@ -1,6 +1,7 @@ use allo_isolate::{IntoDart, Isolate}; use anyhow::anyhow; use futures::Sink; +pub use futures_util::sink::SinkExt; use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll};