Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
fix: upgrade netty and jackson mapper (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Aug 23, 2024
1 parent 5774b49 commit 017a52b
Show file tree
Hide file tree
Showing 167 changed files with 1,027 additions and 1,069 deletions.
20 changes: 18 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ jobs:
name: Unit tests (${{ matrix.name }})
runs-on: ubuntu-latest
timeout-minutes: 60
permissions:
checks: write # junit reporter
strategy:
fail-fast: false
matrix:
Expand All @@ -48,20 +50,29 @@ jobs:
- name: Kafka - IT - Group 0
test_cmd: |
export TESTS_RUNTIME_TYPE=kafka
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.excludedGroups=group-1 $MAVEN_COMMON_SKIP_FLAGS
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.excludedGroups=group-1,group-2 $MAVEN_COMMON_SKIP_FLAGS
- name: Kafka - IT - Group 1
test_cmd: |
export TESTS_RUNTIME_TYPE=kafka
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.groups=group-1 $MAVEN_COMMON_SKIP_FLAGS
- name: Kafka - IT - Group 2
test_cmd: |
export TESTS_RUNTIME_TYPE=kafka
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.groups=group-2 $MAVEN_COMMON_SKIP_FLAGS
- name: Pulsar - IT - Group 0
test_cmd: |
export TESTS_RUNTIME_TYPE=pulsar
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.excludedGroups=group-1 $MAVEN_COMMON_SKIP_FLAGS
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.excludedGroups=group-1,group-2 $MAVEN_COMMON_SKIP_FLAGS
- name: Pulsar - IT - Group 1
test_cmd: |
export TESTS_RUNTIME_TYPE=pulsar
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.groups=group-1 $MAVEN_COMMON_SKIP_FLAGS
- name: Pulsar - IT - Group 2
test_cmd: |
export TESTS_RUNTIME_TYPE=pulsar
./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -DintegrationTests.groups=group-2 $MAVEN_COMMON_SKIP_FLAGS
- name: Deployer
test_cmd: ./mvnw verify -f langstream-k8s-deployer -Pdocker $MAVEN_COMMON_SKIP_FLAGS
- name: Api Gateway
Expand Down Expand Up @@ -123,6 +134,11 @@ jobs:
git diff
exit 1
fi
- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'

# e2e-tests:
# name: End to End tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import ai.langstream.api.runner.code.*;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.util.ConfigurationUtils;
import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
Expand All @@ -38,8 +38,6 @@

@Slf4j
public class CamelSource extends AbstractAgentCode implements AgentSource {
private static final ObjectMapper MAPPER = new ObjectMapper();

private String componentUri;
private DefaultCamelContext camelContext;

Expand Down Expand Up @@ -157,7 +155,7 @@ private static Object safeObject(Object v) throws JsonProcessingException {
} else if (v instanceof CharSequence || v instanceof Number || v instanceof Boolean) {
converted = v;
} else {
converted = MAPPER.writeValueAsString(v);
converted = ObjectMapperFactory.getDefaultMapper().writeValueAsString(v);
}
return converted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.api.util.ConfigurationUtils;
import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -52,7 +52,6 @@

@Slf4j
abstract class AbstractGrpcAgent extends AbstractAgentCode {
protected static final ObjectMapper MAPPER = new ObjectMapper();
protected ManagedChannel channel;

// For each schema sent, we increment the schemaId
Expand Down Expand Up @@ -217,8 +216,10 @@ private synchronized void sendTopicProducerWriteResult(
@Override
protected Map<String, Object> buildAdditionalInfo() {
try {
return MAPPER.readValue(
blockingStub.agentInfo(Empty.getDefaultInstance()).getJsonInfo(), Map.class);
return ObjectMapperFactory.getDefaultMapper()
.readValue(
blockingStub.agentInfo(Empty.getDefaultInstance()).getJsonInfo(),
Map.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package ai.langstream.agents.grpc;

import ai.langstream.api.runner.code.AgentContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import ai.langstream.api.util.ObjectMapperFactory;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -32,8 +32,6 @@
public class PythonGrpcServer {
private static final int MAX_TRIALS = 15;

private static final ObjectMapper MAPPER = new ObjectMapper();

private final Path codeDirectory;
private final Map<String, Object> configuration;
private final String agentId;
Expand Down Expand Up @@ -116,8 +114,10 @@ private Process startPythonProcess(boolean runMode, int targetPort) throws IOExc
"langstream_grpc",
runMode ? "run" : "cleanup",
"[::]:%s".formatted(targetPort),
MAPPER.writeValueAsString(configuration),
MAPPER.writeValueAsString(agentContextConfiguration))
ObjectMapperFactory.getDefaultMapper()
.writeValueAsString(configuration),
ObjectMapperFactory.getDefaultMapper()
.writeValueAsString(agentContextConfiguration))
.inheritIO()
.redirectOutput(ProcessBuilder.Redirect.INHERIT)
.redirectError(ProcessBuilder.Redirect.INHERIT);
Expand Down
2 changes: 1 addition & 1 deletion langstream-agents/langstream-agent-http-request/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<groupId>org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import ai.langstream.api.runner.code.RecordSink;
import ai.langstream.api.runtime.ComponentType;
import ai.langstream.api.util.ConfigurationUtils;
import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.samskivert.mustache.Mustache;
import com.samskivert.mustache.Template;
import java.net.CookieManager;
Expand All @@ -49,7 +49,6 @@
@Slf4j
public class HttpRequestAgent extends AbstractAgentCode implements AgentProcessor {

static final ObjectMapper mapper = new ObjectMapper();
private final Map<Schema, Schema> avroValueSchemaCache = new ConcurrentHashMap<>();

private final Map<Schema, Schema> avroKeySchemaCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -199,7 +198,7 @@ public void processRecord(Record record, RecordSink recordSink) {

private Object parseResponseBody(HttpResponse<String> response) {
try {
return mapper.readValue(response.body(), Map.class);
return ObjectMapperFactory.getDefaultMapper().readValue(response.body(), Map.class);
} catch (JsonProcessingException ex) {
log.debug("Not able to parse response to json: {}, {}", response.body(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package ai.langstream.agents.http;

import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringWriter;
import java.net.CookieManager;
Expand All @@ -37,8 +37,6 @@
@Slf4j
public class LangServeClient {

static final ObjectMapper mapper = new ObjectMapper();

final HttpClient httpClient;

final Options options;
Expand Down Expand Up @@ -144,7 +142,8 @@ private Object parseResponseBody(String body, boolean streaming) {
try {
if (body.startsWith("{")) {
Map<String, Object> map =
mapper.readValue(body, new TypeReference<Map<String, Object>>() {});
ObjectMapperFactory.getDefaultMapper()
.readValue(body, new TypeReference<Map<String, Object>>() {});
if (!streaming) {
Object output = map.get("output");
if (output == null || output instanceof String) {
Expand All @@ -159,7 +158,7 @@ private Object parseResponseBody(String body, boolean streaming) {
return map.get(options.contentField);
}
} else if (body.startsWith("\"")) {
body = mapper.readValue(body, String.class);
body = ObjectMapperFactory.getDefaultMapper().readValue(body, String.class);
}
} catch (JsonProcessingException ex) {
log.info("Not able to parse response to json: {}, {}", body, ex);
Expand Down Expand Up @@ -268,7 +267,7 @@ interface StreamingChunksConsumer {

private String buildBody(Map<String, Object> values) throws IOException {
Map<String, Object> request = Map.of("input", values);
return mapper.writeValueAsString(request);
return ObjectMapperFactory.getDefaultMapper().writeValueAsString(request);
}

public CompletableFuture<String> execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.api.runtime.ComponentType;
import ai.langstream.api.util.ConfigurationUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.samskivert.mustache.Mustache;
import com.samskivert.mustache.Template;
import java.io.IOException;
Expand Down Expand Up @@ -55,7 +54,6 @@ record FieldDefinition(String name, JstlEvaluator<Object> expressionEvaluator) {
private int minChunksPerMessage;
private String contentField;
private boolean debug;
static final ObjectMapper mapper = new ObjectMapper();
private final Map<Schema, Schema> avroValueSchemaCache = new ConcurrentHashMap<>();

private final Map<Schema, Schema> avroKeySchemaCache = new ConcurrentHashMap<>();
Expand Down
2 changes: 1 addition & 1 deletion langstream-agents/langstream-agent-webcrawler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<groupId>org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.minio.*;
import java.nio.file.Path;
import java.time.Instant;
Expand All @@ -46,7 +46,6 @@
@Slf4j
public class WebCrawlerSource extends AbstractAgentCode implements AgentSource {

public static final ObjectMapper MAPPER = new ObjectMapper();
private int maxUnflushedPages;

private String bucketName;
Expand Down Expand Up @@ -536,7 +535,9 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception {
currentSourceActivitySummary.deletedUrls().size());

// Convert the new object to JSON
String value = MAPPER.writeValueAsString(summaryWithCounts);
String value =
ObjectMapperFactory.getDefaultMapper()
.writeValueAsString(summaryWithCounts);
List<Header> allHeaders = new ArrayList<>(sourceRecordHeaders);
allHeaders.add(
new SimpleRecord.SimpleHeader("recordType", "sourceActivitySummary"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package ai.langstream.ai.agents.commons.state;

import com.fasterxml.jackson.databind.ObjectMapper;
import ai.langstream.api.util.ObjectMapperFactory;
import io.minio.*;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -55,22 +55,20 @@ public static Path computePath(
pathPrefix + "." + agentId + ".status.json");
}

private static final ObjectMapper MAPPER = new ObjectMapper();

private final Path path;

@Override
public void store(T status) throws Exception {
log.info("Storing state to the disk at path {}", path);
MAPPER.writeValue(path.toFile(), status);
ObjectMapperFactory.getDefaultMapper().writeValue(path.toFile(), status);
}

@Override
public T get(Class<T> clazz) throws Exception {
if (Files.exists(path)) {
log.info("Restoring state from {}", path);
try {
return MAPPER.readValue(path.toFile(), clazz);
return ObjectMapperFactory.getDefaultMapper().readValue(path.toFile(), clazz);
} catch (IOException e) {
log.error("Error parsing state file", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
*/
package ai.langstream.ai.agents.commons.state;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MemoryStateStorage<T> implements StateStorage<T> {
private static final ObjectMapper MAPPER = new ObjectMapper();

private T value;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package ai.langstream.ai.agents.commons.state;

import com.fasterxml.jackson.databind.ObjectMapper;
import ai.langstream.api.util.ObjectMapperFactory;
import io.minio.*;
import io.minio.errors.ErrorResponseException;
import io.minio.errors.MinioException;
Expand Down Expand Up @@ -49,8 +49,6 @@ public static String computeObjectName(
return pathPrefix + "." + suffix + ".status.json";
}

private static final ObjectMapper MAPPER = new ObjectMapper();

private final MinioClient minioClient;
private final String bucketName;
private final String objectName;
Expand All @@ -74,7 +72,7 @@ private void makeBucketIfNotExists() {

@Override
public void store(T status) throws Exception {
byte[] content = MAPPER.writeValueAsBytes(status);
byte[] content = ObjectMapperFactory.getDefaultMapper().writeValueAsBytes(status);
log.info("Storing state in {}, {} bytes", objectName, content.length);
putWithRetries(
() ->
Expand Down Expand Up @@ -135,7 +133,7 @@ public T get(Class<T> clazz) throws Exception {
byte[] content = result.readAllBytes();
log.info("Restoring state from {}, {} bytes", objectName, content.length);
try {
return MAPPER.readValue(content, clazz);
return ObjectMapperFactory.getDefaultMapper().readValue(content, clazz);
} catch (IOException e) {
log.error("Error parsing state file", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import ai.langstream.api.runner.code.*;
import ai.langstream.api.runner.code.Record;
import ai.langstream.api.runner.topics.TopicProducer;
import ai.langstream.api.util.ObjectMapperFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -37,7 +37,6 @@
public abstract class StorageProviderSource<T extends StorageProviderSourceState>
extends AbstractAgentCode implements AgentSource {

public static final ObjectMapper MAPPER = new ObjectMapper();
private Map<String, Object> agentConfiguration;
private final Set<String> objectsToCommit = ConcurrentHashMap.newKeySet();
@Getter private StateStorage<T> stateStorage;
Expand Down Expand Up @@ -301,7 +300,9 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception {
currentSourceActivitySummary.deletedObjects().size());

// Convert the new object to JSON
String value = MAPPER.writeValueAsString(summaryWithCounts);
String value =
ObjectMapperFactory.getDefaultMapper()
.writeValueAsString(summaryWithCounts);
SimpleRecord simpleRecord = buildSimpleRecord(value, "sourceActivitySummary");
sourceActivitySummaryProducer.write(simpleRecord).get();
} else {
Expand Down
Loading

0 comments on commit 017a52b

Please sign in to comment.