diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/AzureBlobStorageSource.java b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/AzureBlobStorageSource.java index b8b023ff5..e459ae652 100644 --- a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/AzureBlobStorageSource.java +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/AzureBlobStorageSource.java @@ -16,13 +16,10 @@ package ai.langstream.agents.azureblobstorage; import static ai.langstream.api.util.ConfigurationUtils.*; -import static ai.langstream.api.util.ConfigurationUtils.getInt; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; -import ai.langstream.api.runner.code.Header; -import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.util.ConfigurationUtils; import com.azure.core.http.rest.PagedIterable; import com.azure.storage.blob.BlobContainerClient; @@ -31,7 +28,6 @@ import com.azure.storage.blob.models.ListBlobsOptions; import com.azure.storage.common.StorageSharedKeyCredential; import java.util.*; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -42,24 +38,11 @@ public class AzureBlobStorageSource public static class AzureBlobStorageSourceState extends StorageProviderSourceState {} private BlobContainerClient client; - private int idleTime; - - private String deletedObjectsTopic; - private String pathPrefix; private boolean recursive; - private String sourceActivitySummaryTopic; - - private List sourceActivitySummaryEvents; - - private int sourceActivitySummaryNumEventsThreshold; - private int sourceActivitySummaryTimeSecondsThreshold; - private boolean deleteObjects; - private Collection
sourceRecordHeaders; - public static final String ALL_FILES = "*"; public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt"; private Set extensions = Set.of(); @@ -130,32 +113,12 @@ public Class getStateClass() { @Override public void initializeClientAndConfig(Map configuration) { client = createContainerClient(configuration); - idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); - deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration); - sourceRecordHeaders = - getMap("source-record-headers", Map.of(), configuration).entrySet().stream() - .map( - entry -> - SimpleRecord.SimpleHeader.of( - entry.getKey(), entry.getValue())) - .collect(Collectors.toUnmodifiableList()); pathPrefix = configuration.getOrDefault("path-prefix", "").toString(); if (StringUtils.isNotEmpty(pathPrefix) && !pathPrefix.endsWith("/")) { pathPrefix += "/"; } recursive = getBoolean("recursive", false, configuration); - sourceActivitySummaryTopic = - getString("source-activity-summary-topic", null, configuration); - sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); - sourceActivitySummaryNumEventsThreshold = - getInt("source-activity-summary-events-threshold", 0, configuration); - sourceActivitySummaryTimeSecondsThreshold = - getInt("source-activity-summary-time-seconds-threshold", 30, configuration); - if (sourceActivitySummaryTimeSecondsThreshold < 0) { - throw new IllegalArgumentException( - "source-activity-summary-time-seconds-threshold must be > 0"); - } extensions = Set.of( configuration @@ -176,36 +139,6 @@ public boolean isDeleteObjects() { return deleteObjects; } - @Override - public int getIdleTime() { - return idleTime; - } - - @Override - public String getDeletedObjectsTopic() { - return deletedObjectsTopic; - } - - @Override - public String getSourceActivitySummaryTopic() { - return sourceActivitySummaryTopic; - } - - @Override - public List getSourceActivitySummaryEvents() { - return sourceActivitySummaryEvents; - } - - @Override - public int getSourceActivitySummaryNumEventsThreshold() { - return sourceActivitySummaryNumEventsThreshold; - } - - @Override - public int getSourceActivitySummaryTimeSecondsThreshold() { - return sourceActivitySummaryTimeSecondsThreshold; - } - @Override public List listObjects() throws Exception { final PagedIterable blobs; @@ -274,11 +207,6 @@ public void deleteObject(String id) throws Exception { client.getBlobClient(id).deleteIfExists(); } - @Override - public Collection
getSourceRecordHeaders() { - return sourceRecordHeaders; - } - @Override public boolean isStateStorageRequired() { return false; diff --git a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java index a3633945f..6cd6f622e 100644 --- a/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java +++ b/langstream-agents/langstream-agent-s3/src/main/java/ai/langstream/agents/s3/S3Source.java @@ -20,8 +20,6 @@ import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; -import ai.langstream.api.runner.code.Header; -import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.util.ConfigurationUtils; import io.minio.GetObjectArgs; import io.minio.GetObjectResponse; @@ -31,7 +29,6 @@ import io.minio.Result; import io.minio.messages.Item; import java.util.*; -import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -45,21 +42,12 @@ public static class S3SourceState extends StorageProviderSourceState {} private String pathPrefix; private boolean recursive; private MinioClient minioClient; - private int idleTime; - private String deletedObjectsTopic; - private String sourceActivitySummaryTopic; - - private List sourceActivitySummaryEvents; - - private int sourceActivitySummaryNumEventsThreshold; - private int sourceActivitySummaryTimeSecondsThreshold; public static final String ALL_FILES = "*"; public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt"; private Set extensions = Set.of(); private boolean deleteObjects; - private Collection
sourceRecordHeaders; @Override public Class getStateClass() { @@ -82,20 +70,6 @@ public void initializeClientAndConfig(Map configuration) { } recursive = getBoolean("recursive", false, configuration); String region = configuration.getOrDefault("region", "").toString(); - idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); - deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); - sourceActivitySummaryTopic = - getString("source-activity-summary-topic", null, configuration); - sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); - sourceActivitySummaryNumEventsThreshold = - getInt("source-activity-summary-events-threshold", 0, configuration); - sourceActivitySummaryTimeSecondsThreshold = - getInt("source-activity-summary-time-seconds-threshold", 30, configuration); - if (sourceActivitySummaryTimeSecondsThreshold < 0) { - throw new IllegalArgumentException( - "source-activity-summary-time-seconds-threshold must be > 0"); - } - extensions = Set.of( configuration @@ -104,15 +78,6 @@ public void initializeClientAndConfig(Map configuration) { .split(",")); deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration); - - sourceRecordHeaders = - getMap("source-record-headers", Map.of(), configuration).entrySet().stream() - .map( - entry -> - SimpleRecord.SimpleHeader.of( - entry.getKey(), entry.getValue())) - .collect(Collectors.toUnmodifiableList()); - log.info( "Connecting to S3 Bucket at {} in region {} with user {} on path {} with recursive {}", endpoint, @@ -141,36 +106,6 @@ public boolean isDeleteObjects() { return deleteObjects; } - @Override - public int getIdleTime() { - return idleTime; - } - - @Override - public String getDeletedObjectsTopic() { - return deletedObjectsTopic; - } - - @Override - public String getSourceActivitySummaryTopic() { - return sourceActivitySummaryTopic; - } - - @Override - public List getSourceActivitySummaryEvents() { - return sourceActivitySummaryEvents; - } - - @Override - public int getSourceActivitySummaryNumEventsThreshold() { - return sourceActivitySummaryNumEventsThreshold; - } - - @Override - public int getSourceActivitySummaryTimeSecondsThreshold() { - return sourceActivitySummaryTimeSecondsThreshold; - } - @Override public List listObjects() throws Exception { Iterable> results; @@ -236,11 +171,6 @@ public void deleteObject(String id) throws Exception { minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(id).build()); } - @Override - public Collection
getSourceRecordHeaders() { - return sourceRecordHeaders; - } - @Override public boolean isStateStorageRequired() { return false; diff --git a/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java b/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java index 0a2e11779..674b19a43 100644 --- a/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java +++ b/langstream-agents/langstream-agents-commons-storage-provider/src/main/java/ai/langstream/ai/agents/commons/storage/provider/StorageProviderSource.java @@ -15,6 +15,9 @@ */ package ai.langstream.ai.agents.commons.storage.provider; +import static ai.langstream.api.util.ConfigurationUtils.*; +import static ai.langstream.api.util.ConfigurationUtils.getInt; + import ai.langstream.ai.agents.commons.state.StateStorage; import ai.langstream.ai.agents.commons.state.StateStorageProvider; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState.ObjectDetail; @@ -25,6 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -41,25 +45,24 @@ public abstract class StorageProviderSource getStateClass(); - - public abstract void initializeClientAndConfig(Map configuration); + private int idleTime; - public abstract String getBucketName(); - - public abstract boolean isDeleteObjects(); + private String deletedObjectsTopic; + private Collection
sourceRecordHeaders; + private String sourceActivitySummaryTopic; - public abstract int getIdleTime(); + private List sourceActivitySummaryEvents; - public abstract String getDeletedObjectsTopic(); + private int sourceActivitySummaryNumEventsThreshold; + private int sourceActivitySummaryTimeSecondsThreshold; - public abstract String getSourceActivitySummaryTopic(); + public abstract Class getStateClass(); - public abstract List getSourceActivitySummaryEvents(); + public abstract void initializeClientAndConfig(Map configuration); - public abstract int getSourceActivitySummaryNumEventsThreshold(); + public abstract String getBucketName(); - public abstract int getSourceActivitySummaryTimeSecondsThreshold(); + public abstract boolean isDeleteObjects(); public abstract Collection listObjects() throws Exception; @@ -67,8 +70,6 @@ public abstract class StorageProviderSource getSourceRecordHeaders(); - public abstract boolean isStateStorageRequired(); @Getter @@ -97,6 +98,27 @@ public class SourceActivitySummaryWithCounts { public void init(Map configuration) { agentConfiguration = configuration; initializeClientAndConfig(configuration); + + idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); + deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); + sourceRecordHeaders = + getMap("source-record-headers", Map.of(), configuration).entrySet().stream() + .map( + entry -> + SimpleRecord.SimpleHeader.of( + entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableList()); + sourceActivitySummaryTopic = + getString("source-activity-summary-topic", null, configuration); + sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); + sourceActivitySummaryNumEventsThreshold = + getInt("source-activity-summary-events-threshold", 0, configuration); + sourceActivitySummaryTimeSecondsThreshold = + getInt("source-activity-summary-time-seconds-threshold", 30, configuration); + if (sourceActivitySummaryTimeSecondsThreshold < 0) { + throw new IllegalArgumentException( + "source-activity-summary-time-seconds-threshold must be > 0"); + } } @Override @@ -114,7 +136,6 @@ public void setContext(AgentContext context) throws Exception { throw new IllegalStateException("State storage is required but not configured"); } - String deletedObjectsTopic = getDeletedObjectsTopic(); if (deletedObjectsTopic != null) { deletedObjectsProducer = agentContext @@ -123,7 +144,6 @@ public void setContext(AgentContext context) throws Exception { agentContext.getGlobalAgentId(), deletedObjectsTopic, Map.of()); deletedObjectsProducer.start(); } - String sourceActivitySummaryTopic = getSourceActivitySummaryTopic(); if (sourceActivitySummaryTopic != null) { sourceActivitySummaryProducer = agentContext @@ -147,7 +167,6 @@ public List read() throws Exception { if (object == null) { log.info("No objects to emit"); checkDeletedObjects(objects, bucketName); - final int idleTime = getIdleTime(); log.info("sleeping for {} seconds", idleTime); Thread.sleep(idleTime * 1000L); return List.of(); @@ -187,7 +206,7 @@ public List read() throws Exception { } processed(0, 1); - List
allHeaders = new ArrayList<>(getSourceRecordHeaders()); + List
allHeaders = new ArrayList<>(sourceRecordHeaders); allHeaders.addAll(object.additionalRecordHeaders()); allHeaders.add(new SimpleRecord.SimpleHeader("name", name)); allHeaders.add(new SimpleRecord.SimpleHeader("bucket", bucketName)); @@ -212,7 +231,6 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception { if (currentSourceActivitySummary == null) { return; } - List sourceActivitySummaryEvents = getSourceActivitySummaryEvents(); int countEvents = 0; long firstEventTs = Long.MAX_VALUE; if (sourceActivitySummaryEvents.contains("new")) { @@ -249,19 +267,16 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception { long now = System.currentTimeMillis(); boolean emit = false; - int sourceActivitySummaryTimeSecondsThreshold = - getSourceActivitySummaryTimeSecondsThreshold(); boolean isTimeForStartSummaryOver = now >= firstEventTs + sourceActivitySummaryTimeSecondsThreshold * 1000L; if (!isTimeForStartSummaryOver) { // no time yet, but we have enough events to send - int sourceActivitySummaryNumThreshold = getSourceActivitySummaryNumEventsThreshold(); - if (sourceActivitySummaryNumThreshold > 0 - && countEvents >= sourceActivitySummaryNumThreshold) { + if (sourceActivitySummaryNumEventsThreshold > 0 + && countEvents >= sourceActivitySummaryNumEventsThreshold) { log.info( "Emitting source activity summary, events {} with threshold of {}", countEvents, - sourceActivitySummaryNumThreshold); + sourceActivitySummaryNumEventsThreshold); emit = true; } } else { @@ -274,8 +289,7 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception { if (emit) { if (sourceActivitySummaryProducer != null) { log.info( - "Emitting source activity summary to topic {}", - getSourceActivitySummaryTopic()); + "Emitting source activity summary to topic {}", sourceActivitySummaryTopic); // Create a new SourceActivitySummaryWithCounts object directly SourceActivitySummaryWithCounts summaryWithCounts = new SourceActivitySummaryWithCounts( @@ -302,7 +316,7 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception { private SimpleRecord buildSimpleRecord(String value, String recordType) { // Add record type to the headers - List
allHeaders = new ArrayList<>(getSourceRecordHeaders()); + List
allHeaders = new ArrayList<>(sourceRecordHeaders); allHeaders.add(new SimpleRecord.SimpleHeader("recordType", recordType)); allHeaders.add(new SimpleRecord.SimpleHeader("recordSource", "storageProvider")); diff --git a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java index 332be2c9a..ce461f501 100644 --- a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java +++ b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/cloudstorage/GoogleCloudStorageSource.java @@ -21,8 +21,6 @@ import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; -import ai.langstream.api.runner.code.Header; -import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.util.ConfigurationUtils; import com.google.api.gax.paging.Page; import com.google.auth.oauth2.GoogleCredentials; @@ -31,7 +29,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -46,20 +43,9 @@ public static class GCSSourceState extends StorageProviderSourceState {} private Storage gcsClient; private AutoRefreshGoogleCredentials credentials; - private int idleTime; - - private String deletedObjectsTopic; private String pathPrefix; private boolean recursive; private boolean deleteObjects; - private Collection
sourceRecordHeaders; - private String sourceActivitySummaryTopic; - - private List sourceActivitySummaryEvents; - - private int sourceActivitySummaryNumEventsThreshold; - private int sourceActivitySummaryTimeSecondsThreshold; - public static final String ALL_FILES = "*"; public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt"; private Set extensions = Set.of(); @@ -102,32 +88,12 @@ public void initializeClientAndConfig(Map configuration) { "service-account-json", () -> "google cloud storage service"), bucketName); - idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); - deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration); - sourceRecordHeaders = - getMap("source-record-headers", Map.of(), configuration).entrySet().stream() - .map( - entry -> - SimpleRecord.SimpleHeader.of( - entry.getKey(), entry.getValue())) - .collect(Collectors.toUnmodifiableList()); pathPrefix = configuration.getOrDefault("path-prefix", "").toString(); if (StringUtils.isNotEmpty(pathPrefix) && !pathPrefix.endsWith("/")) { pathPrefix += "/"; } recursive = getBoolean("recursive", false, configuration); - sourceActivitySummaryTopic = - getString("source-activity-summary-topic", null, configuration); - sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); - sourceActivitySummaryNumEventsThreshold = - getInt("source-activity-summary-events-threshold", 0, configuration); - sourceActivitySummaryTimeSecondsThreshold = - getInt("source-activity-summary-time-seconds-threshold", 30, configuration); - if (sourceActivitySummaryTimeSecondsThreshold < 0) { - throw new IllegalArgumentException( - "source-activity-summary-time-seconds-threshold must be > 0"); - } extensions = Set.of( configuration @@ -148,36 +114,6 @@ public boolean isDeleteObjects() { return deleteObjects; } - @Override - public int getIdleTime() { - return idleTime; - } - - @Override - public String getDeletedObjectsTopic() { - return deletedObjectsTopic; - } - - @Override - public String getSourceActivitySummaryTopic() { - return sourceActivitySummaryTopic; - } - - @Override - public List getSourceActivitySummaryEvents() { - return sourceActivitySummaryEvents; - } - - @Override - public int getSourceActivitySummaryNumEventsThreshold() { - return sourceActivitySummaryNumEventsThreshold; - } - - @Override - public int getSourceActivitySummaryTimeSecondsThreshold() { - return sourceActivitySummaryTimeSecondsThreshold; - } - @Override public List listObjects() throws Exception { Page blobs = gcsClient.list(bucketName, Storage.BlobListOption.prefix(pathPrefix)); @@ -233,11 +169,6 @@ public void deleteObject(String id) throws Exception { gcsClient.delete(bucketName, id); } - @Override - public Collection
getSourceRecordHeaders() { - return sourceRecordHeaders; - } - @Override public boolean isStateStorageRequired() { return false; diff --git a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java index 81619c96c..1dfe718ba 100644 --- a/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java +++ b/langstream-agents/langstream-agents-google/src/main/java/ai/langstream/agents/google/drive/GoogleDriveSource.java @@ -38,7 +38,6 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.*; -import java.util.stream.Collectors; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -51,17 +50,6 @@ public static class GDriveSourceState extends StorageProviderSourceState {} private GDriveClient client; private List rootParents; - private int idleTime; - - private String deletedObjectsTopic; - private Collection
sourceRecordHeaders; - private String sourceActivitySummaryTopic; - - private List sourceActivitySummaryEvents; - - private int sourceActivitySummaryNumEventsThreshold; - private int sourceActivitySummaryTimeSecondsThreshold; - private Set includeMimeTypes = Set.of(); private Set excludeMimeTypes = Set.of(); @@ -130,26 +118,6 @@ public void initializeClientAndConfig(Map configuration) { void initializeConfig(Map configuration) { rootParents = ConfigurationUtils.getList("root-parents", configuration); - idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); - deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); - sourceRecordHeaders = - getMap("source-record-headers", Map.of(), configuration).entrySet().stream() - .map( - entry -> - SimpleRecord.SimpleHeader.of( - entry.getKey(), entry.getValue())) - .collect(Collectors.toUnmodifiableList()); - sourceActivitySummaryTopic = - getString("source-activity-summary-topic", null, configuration); - sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); - sourceActivitySummaryNumEventsThreshold = - getInt("source-activity-summary-events-threshold", 0, configuration); - sourceActivitySummaryTimeSecondsThreshold = - getInt("source-activity-summary-time-seconds-threshold", 30, configuration); - if (sourceActivitySummaryTimeSecondsThreshold < 0) { - throw new IllegalArgumentException( - "source-activity-summary-time-seconds-threshold must be > 0"); - } includeMimeTypes = ConfigurationUtils.getSet("include-mime-types", configuration); excludeMimeTypes = new HashSet<>(ConfigurationUtils.getList("exclude-mime-types", configuration)); @@ -171,36 +139,6 @@ public boolean isDeleteObjects() { return false; } - @Override - public int getIdleTime() { - return idleTime; - } - - @Override - public String getDeletedObjectsTopic() { - return deletedObjectsTopic; - } - - @Override - public String getSourceActivitySummaryTopic() { - return sourceActivitySummaryTopic; - } - - @Override - public List getSourceActivitySummaryEvents() { - return sourceActivitySummaryEvents; - } - - @Override - public int getSourceActivitySummaryNumEventsThreshold() { - return sourceActivitySummaryNumEventsThreshold; - } - - @Override - public int getSourceActivitySummaryTimeSecondsThreshold() { - return sourceActivitySummaryTimeSecondsThreshold; - } - @Override public Collection listObjects() throws Exception { Map> tree = new HashMap<>(); @@ -365,11 +303,6 @@ public void deleteObject(String id) throws Exception { throw new UnsupportedOperationException(); } - @Override - public Collection
getSourceRecordHeaders() { - return sourceRecordHeaders; - } - @Override public boolean isStateStorageRequired() { return true; diff --git a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/ClientUtil.java b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/ClientUtil.java new file mode 100644 index 000000000..2421c1ff3 --- /dev/null +++ b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/ClientUtil.java @@ -0,0 +1,169 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.ms365; + +import com.microsoft.graph.models.DriveItem; +import com.microsoft.graph.models.File; +import com.microsoft.graph.serviceclient.GraphServiceClient; +import com.microsoft.kiota.ApiException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ClientUtil { + + public interface DriveItemCollector { + void collect(String driveId, DriveItem item, String digest); + } + + @SneakyThrows + public static void collectDriveItems( + GraphServiceClient client, + final String driveId, + DriveItem parentItem, + Set includeMimeTypes, + Set excludeMimeTypes, + DriveItemCollector collector) { + List children = + client.drives() + .byDriveId(driveId) + .items() + .byDriveItemId(parentItem.getId()) + .children() + .get( + request -> { + request.queryParameters.select = + new String[] { + "id", "name", "size", "cTag", "eTag", "file", + "folder" + }; + request.queryParameters.orderby = new String[] {"name asc"}; + }) + .getValue(); + if (children != null) { + for (DriveItem item : children) { + Objects.requireNonNull(item.getId()); + if (log.isDebugEnabled()) { + log.debug( + "File {} ({}), size {}, ctag {}, parents {}, last modified time {}", + item.getName(), + item.getId(), + item.getSize(), + item.getCTag()); + } + if (item.getFolder() != null) { + log.debug("Folder {} ({})", item.getName(), item.getId()); + collectDriveItems( + client, driveId, item, includeMimeTypes, excludeMimeTypes, collector); + } else { + log.debug("File {} ({})", item.getName(), item.getId()); + File file = item.getFile(); + Objects.requireNonNull(file); + if (!includeMimeTypes.isEmpty() + && !includeMimeTypes.contains(file.getMimeType())) { + log.debug( + "Skipping file {} ({}) due to mime type {}", + item.getName(), + item.getId(), + file.getMimeType()); + continue; + } + if (!excludeMimeTypes.isEmpty() + && excludeMimeTypes.contains(file.getMimeType())) { + log.debug( + "Skipping file {} ({}) due to excluded mime type {}", + item.getName(), + item.getId(), + file.getMimeType()); + continue; + } + final String digest; + if (item.getCTag() != null) { + digest = item.getCTag(); + } else if (item.getETag() != null) { + log.warn( + "Using file eTag as digest for {}, this might be end up in duplicated processing", + item.getId()); + digest = item.getETag(); + } else { + log.error("Not able to compute a digest for {}, skipping", item.getId()); + continue; + } + collector.collect(driveId, item, digest); + } + } + } + } + + public static byte[] downloadDriveItem( + GraphServiceClient client, String driveId, DriveItem item) throws Exception { + Objects.requireNonNull(item.getFile()); + String mimeType = item.getFile().getMimeType(); + log.info( + "Downloading file {} ({}) from drive {}, mime type {}", + item.getName(), + item.getId(), + driveId, + mimeType); + try { + return downloadDriveItem(client, driveId, item, true); + } catch (ApiException e) { + log.info( + "Downloading file {} ({}) from drive {} as pdf failed, trying with default format. error: {}", + item.getName(), + item.getId(), + driveId, + e.getMessage()); + try { + return downloadDriveItem(client, driveId, item, false); + } catch (ApiException ex) { + log.error( + "Error downloading file " + item.getName() + " (" + item.getId() + ")", e); + } + throw e; + } + } + + private static byte[] downloadDriveItem( + GraphServiceClient client, String driveId, DriveItem item, boolean exportAsPdf) + throws IOException { + + try (InputStream in = + client.drives() + .byDriveId(driveId) + .items() + .byDriveItemId(item.getId()) + .content() + .get( + requestConfiguration -> { + Objects.requireNonNull(requestConfiguration.queryParameters); + if (exportAsPdf) { + requestConfiguration.queryParameters.format = "pdf"; + } + }); ) { + if (in == null) { + throw new IllegalStateException( + "No content for file " + item.getName() + " (" + item.getId() + ")"); + } + return in.readAllBytes(); + } + } +} diff --git a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/Microsoft365AgentsCodeProvider.java b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/Microsoft365AgentsCodeProvider.java index 1304c7653..1bcc74a90 100644 --- a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/Microsoft365AgentsCodeProvider.java +++ b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/Microsoft365AgentsCodeProvider.java @@ -15,6 +15,7 @@ */ package ai.langstream.agents.ms365; +import ai.langstream.agents.ms365.onedrive.OneDriveSource; import ai.langstream.agents.ms365.sharepoint.SharepointSource; import ai.langstream.api.runner.code.AgentCode; import ai.langstream.api.runner.code.AgentCodeProvider; @@ -23,7 +24,8 @@ public class Microsoft365AgentsCodeProvider implements AgentCodeProvider { public static final String SHAREPOINT_SOURCE = "ms365-sharepoint-source"; - private static final List AGENTS = List.of(SHAREPOINT_SOURCE); + public static final String ONEDRIVE_SOURCE = "ms365-onedrive-source"; + private static final List AGENTS = List.of(SHAREPOINT_SOURCE, ONEDRIVE_SOURCE); @Override public boolean supports(String agentType) { @@ -35,6 +37,8 @@ public AgentCode createInstance(String agentType) { switch (agentType) { case SHAREPOINT_SOURCE: return new SharepointSource(); + case ONEDRIVE_SOURCE: + return new OneDriveSource(); default: throw new IllegalArgumentException("Unsupported agent type: " + agentType); } diff --git a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/onedrive/OneDriveSource.java b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/onedrive/OneDriveSource.java new file mode 100644 index 000000000..4c6958edb --- /dev/null +++ b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/onedrive/OneDriveSource.java @@ -0,0 +1,207 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.ms365.onedrive; + +import static ai.langstream.api.util.ConfigurationUtils.requiredNonEmptyField; + +import ai.langstream.agents.ms365.ClientUtil; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; +import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; +import ai.langstream.api.runner.code.Header; +import ai.langstream.api.runner.code.SimpleRecord; +import ai.langstream.api.util.ConfigurationUtils; +import com.azure.identity.ClientSecretCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.microsoft.graph.models.*; +import com.microsoft.graph.serviceclient.GraphServiceClient; +import java.util.*; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +@Slf4j +public class OneDriveSource extends StorageProviderSource { + + public static class SharepointSourceState extends StorageProviderSourceState {} + + private GraphServiceClient client; + + private List users; + private String pathPrefix; + + private Set includeMimeTypes = Set.of(); + + private Set excludeMimeTypes = Set.of(); + + @Override + public Class getStateClass() { + return SharepointSourceState.class; + } + + @Override + public void initializeClientAndConfig(Map configuration) { + String tenantId = + requiredNonEmptyField(configuration, "ms-tenant-id", () -> "sharepoint source"); + String clientId = + requiredNonEmptyField(configuration, "ms-client-id", () -> "sharepoint source"); + String clientSecret = + requiredNonEmptyField(configuration, "ms-client-secret", () -> "sharepoint source"); + + // this is the default scope for the graph api, the actual scopes are bound to the client + // application + final String[] scopes = new String[] {"https://graph.microsoft.com/.default"}; + final ClientSecretCredential credential = + new ClientSecretCredentialBuilder() + .clientId(clientId) + .tenantId(tenantId) + .clientSecret(clientSecret) + .build(); + + client = new GraphServiceClient(credential, scopes); + initializeConfig(configuration); + } + + void initializeConfig(Map configuration) { + users = ConfigurationUtils.getList("users", configuration); + if (users.isEmpty()) { + throw new IllegalArgumentException("At least one user principal must be specified"); + } + pathPrefix = configuration.getOrDefault("path-prefix", "/").toString(); + if (StringUtils.isNotEmpty(pathPrefix) && !pathPrefix.endsWith("/")) { + pathPrefix += "/"; + } + + includeMimeTypes = ConfigurationUtils.getSet("include-mime-types", configuration); + excludeMimeTypes = + new HashSet<>(ConfigurationUtils.getList("exclude-mime-types", configuration)); + if (includeMimeTypes.isEmpty()) { + log.info("Filtering out files with mime types: {}", excludeMimeTypes); + } else { + log.info("Filtering only files with mime types: {}", includeMimeTypes); + } + } + + @Override + public String getBucketName() { + return ""; + } + + @Override + public boolean isDeleteObjects() { + return false; + } + + @Override + public Collection listObjects() throws Exception { + List collect = new ArrayList<>(); + for (String user : users) { + Drive userDrive = client.users().byUserId(user).drive().get(); + if (userDrive == null) { + log.warn("No drive found for user {}", user); + continue; + } + + Objects.requireNonNull(userDrive.getId()); + final String rootPath = "root:" + pathPrefix; + log.info( + "Listing items for user {} in drive {} at path {}", + user, + userDrive.getId(), + rootPath); + DriveItem rootItem = + client.drives() + .byDriveId(userDrive.getId()) + .items() + .byDriveItemId(rootPath) + .get(); + if (rootItem == null) { + log.warn( + "No root item found for user drive {} of user {}", userDrive.getId(), user); + continue; + } + int beforeLength = collect.size(); + ClientUtil.collectDriveItems( + client, + userDrive.getId(), + rootItem, + includeMimeTypes, + excludeMimeTypes, + new ClientUtil.DriveItemCollector() { + @Override + public void collect(String driveId, DriveItem item, String digest) { + OneDriveObject oneDriveObject = + new OneDriveObject(item, user, digest, driveId); + collect.add(oneDriveObject); + } + }); + log.info("Found {} items for user {}", collect.size() - beforeLength, user); + } + return collect; + } + + @AllArgsConstructor + @Data + private static class OneDriveObject implements StorageProviderObjectReference { + private final DriveItem item; + private final String user; + private final String contentDigest; + private final String driveId; + + @Override + public String id() { + return driveId + "_" + item.getId(); + } + + @Override + public long size() { + return item.getSize() == null ? -1 : item.getSize(); + } + + @Override + public String contentDigest() { + return contentDigest; + } + + @Override + public Collection
additionalRecordHeaders() { + return List.of( + SimpleRecord.SimpleHeader.of("onedrive-item-name", item.getName()), + SimpleRecord.SimpleHeader.of("onedrive-item-id", item.getId()), + SimpleRecord.SimpleHeader.of("onedrive-user", user), + SimpleRecord.SimpleHeader.of("onedrive-drive-id", driveId)); + } + } + + @Override + public byte[] downloadObject(StorageProviderObjectReference object) throws Exception { + OneDriveObject oneDriveObject = (OneDriveObject) object; + return ClientUtil.downloadDriveItem( + client, oneDriveObject.getDriveId(), oneDriveObject.getItem()); + } + + @Override + public void deleteObject(String id) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isStateStorageRequired() { + return true; + } +} diff --git a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/sharepoint/SharepointSource.java b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/sharepoint/SharepointSource.java index 4c8a804e5..6f0920e3b 100644 --- a/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/sharepoint/SharepointSource.java +++ b/langstream-agents/langstream-agents-ms365/src/main/java/ai/langstream/agents/ms365/sharepoint/SharepointSource.java @@ -16,8 +16,8 @@ package ai.langstream.agents.ms365.sharepoint; import static ai.langstream.api.util.ConfigurationUtils.*; -import static ai.langstream.api.util.ConfigurationUtils.getInt; +import ai.langstream.agents.ms365.ClientUtil; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource; import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState; @@ -29,13 +29,10 @@ import com.microsoft.graph.models.*; import com.microsoft.graph.serviceclient.GraphServiceClient; import com.microsoft.graph.sites.getallsites.GetAllSitesGetResponse; -import java.io.InputStream; import java.util.*; import java.util.List; -import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -48,17 +45,6 @@ public static class SharepointSourceState extends StorageProviderSourceState {} private List includeOnlySites; - private int idleTime; - - private String deletedObjectsTopic; - private Collection
sourceRecordHeaders; - private String sourceActivitySummaryTopic; - - private List sourceActivitySummaryEvents; - - private int sourceActivitySummaryNumEventsThreshold; - private int sourceActivitySummaryTimeSecondsThreshold; - private Set includeMimeTypes = Set.of(); private Set excludeMimeTypes = Set.of(); @@ -93,26 +79,7 @@ public void initializeClientAndConfig(Map configuration) { void initializeConfig(Map configuration) { includeOnlySites = ConfigurationUtils.getList("sites", configuration); - idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); - deletedObjectsTopic = getString("deleted-objects-topic", null, configuration); - sourceRecordHeaders = - getMap("source-record-headers", Map.of(), configuration).entrySet().stream() - .map( - entry -> - SimpleRecord.SimpleHeader.of( - entry.getKey(), entry.getValue())) - .collect(Collectors.toUnmodifiableList()); - sourceActivitySummaryTopic = - getString("source-activity-summary-topic", null, configuration); - sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration); - sourceActivitySummaryNumEventsThreshold = - getInt("source-activity-summary-events-threshold", 0, configuration); - sourceActivitySummaryTimeSecondsThreshold = - getInt("source-activity-summary-time-seconds-threshold", 30, configuration); - if (sourceActivitySummaryTimeSecondsThreshold < 0) { - throw new IllegalArgumentException( - "source-activity-summary-time-seconds-threshold must be > 0"); - } + includeMimeTypes = ConfigurationUtils.getSet("include-mime-types", configuration); excludeMimeTypes = new HashSet<>(ConfigurationUtils.getList("exclude-mime-types", configuration)); @@ -133,36 +100,6 @@ public boolean isDeleteObjects() { return false; } - @Override - public int getIdleTime() { - return idleTime; - } - - @Override - public String getDeletedObjectsTopic() { - return deletedObjectsTopic; - } - - @Override - public String getSourceActivitySummaryTopic() { - return sourceActivitySummaryTopic; - } - - @Override - public List getSourceActivitySummaryEvents() { - return sourceActivitySummaryEvents; - } - - @Override - public int getSourceActivitySummaryNumEventsThreshold() { - return sourceActivitySummaryNumEventsThreshold; - } - - @Override - public int getSourceActivitySummaryTimeSecondsThreshold() { - return sourceActivitySummaryTimeSecondsThreshold; - } - @Override public Collection listObjects() throws Exception { @@ -214,7 +151,20 @@ public Collection listObjects() throws Exception log.warn("No root item found for drive {}", drive.getId()); continue; } - collectItems(site.getId(), drive.getId(), rootItem, collect); + ClientUtil.collectDriveItems( + client, + drive.getId(), + rootItem, + includeMimeTypes, + excludeMimeTypes, + new ClientUtil.DriveItemCollector() { + @Override + public void collect(String driveId, DriveItem item, String digest) { + SharepointObject sharepointObject = + new SharepointObject(item, site.getId(), digest, driveId); + collect.add(sharepointObject); + } + }); } log.info( "Found {} items in site {} ({})", @@ -225,108 +175,22 @@ public Collection listObjects() throws Exception return collect; } - @SneakyThrows - private void collectItems( - final String siteId, - final String driveId, - DriveItem parentItem, - List collect) { - List children = - client.drives() - .byDriveId(driveId) - .items() - .byDriveItemId(parentItem.getId()) - .children() - .get( - request -> { - request.queryParameters.select = - new String[] { - "id", "name", "size", "cTag", "eTag", "file", - "folder" - }; - request.queryParameters.orderby = new String[] {"name asc"}; - }) - .getValue(); - if (children != null) { - for (DriveItem item : children) { - Objects.requireNonNull(item.getId()); - if (log.isDebugEnabled()) { - log.debug( - "File {} ({}), size {}, ctag {}, parents {}, last modified time {}", - item.getName(), - item.getId(), - item.getSize(), - item.getCTag()); - } - if (item.getFolder() != null) { - log.debug("Folder {} ({})", item.getName(), item.getId()); - collectItems(siteId, driveId, item, collect); - } else { - log.debug("File {} ({})", item.getName(), item.getId()); - File file = item.getFile(); - Objects.requireNonNull(file); - if (!includeMimeTypes.isEmpty() - && !includeMimeTypes.contains(file.getMimeType())) { - log.debug( - "Skipping file {} ({}) due to mime type {}", - item.getName(), - item.getId(), - file.getMimeType()); - continue; - } - if (!excludeMimeTypes.isEmpty() - && excludeMimeTypes.contains(file.getMimeType())) { - log.debug( - "Skipping file {} ({}) due to excluded mime type {}", - item.getName(), - item.getId(), - file.getMimeType()); - continue; - } - final String digest; - if (item.getCTag() != null) { - digest = item.getCTag(); - } else if (item.getETag() != null) { - log.warn( - "Using file eTag as digest for {}, this might be end up in duplicated processing", - item.getId()); - digest = item.getETag(); - } else { - log.error("Not able to compute a digest for {}, skipping", item.getId()); - continue; - } - SharepointObject sharepointObject = - new SharepointObject( - item.getId(), - siteId, - item.getSize() == null ? -1 : item.getSize(), - digest, - driveId, - item.getName()); - collect.add(sharepointObject); - } - } - } - } - @AllArgsConstructor @Data private static class SharepointObject implements StorageProviderObjectReference { - private final String itemId; + private final DriveItem item; private final String siteId; - private final long size; private final String contentDigest; private final String driveId; - private final String name; @Override public String id() { - return siteId + "_" + itemId; + return siteId + "_" + item.getId(); } @Override public long size() { - return size; + return item.getSize() == null ? -1 : item.getSize(); } @Override @@ -337,8 +201,8 @@ public String contentDigest() { @Override public Collection
additionalRecordHeaders() { return List.of( - SimpleRecord.SimpleHeader.of("sharepoint-item-name", name), - SimpleRecord.SimpleHeader.of("sharepoint-item-id", itemId), + SimpleRecord.SimpleHeader.of("sharepoint-item-name", item.getName()), + SimpleRecord.SimpleHeader.of("sharepoint-item-id", item.getId()), SimpleRecord.SimpleHeader.of("sharepoint-drive-id", driveId), SimpleRecord.SimpleHeader.of("sharepoint-site-id", siteId)); } @@ -347,39 +211,8 @@ public Collection
additionalRecordHeaders() { @Override public byte[] downloadObject(StorageProviderObjectReference object) throws Exception { SharepointObject sharepointObject = (SharepointObject) object; - try { - try (InputStream in = - client.drives() - .byDriveId(sharepointObject.getDriveId()) - .items() - .byDriveItemId(sharepointObject.getItemId()) - .content() - .get( - requestConfiguration -> { - Objects.requireNonNull( - requestConfiguration.queryParameters); - requestConfiguration.queryParameters.format = "pdf"; - }); ) { - if (in == null) { - throw new IllegalStateException( - "No content for file " - + sharepointObject.getName() - + " (" - + sharepointObject.id() - + ")"); - } - return in.readAllBytes(); - } - } catch (Exception e) { - log.error( - "Error downloading file " - + sharepointObject.getName() - + " (" - + sharepointObject.id() - + ")", - e); - throw e; - } + return ClientUtil.downloadDriveItem( + client, sharepointObject.getDriveId(), sharepointObject.getItem()); } @Override @@ -387,11 +220,6 @@ public void deleteObject(String id) throws Exception { throw new UnsupportedOperationException(); } - @Override - public Collection
getSourceRecordHeaders() { - return sourceRecordHeaders; - } - @Override public boolean isStateStorageRequired() { return true; diff --git a/langstream-agents/langstream-agents-ms365/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agents-ms365/src/main/resources/META-INF/ai.langstream.agents.index index a23075487..05b36f3b7 100644 --- a/langstream-agents/langstream-agents-ms365/src/main/resources/META-INF/ai.langstream.agents.index +++ b/langstream-agents/langstream-agents-ms365/src/main/resources/META-INF/ai.langstream.agents.index @@ -1 +1,2 @@ ms365-sharepoint-source +ms365-onedrive-source diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java index 2c2820f29..8b27262da 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceAgentProvider.java @@ -23,7 +23,6 @@ import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import java.util.Map; import java.util.Set; import lombok.Data; import lombok.EqualsAndHashCode; @@ -40,6 +39,7 @@ public class StorageProviderSourceAgentProvider extends AbstractComposableAgentP protected static final String GOOGLE_DRIVE_SOURCE = "google-drive-source"; protected static final String MS_365_SHAREPOINT_SOURCE = "ms365-sharepoint-source"; + protected static final String MS_365_ONEDRIVE_SOURCE = "ms365-onedrive-source"; public StorageProviderSourceAgentProvider() { super( @@ -48,7 +48,8 @@ public StorageProviderSourceAgentProvider() { AZURE_BLOB_STORAGE_SOURCE, GCS_SOURCE, GOOGLE_DRIVE_SOURCE, - MS_365_SHAREPOINT_SOURCE), + MS_365_SHAREPOINT_SOURCE, + MS_365_ONEDRIVE_SOURCE), List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); } @@ -70,6 +71,8 @@ protected Class getAgentConfigModelClass(String type) { return GoogleDriveSourceConfiguration.class; case MS_365_SHAREPOINT_SOURCE: return MS365SharepointSourceConfiguration.class; + case MS_365_ONEDRIVE_SOURCE: + return MS365OneDriveSourceConfiguration.class; default: throw new IllegalArgumentException("Unknown agent type: " + type); } @@ -78,7 +81,7 @@ protected Class getAgentConfigModelClass(String type) { @EqualsAndHashCode(callSuper = true) @AgentConfig(name = "S3 Source", description = "Reads data from S3 bucket") @Data - public static class S3SourceConfiguration extends StateStorageBasedConfiguration { + public static class S3SourceConfiguration extends StorageProviderSourceBaseConfiguration { protected static final String DEFAULT_BUCKET_NAME = "langstream-source"; protected static final String DEFAULT_ENDPOINT = "http://minio-endpoint.-not-set:9090"; @@ -89,24 +92,24 @@ public static class S3SourceConfiguration extends StateStorageBasedConfiguration @ConfigProperty( description = """ - The name of the bucket to read from. - """, + The name of the bucket to read from. + """, defaultValue = DEFAULT_BUCKET_NAME) private String bucketName = DEFAULT_BUCKET_NAME; @ConfigProperty( description = """ - The endpoint of the S3 server. - """, + The endpoint of the S3 server. + """, defaultValue = DEFAULT_ENDPOINT) private String endpoint = DEFAULT_ENDPOINT; @ConfigProperty( description = """ - Access key for the S3 server. - """, + Access key for the S3 server. + """, defaultValue = DEFAULT_ACCESSKEY) @JsonProperty("access-key") private String accessKey = DEFAULT_ACCESSKEY; @@ -114,8 +117,8 @@ public static class S3SourceConfiguration extends StateStorageBasedConfiguration @ConfigProperty( description = """ - Secret key for the S3 server. - """, + Secret key for the S3 server. + """, defaultValue = DEFAULT_SECRETKEY) @JsonProperty("secret-key") private String secretKey = DEFAULT_SECRETKEY; @@ -128,15 +131,6 @@ public static class S3SourceConfiguration extends StateStorageBasedConfiguration """) private String region = ""; - @ConfigProperty( - defaultValue = "5", - description = - """ - Time in seconds to sleep after polling for new files. - """) - @JsonProperty("idle-time") - private int idleTime; - @ConfigProperty( defaultValue = DEFAULT_FILE_EXTENSIONS, description = @@ -150,62 +144,11 @@ public static class S3SourceConfiguration extends StateStorageBasedConfiguration defaultValue = "true", description = """ - Whether to delete objects after processing. - """) + Whether to delete objects after processing. + """) @JsonProperty("delete-objects") private boolean deleteObjects; - @ConfigProperty( - description = - """ - Write a message to this topic when an object has been detected as deleted for any reason. - """) - @JsonProperty("deleted-objects-topic") - private String deletedObjectsTopic; - - @ConfigProperty( - description = - """ - Write a message to this topic periodically with a summary of the activity in the source. - """) - @JsonProperty("source-activity-summary-topic") - private String sourceActivitySummaryTopic; - - @ConfigProperty( - description = - """ - List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') - To include all: 'new,updated,deleted'. - Use this property to disable the source activity summary (by leaving default to empty). - """) - @JsonProperty("source-activity-summary-events") - private String sourceActivitySummaryEvents; - - @ConfigProperty( - defaultValue = "60", - description = - """ - Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. - """) - @JsonProperty("source-activity-summary-events-threshold") - private int sourceActivitySummaryNumEventsThreshold; - - @ConfigProperty( - description = - """ - Trigger source activity summary emission every time this time threshold has been reached. - """) - @JsonProperty("source-activity-summary-time-seconds-threshold") - private int sourceActivitySummaryTimeSecondsThreshold; - - @ConfigProperty( - description = - """ - Additional headers to add to emitted records. - """) - @JsonProperty("source-record-headers") - private Map sourceRecordHeaders; - @ConfigProperty( description = """ @@ -228,14 +171,15 @@ Use this property to disable the source activity summary (by leaving default to name = "Azure Blob Storage Source", description = """ - Reads data from Azure blobs. There are three supported ways to authenticate: - - SAS token - - Storage account name and key - - Storage account connection string - """) + Reads data from Azure blobs. There are three supported ways to authenticate: + - SAS token + - Storage account name and key + - Storage account connection string + """) @Data @EqualsAndHashCode(callSuper = true) - public static class AzureBlobStorageConfiguration extends StateStorageBasedConfiguration { + public static class AzureBlobStorageConfiguration + extends StorageProviderSourceBaseConfiguration { @ConfigProperty( defaultValue = "langstream-azure-source", @@ -256,44 +200,35 @@ public static class AzureBlobStorageConfiguration extends StateStorageBasedConfi @ConfigProperty( description = """ - Azure SAS token. If not provided, storage account name and key must be provided. - """) + Azure SAS token. If not provided, storage account name and key must be provided. + """) @JsonProperty("sas-token") private String sasToken; @ConfigProperty( description = """ - Azure storage account name. If not provided, SAS token must be provided. - """) + Azure storage account name. If not provided, SAS token must be provided. + """) @JsonProperty("storage-account-name") private String storageAccountName; @ConfigProperty( description = """ - Azure storage account key. If not provided, SAS token must be provided. - """) + Azure storage account key. If not provided, SAS token must be provided. + """) @JsonProperty("storage-account-key") private String storageAccountKey; @ConfigProperty( description = """ - Azure storage account connection string. If not provided, SAS token must be provided. - """) + Azure storage account connection string. If not provided, SAS token must be provided. + """) @JsonProperty("storage-account-connection-string") private String storageAccountConnectionString; - @ConfigProperty( - defaultValue = "5", - description = - """ - Time in seconds to sleep after polling for new files. - """) - @JsonProperty("idle-time") - private int idleTime; - @ConfigProperty( defaultValue = "pdf,docx,html,htm,md,txt", description = @@ -307,62 +242,11 @@ public static class AzureBlobStorageConfiguration extends StateStorageBasedConfi defaultValue = "true", description = """ - Whether to delete objects after processing. - """) + Whether to delete objects after processing. + """) @JsonProperty("delete-objects") private boolean deleteObjects; - @ConfigProperty( - description = - """ - Write a message to this topic when an object has been detected as deleted for any reason. - """) - @JsonProperty("deleted-objects-topic") - private String deletedObjectsTopic; - - @ConfigProperty( - description = - """ - Write a message to this topic periodically with a summary of the activity in the source. - """) - @JsonProperty("source-activity-summary-topic") - private String sourceActivitySummaryTopic; - - @ConfigProperty( - description = - """ - List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') - To include all: 'new,updated,deleted'. - Use this property to disable the source activity summary (by leaving default to empty). - """) - @JsonProperty("source-activity-summary-events") - private String sourceActivitySummaryEvents; - - @ConfigProperty( - defaultValue = "60", - description = - """ - Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. - """) - @JsonProperty("source-activity-summary-events-threshold") - private int sourceActivitySummaryNumEventsThreshold; - - @ConfigProperty( - description = - """ - Trigger source activity summary emission every time this time threshold has been reached. - """) - @JsonProperty("source-activity-summary-time-seconds-threshold") - private int sourceActivitySummaryTimeSecondsThreshold; - - @ConfigProperty( - description = - """ - Additional headers to add to emitted records. - """) - @JsonProperty("source-record-headers") - private Map sourceRecordHeaders; - @ConfigProperty( description = """ @@ -385,11 +269,12 @@ Use this property to disable the source activity summary (by leaving default to name = "Google Cloud Storage Source", description = """ - Reads data from Google Cloud Storage. The only authentication supported is via service account JSON. - """) + Reads data from Google Cloud Storage. The only authentication supported is via service account JSON. + """) @Data @EqualsAndHashCode(callSuper = true) - public static class GoogleCloudStorageConfiguration extends StateStorageBasedConfiguration { + public static class GoogleCloudStorageConfiguration + extends StorageProviderSourceBaseConfiguration { @ConfigProperty( defaultValue = "langstream-gcs-source", @@ -409,15 +294,6 @@ public static class GoogleCloudStorageConfiguration extends StateStorageBasedCon @JsonProperty("service-account-json") private String serviceAccountJson; - @ConfigProperty( - defaultValue = "5", - description = - """ - Time in seconds to sleep after polling for new files. - """) - @JsonProperty("idle-time") - private int idleTime; - @ConfigProperty( defaultValue = "pdf,docx,html,htm,md,txt", description = @@ -431,63 +307,11 @@ public static class GoogleCloudStorageConfiguration extends StateStorageBasedCon defaultValue = "true", description = """ - Whether to delete objects after processing. - """) + Whether to delete objects after processing. + """) @JsonProperty("delete-objects") private boolean deleteObjects; - @ConfigProperty( - defaultValue = "true", - description = - """ - Write a message to this topic when an object has been detected as deleted for any reason. - """) - @JsonProperty("deleted-objects-topic") - private String deletedObjectsTopic; - - @ConfigProperty( - description = - """ - Write a message to this topic periodically with a summary of the activity in the source. - """) - @JsonProperty("source-activity-summary-topic") - private String sourceActivitySummaryTopic; - - @ConfigProperty( - description = - """ - List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') - To include all: 'new,updated,deleted'. - Use this property to disable the source activity summary (by leaving default to empty). - """) - @JsonProperty("source-activity-summary-events") - private String sourceActivitySummaryEvents; - - @ConfigProperty( - defaultValue = "60", - description = - """ - Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. - """) - @JsonProperty("source-activity-summary-events-threshold") - private int sourceActivitySummaryNumEventsThreshold; - - @ConfigProperty( - description = - """ - Trigger source activity summary emission every time this time threshold has been reached. - """) - @JsonProperty("source-activity-summary-time-seconds-threshold") - private int sourceActivitySummaryTimeSecondsThreshold; - - @ConfigProperty( - description = - """ - Additional headers to add to emitted records. - """) - @JsonProperty("source-record-headers") - private Map sourceRecordHeaders; - @ConfigProperty( description = """ @@ -510,11 +334,12 @@ Use this property to disable the source activity summary (by leaving default to name = "Google Drive Source", description = """ - Reads data from Google Drive. The only authentication supported is via service account JSON. - """) + Reads data from Google Drive. The only authentication supported is via service account JSON. + """) @Data @EqualsAndHashCode(callSuper = true) - public static class GoogleDriveSourceConfiguration extends StateStorageBasedConfiguration { + public static class GoogleDriveSourceConfiguration + extends StorageProviderSourceBaseConfiguration { @ConfigProperty( required = true, @@ -525,103 +350,108 @@ public static class GoogleDriveSourceConfiguration extends StateStorageBasedConf @JsonProperty("service-account-json") private String serviceAccountJson; - @ConfigProperty( - defaultValue = "5", - description = - """ - Time in seconds to sleep after polling for new files. - """) - @JsonProperty("idle-time") - private int idleTime; - @ConfigProperty( description = """ - Filter by parent folders. Comma separated list of folder IDs. Only children will be processed. - """) + Filter by parent folders. Comma separated list of folder IDs. Only children will be processed. + """) @JsonProperty("root-parents") private List rootParents; @ConfigProperty( description = """ - Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed. - """) + Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed. + """) @JsonProperty("include-mime-types") private List includeMimeTypes; @ConfigProperty( description = """ - Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed. - Note that folders are always discarded. - """) + Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed. + Note that folders are always discarded. + """) @JsonProperty("exclude-mime-types") private List excludeMimeTypes; + } + + @AgentConfig( + name = "MS 365 Sharepoint Source", + description = + """ + Reads data from MS 365 Sharepoint documents. The only authentication supported is application credentials and client secret. + Permissions must be set as "Application permissions" in the registered application. (not Delegated) + """) + @Data + @EqualsAndHashCode(callSuper = true) + public static class MS365SharepointSourceConfiguration + extends StorageProviderSourceBaseConfiguration { @ConfigProperty( - defaultValue = "true", + required = true, description = """ - Write a message to this topic when an object has been detected as deleted for any reason. + Entra MS registered application ID (client ID). """) - @JsonProperty("deleted-objects-topic") - private String deletedObjectsTopic; + @JsonProperty("ms-client-id") + private String clientId; @ConfigProperty( + required = true, description = """ - Write a message to this topic periodically with a summary of the activity in the source. + Entra MS registered application's tenant ID. """) - @JsonProperty("source-activity-summary-topic") - private String sourceActivitySummaryTopic; + @JsonProperty("ms-tenant-id") + private String tenantId; @ConfigProperty( + required = true, description = """ - List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') - To include all: 'new,updated,deleted'. - Use this property to disable the source activity summary (by leaving default to empty). + Entra MS registered application's client secret value. """) - @JsonProperty("source-activity-summary-events") - private String sourceActivitySummaryEvents; + @JsonProperty("ms-client-secret") + private String clientSecret; @ConfigProperty( - defaultValue = "60", description = """ - Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. - """) - @JsonProperty("source-activity-summary-events-threshold") - private int sourceActivitySummaryNumEventsThreshold; + Filter by sites. By default, all sites are included. + """) + @JsonProperty("sites") + private List sites; @ConfigProperty( description = """ - Trigger source activity summary emission every time this time threshold has been reached. - """) - @JsonProperty("source-activity-summary-time-seconds-threshold") - private int sourceActivitySummaryTimeSecondsThreshold; + Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed. + """) + @JsonProperty("include-mime-types") + private List includeMimeTypes; @ConfigProperty( description = """ - Additional headers to add to emitted records. - """) - @JsonProperty("source-record-headers") - private Map sourceRecordHeaders; + Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed. + Note that folders are always discarded. + """) + @JsonProperty("exclude-mime-types") + private List excludeMimeTypes; } @AgentConfig( - name = "MS 365 Sharepoint Source", + name = "MS 365 OneDrive Source", description = """ - Reads data from MS 365 Sharepoint documents. The only authentication supported is application credentials and client secret. - Permissions must be set as "Application permissions" in the registered application. (not Delegated) - """) + Reads data from MS 365 OneDrive documents. The only authentication supported is application credentials and client secret. + Permissions must be set as "Application permissions" in the registered application. (not Delegated) + """) @Data @EqualsAndHashCode(callSuper = true) - public static class MS365SharepointSourceConfiguration extends StateStorageBasedConfiguration { + public static class MS365OneDriveSourceConfiguration + extends StorageProviderSourceBaseConfiguration { @ConfigProperty( required = true, @@ -651,89 +481,38 @@ Entra MS registered application ID (client ID). private String clientSecret; @ConfigProperty( - defaultValue = "5", - description = - """ - Time in seconds to sleep after polling for new files. - """) - @JsonProperty("idle-time") - private int idleTime; - - @ConfigProperty( + required = true, description = """ - Filter by sites. By default, all sites are included. - """) - @JsonProperty("sites") - private List sites; + Users drives to read from. + """) + @JsonProperty("users") + private List users; @ConfigProperty( description = """ - Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed. - """) + Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed. + """) @JsonProperty("include-mime-types") private List includeMimeTypes; @ConfigProperty( description = """ - Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed. - Note that folders are always discarded. - """) + Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed. + Note that folders are always discarded. + """) @JsonProperty("exclude-mime-types") private List excludeMimeTypes; - @ConfigProperty( - defaultValue = "true", - description = - """ - Write a message to this topic when an object has been detected as deleted for any reason. - """) - @JsonProperty("deleted-objects-topic") - private String deletedObjectsTopic; - - @ConfigProperty( - description = - """ - Write a message to this topic periodically with a summary of the activity in the source. - """) - @JsonProperty("source-activity-summary-topic") - private String sourceActivitySummaryTopic; - - @ConfigProperty( - description = - """ - List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') - To include all: 'new,updated,deleted'. - Use this property to disable the source activity summary (by leaving default to empty). - """) - @JsonProperty("source-activity-summary-events") - private String sourceActivitySummaryEvents; - - @ConfigProperty( - defaultValue = "60", - description = - """ - Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. - """) - @JsonProperty("source-activity-summary-events-threshold") - private int sourceActivitySummaryNumEventsThreshold; - - @ConfigProperty( - description = - """ - Trigger source activity summary emission every time this time threshold has been reached. - """) - @JsonProperty("source-activity-summary-time-seconds-threshold") - private int sourceActivitySummaryTimeSecondsThreshold; - @ConfigProperty( description = """ - Additional headers to add to emitted records. - """) - @JsonProperty("source-record-headers") - private Map sourceRecordHeaders; + The root directory to read from. + Do not use a leading slash. To specify a directory, include a trailing slash. """, + defaultValue = "/") + @JsonProperty("path-prefix") + private String pathPrefix; } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StateStorageBasedConfiguration.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceBaseConfiguration.java similarity index 51% rename from langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StateStorageBasedConfiguration.java rename to langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceBaseConfiguration.java index 9df7e39ed..3f9f94d31 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StateStorageBasedConfiguration.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/StorageProviderSourceBaseConfiguration.java @@ -17,10 +17,11 @@ import ai.langstream.api.doc.ConfigProperty; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; import lombok.Data; @Data -public class StateStorageBasedConfiguration { +public class StorageProviderSourceBaseConfiguration { @ConfigProperty( description = """ @@ -82,4 +83,64 @@ State storage type (s3, disk). """) @JsonProperty("state-storage-s3-region") private String stateStorageS3Region; + + @ConfigProperty( + description = + """ + Write a message to this topic when an object has been detected as deleted for any reason. + """) + @JsonProperty("deleted-objects-topic") + private String deletedObjectsTopic; + + @ConfigProperty( + description = + """ + Write a message to this topic periodically with a summary of the activity in the source. + """) + @JsonProperty("source-activity-summary-topic") + private String sourceActivitySummaryTopic; + + @ConfigProperty( + description = + """ + List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted') + To include all: 'new,updated,deleted'. + Use this property to disable the source activity summary (by leaving default to empty). + """) + @JsonProperty("source-activity-summary-events") + private String sourceActivitySummaryEvents; + + @ConfigProperty( + defaultValue = "60", + description = + """ + Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet. + """) + @JsonProperty("source-activity-summary-events-threshold") + private int sourceActivitySummaryNumEventsThreshold; + + @ConfigProperty( + description = + """ + Trigger source activity summary emission every time this time threshold has been reached. + """) + @JsonProperty("source-activity-summary-time-seconds-threshold") + private int sourceActivitySummaryTimeSecondsThreshold; + + @ConfigProperty( + description = + """ + Additional headers to add to emitted records. + """) + @JsonProperty("source-record-headers") + private Map sourceRecordHeaders; + + @ConfigProperty( + defaultValue = "5", + description = + """ + Time in seconds to sleep after polling for new files. + """) + @JsonProperty("idle-time") + private int idleTime; } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java index 7e3f3009d..7c9fb5acf 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java @@ -258,8 +258,7 @@ public void testDocumentation() { "deleted-objects-topic" : { "description" : "Write a message to this topic when an object has been detected as deleted for any reason.", "required" : false, - "type" : "string", - "defaultValue" : "true" + "type" : "string" }, "file-extensions" : { "description" : "Comma separated list of file extensions to filter by.", @@ -364,8 +363,7 @@ public void testDocumentation() { "deleted-objects-topic" : { "description" : "Write a message to this topic when an object has been detected as deleted for any reason.", "required" : false, - "type" : "string", - "defaultValue" : "true" + "type" : "string" }, "exclude-mime-types" : { "description" : "Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed.\\nNote that folders are always discarded.", @@ -476,6 +474,140 @@ public void testDocumentation() { } } }, + "ms365-onedrive-source" : { + "name" : "MS 365 OneDrive Source", + "description" : "Reads data from MS 365 OneDrive documents. The only authentication supported is application credentials and client secret.\\nPermissions must be set as \\"Application permissions\\" in the registered application. (not Delegated)", + "properties" : { + "deleted-objects-topic" : { + "description" : "Write a message to this topic when an object has been detected as deleted for any reason.", + "required" : false, + "type" : "string" + }, + "exclude-mime-types" : { + "description" : "Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed.\\nNote that folders are always discarded.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed.\\nNote that folders are always discarded.", + "required" : false, + "type" : "string" + } + }, + "idle-time" : { + "description" : "Time in seconds to sleep after polling for new files.", + "required" : false, + "type" : "integer", + "defaultValue" : "5" + }, + "include-mime-types" : { + "description" : "Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Filter by mime types. Comma separated list of mime types. Only files with these mime types will be processed.", + "required" : false, + "type" : "string" + } + }, + "ms-client-id" : { + "description" : "Entra MS registered application ID (client ID).", + "required" : true, + "type" : "string" + }, + "ms-client-secret" : { + "description" : "Entra MS registered application's client secret value.", + "required" : true, + "type" : "string" + }, + "ms-tenant-id" : { + "description" : "Entra MS registered application's tenant ID.", + "required" : true, + "type" : "string" + }, + "path-prefix" : { + "description" : "The root directory to read from.\\nDo not use a leading slash. To specify a directory, include a trailing slash.", + "required" : false, + "type" : "string", + "defaultValue" : "/" + }, + "source-activity-summary-events" : { + "description" : "List of events (comma separated) to include in the source activity summary. ('new', 'updated', 'deleted')\\nTo include all: 'new,updated,deleted'.\\nUse this property to disable the source activity summary (by leaving default to empty).", + "required" : false, + "type" : "string" + }, + "source-activity-summary-events-threshold" : { + "description" : "Trigger source activity summary emission when this number of events have been detected, even if the time threshold has not been reached yet.", + "required" : false, + "type" : "integer", + "defaultValue" : "60" + }, + "source-activity-summary-time-seconds-threshold" : { + "description" : "Trigger source activity summary emission every time this time threshold has been reached.", + "required" : false, + "type" : "integer" + }, + "source-activity-summary-topic" : { + "description" : "Write a message to this topic periodically with a summary of the activity in the source.", + "required" : false, + "type" : "string" + }, + "source-record-headers" : { + "description" : "Additional headers to add to emitted records.", + "required" : false, + "type" : "object" + }, + "state-storage" : { + "description" : "State storage type (s3, disk).", + "required" : false, + "type" : "string" + }, + "state-storage-file-prefix" : { + "description" : "Prepend a prefix to the state storage file. (valid for all types)", + "required" : false, + "type" : "string" + }, + "state-storage-file-prepend-tenant" : { + "description" : "Prepend tenant to the state storage file. (valid for all types)", + "required" : false, + "type" : "string" + }, + "state-storage-s3-access-key" : { + "description" : "State storage S3 access key.", + "required" : false, + "type" : "string" + }, + "state-storage-s3-bucket" : { + "description" : "State storage S3 bucket.", + "required" : false, + "type" : "string" + }, + "state-storage-s3-endpoint" : { + "description" : "State storage S3 endpoint.", + "required" : false, + "type" : "string" + }, + "state-storage-s3-region" : { + "description" : "State storage S3 region.", + "required" : false, + "type" : "string" + }, + "state-storage-s3-secret-key" : { + "description" : "State storage S3 secret key.", + "required" : false, + "type" : "string" + }, + "users" : { + "description" : "Users drives to read from.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Users drives to read from.", + "required" : true, + "type" : "string" + } + } + } + }, "ms365-sharepoint-source" : { "name" : "MS 365 Sharepoint Source", "description" : "Reads data from MS 365 Sharepoint documents. The only authentication supported is application credentials and client secret.\\nPermissions must be set as \\"Application permissions\\" in the registered application. (not Delegated)", @@ -483,8 +615,7 @@ public void testDocumentation() { "deleted-objects-topic" : { "description" : "Write a message to this topic when an object has been detected as deleted for any reason.", "required" : false, - "type" : "string", - "defaultValue" : "true" + "type" : "string" }, "exclude-mime-types" : { "description" : "Filter out mime types. Comma separated list of mime types. Only files with different mime types will be processed.\\nNote that folders are always discarded.", diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSOneDriveSourceIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSOneDriveSourceIT.java new file mode 100644 index 000000000..2dc21eec2 --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSOneDriveSourceIT.java @@ -0,0 +1,204 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents; + +import static ai.langstream.testrunners.AbstractApplicationRunner.INTEGRATION_TESTS_GROUP1; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3; + +import ai.langstream.api.runner.topics.TopicConsumer; +import ai.langstream.testrunners.AbstractGenericStreamingApplicationRunner; +import com.azure.identity.ClientSecretCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.microsoft.graph.models.*; +import com.microsoft.graph.serviceclient.GraphServiceClient; +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +@Tag(INTEGRATION_TESTS_GROUP1) +@Disabled +class MSOneDriveSourceIT extends AbstractGenericStreamingApplicationRunner { + @Container + private static final LocalStackContainer localstack = + new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.2.0")) + .withServices(S3); + + private static final String TENANT_ID = ""; + private static final String CLIENT_ID = ""; + private static final String CLIENT_SECRET = ""; + + private static final String USER = ""; + + @Test + public void test() throws Exception { + GraphServiceClient client = newClient(); + + final String appId = "app-" + UUID.randomUUID().toString().substring(0, 4); + + String tenant = "tenant"; + Drive drive = client.users().byUserId(USER).drive().get(); + DriveItem rootItem = client.drives().byDriveId(drive.getId()).root().get(); + + DriveItem folderItem = new DriveItem(); + folderItem.setName("LangStream test"); + folderItem.setFolder(new Folder()); + List children = + client.drives() + .byDriveId(drive.getId()) + .items() + .byDriveItemId(rootItem.getId()) + .children() + .get() + .getValue(); + for (DriveItem child : children) { + client.drives().byDriveId(drive.getId()).items().byDriveItemId(child.getId()).delete(); + } + String folderId = + client.drives() + .byDriveId(drive.getId()) + .items() + .byDriveItemId(rootItem.getId()) + .children() + .post(folderItem) + .getId(); + + String[] expectedAgents = new String[] {appId + "-step1", appId + "-step2"}; + Map application = + Map.of( + "module.yaml", + """ + module: "module-1" + id: "pipeline-1" + topics: + - name: "${globals.output-topic}" + creation-mode: create-if-not-exists + - name: "deleted-documents" + creation-mode: create-if-not-exists + pipeline: + - type: "ms365-onedrive-source" + id: "step1" + configuration: + ms-client-id: %s + ms-client-secret: %s + ms-tenant-id: %s + users: [%s] + state-storage: s3 + state-storage-s3-bucket: "test-state-bucket" + state-storage-s3-endpoint: "%s" + deleted-objects-topic: "deleted-objects" + idle-time: 1 + - type: text-extractor + id: step2 + output: "${globals.output-topic}" + """ + .formatted( + CLIENT_ID, + CLIENT_SECRET, + TENANT_ID, + USER, + localstack.getEndpointOverride(S3))); + + List fileIds = new ArrayList<>(); + List itemIds = new ArrayList<>(); + List docs = + List.of( + MSOneDriveSourceIT.class.getResourceAsStream("/doc1.docx").readAllBytes(), + MSOneDriveSourceIT.class.getResourceAsStream("/doc2.docx").readAllBytes()); + + for (int i = 0; i < 2; i++) { + byte[] bytes = docs.get(i); + DriveItem driveItem = new DriveItem(); + driveItem.setName("test-" + i + ".docx"); + driveItem.setFile(new File()); + DriveItem item = + client.drives() + .byDriveId(drive.getId()) + .items() + .byDriveItemId(folderId) + .children() + .post(driveItem); + + client.drives() + .byDriveId(drive.getId()) + .items() + .byDriveItemId(item.getId()) + .content() + .put(new ByteArrayInputStream(bytes)); + fileIds.add(drive.getId() + "_" + item.getId()); + itemIds.add(item.getId()); + } + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, appId, application, buildInstanceYaml(), expectedAgents)) { + + try (TopicConsumer deletedDocumentsConsumer = createConsumer("deleted-objects"); + TopicConsumer consumer = + createConsumer(applicationRuntime.getGlobal("output-topic")); ) { + + executeAgentRunners(applicationRuntime, 5); + waitForMessages( + consumer, + 2, + (consumerRecords, objects) -> { + assertEquals(2, consumerRecords.size()); + assertEquals(fileIds.get(0), consumerRecords.get(0).key()); + assertTrue( + ((String) consumerRecords.get(0).value()) + .contains("This is a document")); + assertEquals(fileIds.get(1), consumerRecords.get(1).key()); + assertTrue( + ((String) consumerRecords.get(1).value()) + .contains("This is a another document")); + }); + + client.drives() + .byDriveId(drive.getId()) + .items() + .byDriveItemId(itemIds.get(0)) + .delete(); + executeAgentRunners(applicationRuntime); + waitForMessages(deletedDocumentsConsumer, List.of(fileIds.get(0))); + } + } + } + + private static GraphServiceClient newClient() { + final String[] scopes = new String[] {"https://graph.microsoft.com/.default"}; + final ClientSecretCredential credential = + new ClientSecretCredentialBuilder() + .clientId(CLIENT_ID) + .tenantId(TENANT_ID) + .clientSecret(CLIENT_SECRET) + .build(); + + return new GraphServiceClient(credential, scopes); + } +} diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSSharepointSourceIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSSharepointSourceIT.java index e0b4ad3ed..997f22d43 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSSharepointSourceIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/agents/MSSharepointSourceIT.java @@ -53,7 +53,7 @@ class MSSharepointSourceIT extends AbstractGenericStreamingApplicationRunner { private static final String CLIENT_ID = ""; private static final String CLIENT_SECRET = ""; - private static final String SITE_ID = "sito2"; + private static final String SITE_ID = ""; @Test public void test() throws Exception {