diff --git a/scrapy_webarchive/cdxj.py b/scrapy_webarchive/cdxj.py index 9dc927b..cfc4011 100644 --- a/scrapy_webarchive/cdxj.py +++ b/scrapy_webarchive/cdxj.py @@ -1,10 +1,10 @@ import json import re -from typing import List +from dataclasses import dataclass, field +from typing import Any, List from cdxj_indexer.main import CDXJIndexer -# based on https://github.com/internetarchive/cdx-summary/blob/main/cdxsummary/parser.py CDXREC = re.compile( r"^(?P(?P[^\)\s]+)\)(?P[^\?\s]+)?(\?(?P\S+))?)" r"\s(?P(?P\d{4})(?P\d{2})(?P\d{2})(?P\d{2})(?P\d{2})(?P\d{2})(?:\d{3})?)" @@ -12,21 +12,37 @@ ) +@dataclass class CdxjRecord: - def _parse(self, line): + wacz_file: Any + surt: str + host: str + path: str = "" + query: str = "" + datetime: str = "" + year: str = "" + month: str = "" + day: str = "" + hour: str = "" + minute: str = "" + second: str = "" + data: dict = field(default_factory=dict) + + @staticmethod + def _parse(line: str): return CDXREC.match(line) - def __init__(self, cdxline): - m = self._parse(cdxline.strip()) + @classmethod + def from_cdxline(cls, cdxline: str, wacz_file): + m = cls._parse(cdxline.strip()) if not m: raise ValueError(f"Invalid CDXJ line: '{cdxline.strip()}'") - for key, value in m.groupdict(default="").items(): - if key == "data": - value = json.loads(value) + parsed_data = m.groupdict(default="") + parsed_data['data'] = json.loads(parsed_data['data']) - setattr(self, key, value) + return cls(**parsed_data, wacz_file=wacz_file) def __str__(self): return str(self.__dict__) diff --git a/scrapy_webarchive/downloadermiddlewares.py b/scrapy_webarchive/downloadermiddlewares.py index ff1fdb2..83cdc30 100644 --- a/scrapy_webarchive/downloadermiddlewares.py +++ b/scrapy_webarchive/downloadermiddlewares.py @@ -1,4 +1,5 @@ import re +from typing import IO, List from scrapy import signals from scrapy.crawler import Crawler @@ -9,7 +10,7 @@ from scrapy.spiders import Spider from scrapy.statscollectors import StatsCollector from smart_open import open -from typing_extensions import Self, Union +from typing_extensions import Self from scrapy_webarchive.wacz import MultiWaczFile, WaczFile from scrapy_webarchive.warc import record_transformer @@ -22,6 +23,8 @@ class WaczMiddleware: Loads the index fully into memory, but lazily loads pages. This helps to work with large archives, including remote ones. """ + + wacz: WaczFile | MultiWaczFile def __init__(self, settings: Settings, stats: StatsCollector) -> None: self.stats = stats @@ -43,22 +46,40 @@ def from_crawler(cls, crawler: Crawler) -> Self: def spider_opened(self, spider: Spider) -> None: tp = {"timeout": self.timeout} - self.wacz: Union[WaczFile, MultiWaczFile] - - if len(self.wacz_urls) == 1: - spider.logger.info(f"[WACZDownloader] Opening WACZ {self.wacz_urls[0]}") - self.wacz = WaczFile(open(self.wacz_urls[0], "rb", transport_params=tp)) + multiple_entries = len(self.wacz_urls) != 1 + + def open_wacz_file(wacz_url: str) -> IO[bytes] | None: + spider.logger.info(f"[WACZDownloader] Opening WACZ {wacz_url}") + + try: + return open(wacz_url, "rb", transport_params=tp) + except OSError: + spider.logger.error(f"[WACZDownloader] Could not open WACZ {wacz_url}") + return None + + if not multiple_entries: + wacz_url = self.wacz_urls[0] + wacz_file = open_wacz_file(wacz_url) + if wacz_file: + self.wacz = WaczFile(wacz_file) else: - spider.logger.info(f"[WACZDownloader] Opening WACZs {self.wacz_urls}") - self.wacz = MultiWaczFile( - [open(u, "rb", transport_params=tp) for u in self.wacz_urls] - ) + wacz_files: List[IO[bytes]] = [] + + for wacz_url in self.wacz_urls: + wacz_file = open_wacz_file(wacz_url) + if wacz_file: + wacz_files.append(wacz_file) + + if wacz_files: + self.wacz = MultiWaczFile(wacz_files) def process_request(self, request: Request, spider: Spider): + if not hasattr(self, 'wacz'): + self.stats.set_value("wacz/no_valid_sources", True, spider=spider) + raise IgnoreRequest() + # ignore blacklisted pages (to avoid crawling e.g. redirects from whitelisted pages to unwanted ones) - if hasattr(spider, "archive_blacklist_regexp") and re.search( - spider.archive_blacklist_regexp, request.url - ): + if hasattr(spider, "archive_blacklist_regexp") and re.search(spider.archive_blacklist_regexp, request.url): self.stats.inc_value("wacz/request_blacklisted", spider=spider) raise IgnoreRequest() @@ -68,17 +89,22 @@ def process_request(self, request: Request, spider: Spider): raise IgnoreRequest() # get record from existing index entry, or else lookup by URL - record = self.wacz.get_record(request.meta.get("wacz_index_entry", request.url)) - if record: - response = record_transformer.response_for_record(record) - - if not response: - self.stats.inc_value("wacz/response_not_recognized", spider=spider) - raise IgnoreRequest() - - self.stats.inc_value("wacz/hit", spider=spider) - return response + if request.meta.get("cdxj_record"): + warc_record = self.wacz.get_warc_from_cdxj_record(cdxj_record=request.meta["cdxj_record"]) else: - # when page not found in archive, return 404, and record it in a statistic + warc_record = self.wacz.get_warc_from_url(url=request.url) + + # When page not found in archive, return 404, and record it in a statistic + if not warc_record: self.stats.inc_value("wacz/response_not_found", spider=spider) return Response(url=request.url, status=404) + + # Record found + response = record_transformer.response_for_record(warc_record) + + if not response: + self.stats.inc_value("wacz/response_not_recognized", spider=spider) + raise IgnoreRequest() + + self.stats.inc_value("wacz/hit", spider=spider) + return response diff --git a/scrapy_webarchive/middleware.py b/scrapy_webarchive/middleware.py index f14393d..6891182 100644 --- a/scrapy_webarchive/middleware.py +++ b/scrapy_webarchive/middleware.py @@ -1,4 +1,5 @@ import re +from typing import IO, List from urllib.parse import urlparse from scrapy import Request, Spider, signals @@ -7,13 +8,15 @@ from scrapy.settings import Settings from scrapy.statscollectors import StatsCollector from smart_open import open -from typing_extensions import Iterable, Self, Union +from typing_extensions import Iterable, Self from scrapy_webarchive.wacz import MultiWaczFile, WaczFile from scrapy_webarchive.warc import record_transformer class WaczCrawlMiddleware: + wacz: WaczFile | MultiWaczFile + def __init__(self, settings: Settings, stats: StatsCollector) -> None: self.stats = stats wacz_url = settings.get("SW_WACZ_SOURCE_URL", None) @@ -37,24 +40,42 @@ def spider_opened(self, spider: Spider) -> None: return tp = {"timeout": self.timeout} - self.wacz: Union[WaczFile, MultiWaczFile] + multiple_entries = len(self.wacz_urls) != 1 + + def open_wacz_file(wacz_url: str) -> IO[bytes] | None: + spider.logger.info(f"[WACZDownloader] Opening WACZ {wacz_url}") + + try: + return open(wacz_url, "rb", transport_params=tp) + except OSError: + spider.logger.error(f"[WACZDownloader] Could not open WACZ {wacz_url}") + return None - if len(self.wacz_urls) == 1: - spider.logger.info(f"[WACZDownloader] Opening WACZ {self.wacz_urls[0]}") - self.wacz = WaczFile(open(self.wacz_urls[0], "rb", transport_params=tp)) + if not multiple_entries: + wacz_url = self.wacz_urls[0] + wacz_file = open_wacz_file(wacz_url) + if wacz_file: + self.wacz = WaczFile(wacz_file) else: - spider.logger.info(f"[WACZDownloader] Opening WACZs {self.wacz_urls}") - self.wacz = MultiWaczFile( - [open(u, "rb", transport_params=tp) for u in self.wacz_urls] - ) + wacz_files: List[IO[bytes]] = [] + + for wacz_url in self.wacz_urls: + wacz_file = open_wacz_file(wacz_url) + if wacz_file: + wacz_files.append(wacz_file) + + if wacz_files: + self.wacz = MultiWaczFile(wacz_files) def process_start_requests(self, start_requests: Iterable[Request], spider: Spider): - if not self.crawl: + if not self.crawl or not hasattr(self, 'wacz'): for request in start_requests: yield request - else: # ignore original start requests, just yield all responses found + + # Ignore original start requests, just yield all responses found + else: for entry in self.wacz.iter_index(): - url = entry["url"] + url = entry.data["url"] # filter out off-site responses if hasattr(spider, "allowed_domains") and urlparse(url).hostname not in spider.allowed_domains: @@ -71,6 +92,6 @@ def process_start_requests(self, start_requests: Iterable[Request], spider: Spid yield record_transformer.request_for_record( entry, flags=["wacz_start_request"], - meta={"wacz_index_entry": entry}, + meta={"cdxj_record": entry}, dont_filter=True, ) diff --git a/scrapy_webarchive/wacz.py b/scrapy_webarchive/wacz.py index 13dc962..d9357c6 100644 --- a/scrapy_webarchive/wacz.py +++ b/scrapy_webarchive/wacz.py @@ -3,8 +3,10 @@ import os import zipfile from collections import defaultdict +from typing import IO, Generator, List from warc import WARCReader as BaseWARCReader +from warc.warc import WARCRecord from scrapy_webarchive.cdxj import CdxjRecord, write_cdxj_index from scrapy_webarchive.utils import get_current_timestamp @@ -19,18 +21,13 @@ class WARCReader(BaseWARCReader): class WaczFileCreator: """Handles creating WACZ archives""" - def __init__( - self, - store, - warc_fname: str, - cdxj_fname: str = "index.cdxj", - ) -> None: + def __init__(self, store, warc_fname: str, cdxj_fname: str = "index.cdxj") -> None: self.store = store self.warc_fname = warc_fname self.cdxj_fname = cdxj_fname - def create(self): - """Create the WACZ file from the WARC and CDXJ index""" + def create(self) -> None: + """Create the WACZ file from the WARC and CDXJ index and save it in the configured store""" # Write cdxj index to a temporary file write_cdxj_index(output=self.cdxj_fname, inputs=[self.warc_fname]) @@ -46,7 +43,7 @@ def create(self): self.store.persist_file(self.get_wacz_fname(), zip_buffer, info=None) def create_wacz_zip(self) -> io.BytesIO: - """Create the WACZ zip file and return the in-memory buffer.""" + """Create the WACZ zip file and return the in-memory buffer""" zip_buffer = io.BytesIO() @@ -57,110 +54,64 @@ def create_wacz_zip(self) -> io.BytesIO: return zip_buffer def write_to_zip(self, zip_file: zipfile.ZipFile, filename: str, destination: str) -> None: - """Helper function to write a file into the ZIP archive.""" + """Helper function to write a file into the ZIP archive""" with open(filename, "rb") as file_handle: zip_file.writestr(destination + os.path.basename(filename), file_handle.read()) def cleanup_files(self, *files: str) -> None: - """Remove files from the filesystem.""" + """Remove files from the filesystem""" for file in files: os.remove(file) def get_wacz_fname(self) -> str: - """Generate WACZ filename based on the WARC filename.""" + """Generate WACZ filename based on the WARC filename""" return f"archive-{get_current_timestamp()}.wacz" -class MultiWaczFile: - """ - The MultiWACZ file format is not yet finalized, hence instead of pointing to a - MultiWACZ file, this just works with the multiple WACZ files. - - Supports the same things as WACZFile, but handles multiple WACZ files underneath. - """ - - def __init__(self, wacz_files): - self.waczs = [WaczFile(f) for f in wacz_files] - - def load_index(self): - for f in self.waczs: - f.load_index() - - def get_record(self, url_or_record, **kwargs): - if not isinstance(url_or_record, str): - return url_or_record["_wacz_file"].get_record(url_or_record, **kwargs) - for f in self.waczs: - r = f.get_record(url_or_record, **kwargs) - if r: - return r - - def iter_index(self): - for f in self.waczs: - for r in f.iter_index(): - yield {**r, "_wacz_file": f} - - class WaczFile: """ Handles looking up pages in the index, and iterating over all pages in the index. Can also iterate over all entries in each WARC embedded in the archive. """ - index = None - - def __init__(self, file): + def __init__(self, file: IO[bytes]): self.wacz_file = zipfile.ZipFile(file) + self.index = self._parse_index(self._get_index(self.wacz_file)) - def _find_in_index(self, url, **kwargs): - if not self.index: - self.load_index() - + def _find_in_index(self, url: str) -> CdxjRecord | None: records = self.index.get(url, []) - # allow filtering on all given fields - for k, v in kwargs.items(): - records = [r for r in records if r.get(k) == v] - if len(records) > 0: - # if multiple entries are present, the last one is most likely to be relevant - return records[-1] - # nothing found - return None - def load_index(self): - self.index = self._parse_index(self._get_index(self.wacz_file)) + # If multiple entries are present, the last one is most likely to be relevant + return records[-1] if records else None - def get_record(self, url_or_cdxjrecord, **kwargs): - if isinstance(url_or_cdxjrecord, str): - record = self._find_in_index(url_or_cdxjrecord, **kwargs) - if not record: - return None - else: - record = url_or_cdxjrecord + def get_warc_from_cdxj_record(self, cdxj_record: CdxjRecord) -> WARCRecord | None: + warc_file: gzip.GzipFile | IO[bytes] try: - warc_file = self.wacz_file.open("archive/" + record["filename"]) + warc_file = self.wacz_file.open("archive/" + cdxj_record.data["filename"]) except KeyError: return None - warc_file.seek(int(record["offset"])) - if record["filename"].endswith(".gz"): - warc_file = gzip.open(warc_file) - reader = WARCReader(warc_file) - return reader.read_record() + warc_file.seek(int(cdxj_record.data["offset"])) + if cdxj_record.data["filename"].endswith(".gz"): + warc_file = gzip.open(warc_file) - def iter_index(self): - if not self.index: - self.load_index() + return WARCReader(warc_file).read_record() - for records in self.index.values(): - for record in records: - yield record + def get_warc_from_url(self, url: str) -> WARCRecord | None: + cdxj_record = self._find_in_index(url) + return self.get_warc_from_cdxj_record(cdxj_record) if cdxj_record else None + def iter_index(self) -> Generator[CdxjRecord, None, None]: + for cdxj_records in self.index.values(): + for cdxj_record in cdxj_records: + yield cdxj_record @staticmethod - def _get_index(wacz_file): + def _get_index(wacz_file: zipfile.ZipFile) -> gzip.GzipFile | IO[bytes]: """Opens the index file from the WACZ archive, checking for .cdxj, .cdxj.gz, .cdx. and .cdx.gz""" index_paths = [ @@ -183,13 +134,40 @@ def _get_index(wacz_file): raise FileNotFoundError("No valid index file found.") - @staticmethod - def _parse_index(index_file): - records = defaultdict(list) + def _parse_index(self, index_file: gzip.GzipFile | IO[bytes]) -> dict[str, List[CdxjRecord]]: + cdxj_records = defaultdict(list) for line in index_file: - record = CdxjRecord(line.decode()) - url = record.data["url"] - records[url].append(record.data) + cdxj_record = CdxjRecord.from_cdxline(line.decode(), wacz_file=self) + cdxj_records[cdxj_record.data["url"]].append(cdxj_record) + + return cdxj_records + + +class MultiWaczFile: + """ + The MultiWACZ file format is not yet finalized, hence instead of pointing to a + MultiWACZ file, this just works with the multiple WACZ files. + + Supports the same things as WACZFile, but handles multiple WACZ files underneath. + """ + + def __init__(self, wacz_files: List[IO[bytes]]) -> None: + self.waczs = [WaczFile(wacz_file) for wacz_file in wacz_files] + + def get_warc_from_cdxj_record(self, cdxj_record: CdxjRecord) -> WARCRecord | None: + return cdxj_record.wacz_file.get_warc_from_cdxj_record(cdxj_record) if cdxj_record.wacz_file else None + + def get_warc_from_url(self, url: str) -> WARCRecord | None: + for wacz in self.waczs: + warc_record = wacz.get_warc_from_url(url) + if warc_record: + return warc_record + + return None - return records + def iter_index(self) -> Generator[CdxjRecord, None, None]: + for wacz in self.waczs: + for cdxj_record in wacz.iter_index(): + cdxj_record.wacz_file = wacz + yield cdxj_record diff --git a/scrapy_webarchive/warc.py b/scrapy_webarchive/warc.py index 168fca8..5f0cc83 100644 --- a/scrapy_webarchive/warc.py +++ b/scrapy_webarchive/warc.py @@ -13,6 +13,7 @@ from warcio.statusandheaders import StatusAndHeaders from warcio.warcwriter import WARCWriter +from scrapy_webarchive.cdxj import CdxjRecord from scrapy_webarchive.exceptions import WaczMiddlewareException from scrapy_webarchive.utils import get_current_timestamp, header_lines_to_dict @@ -160,26 +161,26 @@ class WarcRecordTransformer: response_types = ResponseTypes() - def request_for_record(self, record: dict, **kwargs): + def request_for_record(self, cdxj_record: CdxjRecord, **kwargs): # TODO: locate request in WACZ and include all relevant things (like headers) - return Request(url=record["url"], method=record.get("method", "GET"), **kwargs) + return Request(url=cdxj_record.data["url"], method=cdxj_record.data.get("method", "GET"), **kwargs) - def response_for_record(self, record: WARCRecord, **kwargs): + def response_for_record(self, warc_record: WARCRecord, **kwargs): # We expect a response. # https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#warc-type-mandatory - if record.type != "response": - raise WaczMiddlewareException(f"Unexpected record type: {record.type}") + if warc_record.type != "response": + raise WaczMiddlewareException(f"Unexpected record type: {warc_record.type}") # We only know how to handle application/http. # https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#content-type - record_content_type = (record["Content-Type"] or "").split(";", 1)[0] + record_content_type = (warc_record["Content-Type"] or "").split(";", 1)[0] if record_content_type != "application/http": raise WaczMiddlewareException(f"Unexpected record content-type: {record_content_type}") # There is a date field in record['WARC-Date'], but don't have a use for it now. # https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#warc-date-mandatory - payload = record.payload.read() + payload = warc_record.payload.read() payload_parts = payload.split(b"\r\n\r\n", 1) header_lines = payload_parts[0] if len(payload_parts) > 0 else "" body = payload_parts[1] if len(payload_parts) > 1 else None @@ -196,7 +197,7 @@ def response_for_record(self, record: WARCRecord, **kwargs): response_cls = self.response_types.from_headers(headers) return response_cls( - url=record.url, + url=warc_record.url, status=int(status.decode()), protocol=protocol.decode(), headers=headers, diff --git a/tests/test_cdxj.py b/tests/test_cdxj.py index 4749c2a..ef6aaa3 100644 --- a/tests/test_cdxj.py +++ b/tests/test_cdxj.py @@ -8,7 +8,7 @@ def test_cdxj_record_valid(): valid_cdxj_line = "com,example)/index 20241003000000 {\"url\": \"http://example.com/index\", \"status\": \"200\"}" # Create a CdxjRecord object - record = CdxjRecord(valid_cdxj_line) + record = CdxjRecord.from_cdxline(valid_cdxj_line, wacz_file=None) # Test extracted data from the CDXJ line assert record.surt == "com,example)/index" @@ -28,7 +28,7 @@ def test_cdxj_record_invalid_format(): # Test that the invalid line raises a ValueError with pytest.raises(ValueError, match=r"Invalid CDXJ line:"): - CdxjRecord(invalid_cdxj_line) + CdxjRecord.from_cdxline(invalid_cdxj_line, wacz_file=None) def test_cdxj_record_invalid_json_data(): @@ -37,13 +37,13 @@ def test_cdxj_record_invalid_json_data(): # Test that the invalid JSON raises a ValueError with pytest.raises(ValueError): - CdxjRecord(invalid_cdxj_line) + CdxjRecord.from_cdxline(invalid_cdxj_line, wacz_file=None) def test_cdxj_record_empty_line(): # Test that an empty line raises a ValueError with pytest.raises(ValueError, match=r"Invalid CDXJ line:"): - CdxjRecord('') + CdxjRecord.from_cdxline('', wacz_file=None) def test_cdxj_record_no_data_field(): @@ -52,4 +52,4 @@ def test_cdxj_record_no_data_field(): # Test that no data field raises a ValueError with pytest.raises(ValueError, match=r"Invalid CDXJ line:"): - CdxjRecord(no_data_cdxj_line) + CdxjRecord.from_cdxline(no_data_cdxj_line, wacz_file=None) diff --git a/tests/test_warc.py b/tests/test_warc.py index 027c214..535e3db 100644 --- a/tests/test_warc.py +++ b/tests/test_warc.py @@ -10,6 +10,7 @@ from warc.warc import WARCRecord from warcio.recordloader import ArcWarcRecordLoader +from scrapy_webarchive.cdxj import CdxjRecord from scrapy_webarchive.exceptions import WaczMiddlewareException from scrapy_webarchive.warc import WarcFileWriter, generate_warc_fname, record_transformer @@ -34,15 +35,20 @@ def warc_record_request(): class TestWarcRecordTransformer: def test_request_for_record(self): - record = { + record = CdxjRecord( + wacz_file=None, + surt="com,example)/index", + host="example", + data={ "url": "http://example.com", - "mime": "text/html", - "status": "200", - "digest": "sha1:AA7J5JETQ4H7GG22MU2NCAUO6LM2EPEU", - "length": "2302", - "offset": "384", - "filename": "example-20241007095844-00000-BA92-CKXFG4FF6H.warc.gz", - } + "mime": "text/html", + "status": "200", + "digest": "sha1:AA7J5JETQ4H7GG22MU2NCAUO6LM2EPEU", + "length": "2302", + "offset": "384", + "filename": "example-20241007095844-00000-BA92-CKXFG4FF6H.warc.gz", + } + ) request = record_transformer.request_for_record(record) assert isinstance(request, Request)