Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Aug 22, 2024
1 parent 05182c5 commit 91ceed1
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package ai.langstream.agents.azureblobstorage;

import static ai.langstream.api.util.ConfigurationUtils.*;
import static ai.langstream.api.util.ConfigurationUtils.getInt;

import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
Expand All @@ -31,7 +28,6 @@
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.util.*;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import io.minio.GetObjectArgs;
import io.minio.GetObjectResponse;
Expand All @@ -31,7 +29,6 @@
import io.minio.Result;
import io.minio.messages.Item;
import java.util.*;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package ai.langstream.ai.agents.commons.storage.provider;

import static ai.langstream.api.util.ConfigurationUtils.*;
import static ai.langstream.api.util.ConfigurationUtils.getInt;

import ai.langstream.ai.agents.commons.state.StateStorage;
import ai.langstream.ai.agents.commons.state.StateStorageProvider;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState.ObjectDetail;
Expand All @@ -26,14 +29,10 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static ai.langstream.api.util.ConfigurationUtils.*;
import static ai.langstream.api.util.ConfigurationUtils.getInt;

@Slf4j
public abstract class StorageProviderSource<T extends StorageProviderSourceState>
extends AbstractAgentCode implements AgentSource {
Expand All @@ -46,7 +45,6 @@ public abstract class StorageProviderSource<T extends StorageProviderSourceState
private TopicProducer deletedObjectsProducer;
private TopicProducer sourceActivitySummaryProducer;


private int idleTime;

private String deletedObjectsTopic;
Expand All @@ -58,7 +56,6 @@ public abstract class StorageProviderSource<T extends StorageProviderSourceState
private int sourceActivitySummaryNumEventsThreshold;
private int sourceActivitySummaryTimeSecondsThreshold;


public abstract Class<T> getStateClass();

public abstract void initializeClientAndConfig(Map<String, Object> configuration);
Expand All @@ -67,7 +64,6 @@ public abstract class StorageProviderSource<T extends StorageProviderSourceState

public abstract boolean isDeleteObjects();


public abstract Collection<StorageProviderObjectReference> listObjects() throws Exception;

public abstract byte[] downloadObject(StorageProviderObjectReference object) throws Exception;
Expand Down Expand Up @@ -293,8 +289,7 @@ private void sendSourceActivitySummaryIfNeeded() throws Exception {
if (emit) {
if (sourceActivitySummaryProducer != null) {
log.info(
"Emitting source activity summary to topic {}",
sourceActivitySummaryTopic);
"Emitting source activity summary to topic {}", sourceActivitySummaryTopic);
// Create a new SourceActivitySummaryWithCounts object directly
SourceActivitySummaryWithCounts summaryWithCounts =
new SourceActivitySummaryWithCounts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
import ai.langstream.api.runner.code.Header;
import ai.langstream.api.runner.code.SimpleRecord;
import ai.langstream.api.util.ConfigurationUtils;
import com.google.api.gax.paging.Page;
import com.google.auth.oauth2.GoogleCredentials;
Expand All @@ -31,7 +29,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -139,6 +138,7 @@ public String getBucketName() {
public boolean isDeleteObjects() {
return false;
}

@Override
public Collection<StorageProviderObjectReference> listObjects() throws Exception {
Map<String, List<String>> tree = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
import com.microsoft.graph.models.File;
import com.microsoft.graph.serviceclient.GraphServiceClient;
import com.microsoft.kiota.ApiException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClientUtil {


public interface DriveItemCollector {
void collect(String driveId, DriveItem item, String digest);
}
Expand All @@ -39,8 +37,8 @@ public static void collectDriveItems(
request -> {
request.queryParameters.select =
new String[] {
"id", "name", "size", "cTag", "eTag", "file",
"folder"
"id", "name", "size", "cTag", "eTag", "file",
"folder"
};
request.queryParameters.orderby = new String[] {"name asc"};
})
Expand Down Expand Up @@ -100,18 +98,21 @@ public static void collectDriveItems(
}
}

public static byte[] downloadDriveItem(GraphServiceClient client, String driveId, DriveItem item) throws Exception {
public static byte[] downloadDriveItem(
GraphServiceClient client, String driveId, DriveItem item) throws Exception {
Objects.requireNonNull(item.getFile());
String mimeType = item.getFile().getMimeType();
log.info("Downloading file {} ({}) from drive {}, mime type {}",
log.info(
"Downloading file {} ({}) from drive {}, mime type {}",
item.getName(),
item.getId(),
driveId,
mimeType);
try {
return downloadDriveItem(client, driveId, item, true);
} catch (ApiException e) {
log.info("Downloading file {} ({}) from drive {} as pdf failed, trying with default format. error: {}",
log.info(
"Downloading file {} ({}) from drive {} as pdf failed, trying with default format. error: {}",
item.getName(),
item.getId(),
driveId,
Expand All @@ -120,40 +121,32 @@ public static byte[] downloadDriveItem(GraphServiceClient client, String driveId
return downloadDriveItem(client, driveId, item, false);
} catch (ApiException ex) {
log.error(
"Error downloading file "
+ item.getName()
+ " ("
+ item.getId()
+ ")",
e);
"Error downloading file " + item.getName() + " (" + item.getId() + ")", e);
}
throw e;
}
}

private static byte[] downloadDriveItem(GraphServiceClient client, String driveId, DriveItem item, boolean exportAsPdf) throws IOException {
private static byte[] downloadDriveItem(
GraphServiceClient client, String driveId, DriveItem item, boolean exportAsPdf)
throws IOException {

try (InputStream in =
client.drives()
.byDriveId(driveId)
.items()
.byDriveItemId(item.getId())
.content()
.get(
requestConfiguration -> {
Objects.requireNonNull(
requestConfiguration.queryParameters);
if (exportAsPdf) {
requestConfiguration.queryParameters.format = "pdf";
}
}); ) {
client.drives()
.byDriveId(driveId)
.items()
.byDriveItemId(item.getId())
.content()
.get(
requestConfiguration -> {
Objects.requireNonNull(requestConfiguration.queryParameters);
if (exportAsPdf) {
requestConfiguration.queryParameters.format = "pdf";
}
}); ) {
if (in == null) {
throw new IllegalStateException(
"No content for file "
+ item.getName()
+ " ("
+ item.getId()
+ ")");
"No content for file " + item.getName() + " (" + item.getId() + ")");
}
return in.readAllBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package ai.langstream.agents.ms365.onedrive;

import static ai.langstream.api.util.ConfigurationUtils.requiredNonEmptyField;

import ai.langstream.agents.ms365.ClientUtil;
import ai.langstream.agents.ms365.sharepoint.SharepointSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderObjectReference;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSource;
import ai.langstream.ai.agents.commons.storage.provider.StorageProviderSourceState;
Expand All @@ -27,22 +28,15 @@
import com.azure.identity.ClientSecretCredentialBuilder;
import com.microsoft.graph.models.*;
import com.microsoft.graph.serviceclient.GraphServiceClient;
import com.microsoft.graph.sites.getallsites.GetAllSitesGetResponse;
import java.util.*;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.io.InputStream;
import java.util.List;
import java.util.*;

import static ai.langstream.api.util.ConfigurationUtils.requiredNonEmptyField;

@Slf4j
public class OneDriveSource
extends StorageProviderSource<OneDriveSource.SharepointSourceState> {
public class OneDriveSource extends StorageProviderSource<OneDriveSource.SharepointSourceState> {

public static class SharepointSourceState extends StorageProviderSourceState {}

Expand Down Expand Up @@ -117,35 +111,46 @@ public boolean isDeleteObjects() {
public Collection<StorageProviderObjectReference> listObjects() throws Exception {
List<StorageProviderObjectReference> collect = new ArrayList<>();
for (String user : users) {
Drive userDrive = client.users().byUserId(user)
.drive()
.get();
Drive userDrive = client.users().byUserId(user).drive().get();
if (userDrive == null) {
log.warn("No drive found for user {}", user);
continue;
}

Objects.requireNonNull(userDrive.getId());
final String rootPath = "root:" + pathPrefix;
log.info("Listing items for user {} in drive {} at path {}", user, userDrive.getId(), rootPath);
DriveItem rootItem = client.drives().byDriveId(userDrive.getId()).items().byDriveItemId(rootPath).get();
log.info(
"Listing items for user {} in drive {} at path {}",
user,
userDrive.getId(),
rootPath);
DriveItem rootItem =
client.drives()
.byDriveId(userDrive.getId())
.items()
.byDriveItemId(rootPath)
.get();
if (rootItem == null) {
log.warn("No root item found for user drive {} of user {}", userDrive.getId(), user);
log.warn(
"No root item found for user drive {} of user {}", userDrive.getId(), user);
continue;
}
int beforeLength = collect.size();
ClientUtil.collectDriveItems(
client, userDrive.getId(), rootItem, includeMimeTypes, excludeMimeTypes, new ClientUtil.DriveItemCollector() {
client,
userDrive.getId(),
rootItem,
includeMimeTypes,
excludeMimeTypes,
new ClientUtil.DriveItemCollector() {
@Override
public void collect(String driveId, DriveItem item, String digest) {
OneDriveObject oneDriveObject = new OneDriveObject(item, user, digest, driveId);
OneDriveObject oneDriveObject =
new OneDriveObject(item, user, digest, driveId);
collect.add(oneDriveObject);
}
});
log.info(
"Found {} items for user {}",
collect.size() - beforeLength,
user);
log.info("Found {} items for user {}", collect.size() - beforeLength, user);
}
return collect;
}
Expand All @@ -165,7 +170,7 @@ public String id() {

@Override
public long size() {
return item.getSize() == null ?-1 : item.getSize();
return item.getSize() == null ? -1 : item.getSize();
}

@Override
Expand All @@ -186,7 +191,8 @@ public Collection<Header> additionalRecordHeaders() {
@Override
public byte[] downloadObject(StorageProviderObjectReference object) throws Exception {
OneDriveObject oneDriveObject = (OneDriveObject) object;
return ClientUtil.downloadDriveItem(client, oneDriveObject.getDriveId(), oneDriveObject.getItem());
return ClientUtil.downloadDriveItem(
client, oneDriveObject.getDriveId(), oneDriveObject.getItem());
}

@Override
Expand Down
Loading

0 comments on commit 91ceed1

Please sign in to comment.