Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Aug 23, 2024
1 parent 7287ca7 commit e8cca88
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,17 @@
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.*;
import java.io.ByteArrayOutputStream;
import java.util.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.io.ByteArrayOutputStream;
import java.util.*;

@Slf4j
public class DropboxSource extends StorageProviderSource<DropboxSource.DropboxSourceState> {

public static class DropboxSourceState extends StorageProviderSourceState {
}
public static class DropboxSourceState extends StorageProviderSourceState {}

private DbxClientV2 client;

Expand All @@ -51,14 +49,16 @@ public Class<DropboxSourceState> getStateClass() {

@Override
public void initializeClientAndConfig(Map<String, Object> configuration) {
String accessToken = ConfigurationUtils.requiredField(configuration, "access-token", () -> "dropbox source");
String clientIdentifier = ConfigurationUtils.getString("client-identifier", "langstream-source", configuration);
DbxRequestConfig config = DbxRequestConfig.newBuilder(clientIdentifier)
.withAutoRetryEnabled()
.build();
String accessToken =
ConfigurationUtils.requiredField(
configuration, "access-token", () -> "dropbox source");
String clientIdentifier =
ConfigurationUtils.getString(
"client-identifier", "langstream-source", configuration);
DbxRequestConfig config =
DbxRequestConfig.newBuilder(clientIdentifier).withAutoRetryEnabled().build();
client = new DbxClientV2(config, accessToken);
initializeConfig(configuration);

}

void initializeConfig(Map<String, Object> configuration) {
Expand Down Expand Up @@ -91,8 +91,8 @@ public Collection<StorageProviderObjectReference> listObjects() throws Exception
return collect;
}


private void collectFiles(String path, List<StorageProviderObjectReference> collect) throws Exception {
private void collectFiles(String path, List<StorageProviderObjectReference> collect)
throws Exception {
log.debug("Listing path {}", path);
ListFolderResult result = client.files().listFolder(path);
while (true) {
Expand All @@ -112,18 +112,26 @@ private void collectFiles(String path, List<StorageProviderObjectReference> coll
if (!extensions.isEmpty()) {
final String extension;
if (file.getName().contains(".")) {
extension = file.getName().substring(file.getName().lastIndexOf('.') + 1);
extension =
file.getName().substring(file.getName().lastIndexOf('.') + 1);
} else {
extension = "";
}
if (!extensions.contains(extension)) {
log.info("Skipping file with extension {} (extension {})", file.getPathDisplay(), extension);
log.info(
"Skipping file with extension {} (extension {})",
file.getPathDisplay(),
extension);
continue;
}
}
if (log.isDebugEnabled()) {
log.debug("Adding file {}, id {}, size {}, digest {}, path {}", file.getName(), file.getId(),
file.getSize(), file.getContentHash(),
log.debug(
"Adding file {}, id {}, size {}, digest {}, path {}",
file.getName(),
file.getId(),
file.getSize(),
file.getContentHash(),
file.getPathDisplay());
}
collect.add(new DropboxObject(file));
Expand All @@ -144,7 +152,8 @@ public byte[] downloadObject(StorageProviderObjectReference object) throws Excep
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
log.info("Downloading file {}", file.getFile().getPathDisplay());
try (DbxDownloader<FileMetadata> downloader = client.files().download(file.getFile().getPathDisplay());) {
try (DbxDownloader<FileMetadata> downloader =
client.files().download(file.getFile().getPathDisplay()); ) {
downloader.download(baos);
}
return baos.toByteArray();
Expand Down Expand Up @@ -188,8 +197,7 @@ public String contentDigest() {
public Collection<Header> additionalRecordHeaders() {
return List.of(
SimpleRecord.SimpleHeader.of("dropbox-path", file.getPathDisplay()),
SimpleRecord.SimpleHeader.of("dropbox-filename", file.getName())
);
SimpleRecord.SimpleHeader.of("dropbox-filename", file.getName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
import ai.langstream.impl.agents.AbstractComposableAgentProvider;
import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Set;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

/**
* Implements support for Storage provider source Agents.
*/
/** Implements support for Storage provider source Agents. */
@Slf4j
public class StorageProviderSourceAgentProvider extends AbstractComposableAgentProvider {

Expand Down Expand Up @@ -532,8 +528,7 @@ Entra MS registered application ID (client ID).
""")
@Data
@EqualsAndHashCode(callSuper = true)
public static class DropboxSourceConfiguration
extends StorageProviderSourceBaseConfiguration {
public static class DropboxSourceConfiguration extends StorageProviderSourceBaseConfiguration {

@ConfigProperty(
required = true,
Expand Down Expand Up @@ -564,7 +559,7 @@ public static class DropboxSourceConfiguration
description =
"""
The root directory to read from.
Use a leading slash.
Use a leading slash.
Examples: /my-root/ or /my-root/sub-folder
""",
defaultValue = "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
*/
package ai.langstream.agents;

import static ai.langstream.testrunners.AbstractApplicationRunner.INTEGRATION_TESTS_GROUP1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

import ai.langstream.api.runner.topics.TopicConsumer;
import ai.langstream.api.util.ConfigurationUtils;
import ai.langstream.testrunners.AbstractGenericStreamingApplicationRunner;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.DeleteErrorException;
import com.dropbox.core.v2.files.FileMetadata;
import com.microsoft.graph.models.*;
import com.microsoft.graph.serviceclient.GraphServiceClient;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
Expand All @@ -35,17 +41,6 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static ai.langstream.testrunners.AbstractApplicationRunner.INTEGRATION_TESTS_GROUP1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

@Slf4j
@Testcontainers
@Tag(INTEGRATION_TESTS_GROUP1)
Expand All @@ -61,17 +56,15 @@ class DropboxSourceIT extends AbstractGenericStreamingApplicationRunner {
@Test
public void test() throws Exception {

DbxRequestConfig config = DbxRequestConfig.newBuilder("langstream-test")
.withAutoRetryEnabled()
.build();
DbxRequestConfig config =
DbxRequestConfig.newBuilder("langstream-test").withAutoRetryEnabled().build();
DbxClientV2 client = new DbxClientV2(config, ACCESS_TOKEN);
try {
client.files().deleteV2("/langstream-test");
} catch (DeleteErrorException e) {
}
client.files().createFolderV2("/langstream-test");


final String appId = "app-" + UUID.randomUUID().toString().substring(0, 4);

String tenant = "tenant";
Expand Down Expand Up @@ -102,23 +95,21 @@ public void test() throws Exception {
id: step2
output: "${globals.output-topic}"
"""
.formatted(
ACCESS_TOKEN,
localstack.getEndpointOverride(S3)));
.formatted(ACCESS_TOKEN, localstack.getEndpointOverride(S3)));

List<String> fileIds = new ArrayList<>();
List<byte[]> docs =
List.of(
DropboxSourceIT.class.getResourceAsStream("/doc1.docx").readAllBytes(),
DropboxSourceIT.class
.getResourceAsStream("/doc2.docx")
.readAllBytes());
DropboxSourceIT.class.getResourceAsStream("/doc2.docx").readAllBytes());

for (int i = 0; i < 2; i++) {
byte[] bytes = docs.get(i);

FileMetadata fileMetadata = client.files().upload("/langstream-test/test-" + i + ".docx")
.uploadAndFinish(new ByteArrayInputStream(bytes));
FileMetadata fileMetadata =
client.files()
.upload("/langstream-test/test-" + i + ".docx")
.uploadAndFinish(new ByteArrayInputStream(bytes));

fileIds.add(fileMetadata.getId());
}
Expand Down Expand Up @@ -147,7 +138,6 @@ tenant, appId, application, buildInstanceYaml(), expectedAgents)) {
.contains("This is a another document"));
});


client.files().deleteV2("/langstream-test/test-0.docx");
executeAgentRunners(applicationRuntime);
waitForMessages(deletedDocumentsConsumer, List.of(fileIds.get(0)));
Expand Down

0 comments on commit e8cca88

Please sign in to comment.