Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
feat: onedrive source (#132)
Browse files Browse the repository at this point in the history
- New agent source `ms365-onedrive-source`
example usage:
```
 - type: "ms365-onedrive-source"
                                    id: "step1"
                                    configuration:
                                        ms-client-id: CLIENT_ID
                                        ms-client-secret: SECRET
                                        ms-tenant-id: TENANT
                                        users: "your-principal-email"
                                        path-prefix: /MyDocuments
                                        state-storage: s3
                                        state-storage-s3-bucket: "test-state-bucket"
                                        state-storage-s3-endpoint: "%s"
                                        deleted-objects-topic: "deleted-objects"
                                        idle-time: 1
```

to configure the credentials you need to create a registered application
(this
[guide](https://docs.airbyte.com/integrations/sources/microsoft-sharepoint#step-1-set-up-sharepoint-application)
is helpful).


notes:
1. Auth is only supported via client secret (no delegated users or
client certificates)
2. Shared drives are not supported
3. It gets files from the configured users principals. Since the
authentication is done via app credentials, the "current user" drive is
not accessible from UI so it makes no sense to use it.
  • Loading branch information
nicoloboschi authored Aug 23, 2024
1 parent f367357 commit 5774b49
Show file tree
Hide file tree
Showing 15 changed files with 960 additions and 840 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package ai.langstream.agents.azureblobstorage;

import static ai.langstream.api.util.ConfigurationUtils.*;
import static ai.langstream.api.util.ConfigurationUtils.getInt;

import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
Expand All @@ -31,7 +28,6 @@
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.util.*;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -42,24 +38,11 @@ public class AzureBlobStorageSource
public static class AzureBlobStorageSourceState extends StorageProviderSourceState {}

private BlobContainerClient client;
private int idleTime;

private String deletedObjectsTopic;

private String pathPrefix;
private boolean recursive;

private String sourceActivitySummaryTopic;

private List<String> sourceActivitySummaryEvents;

private int sourceActivitySummaryNumEventsThreshold;
private int sourceActivitySummaryTimeSecondsThreshold;

private boolean deleteObjects;

private Collection<Header> sourceRecordHeaders;

public static final String ALL_FILES = "*";
public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt";
private Set<String> extensions = Set.of();
Expand Down Expand Up @@ -130,32 +113,12 @@ public Class<AzureBlobStorageSourceState> getStateClass() {
@Override
public void initializeClientAndConfig(Map<String, Object> configuration) {
client = createContainerClient(configuration);
idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString());
deletedObjectsTopic = getString("deleted-objects-topic", null, configuration);
deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration);
sourceRecordHeaders =
getMap("source-record-headers", Map.of(), configuration).entrySet().stream()
.map(
entry ->
SimpleRecord.SimpleHeader.of(
entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableList());
pathPrefix = configuration.getOrDefault("path-prefix", "").toString();
if (StringUtils.isNotEmpty(pathPrefix) && !pathPrefix.endsWith("/")) {
pathPrefix += "/";
}
recursive = getBoolean("recursive", false, configuration);
sourceActivitySummaryTopic =
getString("source-activity-summary-topic", null, configuration);
sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration);
sourceActivitySummaryNumEventsThreshold =
getInt("source-activity-summary-events-threshold", 0, configuration);
sourceActivitySummaryTimeSecondsThreshold =
getInt("source-activity-summary-time-seconds-threshold", 30, configuration);
if (sourceActivitySummaryTimeSecondsThreshold < 0) {
throw new IllegalArgumentException(
"source-activity-summary-time-seconds-threshold must be > 0");
}
extensions =
Set.of(
configuration
Expand All @@ -176,36 +139,6 @@ public boolean isDeleteObjects() {
return deleteObjects;
}

@Override
public int getIdleTime() {
return idleTime;
}

@Override
public String getDeletedObjectsTopic() {
return deletedObjectsTopic;
}

@Override
public String getSourceActivitySummaryTopic() {
return sourceActivitySummaryTopic;
}

@Override
public List<String> getSourceActivitySummaryEvents() {
return sourceActivitySummaryEvents;
}

@Override
public int getSourceActivitySummaryNumEventsThreshold() {
return sourceActivitySummaryNumEventsThreshold;
}

@Override
public int getSourceActivitySummaryTimeSecondsThreshold() {
return sourceActivitySummaryTimeSecondsThreshold;
}

@Override
public List<StorageProviderObjectReference> listObjects() throws Exception {
final PagedIterable<BlobItem> blobs;
Expand Down Expand Up @@ -274,11 +207,6 @@ public void deleteObject(String id) throws Exception {
client.getBlobClient(id).deleteIfExists();
}

@Override
public Collection<Header> getSourceRecordHeaders() {
return sourceRecordHeaders;
}

@Override
public boolean isStateStorageRequired() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import io.minio.GetObjectArgs;
import io.minio.GetObjectResponse;
Expand All @@ -31,7 +29,6 @@
import io.minio.Result;
import io.minio.messages.Item;
import java.util.*;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -45,21 +42,12 @@ public static class S3SourceState extends StorageProviderSourceState {}
private String pathPrefix;
private boolean recursive;
private MinioClient minioClient;
private int idleTime;
private String deletedObjectsTopic;
private String sourceActivitySummaryTopic;

private List<String> sourceActivitySummaryEvents;

private int sourceActivitySummaryNumEventsThreshold;
private int sourceActivitySummaryTimeSecondsThreshold;

public static final String ALL_FILES = "*";
public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt";
private Set<String> extensions = Set.of();

private boolean deleteObjects;
private Collection<Header> sourceRecordHeaders;

@Override
public Class<S3SourceState> getStateClass() {
Expand All @@ -82,20 +70,6 @@ public void initializeClientAndConfig(Map<String, Object> configuration) {
}
recursive = getBoolean("recursive", false, configuration);
String region = configuration.getOrDefault("region", "").toString();
idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString());
deletedObjectsTopic = getString("deleted-objects-topic", null, configuration);
sourceActivitySummaryTopic =
getString("source-activity-summary-topic", null, configuration);
sourceActivitySummaryEvents = getList("source-activity-summary-events", configuration);
sourceActivitySummaryNumEventsThreshold =
getInt("source-activity-summary-events-threshold", 0, configuration);
sourceActivitySummaryTimeSecondsThreshold =
getInt("source-activity-summary-time-seconds-threshold", 30, configuration);
if (sourceActivitySummaryTimeSecondsThreshold < 0) {
throw new IllegalArgumentException(
"source-activity-summary-time-seconds-threshold must be > 0");
}

extensions =
Set.of(
configuration
Expand All @@ -104,15 +78,6 @@ public void initializeClientAndConfig(Map<String, Object> configuration) {
.split(","));

deleteObjects = ConfigurationUtils.getBoolean("delete-objects", true, configuration);

sourceRecordHeaders =
getMap("source-record-headers", Map.of(), configuration).entrySet().stream()
.map(
entry ->
SimpleRecord.SimpleHeader.of(
entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableList());

log.info(
"Connecting to S3 Bucket at {} in region {} with user {} on path {} with recursive {}",
endpoint,
Expand Down Expand Up @@ -141,36 +106,6 @@ public boolean isDeleteObjects() {
return deleteObjects;
}

@Override
public int getIdleTime() {
return idleTime;
}

@Override
public String getDeletedObjectsTopic() {
return deletedObjectsTopic;
}

@Override
public String getSourceActivitySummaryTopic() {
return sourceActivitySummaryTopic;
}

@Override
public List<String> getSourceActivitySummaryEvents() {
return sourceActivitySummaryEvents;
}

@Override
public int getSourceActivitySummaryNumEventsThreshold() {
return sourceActivitySummaryNumEventsThreshold;
}

@Override
public int getSourceActivitySummaryTimeSecondsThreshold() {
return sourceActivitySummaryTimeSecondsThreshold;
}

@Override
public List<StorageProviderObjectReference> listObjects() throws Exception {
Iterable<Result<Item>> results;
Expand Down Expand Up @@ -236,11 +171,6 @@ public void deleteObject(String id) throws Exception {
minioClient.removeObject(RemoveObjectArgs.builder().bucket(bucketName).object(id).build());
}

@Override
public Collection<Header> getSourceRecordHeaders() {
return sourceRecordHeaders;
}

@Override
public boolean isStateStorageRequired() {
return false;
Expand Down
Loading

0 comments on commit 5774b49

Please sign in to comment.