diff --git a/Justfile b/Justfile index 88baa2de..fd679205 100644 --- a/Justfile +++ b/Justfile @@ -30,3 +30,8 @@ run-server: make-ui: @just release cd ui/desktop && npm run bundle:default + +# Setup langfuse server +langfuse-server: + #!/usr/bin/env bash + ./scripts/setup_langfuse.sh \ No newline at end of file diff --git a/crates/goose-cli/Cargo.toml b/crates/goose-cli/Cargo.toml index e5525f0d..befcf2ee 100644 --- a/crates/goose-cli/Cargo.toml +++ b/crates/goose-cli/Cargo.toml @@ -50,9 +50,14 @@ async-trait = "0.1" rustyline = "15.0.0" rust_decimal = "1.36.0" rust_decimal_macros = "1.36.0" -tracing = "0.1.41" +tracing = "0.1" +chrono = "0.4" +parking_lot = "0.12.3" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json", "time"] } +tracing-appender = "0.2" [dev-dependencies] tempfile = "3" temp-env = { version = "0.3.6", features = ["async_closure"] } - +test-case = "3.3" +tokio = { version = "1.0", features = ["rt", "macros"] } \ No newline at end of file diff --git a/crates/goose-cli/src/logging.rs b/crates/goose-cli/src/logging.rs new file mode 100644 index 00000000..0af6e201 --- /dev/null +++ b/crates/goose-cli/src/logging.rs @@ -0,0 +1,228 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::PathBuf; +use tracing_appender::rolling::Rotation; +use tracing_subscriber::{ + filter::LevelFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, + Registry, +}; + +use goose::tracing::langfuse_layer; + +/// Returns the directory where log files should be stored. +/// Creates the directory structure if it doesn't exist. +fn get_log_directory() -> Result { + let home = std::env::var("HOME").context("HOME environment variable not set")?; + let base_log_dir = PathBuf::from(home) + .join(".config") + .join("goose") + .join("logs") + .join("cli"); // Add cli-specific subdirectory + + // Create date-based subdirectory + let now = chrono::Local::now(); + let date_dir = base_log_dir.join(now.format("%Y-%m-%d").to_string()); + + // Ensure log directory exists + fs::create_dir_all(&date_dir).context("Failed to create log directory")?; + + Ok(date_dir) +} + +/// Sets up the logging infrastructure for the application. +/// This includes: +/// - File-based logging with JSON formatting (DEBUG level) +/// - Console output for development (INFO level) +/// - Optional Langfuse integration (DEBUG level) +pub fn setup_logging(session_name: Option<&str>) -> Result<()> { + // Set up file appender for goose module logs + let log_dir = get_log_directory()?; + let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S").to_string(); + + // Create non-rolling file appender for detailed logs + let file_appender = tracing_appender::rolling::RollingFileAppender::new( + Rotation::NEVER, + log_dir, + &format!("{}.log", session_name.unwrap_or(×tamp)), + ); + + // Create JSON file logging layer with all logs (DEBUG and above) + let file_layer = fmt::layer() + .with_target(true) + .with_level(true) + .with_writer(file_appender) + .with_ansi(false) + .with_file(true) + .pretty(); + + // Create console logging layer for development - INFO and above only + let console_layer = fmt::layer() + .with_target(true) + .with_level(true) + .with_ansi(true) + .with_file(true) + .with_line_number(true) + .pretty(); + + // Base filter + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { + // Set default levels for different modules + EnvFilter::new("") + .add_directive("goose=debug".parse().unwrap()) + // Set goose-cli to INFO + .add_directive("goose_cli=info".parse().unwrap()) + // Set everything else to WARN + .add_directive(LevelFilter::WARN.into()) + }); + + // Build the subscriber with required layers + let subscriber = Registry::default() + .with(file_layer.with_filter(env_filter)) // Gets all logs + .with(console_layer.with_filter(LevelFilter::INFO)); // Controls log levels + + // Initialize with Langfuse if available + if let Some(langfuse) = langfuse_layer::create_langfuse_observer() { + subscriber + .with(langfuse.with_filter(LevelFilter::DEBUG)) + .try_init() + .context("Failed to set global subscriber")?; + } else { + subscriber + .try_init() + .context("Failed to set global subscriber")?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use tempfile::TempDir; + use test_case::test_case; + use tokio::runtime::Runtime; + + fn setup_temp_home() -> TempDir { + let temp_dir = TempDir::new().unwrap(); + env::set_var("HOME", temp_dir.path()); + temp_dir + } + + #[test] + fn test_log_directory_creation() { + let _temp_dir = setup_temp_home(); + let log_dir = get_log_directory().unwrap(); + assert!(log_dir.exists()); + assert!(log_dir.is_dir()); + + // Verify directory structure + let path_components: Vec<_> = log_dir.components().collect(); + assert!(path_components.iter().any(|c| c.as_os_str() == "goose")); + assert!(path_components.iter().any(|c| c.as_os_str() == "logs")); + assert!(path_components.iter().any(|c| c.as_os_str() == "cli")); + } + + #[test_case(Some("test_session") ; "with session name")] + #[test_case(None ; "without session name")] + fn test_log_file_name(session_name: Option<&str>) { + let _rt = Runtime::new().unwrap(); + let _temp_dir = setup_temp_home(); + + // Create a test-specific log directory and file + let log_dir = get_log_directory().unwrap(); + let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S").to_string(); + let file_name = format!("{}.log", session_name.unwrap_or(×tamp)); + + // Create the log file + let file_path = log_dir.join(&file_name); + fs::write(&file_path, "test").unwrap(); + + // Verify the file exists and has the correct name + let entries = fs::read_dir(log_dir).unwrap(); + let log_files: Vec<_> = entries + .filter_map(Result::ok) + .filter(|e| e.path().extension().map_or(false, |ext| ext == "log")) + .collect(); + + assert_eq!(log_files.len(), 1, "Expected exactly one log file"); + + let log_file_name = log_files[0].file_name().to_string_lossy().into_owned(); + println!("Log file name: {}", log_file_name); + + if let Some(name) = session_name { + assert_eq!(log_file_name, format!("{}.log", name)); + } else { + // Extract just the filename without extension for comparison + let name_without_ext = log_file_name.trim_end_matches(".log"); + // Verify it's a valid timestamp format + assert_eq!( + name_without_ext.len(), + 15, + "Expected 15 characters (YYYYMMDD_HHMMSS)" + ); + assert!( + name_without_ext[8..9].contains('_'), + "Expected underscore at position 8" + ); + assert!( + name_without_ext + .chars() + .all(|c| c.is_ascii_digit() || c == '_'), + "Expected only digits and underscore" + ); + } + } + + #[tokio::test] + async fn test_langfuse_layer_creation() { + let _temp_dir = setup_temp_home(); + + // Store original environment variables (both sets) + let original_vars = [ + ("LANGFUSE_PUBLIC_KEY", env::var("LANGFUSE_PUBLIC_KEY").ok()), + ("LANGFUSE_SECRET_KEY", env::var("LANGFUSE_SECRET_KEY").ok()), + ("LANGFUSE_HOST", env::var("LANGFUSE_HOST").ok()), + ( + "LANGFUSE_INIT_PROJECT_PUBLIC_KEY", + env::var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY").ok(), + ), + ( + "LANGFUSE_INIT_PROJECT_SECRET_KEY", + env::var("LANGFUSE_INIT_PROJECT_SECRET_KEY").ok(), + ), + ]; + + // Clear all Langfuse environment variables + for (var, _) in &original_vars { + env::remove_var(var); + } + + // Test without any environment variables + assert!(langfuse_layer::create_langfuse_observer().is_none()); + + // Test with standard Langfuse variables + env::set_var("LANGFUSE_PUBLIC_KEY", "test_public_key"); + env::set_var("LANGFUSE_SECRET_KEY", "test_secret_key"); + assert!(langfuse_layer::create_langfuse_observer().is_some()); + + // Clear and test with init project variables + env::remove_var("LANGFUSE_PUBLIC_KEY"); + env::remove_var("LANGFUSE_SECRET_KEY"); + env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test_public_key"); + env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test_secret_key"); + assert!(langfuse_layer::create_langfuse_observer().is_some()); + + // Test fallback behavior + env::remove_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY"); + assert!(langfuse_layer::create_langfuse_observer().is_none()); + + // Restore original environment variables + for (var, value) in original_vars { + match value { + Some(val) => env::set_var(var, val), + None => env::remove_var(var), + } + } + } +} diff --git a/crates/goose-cli/src/main.rs b/crates/goose-cli/src/main.rs index d21f06e2..83321752 100644 --- a/crates/goose-cli/src/main.rs +++ b/crates/goose-cli/src/main.rs @@ -4,6 +4,7 @@ use goose::agents::AgentFactory; mod commands; mod log_usage; +mod logging; mod profile; mod prompt; mod session; @@ -14,6 +15,7 @@ use commands::configure::handle_configure; use commands::mcp::run_server; use commands::session::build_session; use commands::version::print_version; +use logging::setup_logging; use profile::has_no_profiles; use std::io::{self, Read}; @@ -268,6 +270,8 @@ async fn main() -> Result<()> { } let mut session = build_session(name, profile, agent, resume).await; + setup_logging(session.session_file().file_stem().and_then(|s| s.to_str()))?; + let _ = session.start().await; return Ok(()); } diff --git a/crates/goose-server/Cargo.toml b/crates/goose-server/Cargo.toml index a572ced8..f0020ea5 100644 --- a/crates/goose-server/Cargo.toml +++ b/crates/goose-server/Cargo.toml @@ -20,7 +20,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" futures = "0.3" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json", "time"] } +tracing-appender = "0.2" tokio-stream = "0.1" anyhow = "1.0" bytes = "1.5" @@ -37,4 +38,5 @@ path = "src/main.rs" [dev-dependencies] serial_test = "3.2.0" tower = "0.5" -async-trait = "0.1" \ No newline at end of file +async-trait = "0.1" +tempfile = "3.15.0" \ No newline at end of file diff --git a/crates/goose-server/src/commands/agent.rs b/crates/goose-server/src/commands/agent.rs index 43b55428..277b3081 100644 --- a/crates/goose-server/src/commands/agent.rs +++ b/crates/goose-server/src/commands/agent.rs @@ -5,8 +5,8 @@ use tower_http::cors::{Any, CorsLayer}; use tracing::info; pub async fn run() -> Result<()> { - // Initialize tracing for logging - tracing_subscriber::fmt::init(); + // Initialize logging + crate::logging::setup_logging()?; // Load configuration let settings = configuration::Settings::new()?; diff --git a/crates/goose-server/src/logging.rs b/crates/goose-server/src/logging.rs new file mode 100644 index 00000000..b195c17f --- /dev/null +++ b/crates/goose-server/src/logging.rs @@ -0,0 +1,98 @@ +use anyhow::{Context, Result}; +use std::fs; +use std::path::PathBuf; +use tracing_appender::rolling::Rotation; +use tracing_subscriber::{ + filter::LevelFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, + Registry, +}; + +use goose::tracing::langfuse_layer; + +/// Returns the directory where log files should be stored. +/// Creates the directory structure if it doesn't exist. +fn get_log_directory() -> Result { + let home = std::env::var("HOME").context("HOME environment variable not set")?; + let base_log_dir = PathBuf::from(home) + .join(".config") + .join("goose") + .join("logs") + .join("server"); // Add server-specific subdirectory + + // Create date-based subdirectory + let now = chrono::Local::now(); + let date_dir = base_log_dir.join(now.format("%Y-%m-%d").to_string()); + + // Ensure log directory exists + fs::create_dir_all(&date_dir).context("Failed to create log directory")?; + + Ok(date_dir) +} + +/// Sets up the logging infrastructure for the application. +/// This includes: +/// - File-based logging with JSON formatting (DEBUG level) +/// - Console output for development (INFO level) +/// - Optional Langfuse integration (DEBUG level) +pub fn setup_logging() -> Result<()> { + // Set up file appender for goose module logs + let log_dir = get_log_directory()?; + let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S").to_string(); + + // Create non-rolling file appender for detailed logs + let file_appender = tracing_appender::rolling::RollingFileAppender::new( + Rotation::NEVER, + log_dir, + &format!("goosed_{}.log", timestamp), + ); + + // Create JSON file logging layer + let file_layer = fmt::layer() + .with_target(true) + .with_level(true) + .with_writer(file_appender) + .with_ansi(false) + .with_file(true); + + // Create console logging layer for development - INFO and above only + let console_layer = fmt::layer() + .with_target(true) + .with_level(true) + .with_ansi(true) + .with_file(true) + .with_line_number(true) + .pretty(); + + // Base filter for all logging + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { + // Set default levels for different modules + EnvFilter::new("") + // Set goose module to INFO only + .add_directive("goose=debug".parse().unwrap()) + // Set goose-server to INFO + .add_directive("goose_server=info".parse().unwrap()) + // Set tower-http to INFO for request logging + .add_directive("tower_http=info".parse().unwrap()) + // Set everything else to WARN + .add_directive(LevelFilter::WARN.into()) + }); + + // Build the subscriber with required layers + let subscriber = Registry::default() + .with(file_layer.with_filter(env_filter)) + .with(console_layer.with_filter(LevelFilter::INFO)); + + // Initialize with Langfuse if available + if let Some(langfuse) = langfuse_layer::create_langfuse_observer() { + subscriber + .with(langfuse.with_filter(LevelFilter::DEBUG)) + .try_init() + .context("Failed to set global subscriber")?; + } else { + subscriber + .try_init() + .context("Failed to set global subscriber")?; + } + + Ok(()) +} diff --git a/crates/goose-server/src/main.rs b/crates/goose-server/src/main.rs index 339a9c68..4647c755 100644 --- a/crates/goose-server/src/main.rs +++ b/crates/goose-server/src/main.rs @@ -1,6 +1,7 @@ mod commands; mod configuration; mod error; +mod logging; mod routes; mod state; diff --git a/crates/goose/Cargo.toml b/crates/goose/Cargo.toml index b4d16cf2..4523d542 100644 --- a/crates/goose/Cargo.toml +++ b/crates/goose/Cargo.toml @@ -59,6 +59,10 @@ xcap = "0.0.14" libc = "=0.2.167" lazy_static = "1.5" kill_tree = "0.2.4" +tracing = "0.1" +tracing-subscriber = "0.3" +wiremock = "0.6.0" + keyring = { version = "3.6.1", features = [ "apple-native", @@ -75,7 +79,6 @@ tower-service = "0.3" [dev-dependencies] sysinfo = "0.32.1" -wiremock = "0.6.0" mockito = "1.2" tempfile = "3.8" mockall = "0.11" @@ -92,4 +95,4 @@ path = "examples/databricks_oauth.rs" [[bench]] name = "tokenization_benchmark" -harness = false +harness = false \ No newline at end of file diff --git a/crates/goose/src/agents/capabilities.rs b/crates/goose/src/agents/capabilities.rs index 75ecec7e..cea2b0db 100644 --- a/crates/goose/src/agents/capabilities.rs +++ b/crates/goose/src/agents/capabilities.rs @@ -2,6 +2,7 @@ use rust_decimal_macros::dec; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; +use tracing::{debug, instrument}; use super::system::{SystemConfig, SystemError, SystemInfo, SystemResult}; use crate::prompt_template::load_prompt_file; @@ -201,6 +202,7 @@ impl Capabilities { } /// Dispatch a single tool call to the appropriate client + #[instrument(skip(self, tool_call), fields(input, output))] pub async fn dispatch_tool_call(&self, tool_call: ToolCall) -> ToolResult> { let client = self .get_client_for_tool(&tool_call.name) @@ -213,10 +215,17 @@ impl Capabilities { .ok_or_else(|| ToolError::NotFound(tool_call.name.clone()))?; let client_guard = client.lock().await; - client_guard - .call_tool(tool_name, tool_call.arguments) + let result = client_guard + .call_tool(tool_name, tool_call.clone().arguments) .await .map(|result| result.content) - .map_err(|e| ToolError::ExecutionError(e.to_string())) + .map_err(|e| ToolError::ExecutionError(e.to_string())); + + debug!( + "input" = serde_json::to_string(&tool_call).unwrap(), + "output" = serde_json::to_string(&result).unwrap(), + ); + + result } } diff --git a/crates/goose/src/agents/default.rs b/crates/goose/src/agents/default.rs index f10d7ddf..b032c56c 100644 --- a/crates/goose/src/agents/default.rs +++ b/crates/goose/src/agents/default.rs @@ -3,6 +3,7 @@ use futures::stream::BoxStream; use serde_json::json; use std::collections::HashMap; use tokio::sync::Mutex; +use tracing::{debug, instrument}; use super::Agent; use crate::agents::capabilities::Capabilities; @@ -192,10 +193,12 @@ impl Agent for DefaultAgent { Ok(Value::Null) } + #[instrument(skip(self, messages), fields(user_message))] async fn reply( &self, messages: &[Message], ) -> anyhow::Result>> { + let reply_span = tracing::Span::current(); let mut capabilities = self.capabilities.lock().await; let tools = capabilities.get_prefixed_tools().await?; let system_prompt = capabilities.get_system_prompt().await; @@ -204,6 +207,15 @@ impl Agent for DefaultAgent { .get_model_config() .get_estimated_limit(); + // Set the user_message field in the span instead of creating a new event + if let Some(content) = messages + .last() + .and_then(|msg| msg.content.first()) + .and_then(|c| c.as_text()) + { + debug!("user_message" = &content); + } + // Update conversation history for the start of the reply let mut messages = self .prepare_inference( @@ -218,6 +230,7 @@ impl Agent for DefaultAgent { .await?; Ok(Box::pin(async_stream::try_stream! { + let _reply_guard = reply_span.enter(); loop { // Get completion from provider let (response, usage) = capabilities.provider().complete( diff --git a/crates/goose/src/lib.rs b/crates/goose/src/lib.rs index 78bf6aad..010e7018 100644 --- a/crates/goose/src/lib.rs +++ b/crates/goose/src/lib.rs @@ -4,3 +4,4 @@ pub mod message; pub mod prompt_template; pub mod providers; pub mod token_counter; +pub mod tracing; diff --git a/crates/goose/src/providers/anthropic.rs b/crates/goose/src/providers/anthropic.rs index 61f1094b..9329cd90 100644 --- a/crates/goose/src/providers/anthropic.rs +++ b/crates/goose/src/providers/anthropic.rs @@ -11,7 +11,7 @@ use super::base::{Provider, Usage}; use super::configs::{AnthropicProviderConfig, ModelConfig, ProviderModelConfig}; use super::model_pricing::cost; use super::model_pricing::model_pricing_for; -use super::utils::get_model; +use super::utils::{emit_debug_trace, get_model}; use crate::message::{Message, MessageContent}; use mcp_core::content::Content; use mcp_core::role::Role; @@ -193,6 +193,18 @@ impl Provider for AnthropicProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -238,14 +250,14 @@ impl Provider for AnthropicProvider { } // Make request - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; // Parse response let message = Self::parse_anthropic_response(response.clone())?; let usage = self.get_usage(&response)?; let model = get_model(&response); let cost = cost(&usage, &model_pricing_for(&model)); - + emit_debug_trace(&self.config, &payload, &response, &usage, cost); Ok((message, ProviderUsage::new(model, usage, cost))) } diff --git a/crates/goose/src/providers/databricks.rs b/crates/goose/src/providers/databricks.rs index ab7434bd..a959d30a 100644 --- a/crates/goose/src/providers/databricks.rs +++ b/crates/goose/src/providers/databricks.rs @@ -74,6 +74,18 @@ impl Provider for DatabricksProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -121,8 +133,7 @@ impl Provider for DatabricksProvider { .collect(), ); - // Make request - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; // Raise specific error if context length is exceeded if let Some(error) = response.get("error") { @@ -139,7 +150,7 @@ impl Provider for DatabricksProvider { let usage = self.get_usage(&response)?; let model = get_model(&response); let cost = cost(&usage, &model_pricing_for(&model)); - + super::utils::emit_debug_trace(&self.config, &payload, &response, &usage, cost); Ok((message, ProviderUsage::new(model, usage, cost))) } diff --git a/crates/goose/src/providers/google.rs b/crates/goose/src/providers/google.rs index fa703d8a..c074d55b 100644 --- a/crates/goose/src/providers/google.rs +++ b/crates/goose/src/providers/google.rs @@ -276,6 +276,18 @@ impl Provider for GoogleProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -309,7 +321,7 @@ impl Provider for GoogleProvider { } // Make request - let response = self.post(Value::Object(payload)).await?; + let response = self.post(Value::Object(payload.clone())).await?; // Parse response let message = self.google_response_to_message(unescape_json_values(&response))?; let usage = self.get_usage(&response)?; @@ -317,6 +329,7 @@ impl Provider for GoogleProvider { Some(model_version) => model_version.as_str().unwrap_or_default().to_string(), None => self.config.model.model_name.clone(), }; + super::utils::emit_debug_trace(&self.config, &payload, &response, &usage, None); let provider_usage = ProviderUsage::new(model, usage, None); Ok((message, provider_usage)) } diff --git a/crates/goose/src/providers/groq.rs b/crates/goose/src/providers/groq.rs index 72b8a47c..dad096ad 100644 --- a/crates/goose/src/providers/groq.rs +++ b/crates/goose/src/providers/groq.rs @@ -52,6 +52,30 @@ impl Provider for GroqProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -65,12 +89,12 @@ impl Provider for GroqProvider { tools, )?; - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; let message = openai_response_to_message(response.clone())?; let usage = self.get_usage(&response)?; let model = get_model(&response); - + super::utils::emit_debug_trace(&self.config, &payload, &response, &usage, None); Ok((message, ProviderUsage::new(model, usage, None))) } diff --git a/crates/goose/src/providers/ollama.rs b/crates/goose/src/providers/ollama.rs index cbf8758a..e160cd76 100644 --- a/crates/goose/src/providers/ollama.rs +++ b/crates/goose/src/providers/ollama.rs @@ -47,6 +47,18 @@ impl Provider for OllamaProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -55,14 +67,14 @@ impl Provider for OllamaProvider { ) -> Result<(Message, ProviderUsage)> { let payload = create_openai_request_payload(&self.config.model, system, messages, tools)?; - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; // Parse response let message = openai_response_to_message(response.clone())?; let usage = self.get_usage(&response)?; let model = get_model(&response); let cost = None; - + super::utils::emit_debug_trace(&self.config, &payload, &response, &usage, cost); Ok((message, ProviderUsage::new(model, usage, cost))) } diff --git a/crates/goose/src/providers/openai.rs b/crates/goose/src/providers/openai.rs index d875ca5e..c1eb076b 100644 --- a/crates/goose/src/providers/openai.rs +++ b/crates/goose/src/providers/openai.rs @@ -10,7 +10,7 @@ use super::configs::OpenAiProviderConfig; use super::configs::{ModelConfig, ProviderModelConfig}; use super::model_pricing::cost; use super::model_pricing::model_pricing_for; -use super::utils::{get_model, handle_response}; +use super::utils::{emit_debug_trace, get_model, handle_response}; use crate::message::Message; use crate::providers::openai_utils::{ check_openai_context_length_error, create_openai_request_payload, get_openai_usage, @@ -58,6 +58,18 @@ impl Provider for OpenAiProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -68,7 +80,7 @@ impl Provider for OpenAiProvider { let payload = create_openai_request_payload(&self.config.model, system, messages, tools)?; // Make request - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; // Raise specific error if context length is exceeded if let Some(error) = response.get("error") { @@ -83,7 +95,7 @@ impl Provider for OpenAiProvider { let usage = self.get_usage(&response)?; let model = get_model(&response); let cost = cost(&usage, &model_pricing_for(&model)); - + emit_debug_trace(&self.config, &payload, &response, &usage, cost); Ok((message, ProviderUsage::new(model, usage, cost))) } diff --git a/crates/goose/src/providers/openrouter.rs b/crates/goose/src/providers/openrouter.rs index e33885e3..45c9a79f 100644 --- a/crates/goose/src/providers/openrouter.rs +++ b/crates/goose/src/providers/openrouter.rs @@ -10,7 +10,7 @@ use super::configs::OpenAiProviderConfig; use super::configs::{ModelConfig, ProviderModelConfig}; use super::model_pricing::cost; use super::model_pricing::model_pricing_for; -use super::utils::{get_model, handle_response}; +use super::utils::{emit_debug_trace, get_model, handle_response}; use crate::message::Message; use crate::providers::openai_utils::{ check_openai_context_length_error, create_openai_request_payload_with_concat_response_content, @@ -61,6 +61,18 @@ impl Provider for OpenRouterProvider { self.config.model_config() } + #[tracing::instrument( + skip(self, system, messages, tools), + fields( + model_config, + input, + output, + input_tokens, + output_tokens, + total_tokens, + cost + ) + )] async fn complete( &self, system: &str, @@ -76,7 +88,7 @@ impl Provider for OpenRouterProvider { )?; // Make request - let response = self.post(payload).await?; + let response = self.post(payload.clone()).await?; // Raise specific error if context length is exceeded if let Some(error) = response.get("error") { @@ -91,7 +103,7 @@ impl Provider for OpenRouterProvider { let usage = self.get_usage(&response)?; let model = get_model(&response); let cost = cost(&usage, &model_pricing_for(&model)); - + emit_debug_trace(&self.config, &payload, &response, &usage, cost); Ok((message, ProviderUsage::new(model, usage, cost))) } diff --git a/crates/goose/src/providers/utils.rs b/crates/goose/src/providers/utils.rs index abc021b9..c67bda38 100644 --- a/crates/goose/src/providers/utils.rs +++ b/crates/goose/src/providers/utils.rs @@ -1,8 +1,11 @@ +use super::base::Usage; use anyhow::{anyhow, Error, Result}; use regex::Regex; use reqwest::{Response, StatusCode}; +use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use serde_json::{json, Map, Value}; +use tracing::debug; use mcp_core::content::ImageContent; @@ -115,6 +118,30 @@ pub fn unescape_json_values(value: &Value) -> Value { } } +pub fn emit_debug_trace( + model_config: &T, + payload: &impl serde::Serialize, + response: &Value, + usage: &Usage, + cost: Option, +) { + // Handle both Map and Value payload types + let payload_str = match serde_json::to_value(payload) { + Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(), + Err(_) => serde_json::to_string_pretty(&payload).unwrap_or_default(), + }; + + debug!( + model_config = %serde_json::to_string_pretty(model_config).unwrap_or_default(), + input = %payload_str, + output = %serde_json::to_string_pretty(response).unwrap_or_default(), + input_tokens = ?usage.input_tokens.unwrap_or_default(), + output_tokens = ?usage.output_tokens.unwrap_or_default(), + total_tokens = ?usage.total_tokens.unwrap_or_default(), + cost = ?cost.unwrap_or_default() + ); +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/goose/src/tracing/langfuse_layer.rs b/crates/goose/src/tracing/langfuse_layer.rs new file mode 100644 index 00000000..ba41aa9d --- /dev/null +++ b/crates/goose/src/tracing/langfuse_layer.rs @@ -0,0 +1,502 @@ +use crate::tracing::observation_layer::{BatchManager, ObservationLayer, SpanTracker}; +use chrono::Utc; +use reqwest::{Client, StatusCode}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::env; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use uuid::Uuid; + +const DEFAULT_LANGFUSE_URL: &str = "http://localhost:3000"; + +#[derive(Debug, Serialize, Deserialize)] +struct LangfuseIngestionResponse { + successes: Vec, + errors: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +struct LangfuseIngestionSuccess { + id: String, + status: i32, +} + +#[derive(Debug, Serialize, Deserialize)] +struct LangfuseIngestionError { + id: String, + status: i32, + message: Option, + error: Option, +} + +#[derive(Debug, Clone)] +pub struct LangfuseBatchManager { + pub batch: Vec, + pub client: Client, + pub base_url: String, + pub public_key: String, + pub secret_key: String, +} + +impl LangfuseBatchManager { + pub fn new(public_key: String, secret_key: String, base_url: String) -> Self { + Self { + batch: Vec::new(), + client: Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .expect("Failed to create HTTP client"), + base_url, + public_key, + secret_key, + } + } + + pub fn spawn_sender(manager: Arc>) { + const BATCH_INTERVAL: Duration = Duration::from_secs(5); + + tokio::spawn(async move { + loop { + tokio::time::sleep(BATCH_INTERVAL).await; + if let Err(e) = manager.lock().await.send() { + tracing::error!( + error.msg = %e, + error.type = %std::any::type_name_of_val(&e), + "Failed to send batch to Langfuse" + ); + } + } + }); + } + + pub async fn send_async(&mut self) -> Result<(), Box> { + if self.batch.is_empty() { + return Ok(()); + } + + let payload = json!({ "batch": self.batch }); + let url = format!("{}/api/public/ingestion", self.base_url); + + let response = self + .client + .post(&url) + .basic_auth(&self.public_key, Some(&self.secret_key)) + .json(&payload) + .send() + .await?; + + match response.status() { + status if status.is_success() => { + let response_body: LangfuseIngestionResponse = response.json().await?; + + for error in &response_body.errors { + tracing::error!( + id = %error.id, + status = error.status, + message = error.message.as_deref().unwrap_or("No message"), + error = ?error.error, + "Partial failure in batch ingestion" + ); + } + + if !response_body.successes.is_empty() { + self.batch.clear(); + } + + if response_body.successes.is_empty() && !response_body.errors.is_empty() { + Err("Langfuse ingestion failed for all items".into()) + } else { + Ok(()) + } + } + status @ (StatusCode::BAD_REQUEST + | StatusCode::UNAUTHORIZED + | StatusCode::FORBIDDEN + | StatusCode::NOT_FOUND + | StatusCode::METHOD_NOT_ALLOWED) => { + let err_text = response.text().await.unwrap_or_default(); + Err(format!("Langfuse API error: {}: {}", status, err_text).into()) + } + status => { + let err_text = response.text().await.unwrap_or_default(); + Err(format!("Unexpected status code: {}: {}", status, err_text).into()) + } + } + } +} + +impl BatchManager for LangfuseBatchManager { + fn add_event(&mut self, event_type: &str, body: Value) { + self.batch.push(json!({ + "id": Uuid::new_v4().to_string(), + "timestamp": Utc::now().to_rfc3339(), + "type": event_type, + "body": body + })); + } + + fn send(&mut self) -> Result<(), Box> { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(self.send_async()) + }) + } + + fn is_empty(&self) -> bool { + self.batch.is_empty() + } +} + +pub fn create_langfuse_observer() -> Option { + let public_key = env::var("LANGFUSE_PUBLIC_KEY") + .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY")) + .unwrap_or_default(); // Use empty string if not found + + let secret_key = env::var("LANGFUSE_SECRET_KEY") + .or_else(|_| env::var("LANGFUSE_INIT_PROJECT_SECRET_KEY")) + .unwrap_or_default(); // Use empty string if not found + + // Return None if either key is empty + if public_key.is_empty() || secret_key.is_empty() { + return None; + } + + let base_url = env::var("LANGFUSE_URL").unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string()); + + let batch_manager = Arc::new(Mutex::new(LangfuseBatchManager::new( + public_key, secret_key, base_url, + ))); + + if !cfg!(test) { + LangfuseBatchManager::spawn_sender(batch_manager.clone()); + } + + Some(ObservationLayer { + batch_manager, + span_tracker: Arc::new(Mutex::new(SpanTracker::new())), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::collections::HashMap; + use tokio::sync::Mutex; + use tracing::dispatcher; + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + struct TestFixture { + original_subscriber: Option, + original_env_vars: HashMap, + mock_server: Option, + } + + impl TestFixture { + async fn new() -> Self { + Self { + original_subscriber: Some(dispatcher::get_default(dispatcher::Dispatch::clone)), + original_env_vars: Self::save_env_vars(), + mock_server: None, + } + } + + fn save_env_vars() -> HashMap { + [ + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_INIT_PROJECT_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_INIT_PROJECT_SECRET_KEY", + "LANGFUSE_URL", + ] + .iter() + .filter_map(|&var| env::var(var).ok().map(|val| (var.to_string(), val))) + .collect() + } + + async fn with_mock_server(mut self) -> Self { + self.mock_server = Some(MockServer::start().await); + self + } + + fn mock_server_uri(&self) -> String { + self.mock_server + .as_ref() + .expect("Mock server not initialized") + .uri() + } + + async fn mock_response(&self, status: u16, body: Value) { + Mock::given(method("POST")) + .and(path("/api/public/ingestion")) + .respond_with(ResponseTemplate::new(status).set_body_json(body)) + .mount(self.mock_server.as_ref().unwrap()) + .await; + } + } + + impl Drop for TestFixture { + fn drop(&mut self) { + // Restore original subscriber + if let Some(subscriber) = &self.original_subscriber { + let _ = dispatcher::set_global_default(subscriber.clone()); + } + + // Restore environment + for var in [ + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_INIT_PROJECT_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_INIT_PROJECT_SECRET_KEY", + "LANGFUSE_URL", + ] { + if let Some(value) = self.original_env_vars.get(var) { + env::set_var(var, value); + } else { + env::remove_var(var); + } + } + } + } + + fn create_test_event() -> Value { + json!({ + "name": "test_span", + "type": "SPAN" + }) + } + + #[tokio::test] + async fn test_batch_manager_creation() { + let _fixture = TestFixture::new().await; + + let manager = LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + "http://test.local".to_string(), + ); + + assert_eq!(manager.public_key, "test-public"); + assert_eq!(manager.secret_key, "test-secret"); + assert_eq!(manager.base_url, "http://test.local"); + assert!(manager.batch.is_empty()); + } + + #[tokio::test] + async fn test_add_event() { + let _fixture = TestFixture::new().await; + let mut manager = LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + "http://test.local".to_string(), + ); + + manager.add_event("test-event", create_test_event()); + + assert_eq!(manager.batch.len(), 1); + let event = &manager.batch[0]; + assert_eq!(event["type"], "test-event"); + assert_eq!(event["body"], create_test_event()); + assert!(event["id"].as_str().is_some()); + assert!(event["timestamp"].as_str().is_some()); + } + + #[tokio::test] + async fn test_batch_send_success() { + let fixture = TestFixture::new().await.with_mock_server().await; + + fixture + .mock_response( + 200, + json!({ + "successes": [{"id": "1", "status": 200}], + "errors": [] + }), + ) + .await; + + let mut manager = LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + fixture.mock_server_uri(), + ); + + manager.add_event("test-event", create_test_event()); + + let result = manager.send_async().await; + assert!(result.is_ok()); + assert!(manager.batch.is_empty()); + } + + #[tokio::test] + async fn test_batch_send_partial_failure() { + let fixture = TestFixture::new().await.with_mock_server().await; + + fixture + .mock_response( + 200, + json!({ + "successes": [{"id": "1", "status": 200}], + "errors": [{"id": "2", "status": 400, "message": "Invalid data"}] + }), + ) + .await; + + let mut manager = LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + fixture.mock_server_uri(), + ); + + manager.add_event("test-event", create_test_event()); + + let result = manager.send_async().await; + assert!(result.is_ok()); + assert!(manager.batch.is_empty()); + } + + #[tokio::test] + async fn test_batch_send_complete_failure() { + let fixture = TestFixture::new().await.with_mock_server().await; + + fixture + .mock_response( + 200, + json!({ + "successes": [], + "errors": [{"id": "1", "status": 400, "message": "Invalid data"}] + }), + ) + .await; + + let mut manager = LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + fixture.mock_server_uri(), + ); + + manager.add_event("test-event", create_test_event()); + + let result = manager.send_async().await; + assert!(result.is_err()); + assert!(!manager.batch.is_empty()); + } + + #[tokio::test] + async fn test_create_langfuse_observer() { + let fixture = TestFixture::new().await.with_mock_server().await; + + // Test 1: No environment variables set - remove all possible variables + for var in &[ + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_INIT_PROJECT_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_INIT_PROJECT_SECRET_KEY", + "LANGFUSE_URL", + ] { + env::remove_var(var); + } + + let observer = create_langfuse_observer(); + assert!( + observer.is_none(), + "Observer should be None without environment variables" + ); + + // Test 2: Only public key set (regular) + env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key"); + let observer = create_langfuse_observer(); + assert!( + observer.is_none(), + "Observer should be None with only public key" + ); + env::remove_var("LANGFUSE_PUBLIC_KEY"); + + // Test 3: Only secret key set (regular) + env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key"); + let observer = create_langfuse_observer(); + assert!( + observer.is_none(), + "Observer should be None with only secret key" + ); + env::remove_var("LANGFUSE_SECRET_KEY"); + + // Test 4: Only public key set (init project) + env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key"); + let observer = create_langfuse_observer(); + assert!( + observer.is_none(), + "Observer should be None with only init project public key" + ); + env::remove_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY"); + + // Test 5: Only secret key set (init project) + env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key"); + let observer = create_langfuse_observer(); + assert!( + observer.is_none(), + "Observer should be None with only init project secret key" + ); + env::remove_var("LANGFUSE_INIT_PROJECT_SECRET_KEY"); + + // Test 6: Both regular keys set (should succeed) + env::set_var("LANGFUSE_PUBLIC_KEY", "test-public-key"); + env::set_var("LANGFUSE_SECRET_KEY", "test-secret-key"); + env::set_var("LANGFUSE_URL", fixture.mock_server_uri()); + let observer = create_langfuse_observer(); + assert!( + observer.is_some(), + "Observer should be Some with both regular keys set" + ); + + // Clean up regular keys + env::remove_var("LANGFUSE_PUBLIC_KEY"); + env::remove_var("LANGFUSE_SECRET_KEY"); + + // Test 7: Both init project keys set (should succeed) + env::set_var("LANGFUSE_INIT_PROJECT_PUBLIC_KEY", "test-public-key"); + env::set_var("LANGFUSE_INIT_PROJECT_SECRET_KEY", "test-secret-key"); + let observer = create_langfuse_observer(); + assert!( + observer.is_some(), + "Observer should be Some with both init project keys set" + ); + + // Verify the observer has an empty batch manager + let batch_manager = observer.unwrap().batch_manager; + assert!(batch_manager.lock().await.is_empty()); + } + #[tokio::test] + async fn test_batch_manager_spawn_sender() { + let fixture = TestFixture::new().await.with_mock_server().await; + + fixture + .mock_response( + 200, + json!({ + "successes": [{"id": "1", "status": 200}], + "errors": [] + }), + ) + .await; + + let manager = Arc::new(Mutex::new(LangfuseBatchManager::new( + "test-public".to_string(), + "test-secret".to_string(), + fixture.mock_server_uri(), + ))); + + manager + .lock() + .await + .add_event("test-event", create_test_event()); + + // Instead of spawning the sender which uses blocking operations, + // test the async send directly + let result = manager.lock().await.send_async().await; + assert!(result.is_ok()); + assert!(manager.lock().await.batch.is_empty()); + } +} diff --git a/crates/goose/src/tracing/mod.rs b/crates/goose/src/tracing/mod.rs new file mode 100644 index 00000000..caa6bcd9 --- /dev/null +++ b/crates/goose/src/tracing/mod.rs @@ -0,0 +1,7 @@ +pub mod langfuse_layer; +mod observation_layer; + +pub use langfuse_layer::{create_langfuse_observer, LangfuseBatchManager}; +pub use observation_layer::{ + flatten_metadata, map_level, BatchManager, ObservationLayer, SpanData, SpanTracker, +}; diff --git a/crates/goose/src/tracing/observation_layer.rs b/crates/goose/src/tracing/observation_layer.rs new file mode 100644 index 00000000..d8f22518 --- /dev/null +++ b/crates/goose/src/tracing/observation_layer.rs @@ -0,0 +1,514 @@ +use chrono::Utc; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::field::{Field, Visit}; +use tracing::{span, Event, Id, Level, Metadata, Subscriber}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct SpanData { + pub observation_id: String, // Langfuse requires ids to be UUID v4 strings + pub name: String, + pub start_time: String, + pub level: String, + pub metadata: serde_json::Map, + pub parent_span_id: Option, +} + +pub fn map_level(level: &Level) -> &'static str { + match *level { + Level::ERROR => "ERROR", + Level::WARN => "WARNING", + Level::INFO => "DEFAULT", + Level::DEBUG => "DEBUG", + Level::TRACE => "DEBUG", + } +} + +pub fn flatten_metadata( + metadata: serde_json::Map, +) -> serde_json::Map { + let mut flattened = serde_json::Map::new(); + for (key, value) in metadata { + match value { + Value::String(s) => { + flattened.insert(key, json!(s)); + } + Value::Object(mut obj) => { + if let Some(text) = obj.remove("text") { + flattened.insert(key, text); + } else { + flattened.insert(key, json!(obj)); + } + } + _ => { + flattened.insert(key, value); + } + } + } + flattened +} + +pub trait BatchManager: Send + Sync + 'static { + fn add_event(&mut self, event_type: &str, body: Value); + fn send(&mut self) -> Result<(), Box>; + fn is_empty(&self) -> bool; +} + +#[derive(Debug)] +pub struct SpanTracker { + active_spans: HashMap, // span_id -> observation_id. span_id in Tracing is u64 whereas Langfuse requires UUID v4 strings + current_trace_id: Option, +} + +impl SpanTracker { + pub fn new() -> Self { + Self { + active_spans: HashMap::new(), + current_trace_id: None, + } + } + + pub fn add_span(&mut self, span_id: u64, observation_id: String) { + self.active_spans.insert(span_id, observation_id); + } + + pub fn get_span(&self, span_id: u64) -> Option<&String> { + self.active_spans.get(&span_id) + } + + pub fn remove_span(&mut self, span_id: u64) -> Option { + self.active_spans.remove(&span_id) + } +} + +#[derive(Clone)] +pub struct ObservationLayer { + pub batch_manager: Arc>, + pub span_tracker: Arc>, +} + +impl ObservationLayer { + pub async fn handle_span(&self, span_id: u64, span_data: SpanData) { + let observation_id = span_data.observation_id.clone(); + + { + let mut spans = self.span_tracker.lock().await; + spans.add_span(span_id, observation_id.clone()); + } + + // Get parent ID if it exists + let parent_id = if let Some(parent_span_id) = span_data.parent_span_id { + let spans = self.span_tracker.lock().await; + spans.get_span(parent_span_id).cloned() + } else { + None + }; + + let trace_id = self.ensure_trace_id().await; + + // Create the span observation + let mut batch = self.batch_manager.lock().await; + batch.add_event( + "observation-create", + json!({ + "id": observation_id, + "traceId": trace_id, + "type": "SPAN", + "name": span_data.name, + "startTime": span_data.start_time, + "parentObservationId": parent_id, + "metadata": span_data.metadata, + "level": span_data.level + }), + ); + } + + pub async fn handle_span_close(&self, span_id: u64) { + let observation_id = { + let mut spans = self.span_tracker.lock().await; + spans.remove_span(span_id) + }; + + if let Some(observation_id) = observation_id { + let trace_id = self.ensure_trace_id().await; + let mut batch = self.batch_manager.lock().await; + batch.add_event( + "observation-update", + json!({ + "id": observation_id, + "type": "SPAN", + "traceId": trace_id, + "endTime": Utc::now().to_rfc3339() + }), + ); + } + } + + pub async fn ensure_trace_id(&self) -> String { + let mut spans = self.span_tracker.lock().await; + if let Some(id) = spans.current_trace_id.clone() { + return id; + } + + let trace_id = Uuid::new_v4().to_string(); + spans.current_trace_id = Some(trace_id.clone()); + + let mut batch = self.batch_manager.lock().await; + batch.add_event( + "trace-create", + json!({ + "id": trace_id, + "name": Utc::now().timestamp().to_string(), + "timestamp": Utc::now().to_rfc3339(), + "input": {}, + "metadata": {}, + "tags": [], + "public": false + }), + ); + + trace_id + } + + pub async fn handle_record(&self, span_id: u64, metadata: serde_json::Map) { + let observation_id = { + let spans = self.span_tracker.lock().await; + spans.get_span(span_id).cloned() + }; + + if let Some(observation_id) = observation_id { + let trace_id = self.ensure_trace_id().await; + + let mut update = json!({ + "id": observation_id, + "traceId": trace_id, + "type": "SPAN" + }); + + // Handle special fields + if let Some(val) = metadata.get("input") { + update["input"] = val.clone(); + } + + if let Some(val) = metadata.get("output") { + update["output"] = val.clone(); + } + + if let Some(val) = metadata.get("model_config") { + update["metadata"] = json!({ "model_config": val }); + } + + // Handle any remaining metadata + let remaining_metadata: serde_json::Map = metadata + .iter() + .filter(|(k, _)| !["input", "output", "model_config"].contains(&k.as_str())) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + if !remaining_metadata.is_empty() { + let flattened = flatten_metadata(remaining_metadata); + if update.get("metadata").is_some() { + // If metadata exists (from model_config), merge with it + if let Some(obj) = update["metadata"].as_object_mut() { + for (k, v) in flattened { + obj.insert(k, v); + } + } + } else { + // Otherwise set it directly + update["metadata"] = json!(flattened); + } + } + + let mut batch = self.batch_manager.lock().await; + batch.add_event("span-update", update); + } + } +} + +impl Layer for ObservationLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn enabled(&self, metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool { + metadata.target().starts_with("goose::") + } + + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + let span_id = id.into_u64(); + + let parent_span_id = ctx + .span_scope(id) + .and_then(|scope| scope.skip(1).next()) + .map(|parent| parent.id().into_u64()); + + let mut visitor = JsonVisitor::new(); + attrs.record(&mut visitor); + + let span_data = SpanData { + observation_id: Uuid::new_v4().to_string(), + name: attrs.metadata().name().to_string(), + start_time: Utc::now().to_rfc3339(), + level: map_level(attrs.metadata().level()).to_owned(), + metadata: visitor.recorded_fields, + parent_span_id, + }; + + let layer = self.clone(); + tokio::spawn(async move { layer.handle_span(span_id, span_data).await }); + } + + fn on_close(&self, id: Id, _ctx: Context<'_, S>) { + let span_id = id.into_u64(); + let layer = self.clone(); + tokio::spawn(async move { layer.handle_span_close(span_id).await }); + } + + fn on_record(&self, span: &Id, values: &span::Record<'_>, _ctx: Context<'_, S>) { + let span_id = span.into_u64(); + let mut visitor = JsonVisitor::new(); + values.record(&mut visitor); + let metadata = visitor.recorded_fields; + + if !metadata.is_empty() { + let layer = self.clone(); + tokio::spawn(async move { layer.handle_record(span_id, metadata).await }); + } + } + + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut visitor = JsonVisitor::new(); + event.record(&mut visitor); + let metadata = visitor.recorded_fields; + + if let Some(span_id) = ctx.lookup_current().map(|span| span.id().into_u64()) { + let layer = self.clone(); + tokio::spawn(async move { layer.handle_record(span_id, metadata).await }); + } + } +} + +#[derive(Debug)] +struct JsonVisitor { + recorded_fields: serde_json::Map, +} + +impl JsonVisitor { + fn new() -> Self { + Self { + recorded_fields: serde_json::Map::new(), + } + } + + fn insert_value(&mut self, field: &Field, value: Value) { + self.recorded_fields.insert(field.name().to_string(), value); + } +} + +macro_rules! record_field { + ($fn_name:ident, $type:ty) => { + fn $fn_name(&mut self, field: &Field, value: $type) { + self.insert_value(field, Value::from(value)); + } + }; +} + +impl Visit for JsonVisitor { + record_field!(record_i64, i64); + record_field!(record_u64, u64); + record_field!(record_bool, bool); + record_field!(record_str, &str); + + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + self.insert_value(field, Value::String(format!("{:?}", value))); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tokio::sync::mpsc; + use tracing::dispatcher; + + struct TestFixture { + original_subscriber: Option, + events: Option>>>, + } + + impl TestFixture { + fn new() -> Self { + Self { + original_subscriber: Some(dispatcher::get_default(dispatcher::Dispatch::clone)), + events: None, + } + } + + fn with_test_layer(mut self) -> (Self, ObservationLayer) { + let events = Arc::new(Mutex::new(Vec::new())); + let mock_manager = MockBatchManager::new(events.clone()); + + let layer = ObservationLayer { + batch_manager: Arc::new(Mutex::new(mock_manager)), + span_tracker: Arc::new(Mutex::new(SpanTracker::new())), + }; + + self.events = Some(events); + (self, layer) + } + + async fn get_events(&self) -> Vec<(String, Value)> { + self.events + .as_ref() + .expect("Events not initialized") + .lock() + .await + .clone() + } + } + + impl Drop for TestFixture { + fn drop(&mut self) { + if let Some(subscriber) = &self.original_subscriber { + let _ = dispatcher::set_global_default(subscriber.clone()); + } + } + } + + struct MockBatchManager { + events: Arc>>, + sender: mpsc::UnboundedSender<(String, Value)>, + } + + impl MockBatchManager { + fn new(events: Arc>>) -> Self { + let (sender, mut receiver) = mpsc::unbounded_channel(); + let events_clone = events.clone(); + + tokio::spawn(async move { + while let Some((event_type, body)) = receiver.recv().await { + events_clone.lock().await.push((event_type, body)); + } + }); + + Self { events, sender } + } + } + + impl BatchManager for MockBatchManager { + fn add_event(&mut self, event_type: &str, body: Value) { + self.sender + .send((event_type.to_string(), body)) + .expect("Failed to send event"); + } + + fn send(&mut self) -> Result<(), Box> { + Ok(()) + } + + fn is_empty(&self) -> bool { + futures::executor::block_on(async { self.events.lock().await.is_empty() }) + } + } + + fn create_test_span_data() -> SpanData { + SpanData { + observation_id: Uuid::new_v4().to_string(), + name: "test_span".to_string(), + start_time: Utc::now().to_rfc3339(), + level: "DEFAULT".to_string(), + metadata: serde_json::Map::new(), + parent_span_id: None, + } + } + + const TEST_WAIT_DURATION: Duration = Duration::from_secs(6); + + #[tokio::test] + async fn test_span_creation() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data.clone()).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + assert_eq!(events.len(), 2); // trace-create and observation-create + + let (event_type, body) = &events[1]; + assert_eq!(event_type, "observation-create"); + assert_eq!(body["id"], span_data.observation_id); + assert_eq!(body["name"], "test_span"); + assert_eq!(body["type"], "SPAN"); + } + + #[tokio::test] + async fn test_span_close() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data.clone()).await; + layer.handle_span_close(span_id).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + assert_eq!(events.len(), 3); // trace-create, observation-create, observation-update + + let (event_type, body) = &events[2]; + assert_eq!(event_type, "observation-update"); + assert_eq!(body["id"], span_data.observation_id); + assert!(body["endTime"].as_str().is_some()); + } + + #[tokio::test] + async fn test_record_handling() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data.clone()).await; + + let mut metadata = serde_json::Map::new(); + metadata.insert("input".to_string(), json!("test input")); + metadata.insert("output".to_string(), json!("test output")); + metadata.insert("custom_field".to_string(), json!("custom value")); + + layer.handle_record(span_id, metadata).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + assert_eq!(events.len(), 3); // trace-create, observation-create, span-update + + let (event_type, body) = &events[2]; + assert_eq!(event_type, "span-update"); + assert_eq!(body["input"], "test input"); + assert_eq!(body["output"], "test output"); + assert_eq!(body["metadata"]["custom_field"], "custom value"); + } + + #[test] + fn test_flatten_metadata() { + let _fixture = TestFixture::new(); + let mut metadata = serde_json::Map::new(); + metadata.insert("simple".to_string(), json!("value")); + metadata.insert( + "complex".to_string(), + json!({ + "text": "inner value" + }), + ); + + let flattened = flatten_metadata(metadata); + assert_eq!(flattened["simple"], "value"); + assert_eq!(flattened["complex"], "inner value"); + } +} diff --git a/scripts/.env.langfuse.local b/scripts/.env.langfuse.local new file mode 100644 index 00000000..866ab3f4 --- /dev/null +++ b/scripts/.env.langfuse.local @@ -0,0 +1,11 @@ +# These variables are default initialization variables for locally hosted Langfuse server +LANGFUSE_INIT_PROJECT_NAME=goose-local +LANGFUSE_INIT_PROJECT_PUBLIC_KEY=publickey-local +LANGFUSE_INIT_PROJECT_SECRET_KEY=secretkey-local +LANGFUSE_INIT_USER_EMAIL=local@block.xyz +LANGFUSE_INIT_USER_NAME=localdev +LANGFUSE_INIT_USER_PASSWORD=localpwd + +LANGFUSE_INIT_ORG_ID=local-id +LANGFUSE_INIT_ORG_NAME=local-org +LANGFUSE_INIT_PROJECT_ID=goose diff --git a/scripts/langfuse-docker-compose.yaml b/scripts/langfuse-docker-compose.yaml new file mode 100644 index 00000000..59f8b4a3 --- /dev/null +++ b/scripts/langfuse-docker-compose.yaml @@ -0,0 +1,152 @@ +services: + langfuse-worker: + image: langfuse/langfuse-worker:3 + restart: always + depends_on: &langfuse-depends-on + postgres: + condition: service_healthy + minio: + condition: service_healthy + redis: + condition: service_healthy + clickhouse: + condition: service_healthy + ports: + - "3030:3030" + environment: &langfuse-worker-env + DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres + SALT: "mysalt" + ENCRYPTION_KEY: "0000000000000000000000000000000000000000000000000000000000000000" # generate via `openssl rand -hex 32` + TELEMETRY_ENABLED: ${TELEMETRY_ENABLED:-true} + LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: ${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-true} + CLICKHOUSE_MIGRATION_URL: ${CLICKHOUSE_MIGRATION_URL:-clickhouse://clickhouse:9000} + CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://clickhouse:8123} + CLICKHOUSE_USER: ${CLICKHOUSE_USER:-clickhouse} + CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD:-clickhouse} + CLICKHOUSE_CLUSTER_ENABLED: ${CLICKHOUSE_CLUSTER_ENABLED:-false} + LANGFUSE_S3_EVENT_UPLOAD_BUCKET: ${LANGFUSE_S3_EVENT_UPLOAD_BUCKET:-langfuse} + LANGFUSE_S3_EVENT_UPLOAD_REGION: ${LANGFUSE_S3_EVENT_UPLOAD_REGION:-auto} + LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID:-minio} + LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} + LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: ${LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT:-http://minio:9000} + LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE:-true} + LANGFUSE_S3_EVENT_UPLOAD_PREFIX: ${LANGFUSE_S3_EVENT_UPLOAD_PREFIX:-events/} + LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: ${LANGFUSE_S3_MEDIA_UPLOAD_BUCKET:-langfuse} + LANGFUSE_S3_MEDIA_UPLOAD_REGION: ${LANGFUSE_S3_MEDIA_UPLOAD_REGION:-auto} + LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID:-minio} + LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} + LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://minio:9000} + LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE:-true} + LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: ${LANGFUSE_S3_MEDIA_UPLOAD_PREFIX:-media/} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_AUTH: ${REDIS_AUTH:-myredissecret} + + langfuse-web: + image: langfuse/langfuse:3 + restart: always + depends_on: *langfuse-depends-on + ports: + - "3000:3000" + environment: + <<: *langfuse-worker-env + NEXTAUTH_URL: http://localhost:3000 + NEXTAUTH_SECRET: mysecret + LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-} + LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-} + LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-} + LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-} + LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-} + LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-} + LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-} + LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-} + LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-} + LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED: ${LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED:-false} + LANGFUSE_READ_FROM_POSTGRES_ONLY: ${LANGFUSE_READ_FROM_POSTGRES_ONLY:-false} + LANGFUSE_READ_FROM_CLICKHOUSE_ONLY: ${LANGFUSE_READ_FROM_CLICKHOUSE_ONLY:-true} + LANGFUSE_RETURN_FROM_CLICKHOUSE: ${LANGFUSE_RETURN_FROM_CLICKHOUSE:-true} + + clickhouse: + image: clickhouse/clickhouse-server + restart: always + user: "101:101" + container_name: clickhouse + hostname: clickhouse + environment: + CLICKHOUSE_DB: default + CLICKHOUSE_USER: clickhouse + CLICKHOUSE_PASSWORD: clickhouse + volumes: + - langfuse_clickhouse_data:/var/lib/clickhouse + - langfuse_clickhouse_logs:/var/log/clickhouse-server + ports: + - "8123:8123" + - "9000:9000" + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1 + interval: 5s + timeout: 5s + retries: 10 + start_period: 1s + + minio: + image: minio/minio + restart: always + container_name: minio + entrypoint: sh + # create the 'langfuse' bucket before starting the service + command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data' + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: miniosecret + ports: + - "9090:9000" + - "9091:9001" + volumes: + - langfuse_minio_data:/data + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 1s + timeout: 5s + retries: 5 + start_period: 1s + + redis: + image: redis:7 + restart: always + command: > + --requirepass ${REDIS_AUTH:-myredissecret} + ports: + - 6379:6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 3s + timeout: 10s + retries: 10 + + postgres: + image: postgres:${POSTGRES_VERSION:-latest} + restart: always + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 3s + timeout: 3s + retries: 10 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: postgres + ports: + - 5432:5432 + volumes: + - langfuse_postgres_data:/var/lib/postgresql/data + +volumes: + langfuse_postgres_data: + driver: local + langfuse_clickhouse_data: + driver: local + langfuse_clickhouse_logs: + driver: local + langfuse_minio_data: + driver: local diff --git a/scripts/setup_langfuse.sh b/scripts/setup_langfuse.sh new file mode 100755 index 00000000..dd782896 --- /dev/null +++ b/scripts/setup_langfuse.sh @@ -0,0 +1,118 @@ +#!/bin/bash + +# setup_langfuse.sh +# +# This script sets up and runs Langfuse locally for development and testing purposes. +# +# Key functionalities: +# 1. Downloads the latest docker-compose.yaml from the Langfuse repository +# 2. Starts Langfuse using Docker Compose with default initialization variables +# 3. Waits for the service to be available +# 4. Launches a browser to open the local Langfuse UI +# 5. Prints login credentials from the environment file +# +# Usage: +# ./setup_langfuse.sh +# +# Requirements: +# - Docker +# - curl +# - A .env.langfuse.local file in the env directory +# +# Note: This script is intended for local development use only. + +set -e + +SCRIPT_DIR=$(realpath "$(dirname "${BASH_SOURCE[0]}")") +LANGFUSE_DOCKER_COMPOSE_URL="https://raw.githubusercontent.com/langfuse/langfuse/main/docker-compose.yml" +LANGFUSE_DOCKER_COMPOSE_FILE="langfuse-docker-compose.yaml" +LANGFUSE_ENV_FILE="$SCRIPT_DIR/.env.langfuse.local" + +check_dependencies() { + local dependencies=("curl" "docker") + local missing_dependencies=() + + for cmd in "${dependencies[@]}"; do + if ! command -v "$cmd" &> /dev/null; then + missing_dependencies+=("$cmd") + fi + done + + if [ ${#missing_dependencies[@]} -ne 0 ]; then + echo "Missing dependencies: ${missing_dependencies[*]}" + exit 1 + fi +} + +download_docker_compose() { + if ! curl --fail --location --output "$SCRIPT_DIR/langfuse-docker-compose.yaml" "$LANGFUSE_DOCKER_COMPOSE_URL"; then + echo "Failed to download docker-compose file from $LANGFUSE_DOCKER_COMPOSE_URL" + exit 1 + fi +} + +start_docker_compose() { + docker compose --env-file "$LANGFUSE_ENV_FILE" -f "$LANGFUSE_DOCKER_COMPOSE_FILE" up --detach +} + +wait_for_service() { + echo "Waiting for Langfuse to start..." + local retries=10 + local count=0 + until curl --silent http://localhost:3000 > /dev/null; do + ((count++)) + if [ "$count" -ge "$retries" ]; then + echo "Max retries reached. Langfuse did not start in time." + exit 1 + fi + sleep 1 + done + echo "Langfuse is now available!" +} + +launch_browser() { + if [[ "$OSTYPE" == "linux-gnu"* ]]; then + xdg-open "http://localhost:3000" + elif [[ "$OSTYPE" == "darwin"* ]]; then + open "http://localhost:3000" + else + echo "Please open http://localhost:3000 to view Langfuse traces." + fi +} + +get_project_keys() { + local public_key + local secret_key + + if [ -f "$LANGFUSE_ENV_FILE" ]; then + # Try to get keys from env file + public_key=$(grep -E "^LANGFUSE_INIT_PROJECT_PUBLIC_KEY=" "$LANGFUSE_ENV_FILE" | cut -d'=' -f2) + secret_key=$(grep -E "^LANGFUSE_INIT_PROJECT_SECRET_KEY=" "$LANGFUSE_ENV_FILE" | cut -d'=' -f2) + fi + + # Use environment variables if set, otherwise use defaults + echo "export LANGFUSE_INIT_PROJECT_PUBLIC_KEY=${public_key:-$DEFAULT_PUBLIC_KEY}" + echo "export LANGFUSE_INIT_PROJECT_SECRET_KEY=${secret_key:-$DEFAULT_SECRET_KEY}" +} + +print_login_variables() { + if [ -f "$LANGFUSE_ENV_FILE" ]; then + echo "If not already logged in use the following credentials to log in:" + grep -E "LANGFUSE_INIT_USER_EMAIL|LANGFUSE_INIT_USER_PASSWORD" "$LANGFUSE_ENV_FILE" + else + echo "Langfuse environment file with local credentials not found." + fi + echo + echo "Project keys (add these to your environment):" + get_project_keys + echo +} + +check_dependencies +pushd "$SCRIPT_DIR" > /dev/null +download_docker_compose +start_docker_compose +wait_for_service +print_login_variables +launch_browser +popd > /dev/null \ No newline at end of file