Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Aug 6, 2024
1 parent 9db312b commit d67fdba
Show file tree
Hide file tree
Showing 21 changed files with 482 additions and 239 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ jobs:
test_cmd: |
exclude_modules="$(cd langstream-agents && ls -d langstream-* | sed 's/^/!:/g' | tr '\n' ',' | sed 's/,$//'),!langstream-agents,!langstream-webservice,!:langstream-api-gateway,!:langstream-k8s-deployer-operator,!:langstream-runtime-impl"
./mvnw verify -pl $exclude_modules $MAVEN_COMMON_SKIP_FLAGS
# python + unit tests for runtime-impl
./mvnw package -pl langstream-runtime-impl -ntp -Dspotless.skip -Dlicense.skip
steps:
- name: 'Setup: checkout project'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ai.langstream.agents.grpc;

import ai.langstream.api.runner.code.AgentProcessor;
import ai.langstream.api.runner.code.ErrorTypes;
import ai.langstream.api.runner.code.RecordSink;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -112,9 +113,14 @@ private SourceRecordAndResult fromGrpc(
throws IOException {
List<ai.langstream.api.runner.code.Record> resultRecords = new ArrayList<>();
if (result.hasError()) {
// TODO: specialize exception ?
final ErrorTypes errorType;
if (result.hasErrorType()) {
errorType = ErrorTypes.valueOf(result.getErrorType().toUpperCase());
} else {
errorType = ErrorTypes.INTERNAL_ERROR;
}
return new SourceRecordAndResult(
sourceRecord, null, new RuntimeException(result.getError()));
sourceRecord, null, new RuntimeException(result.getError()), errorType);
}
for (Record record : result.getRecordsList()) {
resultRecords.add(fromGrpc(record));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,12 @@ private static String computeErrorMessage(Exception e) {
final HttpResponse<?> response = httpRequestFailedException.getResponse();
if (response != null) {
Object body = httpRequestFailedException.getResponse().body();
msg += String.format(" with code %d:", response.statusCode());
if (body != null) {
if (body instanceof byte[]) {
body = new String((byte[]) body, StandardCharsets.UTF_8);
}
msg += String.format(": %s", body);
} else {
msg += String.format(": %s", response.statusCode());
}
}
return msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package ai.langstream.cli.commands.gateway;

import ai.langstream.admin.client.HttpRequestFailedException;
import ai.langstream.cli.api.model.Gateways;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.net.URI;
Expand Down Expand Up @@ -131,7 +132,6 @@ private void produceHttp(
getClient()
.getHttpClientFacade()
.http(request, HttpResponse.BodyHandlers.ofString());
log("Response: " + response.statusCode());
log(response.body());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import ai.langstream.tests.util.TestSuites;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
Expand All @@ -47,16 +48,24 @@ public void test() throws Exception {

deployLocalApplicationAndAwaitReady(
tenant, applicationId, "python-processor-with-dlq", appEnv, 2);
String output =
executeCommandOnClient(
CompletableFuture<String> commandResult =
executeCommandOnClientAsync(
"bin/langstream gateway service %s svc -v '{\"my-schema\":true}' --connect-timeout 60"
.formatted(applicationId)
.split(" "));

log.info("Output: {}", output);
Assertions.assertTrue(output.contains("Response: 400"));
Assertions.assertTrue(output.contains("record was not ok:"));

commandResult.whenComplete(
(output, throwable) -> {
if (throwable == null) {
Assertions.fail("Expected exception");
}
CommandExecFailedException failedException =
(CommandExecFailedException) throwable;
log.info("Error: {}", failedException.toString());
String stderr = failedException.getStderr();
Assertions.assertTrue(stderr.contains("with code: 400"));
Assertions.assertTrue(stderr.contains("record was not ok:"));
});
commandResult.get();
deleteAppAndAwaitCleanup(tenant, applicationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -223,17 +223,14 @@ private static Pod getFirstPodFromDeployment(String deploymentName) {

@SneakyThrows
public static String executeCommandOnClient(String... args) {
return executeCommandOnClient(2, TimeUnit.MINUTES, args);
return executeCommandOnClientAsync(args).get(2, TimeUnit.MINUTES);
}

@SneakyThrows
protected static String executeCommandOnClient(long timeout, TimeUnit unit, String... args) {
protected static CompletableFuture<String> executeCommandOnClientAsync(String... args) {
final Pod pod = getFirstPodFromDeployment("langstream-client");
return execInPod(
pod.getMetadata().getName(),
pod.getSpec().getContainers().get(0).getName(),
args)
.get(timeout, unit);
pod.getMetadata().getName(), pod.getSpec().getContainers().get(0).getName(), args);
}

@SneakyThrows
Expand Down Expand Up @@ -275,6 +272,15 @@ public static CompletableFuture<String> execInPod(
return execInPodInNamespace(namespace, podName, containerName, cmds);
}

@AllArgsConstructor
@Getter
@ToString
public static class CommandExecFailedException extends RuntimeException {
private final String command;
private final String stdout;
private final String stderr;
}

public static CompletableFuture<String> execInPodInNamespace(
String namespace, String podName, String containerName, String... cmds) {

Expand All @@ -298,13 +304,17 @@ public void onFailure(Throwable t, Response failureResponse) {
if (!completed.compareAndSet(false, true)) {
return;
}
String errString = error.toString(StandardCharsets.UTF_8);
String outString = out.toString(StandardCharsets.UTF_8);
log.warn(
"Error executing {} encountered; \nstderr: {}\nstdout: {}",
cmd,
error.toString(StandardCharsets.UTF_8),
out.toString(),
errString,
outString,
t);
response.completeExceptionally(t);
CommandExecFailedException commandExecFailedException =
new CommandExecFailedException(cmd, errString, outString);
response.completeExceptionally(commandExecFailedException);
}

@Override
Expand All @@ -313,18 +323,18 @@ public void onExit(int code, Status status) {
return;
}
if (code != 0) {
String errString = error.toString(StandardCharsets.UTF_8);
String outString = out.toString(StandardCharsets.UTF_8);
log.warn(
"Error executing {} encountered; \ncode: {}\n stderr: {}\nstdout: {}",
cmd,
code,
error.toString(StandardCharsets.UTF_8),
out.toString(StandardCharsets.UTF_8));
response.completeExceptionally(
new RuntimeException(
"Command failed with err code: "
+ code
+ ", stderr: "
+ error.toString(StandardCharsets.UTF_8)));
errString,
outString);

CommandExecFailedException commandExecFailedException =
new CommandExecFailedException(cmd, errString, outString);
response.completeExceptionally(commandExecFailedException);
} else {
log.info(
"Command completed {}; \nstderr: {}\nstdout: {}",
Expand All @@ -340,12 +350,13 @@ public void onClose(int rc, String reason) {
if (!completed.compareAndSet(false, true)) {
return;
}
String outString = out.toString(StandardCharsets.UTF_8);
log.info(
"Command completed {}; \nstderr: {}\nstdout: {}",
cmd,
error.toString(StandardCharsets.UTF_8),
out.toString(StandardCharsets.UTF_8));
response.complete(out.toString(StandardCharsets.UTF_8));
outString);
response.complete(outString);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ pipeline:
type: "python-processor"
input: ls-test-topic0
output: ls-test-topic1
errors:
on-failure: dead-letter
configuration:
className: example.FailProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ class FailProcessor(Processor):

def process(self, record):
logging.info("Processing record" + str(record))
from langstream import InvalidRecordException
raise InvalidRecordException("record was not ok:" + str(record))
from langstream import InvalidRecordError
raise InvalidRecordError("record was not ok:" + str(record))
4 changes: 2 additions & 2 deletions langstream-runtime/langstream-runtime-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@
<goal>exec</goal>
</goals>
<configuration>
<workingDirectory>${project.build.directory}/python</workingDirectory>
<workingDirectory>${project.basedir}/src/main/python</workingDirectory>
<executable>poetry</executable>
<arguments>
<argument>run</argument>
Expand All @@ -501,7 +501,7 @@
<goal>exec</goal>
</goals>
<configuration>
<workingDirectory>${project.build.directory}/python</workingDirectory>
<workingDirectory>${project.basedir}/src/main/python</workingDirectory>
<executable>poetry</executable>
<skip>${skipPythonPackage}</skip>
<arguments>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,8 @@ private static void runProcessorAgent(
new AgentProcessor.SourceRecordAndResult(
sourceRecord,
List.of(),
permanentFailureException));
permanentFailureException,
result.errorType()));
} else {
// in case the source does not throw an exception we mark
// the record as "skipped"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ public void commit(List<Record> records) throws Exception {

@Override
public void permanentFailure(Record record, Exception error, ErrorTypes errorType) {
// DLQ
log.error("Permanent failure on record {}", record, error);
errorType = errorType == null ? ErrorTypes.INTERNAL_ERROR : errorType;
log.error("Permanent {} failure on record {}", errorType, record, error);
MutableRecord recordWithError = MutableRecord.recordToMutableRecord(record, false);
String sourceTopic = record.origin();

errorType = errorType == null ? ErrorTypes.INTERNAL_ERROR : errorType;
recordWithError.setProperty(
SystemHeaders.ERROR_HANDLING_ERROR_TYPE.getKey(), errorType.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@
"SimpleRecord",
"AvroValue",
"AgentContext",
"InvalidRecordError"
"InvalidRecordError",
]
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ class AvroValue(object):


class InvalidRecordError(Exception):
pass
pass
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from langstream_grpc.proto.agent_pb2_grpc import AgentServiceServicer
from .api import Source, Sink, Processor, Record, Agent, AgentContext, TopicProducer
from .util import SimpleRecord, AvroValue, InvalidRecordError
from .util import SimpleRecord, AvroValue


class RecordWithId(SimpleRecord):
Expand Down Expand Up @@ -161,7 +161,7 @@ async def handle_read_requests(self, context, read_records):
"permanent_failure",
record,
RuntimeError(failure.error_message),
failure.error_type
failure.error_type,
)
request = await context.read()

Expand Down Expand Up @@ -207,13 +207,12 @@ async def process(self, requests: AsyncIterable[ProcessorRequest], _):
yield ProcessorResponse(schema=schema)
grpc_result.records.append(grpc_record)
yield ProcessorResponse(results=[grpc_result])
except InvalidRecordError as invalid_record_error:
grpc_result.error = str(invalid_record_error)
grpc_result.error_type = "INVALID_RECORD"
yield ProcessorResponse(results=[grpc_result])
except Exception as e:
if e.__class__.__name__ == "InvalidRecordError":
grpc_result.error_type = "INVALID_RECORD"
else:
grpc_result.error_type = "INTERNAL_ERROR"
grpc_result.error = str(e)
grpc_result.error_type = "INTERNAL_ERROR"
yield ProcessorResponse(results=[grpc_result])

async def write(self, requests: AsyncIterable[SinkRequest], context):
Expand Down
Loading

0 comments on commit d67fdba

Please sign in to comment.