Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

feat: onedrive source #132

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package ai.langstream.agents.azureblobstorage;

import static ai.langstream.api.util.ConfigurationUtils.*;
import static ai.langstream.api.util.ConfigurationUtils.getInt;

import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
Expand All @@ -31,7 +28,6 @@
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.util.*;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -42,24 +38,11 @@ public class AzureBlobStorageSource
public static class AzureBlobStorageSourceState extends StorageProviderSourceState {}

private BlobContainerClient client;
private int idleTime;

private String deletedObjectsTopic;

private String pathPrefix;
private boolean recursive;

private String sourceActivitySummaryTopic;

private List<String> sourceActivitySummaryEvents;

private int sourceActivitySummaryNumEventsThreshold;
private int sourceActivitySummaryTimeSecondsThreshold;

private boolean deleteObjects;

private Collection<Header> sourceRecordHeaders;

public static final String ALL_FILES = "*";
public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt";
private Set<String> extensions = Set.of();
Expand Down Expand Up @@ -130,32 +113,12 @@ public Class<AzureBlobStorageSourceState> getStateClass() {
@Override
public void initializeClientAndConfig(Map<String, Object> configuration) {
client = createContainerClient(configuration);
idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString());
deletedObjectsTopic = getString("deleted-objects-topic", null, configuration);
deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration);
sourceRecordHeaders =
getMap("source-record-headers", Map.of(), configuration).entrySet().stream()
.map(
entry ->
SimpleRecord.SimpleHeader.of(
entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableList());
pathPrefix = configuration.getOrDefault("path-prefix", "").toString();
if (StringUtils.isNotEmpty(pathPrefix) && !pathPrefix.endsWith("/")) {
pathPrefix += "/";
}
recursive = getBoolean("recursive", false, configuration);
sourceActivitySummaryTopic =
getString("source-activity-summary-topic", null, configuration);
sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration);
sourceActivitySummaryNumEventsThreshold =
getInt("source-activity-summary-events-threshold", 0, configuration);
sourceActivitySummaryTimeSecondsThreshold =
getInt("source-activity-summary-time-seconds-threshold", 30, configuration);
if (sourceActivitySummaryTimeSecondsThreshold < 0) {
throw new IllegalArgumentException(
"source-activity-summary-time-seconds-threshold must be > 0");
}
extensions =
Set.of(
configuration
Expand All @@ -176,36 +139,6 @@ public boolean isDeleteObjects() {
return deleteObjects;
}

@Override
public int getIdleTime() {
return idleTime;
}

@Override
public String getDeletedObjectsTopic() {
return deletedObjectsTopic;
}

@Override
public String getSourceActivitySummaryTopic() {
return sourceActivitySummaryTopic;
}

@Override
public List<String> getSourceActivitySummaryEvents() {
return sourceActivitySummaryEvents;
}

@Override
public int getSourceActivitySummaryNumEventsThreshold() {
return sourceActivitySummaryNumEventsThreshold;
}

@Override
public int getSourceActivitySummaryTimeSecondsThreshold() {
return sourceActivitySummaryTimeSecondsThreshold;
}

@Override
public List<StorageProviderObjectReference> listObjects() throws Exception {
final PagedIterable<BlobItem> blobs;
Expand Down Expand Up @@ -274,11 +207,6 @@ public void deleteObject(String id) throws Exception {
client.getBlobClient(id).deleteIfExists();
}

@Override
public Collection<Header> getSourceRecordHeaders() {
return sourceRecordHeaders;
}

@Override
public boolean isStateStorageRequired() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import io.minio.GetObjectArgs;
import io.minio.GetObjectResponse;
Expand All @@ -31,7 +29,6 @@
import io.minio.Result;
import io.minio.messages.Item;
import java.util.*;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -45,21 +42,12 @@ public static class S3SourceState extends StorageProviderSourceState {}
private String pathPrefix;
private boolean recursive;
private MinioClient minioClient;
private int idleTime;
private String deletedObjectsTopic;
private String sourceActivitySummaryTopic;

private List<String> sourceActivitySummaryEvents;

private int sourceActivitySummaryNumEventsThreshold;
private int sourceActivitySummaryTimeSecondsThreshold;

public static final String ALL_FILES = "*";
public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt";
private Set<String> extensions = Set.of();

private boolean deleteObjects;
private Collection<Header> sourceRecordHeaders;

@Override
public Class<S3SourceState> getStateClass() {
Expand All @@ -82,20 +70,6 @@ public void initializeClientAndConfig(Map<String, Object> configuration) {
}
recursive = getBoolean("recursive", false, configuration);
String region = configuration.getOrDefault("region", "").toString();
idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString());
deletedObjectsTopic = getString("deleted-objects-topic", null, configuration);
sourceActivitySummaryTopic =
getString("source-activity-summary-topic", null, configuration);
sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration);
sourceActivitySummaryNumEventsThreshold =
getInt("source-activity-summary-events-threshold", 0, configuration);
sourceActivitySummaryTimeSecondsThreshold =
getInt("source-activity-summary-time-seconds-threshold", 30, configuration);
if (sourceActivitySummaryTimeSecondsThreshold < 0) {
throw new IllegalArgumentException(
"source-activity-summary-time-seconds-threshold must be > 0");
}

extensions =
Set.of(
configuration
Expand All @@ -104,15 +78,6 @@ public void initializeClientAndConfig(Map<String, Object> configuration) {
.split(","));

deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration);

sourceRecordHeaders =
getMap("source-record-headers", Map.of(), configuration).entrySet().stream()
.map(
entry ->
SimpleRecord.SimpleHeader.of(
entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableList());

log.info(
"Connecting to S3 Bucket at {} in region {} with user {} on path {} with recursive {}",
endpoint,
Expand Down Expand Up @@ -141,36 +106,6 @@ public boolean isDeleteObjects() {
return deleteObjects;
}

@Override
public int getIdleTime() {
return idleTime;
}

@Override
public String getDeletedObjectsTopic() {
return deletedObjectsTopic;
}

@Override
public String getSourceActivitySummaryTopic() {
return sourceActivitySummaryTopic;
}

@Override
public List<String> getSourceActivitySummaryEvents() {
return sourceActivitySummaryEvents;
}

@Override
public int getSourceActivitySummaryNumEventsThreshold() {
return sourceActivitySummaryNumEventsThreshold;
}

@Override
public int getSourceActivitySummaryTimeSecondsThreshold() {
return sourceActivitySummaryTimeSecondsThreshold;
}

@Override
public List<StorageProviderObjectReference> listObjects() throws Exception {
Iterable<Result<Item>> results;
Expand Down Expand Up @@ -236,11 +171,6 @@ public void deleteObject(String id) throws Exception {
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(id).build());
}

@Override
public Collection<Header> getSourceRecordHeaders() {
return sourceRecordHeaders;
}

@Override
public boolean isStateStorageRequired() {
return false;
Expand Down
Loading
Loading