Skip to content

Commit

Permalink
Merge branch 'main' into 698-EnableNodePortIfMissing
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Burch committed Nov 8, 2023
2 parents 8dedabc + f73c28d commit a9e1dad
Show file tree
Hide file tree
Showing 30 changed files with 461 additions and 124 deletions.
272 changes: 248 additions & 24 deletions docker/metrics/dashboards/apps.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package ai.langstream.agents.camel;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import ai.langstream.api.runner.code.AgentCodeRegistry;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentSource;
import ai.langstream.api.runner.code.MetricsReporter;
import ai.langstream.api.runner.code.Record;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -56,7 +60,10 @@ private AgentSource buildAgentSource(String uri, Map<String, Object> componentOp
configs.put("component-uri", uri);
configs.put("component-options", componentOptions);
configs.put("max-buffered-records", 10);
AgentContext agentContext = mock(AgentContext.class);
when(agentContext.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
agentSource.init(configs);
agentSource.setContext(agentContext);
agentSource.start();
return agentSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package ai.langstream.agents.grpc;

import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -61,7 +60,6 @@ abstract class AbstractGrpcAgent extends AbstractAgentCode {
// Schemas received from the server
protected final Map<Integer, Object> serverSchemas = new ConcurrentHashMap<>();

protected AgentContext agentContext;
protected AgentServiceGrpc.AgentServiceBlockingStub blockingStub;

protected final AtomicBoolean restarting = new AtomicBoolean(false);
Expand Down Expand Up @@ -94,11 +92,6 @@ public void start() throws Exception {
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
}

@Override
public void setContext(AgentContext context) {
this.agentContext = context;
}

@Override
protected Map<String, Object> buildAdditionalInfo() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import ai.langstream.ai.agents.commons.JsonRecord;
import ai.langstream.ai.agents.commons.MutableRecord;
import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.RecordSink;
Expand Down Expand Up @@ -55,7 +54,6 @@ public class HttpRequestAgent extends AbstractAgentCode implements AgentProcesso

private final Map<Schema, Schema> avroKeySchemaCache = new ConcurrentHashMap<>();

private AgentContext agentContext;
private ExecutorService executor;
private HttpClient httpClient;
private String url;
Expand Down Expand Up @@ -115,11 +113,6 @@ public void init(Map<String, Object> configuration) {
.build();
}

@Override
public void setContext(AgentContext context) throws Exception {
this.agentContext = context;
}

@Override
public void process(List<Record> records, RecordSink recordSink) {
for (Record record : records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ai.langstream.ai.agents.commons.MutableRecord;
import ai.langstream.ai.agents.commons.jstl.JstlEvaluator;
import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.RecordSink;
Expand Down Expand Up @@ -61,7 +60,6 @@ record FieldDefinition(String name, JstlEvaluator<Object> expressionEvaluator) {

private final Map<Schema, Schema> avroKeySchemaCache = new ConcurrentHashMap<>();

private AgentContext agentContext;
private ExecutorService executor;
private LangServeClient langServeClient;
private String url;
Expand Down Expand Up @@ -133,11 +131,6 @@ public void init(Map<String, Object> configuration) {
.build());
}

@Override
public void setContext(AgentContext context) throws Exception {
this.agentContext = context;
}

@Override
public void process(List<Record> records, RecordSink recordSink) {
for (Record record : records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

import ai.langstream.api.runner.code.AgentCodeRegistry;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentSource;
import ai.langstream.api.runner.code.MetricsReporter;
import ai.langstream.api.runner.code.Record;
import io.minio.ListObjectsArgs;
import io.minio.MinioClient;
Expand Down Expand Up @@ -175,6 +179,9 @@ private AgentSource buildAgentSource(String bucket) throws Exception {
configs.put("endpoint", endpoint);
configs.put("bucketName", bucket);
agentSource.init(configs);
AgentContext context = mock(AgentContext.class);
when(context.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
agentSource.setContext(context);
agentSource.start();
return agentSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public void init(Map<String, Object> configuration) throws Exception {
}

@Override
public void setContext(AgentContext context) {
public void setContext(AgentContext context) throws Exception {
super.setContext(context);
String globalAgentId = context.getGlobalAgentId();
statusFileName = globalAgentId + ".webcrawler.status.json";
log.info("Status file is {}", statusFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import ai.langstream.ai.agents.commons.MutableRecord;
import ai.langstream.ai.agents.commons.jstl.predicate.JstlPredicate;
import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.RecordSink;
Expand All @@ -38,7 +37,6 @@ record Route(String destination, boolean drop, JstlPredicate predicate) {}

private final List<Route> routes = new ArrayList<>();
private final Map<String, TopicProducer> producers = new HashMap<>();
private AgentContext agentContext;

@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -77,11 +75,6 @@ public void init(Map<String, Object> configuration) {
});
}

@Override
public void setContext(AgentContext context) throws Exception {
this.agentContext = context;
}

@Override
public void start() throws Exception {
routes.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import ai.langstream.ai.agents.commons.jstl.JstlEvaluator;
import ai.langstream.ai.agents.commons.jstl.predicate.JstlPredicate;
import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.RecordSink;
Expand All @@ -41,7 +40,6 @@ record FieldDefinition(String name, JstlEvaluator<Object> expressionEvaluator) {
private TopicProducer topicProducer;
private String destination;
private JstlPredicate predicate;
private AgentContext agentContext;

private boolean continueProcessing;

Expand Down Expand Up @@ -71,11 +69,6 @@ public ComponentType componentType() {
return ComponentType.PROCESSOR;
}

@Override
public void setContext(AgentContext context) throws Exception {
this.agentContext = context;
}

private Record emitRecord(Record originalRecord) {
MutableRecord mutableRecord =
MutableRecord.recordToMutableRecord(originalRecord, true).copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class GenAIToolKitAgent extends AbstractAgentCode implements AgentProcess
private TransformStepConfig config;
private QueryStepDataSource dataSource;
private ServiceProvider serviceProvider;
private AgentContext agentContext;
private Map<String, Object> configuration;

private TopicProducerStreamingAnswersConsumerFactory streamingAnswersConsumerFactory;
Expand All @@ -67,11 +66,6 @@ public ComponentType componentType() {
return ComponentType.PROCESSOR;
}

@Override
public void setContext(AgentContext context) {
this.agentContext = context;
}

@Override
public void process(List<Record> records, RecordSink recordSink) {
if (records == null || records.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void init(Map<String, Object> configuration) {

@Override
public void setContext(AgentContext context) throws Exception {
super.setContext(context);
this.loopTopicProducer =
context.getTopicConnectionProvider()
.createProducer(context.getGlobalAgentId(), loopTopic, Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ public synchronized void accept(
com.azure.ai.openai.models.ChatCompletions chatCompletions) {
List<com.azure.ai.openai.models.ChatChoice> choices = chatCompletions.getChoices();
String answerId = chatCompletions.getId();
log.info("Chat completions chunk: {}", chatCompletions);
log.info("Chat completions chunk:usage: {}", chatCompletions.getUsage());
if (chatCompletions.getUsage() != null) {
totalTokens.addAndGet(chatCompletions.getUsage().getTotalTokens());
completionTokens.addAndGet(chatCompletions.getUsage().getCompletionTokens());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public void close() throws Exception {
public CompletableFuture<?> write(Record record) {
// naive implementation, no batching
Map<String, Object> context = Map.of();
return writer.upsert(record, context);
return writer.upsert(record, context).thenRun(() -> processed(1, 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package ai.langstream.agents.vector.datasource.impl;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import ai.langstream.agents.vector.VectorDBSinkAgent;
import ai.langstream.api.runner.code.AgentCodeRegistry;
import ai.langstream.api.runner.code.AgentContext;
import ai.langstream.api.runner.code.MetricsReporter;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.SimpleRecord;
import com.datastax.oss.driver.api.core.CqlSession;
Expand Down Expand Up @@ -74,7 +78,10 @@ void testWrite() throws Exception {
configuration.put("table", "vsearch.products");
configuration.put("mapping", "id=value.id,description=value.description,name=value.name");

AgentContext agentContext = mock(AgentContext.class);
when(agentContext.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
agent.init(configuration);
agent.setContext(agentContext);
agent.start();
List<Record> committed = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -114,7 +121,10 @@ void testWriteAstra() throws Exception {
configuration.put("table", "vsearch.products");
configuration.put("mapping", "id=value.id,description=value.description,name=value.name");

AgentContext agentContext = mock(AgentContext.class);
when(agentContext.getMetricsReporter()).thenReturn(MetricsReporter.DISABLED);
agent.init(configuration);
agent.setContext(agentContext);
agent.start();
List<Record> committed = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Base class for AgentCode implementations. It provides default implementations for the Agent
* identity and AgentInfo methods.
*/
public abstract class AbstractAgentCode implements AgentCode {
private final AtomicLong totalIn = new AtomicLong();
private final AtomicLong totalOut = new AtomicLong();
private MetricsReporter.Counter totalIn;
private MetricsReporter.Counter totalOut;
private String agentId;
private String agentType;
private long startedAt;
private long lastProcessedAt;

protected AgentContext agentContext;

@Override
public final String agentId() {
return agentId;
Expand All @@ -52,10 +53,31 @@ public final void setMetadata(String id, String agentType, long startedAt) {
this.startedAt = startedAt;
}

@Override
public void setContext(AgentContext context) throws Exception {
this.agentContext = context;

totalIn = MetricsReporter.Counter.NOOP;
totalOut = MetricsReporter.Counter.NOOP;

// this is the main reported for the executor, we can use it to report metrics
// about the whole execution in the pipeline
MetricsReporter reporter = context.getMetricsReporter();
switch (componentType()) {
case SOURCE -> totalOut =
reporter.counter("source_out", "Total number of records emitted by the source");
case SINK -> totalIn =
reporter.counter("sink_in", "Total number of records received by the sink");
}
}

public void processed(long countIn, long countOut) {
if (totalIn == null) {
throw new IllegalStateException("setContext has not been called");
}
lastProcessedAt = System.currentTimeMillis();
totalIn.addAndGet(countIn);
totalOut.addAndGet(countOut);
totalIn.count(countIn);
totalOut.count(countOut);
}

/**
Expand All @@ -76,6 +98,6 @@ public List<AgentStatusResponse> getAgentStatus() {
componentType().name(),
buildAdditionalInfo(),
new AgentStatusResponse.Metrics(
totalIn.get(), totalOut.get(), startedAt(), lastProcessedAt)));
totalIn.value(), totalOut.value(), startedAt(), lastProcessedAt)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ public Counter counter(String name, String help) {
}
};

default MetricsReporter withAgentName(String prefix) {
default MetricsReporter withAgentName(String agentName) {
return this;
}

Counter counter(String name, String help);

default MetricsReporter withPodName(String podName) {
return this;
}

interface Counter {

Counter NOOP = (value) -> {};
Counter NOOP = new SimpleCounter();

void count(long value);

void count(int value);
long value();
}
}
Loading

0 comments on commit a9e1dad

Please sign in to comment.