From 967003afb6952adb505363f94040a5c61e9fad2f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 30 Nov 2023 10:03:49 +0100 Subject: [PATCH] [webcrawler] Support emitting non HTML documents (like PDFs...) --- .../agents/webcrawler/WebCrawlerSource.java | 11 ++-- .../agents/webcrawler/crawler/Document.java | 2 +- .../agents/webcrawler/crawler/WebCrawler.java | 58 +++++++++++++++---- .../crawler/WebCrawlerConfiguration.java | 1 + .../webcrawler/crawler/WebCrawlerTest.java | 47 +++++++++++++++ 5 files changed, 102 insertions(+), 17 deletions(-) diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java index 02aa0226a..f44e64fcc 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/WebCrawlerSource.java @@ -41,7 +41,6 @@ import io.minio.errors.ErrorResponseException; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -269,7 +268,7 @@ public List read() throws Exception { processed(0, 1); return List.of( new WebCrawlerSourceRecord( - document.content().getBytes(StandardCharsets.UTF_8), document.url())); + document.content(), document.url(), document.contentType())); } private void checkReindexIsNeeded() { @@ -336,10 +335,12 @@ public void commit(List records) { private static class WebCrawlerSourceRecord implements Record { private final byte[] read; private final String url; + private final String contentType; - public WebCrawlerSourceRecord(byte[] read, String url) { + public WebCrawlerSourceRecord(byte[] read, String url, String contentType) { this.read = read; this.url = url; + this.contentType = contentType; } /** @@ -370,7 +371,9 @@ public Long timestamp() { @Override public Collection
headers() { - return List.of(new SimpleRecord.SimpleHeader("url", url)); + return List.of( + new SimpleRecord.SimpleHeader("url", url), + new SimpleRecord.SimpleHeader("content_type", contentType)); } @Override diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/Document.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/Document.java index 01b04f09a..b19cb3e59 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/Document.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/Document.java @@ -15,4 +15,4 @@ */ package ai.langstream.agents.webcrawler.crawler; -public record Document(String url, String content) {} +public record Document(String url, byte[] content, String contentType) {} diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java index 8405b2aeb..a45879520 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawler.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -160,14 +161,17 @@ public boolean runCycle() throws Exception { connect.timeout(configuration.getHttpTimeout()); boolean redirectedToForbiddenDomain = false; - Document document; + Document document = null; + String contentType = null; + byte[] binaryContent = null; try { document = connect.get(); Connection.Response response = connect.response(); + contentType = response.contentType(); int statusCode = response.statusCode(); if (statusCode >= 300 && statusCode < 400) { String location = response.header("Location"); - if (!location.equals(current)) { + if (!Objects.equals(location, current)) { if (isUrlForbidden(location)) { redirectedToForbiddenDomain = true; log.warn( @@ -200,17 +204,44 @@ public boolean runCycle() throws Exception { // we did something return true; } catch (UnsupportedMimeTypeException notHtml) { - log.info( - "Url {} lead to a {} content-type document. Skipping", - current, - notHtml.getMimeType()); - discardUrl(current, reference); + if (configuration.isAllowNonHtmlContent()) { + log.info( + "Url {} lead to a {} content-type document. allow-not-html-content is true, so we are processing it", + current, + notHtml.getMimeType()); + handleThrottling(current); + + // download again the file, this is a little inefficient but currently + // this is not the most common case, we can improve it later + + // downloadUrl takes care of retrying + HttpResponse httpResponse = downloadUrl(current); + contentType = + httpResponse + .headers() + .firstValue("content-type") + .orElse("application/octet-stream"); + binaryContent = httpResponse.body(); + visitor.visit( + new ai.langstream.agents.webcrawler.crawler.Document( + current, binaryContent, contentType)); + + handleThrottling(current); + + return true; + } else { + log.info( + "Url {} lead to a {} content-type document. Skipping", + current, + notHtml.getMimeType()); + discardUrl(current, reference); - // prevent from being banned for flooding - handleThrottling(current); + // prevent from being banned for flooding + handleThrottling(current); - // we did something - return true; + // we did something + return true; + } } catch (IOException e) { log.info("Error while crawling url: {}, IO Error: {}", current, e + ""); @@ -240,7 +271,10 @@ public boolean runCycle() throws Exception { }); } visitor.visit( - new ai.langstream.agents.webcrawler.crawler.Document(current, document.html())); + new ai.langstream.agents.webcrawler.crawler.Document( + current, + document.html().getBytes(StandardCharsets.UTF_8), + contentType)); } // prevent from being banned for flooding diff --git a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerConfiguration.java b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerConfiguration.java index b3c97b52a..1ec3e622f 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerConfiguration.java +++ b/langstream-agents/langstream-agent-webcrawler/src/main/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerConfiguration.java @@ -39,6 +39,7 @@ public class WebCrawlerConfiguration { @Builder.Default private boolean handleCookies = true; @Builder.Default private boolean handleRobotsFile = true; @Builder.Default private boolean scanHtmlDocuments = true; + @Builder.Default private boolean allowNonHtmlContent = false; @Builder.Default private Set allowedTags = Set.of("a"); diff --git a/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java b/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java index 06620638b..be0aed44f 100644 --- a/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java +++ b/langstream-agents/langstream-agent-webcrawler/src/test/java/ai/langstream/agents/webcrawler/crawler/WebCrawlerTest.java @@ -322,4 +322,51 @@ void testNetworkErrorsEventuallyFail(WireMockRuntimeInfo vmRuntimeInfo) throws E assertEquals(0, status.getPendingUrls().size()); assertEquals(2, status.getUrls().size()); } + + @Test + void testBinaryContent(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { + + byte[] mockPdf = new byte[] {1, 2, 3, 4, 5}; + stubFor( + get("/index.html") + .willReturn( + okForContentType( + "text/html", + """ + link + """))); + stubFor( + get("/document.pdf") + .willReturn( + aResponse() + .withHeader("content-type", "application/pdf") + .withBody(mockPdf))); + + WebCrawlerConfiguration configuration = + WebCrawlerConfiguration.builder() + .allowedDomains(Set.of(vmRuntimeInfo.getHttpBaseUrl())) + .allowNonHtmlContent(true) + .handleRobotsFile(false) + .maxErrorCount(5) + .build(); + WebCrawlerStatus status = new WebCrawlerStatus(); + List documents = new ArrayList<>(); + WebCrawler crawler = new WebCrawler(configuration, status, documents::add); + crawler.crawl(vmRuntimeInfo.getHttpBaseUrl() + "/index.html"); + crawler.runCycle(); + + assertEquals(1, documents.size()); + assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/index.html", documents.get(0).url()); + assertEquals(1, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + + crawler.runCycle(); + + assertEquals(vmRuntimeInfo.getHttpBaseUrl() + "/document.pdf", documents.get(1).url()); + assertArrayEquals(mockPdf, documents.get(1).content()); + assertEquals("application/pdf", documents.get(1).contentType()); + + assertEquals(0, status.getPendingUrls().size()); + assertEquals(2, status.getUrls().size()); + } }