Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
feat: make service gateway response configurable (#121)
Browse files Browse the repository at this point in the history
* Now when the agent fails by default sets a new header
"langstream-error-type" which has two possible values: INTERNAL_ERROR or
INVALID_RECORD. The default is INTERNAL_ERROR.
* When the service gateway gets the message it looks for this header and
decide the status code (INVALID_RECORD -> 400, INTERNAL_ERROR -> 500).
**This is a breaking change** since before that it was always returning
200. It also looks the following header until one it's not blank:
langstream-error-message, langstream-error-cause-message,
langstream-error-root-cause-message.
* These new headers are sent to the deadletter along with the existing
headers: error-msg, cause-msg, root-cause-msg (note that those headers
are still present to not break compatibility but they're not checked by
the gateway)
* From the agent perspective, to set an error as INVALID_RECORD it can
be done in this way:
  1. In Java agents, you can emit the result with appropriate type
  2. In Python you can raise an Exception in the sink or processor 
  ```
  def process(self, record):
        logging.info("Processing record" + str(record))
        from langstream import InvalidRecordError
        raise InvalidRecordError("record was not ok:" + str(record))
  ```
  Any other exception will treat the error as INTERNAL_ERROR
  • Loading branch information
nicoloboschi authored Aug 7, 2024
1 parent ac5aeaf commit 8f151de
Show file tree
Hide file tree
Showing 51 changed files with 1,338 additions and 360 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ jobs:
- name: Agents
test_cmd: ./mvnw verify -f langstream-agents $MAVEN_COMMON_SKIP_FLAGS
- name: Other
setup_python: "true"
test_cmd: |
exclude_modules="$(cd langstream-agents && ls -d langstream-* | sed 's/^/!:/g' | tr '\n' ',' | sed 's/,$//'),!langstream-agents,!langstream-webservice,!:langstream-api-gateway,!:langstream-k8s-deployer-operator,!:langstream-runtime-impl"
./mvnw verify -pl $exclude_modules $MAVEN_COMMON_SKIP_FLAGS
# python + unit tests for runtime-impl
./mvnw package -pl ":langstream-runtime-impl" -ntp -Dspotless.skip -Dlicense.skip
steps:
- name: Free Disk Space (Ubuntu)
Expand Down
5 changes: 4 additions & 1 deletion docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ elif [ "$only_image" == "cli" ]; then
build_docker_image langstream-cli
elif [ "$only_image" == "api-gateway" ]; then
build_docker_image langstream-api-gateway
else
elif [ "$only_image" == "" ]; then
# Always clean to remove old NARs and cached docker images in the "target" directory
./mvnw clean install -Pdocker -Ddocker.platforms="$(docker_platforms)" $common_flags
docker images | head -n 6
else
echo "Unknown image type: $only_image. Valid values are: control-plane, operator, deployer, runtime-base-docker-image, runtime, runtime-tester, cli, api-gateway"
exit 1
fi


Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package ai.langstream.agents.camel;

import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentSource;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.*;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -240,7 +237,8 @@ public void commit(List<Record> records) throws Exception {
}

@Override
public void permanentFailure(Record record, Exception error) throws Exception {
public void permanentFailure(Record record, Exception error, ErrorTypes errorType)
throws Exception {
CamelRecord camelRecord = (CamelRecord) record;
log.info("Record {} failed", camelRecord);
camelRecord.exchange.setException(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.agents.grpc;

import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.ErrorTypes;
import ai.langstream.api.runner.code.RecordSink;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -112,9 +113,14 @@ private SourceRecordAndResult fromGrpc(
throws IOException {
List<ai.langstream.api.runner.code.Record> resultRecords = new ArrayList<>();
if (result.hasError()) {
// TODO: specialize exception ?
final ErrorTypes errorType;
if (result.hasErrorType()) {
errorType = ErrorTypes.valueOf(result.getErrorType().toUpperCase());
} else {
errorType = ErrorTypes.INTERNAL_ERROR;
}
return new SourceRecordAndResult(
sourceRecord, null, new RuntimeException(result.getError()));
sourceRecord, null, new RuntimeException(result.getError()), errorType);
}
for (Record record : result.getRecordsList()) {
resultRecords.add(fromGrpc(record));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.agents.grpc;

import ai.langstream.api.runner.code.AgentSource;
import ai.langstream.api.runner.code.ErrorTypes;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.util.ConfigurationUtils;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -72,14 +73,19 @@ public List<Record> read() throws Exception {
}

@Override
public void permanentFailure(Record record, Exception error) {
public void permanentFailure(Record record, Exception error, ErrorTypes errorType)
throws Exception {
if (record instanceof GrpcAgentRecord grpcAgentRecord) {
request.onNext(
SourceRequest.newBuilder()
.setPermanentFailure(
PermanentFailure.newBuilder()
.setRecordId(grpcAgentRecord.id())
.setErrorMessage(error.getMessage()))
.setErrorMessage(error.getMessage())
.setErrorType(
errorType == null
? ErrorTypes.INTERNAL_ERROR.toString()
: errorType.toString()))
.build());
} else {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ message TopicProducerResponse {
message PermanentFailure {
int64 record_id = 1;
string error_message = 2;
string error_type = 3;
}

message SourceRequest {
Expand Down Expand Up @@ -110,6 +111,7 @@ message ProcessorResult {
int64 record_id = 1;
optional string error = 2;
repeated Record records = 3;
optional string error_type = 4;
}

message SinkRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void testAvroAndSchema() throws Exception {
@Test
void testPermanentFailure() throws Exception {
List<Record> read = readRecords(source, 1);
source.permanentFailure(read.get(0), new RuntimeException("permanent-failure"));
source.permanentFailure(read.get(0), new RuntimeException("permanent-failure"), null);
assertEquals(testSourceService.permanentFailure.getRecordId(), 42);
assertEquals(testSourceService.permanentFailure.getErrorMessage(), "permanent-failure");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package ai.langstream.agents.pulsardlq;

import ai.langstream.api.runner.code.AbstractAgentCode;
import ai.langstream.api.runner.code.AgentSource;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.*;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.util.ConfigurationUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -40,41 +39,20 @@ public class PulsarDLQSource extends AbstractAgentCode implements AgentSource {
private PulsarClient pulsarClient;
private Consumer<byte[]> dlqTopicsConsumer;
private boolean includePartitioned;

private static class SimpleHeader implements Header {
private final String key;
private final Object value;

public SimpleHeader(String key, Object value) {
this.key = key;
this.value = value;
}

@Override
public String key() {
return key;
}

@Override
public Object value() {
return value;
}

@Override
public String valueAsString() {
if (value != null) {
return value.toString();
} else {
return null;
}
}
}
private int timeoutMs;

private static class PulsarRecord implements Record {
private final Message<byte[]> message;
private final Collection<Header> headers = new ArrayList<>();

public PulsarRecord(Message<byte[]> message) {
this.message = message;
Map<String, String> properties = message.getProperties();
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
headers.add(new SimpleRecord.SimpleHeader(entry.getKey(), entry.getValue()));
}
}
}

@Override
Expand Down Expand Up @@ -103,14 +81,6 @@ public Long timestamp() {

@Override
public Collection<Header> headers() {
Collection<Header> headers = new ArrayList<>();
Map<String, String> properties = message.getProperties();

if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
headers.add(new SimpleHeader(entry.getKey(), entry.getValue()));
}
}
return headers;
}

Expand All @@ -135,6 +105,7 @@ public void init(Map<String, Object> configuration) throws Exception {
dlqSuffix = ConfigurationUtils.getString("dlq-suffix", "-DLQ", configuration);
includePartitioned =
ConfigurationUtils.getBoolean("include-partitioned", false, configuration);
timeoutMs = ConfigurationUtils.getInt("timeout-ms", 0, configuration);
log.info("Initializing PulsarDLQSource with pulsarUrl: {}", pulsarUrl);
log.info("Namespace: {}", namespace);
log.info("Subscription: {}", subscription);
Expand All @@ -144,10 +115,11 @@ public void init(Map<String, Object> configuration) throws Exception {

@Override
public void start() throws Exception {
log.info("Starting pulsar client {}", pulsarUrl);
pulsarClient = PulsarClient.builder().serviceUrl(pulsarUrl).build();
// The maximum lenth of the regex is 50 characters
// Uisng the persistent:// prefix generally works better, but
// it can push the partitioned pattern over the 50 character limit, so
// The maximum length of the regex is 50 characters
// Using the persistent:// prefix generally works better, but
// it can push the partitioned pattern over the 50 characters limit, so
// we drop it for partitioned topics
String patternString = "persistent://" + namespace + "/.*" + dlqSuffix;
if (includePartitioned) {
Expand All @@ -167,15 +139,26 @@ public void start() throws Exception {
@Override
public void close() throws Exception {
super.close();
dlqTopicsConsumer.close();
pulsarClient.close();
if (dlqTopicsConsumer != null) {
dlqTopicsConsumer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
}

@Override
public List<Record> read() throws Exception {

Message<byte[]> msg = dlqTopicsConsumer.receive();

Message<byte[]> msg;
if (timeoutMs > 0) {
msg = dlqTopicsConsumer.receive(timeoutMs, TimeUnit.MILLISECONDS);
} else {
msg = dlqTopicsConsumer.receive();
}
if (msg == null) {
log.debug("No message received");
return List.of();
}
log.info("Received message: {}", new String(msg.getData()));
Record record = new PulsarRecord(msg);
return List.of(record);
Expand All @@ -185,12 +168,13 @@ public List<Record> read() throws Exception {
public void commit(List<Record> records) throws Exception {
for (Record r : records) {
PulsarRecord record = (PulsarRecord) r;
dlqTopicsConsumer.acknowledge(record.messageId()); // acknowledge the message
dlqTopicsConsumer.acknowledge(record.messageId());
}
}

@Override
public void permanentFailure(Record record, Exception error) throws Exception {
public void permanentFailure(Record record, Exception error, ErrorTypes errorType)
throws Exception {
PulsarRecord pulsarRecord = (PulsarRecord) record;
log.error("Failure on record {}", pulsarRecord, error);
dlqTopicsConsumer.negativeAcknowledge(pulsarRecord.messageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public void start() throws Exception {

@Override
public void process(List<Record> records, RecordSink recordSink) {
log.info("got to process!{}", records);
for (Record record : records) {
processRecord(record, recordSink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ public void setup(
}

public void startReadingAsync(
Executor executor, Supplier<Boolean> stop, Consumer<String> onMessage) {
Executor executor,
Supplier<Boolean> stop,
Consumer<ConsumePushMessage> onMessage,
Consumer<Throwable> onError) {
if (requestContext == null || reader == null) {
throw new IllegalStateException("Not initialized");
}
Expand All @@ -142,16 +145,23 @@ public void startReadingAsync(
log.debug("[{}] Started reader", logRef);
readMessages(stop, onMessage);
} catch (Throwable ex) {
log.error("[{}] Error reading messages", logRef, ex);
throw new RuntimeException(ex);
} finally {
closeReader();
}
},
executor);
readerFuture.whenComplete(
(v, ex) -> {
if (ex != null) {
log.error("[{}] Error reading messages", logRef, ex);
onError.accept(ex);
}
});
}

private void readMessages(Supplier<Boolean> stop, Consumer<String> onMessage) throws Exception {
private void readMessages(Supplier<Boolean> stop, Consumer<ConsumePushMessage> onMessage)
throws Exception {
while (true) {
if (Thread.interrupted() || interrupted) {
return;
Expand Down Expand Up @@ -182,8 +192,7 @@ private void readMessages(Supplier<Boolean> stop, Consumer<String> onMessage) th
new ConsumePushMessage.Record(
record.key(), record.value(), messageHeaders),
offset);
final String jsonMessage = mapper.writeValueAsString(message);
onMessage.accept(jsonMessage);
onMessage.accept(message);
}
}
}
Expand Down
Loading

0 comments on commit 8f151de

Please sign in to comment.