Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] langfuse tracing layer #498

Merged
merged 28 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2c4d9d1
working langfuse initial implementation
ahau-square Dec 18, 2024
8cf258b
simplified tracing implementation
ahau-square Dec 19, 2024
7e19d72
merged
ahau-square Dec 19, 2024
ed5a321
remove file logging
ahau-square Dec 19, 2024
928398c
add file writing layer to tracing
ahau-square Dec 22, 2024
ff8238d
use generation type instead
ahau-square Dec 24, 2024
c67af0b
refactor observation layer
ajgray-stripe Dec 23, 2024
e3f5ce2
consolidate tool traces
ahau-square Jan 3, 2025
fe67482
consolidate tool tracing
ahau-square Jan 3, 2025
e96c084
migrated tracing into core goose
ahau-square Jan 3, 2025
697b1e2
add langfuse starter script and update justfile
ahau-square Jan 3, 2025
926e412
tracing added to server
ahau-square Jan 6, 2025
6c21f7b
Merge branch 'v1.0' into ahau/1.0-tracing-langfuse
ahau-square Jan 7, 2025
3ca25e7
formatting
ahau-square Jan 7, 2025
7cd41f9
save cli session traces to have same session name for log file name
ahau-square Jan 7, 2025
7fa0ebc
add tracing on other providers
ahau-square Jan 7, 2025
6beb3f6
reformat
ahau-square Jan 7, 2025
c5df60a
add tests
ahau-square Jan 7, 2025
375b33e
formatting
ahau-square Jan 7, 2025
cc509aa
merged
ahau-square Jan 7, 2025
8876ae9
Merge v1.0: Resolve conflicts with tracing implementation
ahau-square Jan 7, 2025
75f9719
Merge branch 'v1.0' into ahau/1.0-tracing-langfuse
ahau-square Jan 7, 2025
7652e27
merged with cli working with tracing
ahau-square Jan 7, 2025
b1efec5
tracing added back into server after merge
ahau-square Jan 7, 2025
9409351
fix tests
ahau-square Jan 7, 2025
a25ed46
remove agent.rs mistakenly added in merge
ahau-square Jan 8, 2025
f646c13
fix log levels on log file
ahau-square Jan 8, 2025
aa7780f
format
ahau-square Jan 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ run-ui:
run-server:
@echo "Running server..."
cargo run -p goose-server

# Setup langfuse server
langfuse-server:
#!/usr/bin/env bash
./scripts/setup_langfuse.sh
6 changes: 4 additions & 2 deletions crates/goose-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ serde_yaml = "0.9"
dirs = "4.0"
strum = "0.26"
strum_macros = "0.26"
reqwest = "0.11.27"
reqwest = { version = "0.11.27", features = ["json"] }
rand = "0.8.5"
async-trait = "0.1"
rustyline = "15.0.0"
rust_decimal = "1.36.0"
rust_decimal_macros = "1.36.0"
tracing = "0.1"
chrono = "0.4"
parking_lot = "0.12.3"

[dev-dependencies]
tempfile = "3"
temp-env = { version = "0.3.6", features = ["async_closure"] }

3 changes: 3 additions & 0 deletions crates/goose-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use commands::session::build_session;
use commands::version::print_version;
use profile::has_no_profiles;
use std::io::{self, Read};
use goose::logging::setup_logging;

mod log_usage;

Expand Down Expand Up @@ -195,6 +196,8 @@ enum CliProviderVariant {

#[tokio::main]
async fn main() -> Result<()> {
setup_logging()?;

let cli = Cli::parse();

if cli.version {
Expand Down
3 changes: 3 additions & 0 deletions crates/goose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ xcap = "0.0.14"
libc = "=0.2.167"
lazy_static = "1.5"
kill_tree = "0.2.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt", "json", "time"] }
tracing-appender = "0.2"

keyring = { version = "3.6.1", features = [
"apple-native",
Expand Down
43 changes: 38 additions & 5 deletions crates/goose/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rust_decimal_macros::dec;
use serde_json::json;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tracing::{debug, instrument, warn};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will have some conflicts with this file with the mcp changes, but we can forward port easily i think


use crate::errors::{AgentError, AgentResult};
use crate::message::{Message, ToolRequest};
Expand Down Expand Up @@ -107,11 +108,13 @@ impl Agent {
}

/// Dispatch a single tool call to the appropriate system
#[instrument(skip(self, tool_call), fields(input, output))]
async fn dispatch_tool_call(
&self,
tool_call: AgentResult<ToolCall>,
) -> AgentResult<Vec<Content>> {
let call = tool_call?;

let system = self
.get_system_for_tool(&call.name)
.ok_or_else(|| AgentError::ToolNotFound(call.name.clone()))?;
Expand All @@ -122,8 +125,13 @@ impl Agent {
.nth(1)
.ok_or_else(|| AgentError::InvalidToolName(call.name.clone()))?;
let system_tool_call = ToolCall::new(tool_name, call.arguments);
let result = system.call(system_tool_call.clone()).await;

tracing::debug!("input"=serde_json::to_string(&system_tool_call).unwrap(),
"output"=serde_json::to_string(&result).unwrap(),
);

system.call(system_tool_call).await
result
}

fn get_system_prompt(&self) -> AgentResult<String> {
Expand Down Expand Up @@ -213,7 +221,12 @@ impl Agent {
let mut status_content: Vec<String> = Vec::new();

if approx_count > target_limit {
println!("[WARNING] Token budget exceeded. Current count: {} \n Difference: {} tokens over buget. Removing context", approx_count, approx_count - target_limit);
warn!(
approx_count,
target_limit,
overage = approx_count - target_limit,
"Token budget exceeded, removing context"
);

// Get token counts for each resourcee
let mut system_token_counts = HashMap::new();
Expand Down Expand Up @@ -326,9 +339,20 @@ impl Agent {

/// Create a stream that yields each message as it's generated by the agent.
/// This includes both the assistant's responses and any tool responses.
#[instrument(skip(self, messages), fields(user_message))]
pub async fn reply(&self, messages: &[Message]) -> Result<BoxStream<'_, Result<Message>>> {
let reply_span = tracing::Span::current();
let mut messages = messages.to_vec();
let tools = self.get_prefixed_tools();

// Set the user_message field in the span instead of creating a new event
if let Some(content) = messages.last()
.and_then(|msg| msg.content.first())
.and_then(|c| c.as_text())
{
reply_span.record("user_message", &content);
}

let system_prompt = self.get_system_prompt()?;
let estimated_limit = self.provider.get_model_config().get_estimated_limit();

Expand All @@ -344,6 +368,8 @@ impl Agent {
.await?;

Ok(Box::pin(async_stream::try_stream! {
let _reply_guard = reply_span.enter();

loop {
// Get completion from provider
let (response, usage) = self.provider.complete(
Expand All @@ -367,19 +393,26 @@ impl Agent {
.filter_map(|content| content.as_tool_request())
.collect();

let request_count = tool_requests.len();
if tool_requests.is_empty() {
// No more tool calls, end the reply loop
break;
}

// Then dispatch each in parallel
let futures: Vec<_> = tool_requests
.iter()
.map(|request| self.dispatch_tool_call(request.tool_call.clone()))
.map(|request| {
self.dispatch_tool_call(request.tool_call.clone())
})
.collect();



// Process all the futures in parallel but wait until all are finished
let outputs = futures::future::join_all(futures).await;
let outputs = futures::future::join_all(futures)
.await;

debug!("All tool requests completed");

// Create a message with the responses
let mut message_tool_response = Message::user();
Expand Down
2 changes: 2 additions & 0 deletions crates/goose/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub mod prompt_template;
pub mod providers;
pub mod systems;
pub mod token_counter;
pub mod tracing;
pub mod logging;
100 changes: 100 additions & 0 deletions crates/goose/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use anyhow::{Context, Result};
use serde_json::Value;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing_appender::rolling::Rotation;
use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter, Registry};
use tracing::dispatcher::set_global_default;

use crate::tracing::{langfuse_layer, observation_layer::{BatchManager, ObservationLayer, SpanTracker}};

struct ConsoleLogger {
batch: Vec<Value>,
}

impl ConsoleLogger {
fn new() -> Self {
Self {
batch: Vec::new(),
}
}
}

impl BatchManager for ConsoleLogger {
fn add_event(&mut self, _event_type: &str, body: Value) {
self.batch.push(body);
}

fn send(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.batch.clear();
Ok(())
}
}

fn get_log_directory() -> Result<PathBuf> {
let home = std::env::var("HOME").context("HOME environment variable not set")?;
let base_log_dir = PathBuf::from(home).join(".config").join("goose").join("logs");

// Create date-based subdirectory
let now = chrono::Local::now();
let date_dir = base_log_dir.join(now.format("%Y-%m-%d").to_string());

// Ensure log directory exists
fs::create_dir_all(&date_dir).context("Failed to create log directory")?;

Ok(date_dir)
}

fn create_observation_layer() -> ObservationLayer {
let batch_manager = Arc::new(Mutex::new(ConsoleLogger::new()));
ObservationLayer {
batch_manager,
span_tracker: Arc::new(Mutex::new(SpanTracker::new())),
}
}

pub fn setup_logging() -> Result<()> {
// Set up file appender for goose module logs
let log_dir = get_log_directory()?;
let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S%.3f").to_string();

// Create non-rolling file appender
let file_appender = tracing_appender::rolling::RollingFileAppender::new(
Rotation::NEVER,
log_dir,
&format!("{}.log", timestamp),
);

// Create JSON file logging layer
let file_layer = fmt::layer()
.with_target(true)
.with_level(true)
.with_writer(file_appender)
.with_ansi(false)
.with_file(true);

// Update filter to include debug level
let filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("goose=debug"));

// Build the base subscriber
let subscriber = Registry::default()
.with(file_layer)
.with(filter)
.with(create_observation_layer());

// Set up the dispatcher
let dispatcher = if let Some(langfuse) = langfuse_layer::create_langfuse_observer() {
subscriber.with(langfuse).into()
} else {
subscriber.into()
};

// Set the subscriber as the default
set_global_default(dispatcher)
.map_err(|e| anyhow::anyhow!("Failed to set global subscriber: {}", e))?;

Ok(())
}
14 changes: 11 additions & 3 deletions crates/goose/src/providers/databricks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl DatabricksProvider {

#[async_trait]
impl Provider for DatabricksProvider {
#[tracing::instrument(skip(self, system, messages, tools), fields(model_config, input, output, input_tokens, output_tokens, total_tokens, cost))]
async fn complete(
&self,
system: &str,
Expand Down Expand Up @@ -147,8 +148,7 @@ impl Provider for DatabricksProvider {
.collect(),
);

// Make request
let response = self.post(payload).await?;
let response = self.post(payload.clone()).await?;

// Raise specific error if context length is exceeded
if let Some(error) = response.get("error") {
Expand All @@ -165,7 +165,15 @@ impl Provider for DatabricksProvider {
let usage = Self::get_usage(&response)?;
let model = get_model(&response);
let cost = cost(&usage, &model_pricing_for(&model));

tracing::debug!(
model_config = %serde_json::to_string_pretty(&self.config).unwrap_or_default(),
input = %serde_json::to_string_pretty(&payload).unwrap_or_default(),
output = %serde_json::to_string_pretty(&response).unwrap_or_default(),
input_tokens = ?usage.input_tokens.unwrap_or_default(),
output_tokens = ?usage.output_tokens.unwrap_or_default(),
total_tokens = ?usage.total_tokens.unwrap_or_default(),
cost = ?cost.unwrap_or_default()
);
Ok((message, ProviderUsage::new(model, usage, cost)))
}

Expand Down
Loading