Skip to content

Commit

Permalink
fix: local ai enable/disable (AppFlowy-IO#6151)
Browse files Browse the repository at this point in the history
* chore: local ai state

* chore: fix local ai state when switch workspace

* chore: fix test
  • Loading branch information
appflowy authored Sep 1, 2024
1 parent 6ce77ae commit 1cc41c1
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flutter_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ jobs:
- name: Run Docker-Compose
working-directory: AppFlowy-Cloud
env:
BACKEND_VERSION: 0.6.4-amd64
APPFLOWY_CLOUD_VERSION: 0.6.4-amd64
run: |
if [ "$(docker ps --filter name=appflowy-cloud -q)" == "" ]; then
docker compose pull
Expand Down
27 changes: 26 additions & 1 deletion .github/workflows/rust_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ jobs:
sed -i '' 's|RUST_LOG=.*|RUST_LOG=trace|' .env
sed -i '' 's|API_EXTERNAL_URL=.*|API_EXTERNAL_URL=http://localhost|' .env
- name: Ensure AppFlowy-Cloud is Running with Correct Version
working-directory: AppFlowy-Cloud
env:
APPFLOWY_CLOUD_VERSION: 0.6.4-amd64
run: |
container_id=$(docker ps --filter name=appflowy-cloud-appflowy_cloud-1 -q)
if [ -z "$container_id" ]; then
echo "AppFlowy-Cloud container is not running. Pulling and starting the container..."
docker compose pull
docker compose up -d
echo "Waiting for the container to be ready..."
sleep 10
else
running_image=$(docker inspect --format='{{index .Config.Image}}' "$container_id")
if [ "$running_image" != "appflowy-cloud:$APPFLOWY_CLOUD_VERSION" ]; then
echo "AppFlowy-Cloud is running with an incorrect version. Pulling the correct version..."
docker compose pull
docker compose up -d
echo "Waiting for the container to be ready..."
sleep 10
else
echo "AppFlowy-Cloud is running with the correct version."
fi
fi
- name: Run rust-lib tests
working-directory: frontend/rust-lib
env:
Expand Down Expand Up @@ -107,7 +132,7 @@ jobs:
- name: Run Docker-Compose
working-directory: AppFlowy-Cloud
env:
BACKEND_VERSION: 0.6.4-amd64
APPFLOWY_CLOUD_VERSION: 0.6.4-amd64
run: |
if [ "$(docker ps --filter name=appflowy-cloud -q)" == "" ]; then
docker compose pull
Expand Down
6 changes: 6 additions & 0 deletions frontend/rust-lib/flowy-ai-pub/src/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub use client_api::entity::ai_dto::{
AppFlowyOfflineAI, CompletionType, CreateTextChatContext, LLMModel, LocalAIConfig, ModelInfo,
RelatedQuestion, RepeatedRelatedQuestion, StringOrMessage,
};
pub use client_api::entity::billing_dto::SubscriptionPlan;
pub use client_api::entity::{
ChatAuthorType, ChatMessage, ChatMessageMetadata, ChatMessageType, ChatMetadataContentType,
ChatMetadataData, MessageCursor, QAChatMessage, QuestionStreamValue, RepeatedChatMessage,
Expand Down Expand Up @@ -98,4 +99,9 @@ pub trait ChatCloudService: Send + Sync + 'static {
) -> Result<(), FlowyError> {
Ok(())
}

async fn get_workspace_plan(
&self,
workspace_id: &str,
) -> Result<Vec<SubscriptionPlan>, FlowyError>;
}
6 changes: 1 addition & 5 deletions frontend/rust-lib/flowy-ai/src/ai_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ impl AIManager {

pub async fn close_chat(&self, chat_id: &str) -> Result<(), FlowyError> {
trace!("close chat: {}", chat_id);

if self.local_ai_controller.is_running() {
info!("[AI Plugin] notify close chat: {}", chat_id);
self.local_ai_controller.close_chat(chat_id);
}
self.local_ai_controller.close_chat(chat_id);
Ok(())
}

Expand Down
69 changes: 59 additions & 10 deletions frontend/rust-lib/flowy-ai/src/local_ai/local_llm_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use appflowy_plugin::manager::PluginManager;
use appflowy_plugin::util::is_apple_silicon;
use flowy_ai_pub::cloud::{
AppFlowyOfflineAI, ChatCloudService, ChatMessageMetadata, ChatMetadataContentType, LLMModel,
LocalAIConfig,
LocalAIConfig, SubscriptionPlan,
};
use flowy_error::{FlowyError, FlowyResult};
use flowy_sqlite::kv::KVStorePreferences;
Expand All @@ -26,7 +26,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::select;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, instrument, trace};
use tracing::{debug, error, info, instrument, trace, warn};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LLMSetting {
Expand All @@ -50,6 +50,7 @@ pub struct LocalAIController {
current_chat_id: ArcSwapOption<String>,
store_preferences: Arc<KVStorePreferences>,
user_service: Arc<dyn AIUserService>,
cloud_service: Arc<dyn ChatCloudService>,
}

impl Deref for LocalAIController {
Expand All @@ -70,7 +71,7 @@ impl LocalAIController {
let local_ai = Arc::new(AppFlowyLocalAI::new(plugin_manager));
let res_impl = LLMResourceServiceImpl {
user_service: user_service.clone(),
cloud_service,
cloud_service: cloud_service.clone(),
store_preferences: store_preferences.clone(),
};

Expand Down Expand Up @@ -107,6 +108,7 @@ impl LocalAIController {
current_chat_id,
store_preferences,
user_service,
cloud_service,
};

let rag_enabled = this.is_rag_enabled();
Expand Down Expand Up @@ -151,6 +153,33 @@ impl LocalAIController {
pub async fn refresh(&self) -> FlowyResult<()> {
let is_enabled = self.is_enabled();
self.enable_chat_plugin(is_enabled).await?;

if is_enabled {
let local_ai = self.local_ai.clone();
let workspace_id = self.user_service.workspace_id()?;
let cloned_service = self.cloud_service.clone();
let store_preferences = self.store_preferences.clone();
tokio::spawn(async move {
let key = local_ai_enabled_key(&workspace_id);
match cloned_service.get_workspace_plan(&workspace_id).await {
Ok(plans) => {
trace!("[AI Plugin] workspace:{} plans: {:?}", workspace_id, plans);
if !plans.contains(&SubscriptionPlan::AiLocal) {
info!(
"disable local ai plugin for workspace: {}. reason: no plan found",
workspace_id
);
let _ = store_preferences.set_bool(&key, false);
let _ = local_ai.destroy_chat_plugin().await;
}
},
Err(err) => {
warn!("[AI Plugin]: failed to get workspace plan: {:?}", err);
},
}
});
}

Ok(())
}

Expand All @@ -165,23 +194,27 @@ impl LocalAIController {

/// Indicate whether the local AI plugin is running.
pub fn is_running(&self) -> bool {
if !self.is_enabled() {
return false;
}
self.local_ai.get_plugin_running_state().is_ready()
}

/// Indicate whether the local AI is enabled.
/// AppFlowy store the value in local storage isolated by workspace id. Each workspace can have
/// different settings.
pub fn is_enabled(&self) -> bool {
if let Ok(key) = self.local_ai_enabled_key() {
if let Ok(key) = self
.user_service
.workspace_id()
.map(|workspace_id| local_ai_enabled_key(&workspace_id))
{
self.store_preferences.get_bool(&key).unwrap_or(true)
} else {
false
}
}

fn local_ai_enabled_key(&self) -> FlowyResult<String> {
let workspace_id = self.user_service.workspace_id()?;
Ok(format!("{}:{}", APPFLOWY_LOCAL_AI_ENABLED, workspace_id))
}

/// Indicate whether the local AI chat is enabled. In the future, we can support multiple
/// AI plugin.
pub fn is_chat_enabled(&self) -> bool {
Expand Down Expand Up @@ -225,6 +258,10 @@ impl LocalAIController {
}

pub fn close_chat(&self, chat_id: &str) {
if !self.is_running() {
return;
}
info!("[AI Plugin] notify close chat: {}", chat_id);
let weak_ctrl = Arc::downgrade(&self.local_ai);
let chat_id = chat_id.to_string();
tokio::spawn(async move {
Expand Down Expand Up @@ -285,6 +322,13 @@ impl LocalAIController {
}

pub fn get_chat_plugin_state(&self) -> LocalAIPluginStatePB {
if !self.is_enabled() {
return LocalAIPluginStatePB {
state: RunningStatePB::Stopped,
offline_ai_ready: false,
};
}

let offline_ai_ready = self.local_ai_resource.is_offline_app_ready();
let state = self.local_ai.get_plugin_running_state();
LocalAIPluginStatePB {
Expand Down Expand Up @@ -317,7 +361,8 @@ impl LocalAIController {
}

pub async fn toggle_local_ai(&self) -> FlowyResult<bool> {
let key = self.local_ai_enabled_key()?;
let workspace_id = self.user_service.workspace_id()?;
let key = local_ai_enabled_key(&workspace_id);
let enabled = !self.store_preferences.get_bool(&key).unwrap_or(true);
self.store_preferences.set_bool(&key, enabled)?;

Expand Down Expand Up @@ -552,3 +597,7 @@ impl LLMResourceService for LLMResourceServiceImpl {
.get_bool_or_default(APPFLOWY_LOCAL_AI_CHAT_RAG_ENABLED)
}
}

fn local_ai_enabled_key(workspace_id: &str) -> String {
format!("{}:{}", APPFLOWY_LOCAL_AI_ENABLED, workspace_id)
}
9 changes: 8 additions & 1 deletion frontend/rust-lib/flowy-ai/src/middleware/chat_service_mw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::collections::HashMap;
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageMetadata, ChatMessageType, CompletionType,
CreateTextChatContext, LocalAIConfig, MessageCursor, RelatedQuestion, RepeatedChatMessage,
RepeatedRelatedQuestion, StreamAnswer, StreamComplete,
RepeatedRelatedQuestion, StreamAnswer, StreamComplete, SubscriptionPlan,
};
use flowy_error::{FlowyError, FlowyResult};
use futures::{stream, Sink, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -323,4 +323,11 @@ impl ChatCloudService for AICloudServiceMiddleware {
.await
}
}

async fn get_workspace_plan(
&self,
workspace_id: &str,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
self.cloud_service.get_workspace_plan(workspace_id).await
}
}
13 changes: 12 additions & 1 deletion frontend/rust-lib/flowy-core/src/integrate/trait_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use collab_integrate::collab_builder::{
};
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageMetadata, LocalAIConfig, MessageCursor,
RepeatedChatMessage, StreamAnswer, StreamComplete,
RepeatedChatMessage, StreamAnswer, StreamComplete, SubscriptionPlan,
};
use flowy_database_pub::cloud::{
DatabaseAIService, DatabaseCloudService, DatabaseSnapshot, EncodeCollabByOid, SummaryRowContent,
Expand Down Expand Up @@ -693,6 +693,17 @@ impl ChatCloudService for ServerProvider {
.get_local_ai_config(workspace_id)
.await
}

async fn get_workspace_plan(
&self,
workspace_id: &str,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
self
.get_server()?
.chat_service()
.get_workspace_plan(workspace_id)
.await
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions frontend/rust-lib/flowy-core/src/integrate/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
.initialize(user_id, authenticator == &Authenticator::Local)
.await?;
self.document_manager.initialize(user_id).await?;
self.ai_manager.initialize(&user_workspace.id).await?;
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion frontend/rust-lib/flowy-database2/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,15 @@ impl DatabaseManager {
if let Some(editor) = editors.get(&database_id) {
editor.close_view(view_id).await;
// when there is no opening views, mark the database to be removed.
trace!(
"{} has {} opening views",
database_id,
editor.num_of_opening_views().await
);
should_remove = editor.num_of_opening_views().await == 0;
}

if should_remove {
trace!("remove database editor:{}", database_id);
if let Some(editor) = editors.remove(&database_id) {
editor.close_database().await;
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ impl DatabaseEditor {
}

pub async fn close_database(&self) {
info!("Close database: {}", self.database_id);
info!("close database editor: {}", self.database_id);
let cancellation = self.database_cancellation.read().await;
if let Some(cancellation) = &*cancellation {
info!("Cancel database operation");
Expand Down
14 changes: 13 additions & 1 deletion frontend/rust-lib/flowy-server/src/af_cloud/impls/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use client_api::entity::{
};
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageMetadata, ChatMessageType, LocalAIConfig, StreamAnswer,
StreamComplete,
StreamComplete, SubscriptionPlan,
};
use flowy_error::FlowyError;
use futures_util::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -217,4 +217,16 @@ where
.await?;
Ok(())
}

async fn get_workspace_plan(
&self,
workspace_id: &str,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
let plans = self
.inner
.try_get_client()?
.get_active_workspace_subscriptions(workspace_id)
.await?;
Ok(plans)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,6 @@ where
&self,
workspace_id: String,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
let workspace_id = workspace_id.to_string();
let try_get_client = self.server.try_get_client();
let client = try_get_client?;
let plans = client
Expand Down
11 changes: 11 additions & 0 deletions frontend/rust-lib/flowy-server/src/default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use client_api::entity::ai_dto::{CompletionType, LocalAIConfig, RepeatedRelatedQ
use client_api::entity::{ChatMessageType, MessageCursor, RepeatedChatMessage};
use flowy_ai_pub::cloud::{
ChatCloudService, ChatMessage, ChatMessageMetadata, StreamAnswer, StreamComplete,
SubscriptionPlan,
};
use flowy_error::FlowyError;
use lib_infra::async_trait::async_trait;
Expand Down Expand Up @@ -106,4 +107,14 @@ impl ChatCloudService for DefaultChatCloudServiceImpl {
.with_context("Get local ai config is not supported in local server."),
)
}

async fn get_workspace_plan(
&self,
_workspace_id: &str,
) -> Result<Vec<SubscriptionPlan>, FlowyError> {
Err(
FlowyError::not_support()
.with_context("Get local ai config is not supported in local server."),
)
}
}

0 comments on commit 1cc41c1

Please sign in to comment.