Skip to content

Commit

Permalink
WebCrawler: use disk to store status if configured (#643)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Oct 25, 2023
1 parent f0ed083 commit 7ed56ee
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 91 deletions.
6 changes: 2 additions & 4 deletions docker/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ elif [ "$only_image" == "cli" ]; then
elif [ "$only_image" == "api-gateway" ]; then
build_docker_image langstream-api-gateway
else
# Build all artifacts
./mvnw install -T 1C -Ddocker.platforms="$(docker_platforms)" $common_flags
# Build docker images
./mvnw package -Pdocker -Ddocker.platforms="$(docker_platforms)" $common_flags
# Always clean to remove old NARs and cached docker images in the "target" directory
./mvnw clean install -Pdocker -Ddocker.platforms="$(docker_platforms)" $common_flags
docker images | head -n 6
fi

Expand Down
17 changes: 0 additions & 17 deletions examples/applications/webcrawler-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,6 @@ You can find the credentials in the Astra DB console when you create a Token.
The examples/secrets/secrets.yaml resolves those environment variables for you.
When you go in production you are supposed to create a dedicated secrets.yaml file for each environment.

## Configure an S3 bucket to store the status of the Crawler

The Web Crawling Source Connector requires an S3 bucket to store the status of the crawler.
It doesn't copy the contents of the web pages, it only stores some metadata.

If you are using AWS S3, you can use the following environment variables:

```
export S3_BUCKET_NAME...
export S3_ENDPOINT=https://s3.amazonaws.com
export S3_ACCESS_KEY=...
export S3_SECRET=...
```

The default configuration uses the internal MinIO service deployed in the local Kubernetes cluster,
this is useful for testing purposes only and it works only when you deployed LangStream locally.


## Configure the pipeline

Expand Down
7 changes: 2 additions & 5 deletions examples/applications/webcrawler-source/crawler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ pipeline:
http-timeout: 10000
handle-cookies: true
max-unflushed-pages: 100
bucketName: "${secrets.s3.bucket-name}"
endpoint: "${secrets.s3.endpoint}"
access-key: "${secrets.s3.access-key}"
secret-key: "${secrets.s3.secret}"
region: "${secrets.s3.region}"
# store data directly on the agent disk, no need for external S3 storage
state-storage: disk
- name: "Extract text"
type: "text-extractor"
- name: "Normalise text"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,23 @@
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.errors.ErrorResponseException;
import io.minio.errors.InsufficientDataException;
import io.minio.errors.InternalException;
import io.minio.errors.InvalidResponseException;
import io.minio.errors.ServerException;
import io.minio.errors.XmlParserException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -72,10 +70,12 @@ public class WebCrawlerSource extends AbstractAgentCode implements AgentSource {
private boolean handleRobotsFile;
private boolean scanHtmlDocuments;
private Set<String> seedUrls;
private Map<String, Object> agentConfiguration;
private MinioClient minioClient;
private int reindexIntervalSeconds;

@Getter private String statusFileName;
Optional<Path> localDiskPath;

private WebCrawler crawler;

Expand All @@ -85,7 +85,7 @@ public class WebCrawlerSource extends AbstractAgentCode implements AgentSource {

private final BlockingQueue<Document> foundDocuments = new LinkedBlockingQueue<>();

private final StatusStorage statusStorage = new S3StatusStorage();
private StatusStorage statusStorage;

private Runnable onReindexStart;

Expand All @@ -99,12 +99,8 @@ public void setOnReindexStart(Runnable onReindexStart) {

@Override
public void init(Map<String, Object> configuration) throws Exception {
bucketName = configuration.getOrDefault("bucketName", "langstream-source").toString();
String endpoint =
getString("endpoint", "http://minio-endpoint.-not-set:9090", configuration);
String username = getString("access-key", "minioadmin", configuration);
String password = getString("secret-key", "minioadmin", configuration);
String region = getString("region", "", configuration);
agentConfiguration = configuration;

allowedDomains = getSet("allowed-domains", configuration);
forbiddenPaths = getSet("forbidden-paths", configuration);
maxUrls = getInt("max-urls", 1000, configuration);
Expand All @@ -123,11 +119,6 @@ public void init(Map<String, Object> configuration) throws Exception {

boolean handleCookies = getBoolean("handle-cookies", true, configuration);

log.info(
"Connecting to S3 Bucket at {} in region {} with user {}",
endpoint,
region,
username);
log.info("allowed-domains: {}", allowedDomains);
log.info("forbidden-paths: {}", forbiddenPaths);
log.info("seed-urls: {}", seedUrls);
Expand All @@ -140,15 +131,6 @@ public void init(Map<String, Object> configuration) throws Exception {
log.info("min-time-between-requests: {}", minTimeBetweenRequests);
log.info("reindex-interval-seconds: {}", reindexIntervalSeconds);

MinioClient.Builder builder =
MinioClient.builder().endpoint(endpoint).credentials(username, password);
if (!region.isBlank()) {
builder.region(region);
}
minioClient = builder.build();

makeBucketIfNotExists(bucketName);

WebCrawlerConfiguration webCrawlerConfiguration =
WebCrawlerConfiguration.builder()
.allowedDomains(allowedDomains)
Expand All @@ -174,18 +156,49 @@ public void setContext(AgentContext context) {
String globalAgentId = context.getGlobalAgentId();
statusFileName = globalAgentId + ".webcrawler.status.json";
log.info("Status file is {}", statusFileName);
final String agentId = agentId();
localDiskPath = context.getPersistentStateDirectoryForAgent(agentId);
String stateStorage = getString("state-storage", "s3", agentConfiguration);
if (stateStorage.equals("disk")) {
if (!localDiskPath.isPresent()) {
throw new IllegalArgumentException(
"No local disk path available for agent "
+ agentId
+ " and state-storage was set to 'disk'");
}
log.info("Using local disk storage");

statusStorage = new LocalDiskStatusStorage();
} else {
log.info("Using S3 storage");
bucketName = getString("bucketName", "langstream-source", agentConfiguration);
String endpoint =
getString(
"endpoint", "http://minio-endpoint.-not-set:9090", agentConfiguration);
String username = getString("access-key", "minioadmin", agentConfiguration);
String password = getString("secret-key", "minioadmin", agentConfiguration);
String region = getString("region", "", agentConfiguration);

log.info(
"Connecting to S3 Bucket at {} in region {} with user {}",
endpoint,
region,
username);

MinioClient.Builder builder =
MinioClient.builder().endpoint(endpoint).credentials(username, password);
if (!region.isBlank()) {
builder.region(region);
}
minioClient = builder.build();

makeBucketIfNotExists(bucketName);
statusStorage = new S3StatusStorage();
}
}

private void makeBucketIfNotExists(String bucketName)
throws ServerException,
InsufficientDataException,
ErrorResponseException,
IOException,
NoSuchAlgorithmException,
InvalidKeyException,
InvalidResponseException,
XmlParserException,
InternalException {
@SneakyThrows
private void makeBucketIfNotExists(String bucketName) {
if (!minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build())) {
log.info("Creating bucket {}", bucketName);
minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());
Expand Down Expand Up @@ -297,15 +310,12 @@ private List<Record> sleepForNoResults() throws Exception {

@Override
protected Map<String, Object> buildAdditionalInfo() {
return Map.of(
"bucketName",
bucketName,
"statusFileName",
statusFileName,
"seed-Urls",
seedUrls,
"allowed-domains",
allowedDomains);
Map<String, Object> additionalInfo = new HashMap<>();
additionalInfo.put("seed-Urls", seedUrls);
additionalInfo.put("allowed-domains", allowedDomains);
additionalInfo.put("statusFileName", statusFileName);
additionalInfo.put("bucketName", bucketName);
return additionalInfo;
}

@Override
Expand Down Expand Up @@ -398,16 +408,47 @@ public Status getCurrentStatus() throws Exception {
try {
return MAPPER.readValue(content, Status.class);
} catch (IOException e) {
log.error("Error parsing status file, restarting from scratch", e);
log.error("Error parsing status file", e);
return null;
}
} catch (ErrorResponseException e) {
if (e.errorResponse().code().equals("NoSuchKey")) {
log.info("No status file found, starting from scratch");
return new Status(List.of(), List.of(), null, null, Map.of());
}
throw e;
}
}
}

private class LocalDiskStatusStorage implements StatusStorage {
private static final ObjectMapper MAPPER = new ObjectMapper();

@Override
public void storeStatus(Status status) throws Exception {
final Path fullPath = computeFullPath();
log.info("Storing status to the disk at path {}", fullPath);
MAPPER.writeValue(fullPath.toFile(), status);
}

private Path computeFullPath() {
final Path fullPath = localDiskPath.get().resolve(statusFileName);
return fullPath;
}

@Override
public Status getCurrentStatus() throws Exception {
final Path fullPath = computeFullPath();
if (Files.exists(fullPath)) {
log.info("Restoring status from {}", fullPath);
try {
return MAPPER.readValue(fullPath.toFile(), Status.class);
} catch (IOException e) {
log.error("Error parsing status file", e);
return null;
}
} else {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,44 @@ protected ConnectionImplementation computeOutput(
}
}

protected Map<String, DiskSpec> computeDisks(
AgentConfiguration agentConfiguration,
Module module,
Pipeline pipeline,
ExecutionPlan physicalApplicationInstance,
ComputeClusterRuntime clusterRuntime,
StreamingClusterRuntime streamingClusterRuntime) {
final DiskSpec disk =
computeDisk(
agentConfiguration,
module,
pipeline,
physicalApplicationInstance,
clusterRuntime,
streamingClusterRuntime);
if (disk == null) {
return Map.of();
}
return Map.of(agentConfiguration.getId(), disk);
}

protected DiskSpec computeDisk(
AgentConfiguration agentConfiguration,
Module module,
Pipeline pipeline,
ExecutionPlan physicalApplicationInstance,
ComputeClusterRuntime clusterRuntime,
StreamingClusterRuntime streamingClusterRuntime) {
if (agentConfiguration.getResources() != null
&& agentConfiguration.getResources().disk() != null
&& agentConfiguration.getResources().disk().enabled() != null
&& agentConfiguration.getResources().disk().enabled()) {
return agentConfiguration.getResources().disk();
} else {
return null;
}
}

/**
* Allow to override the component type
*
Expand Down Expand Up @@ -188,14 +226,13 @@ public AgentNode createImplementation(
streamingClusterRuntime);
boolean composable = isComposable(agentConfiguration);
Map<String, DiskSpec> disks =
agentConfiguration.getResources() != null
&& agentConfiguration.getResources().disk() != null
&& agentConfiguration.getResources().disk().enabled() != null
&& agentConfiguration.getResources().disk().enabled()
? Map.of(
agentConfiguration.getId(),
agentConfiguration.getResources().disk())
: Map.of();
computeDisks(
agentConfiguration,
module,
pipeline,
executionPlan,
clusterRuntime,
streamingClusterRuntime);
return new DefaultAgentNode(
agentConfiguration.getId(),
agentType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,11 @@ private static Pod getPod(
}

public static void validateApplicationId(String applicationId) throws IllegalArgumentException {
if (applicationId.length() <= 1) {
throw new IllegalArgumentException(
("Application id '%s' is too short. Must be at least 2 characters long.")
.formatted(applicationId));
}
if (!CRDConstants.RESOURCE_NAME_PATTERN.matcher(applicationId).matches()) {
throw new IllegalArgumentException(
("Application id '%s' contains illegal characters. Allowed characters are alphanumeric and "
Expand Down
Loading

0 comments on commit 7ed56ee

Please sign in to comment.