Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
feat: dropbox source (#133)
Browse files Browse the repository at this point in the history
- New `dropbox-source`
```
- type: "dropbox-source"
                                    id: "step1"
                                    configuration:
                                        access-token: sl.xxxxx
                                        path-prefix: /langstream-test
                                        state-storage: s3
                                        state-storage-s3-bucket: "test-state-bucket"
                                        state-storage-s3-endpoint: "%s"
                                        deleted-objects-topic: "deleted-objects"
```

- `access-token` for the app (Follow
[this](https://github.com/dropbox/dropbox-sdk-java?tab=readme-ov-file#register-a-dropbox-api-app))
- `path-prefix` for reading from a different root path
- `extensions` , list for extensions to filter in (by defaut all are
included)
  • Loading branch information
nicoloboschi authored Aug 23, 2024
1 parent 017a52b commit d2d10c6
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 5 deletions.
105 changes: 105 additions & 0 deletions langstream-agents/langstream-agents-dropbox/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright DataStax, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>langstream-agents</artifactId>
<groupId>ai.langstream</groupId>
<version>0.23.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>langstream-agents-dropbox</artifactId>
<dependencyManagement>

</dependencyManagement>


<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-agents-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ai.langstream</groupId>
<artifactId>langstream-agents-commons-storage-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>com.dropbox.core</groupId>
<artifactId>dropbox-core-sdk</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<classifier>nar</classifier>
</configuration>
<executions>
<execution>
<id>default-nar</id>
<phase>package</phase>
<goals>
<goal>nar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.agents.dropbox;

import ai.langstream.api.runner.code.AgentCode;
import ai.langstream.api.runner.code.AgentCodeProvider;
import java.util.List;

public class DropboxAgentsCodeProvider implements AgentCodeProvider {

public static final String DROPBOX_SOURCE = "dropbox-source";
private static final List<String> AGENTS = List.of(DROPBOX_SOURCE);

@Override
public boolean supports(String agentType) {
return AGENTS.contains(agentType);
}

@Override
public AgentCode createInstance(String agentType) {
switch (agentType) {
case DROPBOX_SOURCE:
return new DropboxSource();
default:
throw new IllegalArgumentException("Unsupported agent type: " + agentType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.agents.dropbox;

import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.dropbox.core.DbxDownloader;
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.*;
import java.io.ByteArrayOutputStream;
import java.util.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public class DropboxSource extends StorageProviderSource<DropboxSource.DropboxSourceState> {

public static class DropboxSourceState extends StorageProviderSourceState {}

private DbxClientV2 client;

private String pathPrefix;
private Set<String> extensions;

@Override
public Class<DropboxSourceState> getStateClass() {
return DropboxSourceState.class;
}

@Override
public void initializeClientAndConfig(Map<String, Object> configuration) {
String accessToken =
ConfigurationUtils.requiredField(
configuration, "access-token", () -> "dropbox source");
String clientIdentifier =
ConfigurationUtils.getString(
"client-identifier", "langstream-source", configuration);
DbxRequestConfig config =
DbxRequestConfig.newBuilder(clientIdentifier).withAutoRetryEnabled().build();
client = new DbxClientV2(config, accessToken);
initializeConfig(configuration);
}

void initializeConfig(Map<String, Object> configuration) {
pathPrefix = configuration.getOrDefault("path-prefix", "").toString();
if (StringUtils.isNotEmpty(pathPrefix) && pathPrefix.endsWith("/")) {
pathPrefix = pathPrefix.substring(0, pathPrefix.length() - 1);
}

extensions = ConfigurationUtils.getSet("extensions", configuration);
if (extensions.isEmpty()) {
log.info("No extensions filter set, getting all files");
}
}

@Override
public String getBucketName() {
return "";
}

@Override
public boolean isDeleteObjects() {
return false;
}

@Override
public Collection<StorageProviderObjectReference> listObjects() throws Exception {
List<StorageProviderObjectReference> collect = new ArrayList<>();
collectFiles(pathPrefix, collect);
log.info("Found {} files", collect.size());
return collect;
}

private void collectFiles(String path, List<StorageProviderObjectReference> collect)
throws Exception {
log.debug("Listing path {}", path);
ListFolderResult result = client.files().listFolder(path);
while (true) {
for (Metadata metadata : result.getEntries()) {
if (metadata instanceof DeletedMetadata) {
continue;
} else if (metadata instanceof FolderMetadata folder) {
collectFiles(folder.getPathDisplay(), collect);
} else if (metadata instanceof FileMetadata file) {
if (log.isDebugEnabled()) {
log.debug("found file {}", file);
}
if (file.getContentHash() == null) {
log.warn("No content hash for file {}", file.getPathDisplay());
continue;
}
if (!extensions.isEmpty()) {
final String extension;
if (file.getName().contains(".")) {
extension =
file.getName().substring(file.getName().lastIndexOf('.') + 1);
} else {
extension = "";
}
if (!extensions.contains(extension)) {
log.info(
"Skipping file with extension {} (extension {})",
file.getPathDisplay(),
extension);
continue;
}
}
if (log.isDebugEnabled()) {
log.debug(
"Adding file {}, id {}, size {}, digest {}, path {}",
file.getName(),
file.getId(),
file.getSize(),
file.getContentHash(),
file.getPathDisplay());
}
collect.add(new DropboxObject(file));
} else {
log.warn("Unknown metadata type {}", metadata);
}
}
if (!result.getHasMore()) {
break;
}
result = client.files().listFolderContinue(result.getCursor());
}
}

@Override
public byte[] downloadObject(StorageProviderObjectReference object) throws Exception {
DropboxObject file = (DropboxObject) object;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
log.info("Downloading file {}", file.getFile().getPathDisplay());
try (DbxDownloader<FileMetadata> downloader =
client.files().download(file.getFile().getPathDisplay()); ) {
downloader.download(baos);
}
return baos.toByteArray();
} catch (Exception e) {
log.error("Error downloading file {}", file.getFile().getPathDisplay(), e);
throw e;
}
}

@Override
public void deleteObject(String id) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public boolean isStateStorageRequired() {
return true;
}

@AllArgsConstructor
@Getter
private static class DropboxObject implements StorageProviderObjectReference {
private final FileMetadata file;

@Override
public String id() {
return file.getId();
}

@Override
public long size() {
return file.getSize();
}

@Override
public String contentDigest() {
return file.getContentHash();
}

@Override
public Collection<Header> additionalRecordHeaders() {
return List.of(
SimpleRecord.SimpleHeader.of("dropbox-path", file.getPathDisplay()),
SimpleRecord.SimpleHeader.of("dropbox-filename", file.getName()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dropbox-source
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ai.langstream.agents.dropbox.DropboxAgentsCodeProvider
5 changes: 1 addition & 4 deletions langstream-agents/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
<artifactId>langstream-agents</artifactId>
<name>LangStream - Agents</name>
<packaging>pom</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<modules>
<module>langstream-agents-commons</module>
<module>langstream-agents-commons-state-storage</module>
Expand All @@ -48,5 +44,6 @@
<module>langstream-agent-azure-blob-storage-source</module>
<module>langstream-agents-google</module>
<module>langstream-agents-ms365</module>
<module>langstream-agents-dropbox</module>
</modules>
</project>
Loading

0 comments on commit d2d10c6

Please sign in to comment.