Skip to content

Commit

Permalink
Support csw dcat harvest (#2800)
Browse files Browse the repository at this point in the history
Add a CswDcat backend that uses the DCAT export served by the GeoNetwork CSW endpoint
  • Loading branch information
maudetes authored Oct 19, 2023
1 parent 5724b00 commit 23ea32b
Show file tree
Hide file tree
Showing 9 changed files with 894 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Topics changes:
- Topics creation, update and deletion are now opened to all users [#2898](https://github.com/opendatateam/udata/pull/2898)
- Topics are now `db.Owned` and searchable by `id` in dataset search [#2901](https://github.com/opendatateam/udata/pull/2901)
- Add support for a CSW harvester using DCAT format [#2800](https://github.com/opendatateam/udata/pull/2800)
- Remove `deleted` api field that does not exist [#2903](https://github.com/opendatateam/udata/pull/2903)
- Add `created_at`field to topic's model [#2904](https://github.com/opendatateam/udata/pull/2904)
- Topics can now be filtered by `tag` field [#2904](https://github.com/opendatateam/udata/pull/2904)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def pip(filename):
],
'udata.harvesters': [
'dcat = udata.harvest.backends.dcat:DcatBackend',
'csw-dcat = udata.harvest.backends.dcat:CswDcatBackend',
],
'udata.avatars': [
'internal = udata.features.identicon.backends:internal',
Expand Down
10 changes: 7 additions & 3 deletions udata/commands/dcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from udata.commands import cli, green, yellow, cyan, echo, magenta
from udata.core.dataset.factories import DatasetFactory
from udata.core.dataset.rdf import dataset_from_rdf
from udata.harvest.backends.dcat import DcatBackend
from udata.harvest.backends.dcat import DcatBackend, CswDcatBackend
from udata.rdf import namespace_manager

log = logging.getLogger(__name__)
Expand All @@ -24,7 +24,8 @@ def grp():
@click.argument('url')
@click.option('-q', '--quiet', is_flag=True, help='Ignore warnings')
@click.option('-i', '--rid', help='Inspect specific remote id (contains)')
def parse_url(url, quiet=False, rid=''):
@click.option('-c', '--csw', is_flag=True, help='The target is a CSW endpoint')
def parse_url(url, csw, quiet=False, rid=''):
'''Parse the datasets in a DCAT format located at URL (debug)'''
if quiet:
verbose_loggers = ['rdflib', 'udata.core.dataset']
Expand All @@ -46,7 +47,10 @@ def _create(cls, model_class, *args, **kwargs):
echo(cyan('Parsing url {}'.format(url)))
source = MockSource()
source.url = url
backend = DcatBackend(source, dryrun=True)
if csw:
backend = CswDcatBackend(source, dryrun=True)
else:
backend = DcatBackend(source, dryrun=True)
backend.job = MockJob()
format = backend.get_format()
echo(yellow('Detected format: {}'.format(format)))
Expand Down
2 changes: 0 additions & 2 deletions udata/harvest/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ def process(self, item):
raise NotImplementedError

def add_item(self, identifier, *args, **kwargs):
if identifier is None:
raise ValueError('DCT.identifier is required for all DCAT.Dataset records')
item = HarvestItem(remote_id=str(identifier), args=args, kwargs=kwargs)
self.job.items.append(item)
return item
Expand Down
94 changes: 93 additions & 1 deletion udata/harvest/backends/dcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import requests

from rdflib import Graph, URIRef, BNode
from rdflib import Graph, URIRef
from rdflib.namespace import RDF
import xml.etree.ElementTree as ET
from typing import List

from udata.rdf import (
Expand Down Expand Up @@ -35,6 +36,9 @@
(HYDRA.PagedCollection, HYDRA.nextPage)
)

CSW_NAMESPACE = 'http://www.opengis.net/cat/csw/2.0.2'
OWS_NAMESPACE = 'http://www.opengis.net/ows'

# Useful to patch essential failing URIs
URIS_TO_REPLACE = {
# See https://github.com/etalab/data.gouv.fr/issues/1151
Expand Down Expand Up @@ -119,6 +123,8 @@ def get_node_from_item(self, graph, item):
raise ValueError(f'Unable to find dataset with DCT.identifier:{item.remote_id}')

def process(self, item):
if item.remote_id == 'None':
raise ValueError('The DCT.identifier is missing on this DCAT.Dataset record')
graph = Graph(namespace_manager=namespace_manager)
data = self.job.data['graphs'][item.kwargs['page']]
format = self.job.data['format']
Expand All @@ -129,3 +135,89 @@ def process(self, item):
dataset = self.get_dataset(item.remote_id)
dataset = dataset_from_rdf(graph, dataset, node=node)
return dataset


class CswDcatBackend(DcatBackend):
display_name = 'CSW-DCAT'

DCAT_SCHEMA = 'http://www.w3.org/ns/dcat#'

def parse_graph(self, url: str, fmt: str) -> List[Graph]:
body = '''<csw:GetRecords xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
xmlns:gmd="http://www.isotc211.org/2005/gmd"
service="CSW" version="2.0.2" resultType="results"
startPosition="{start}" maxPosition="200"
outputSchema="{schema}">
<csw:Query typeNames="gmd:MD_Metadata">
<csw:ElementSetName>full</csw:ElementSetName>
<ogc:SortBy xmlns:ogc="http://www.opengis.net/ogc">
<ogc:SortProperty>
<ogc:PropertyName>identifier</ogc:PropertyName>
<ogc:SortOrder>ASC</ogc:SortOrder>
</ogc:SortProperty>
</ogc:SortBy>
</csw:Query>
</csw:GetRecords>'''
headers = {'Content-Type': 'application/xml'}

graphs = []
page = 0
start = 1
response = requests.post(url, data=body.format(start=start, schema=self.DCAT_SCHEMA),
headers=headers)
response.raise_for_status()
content = response.text
tree = ET.fromstring(content)
if tree.tag == '{' + OWS_NAMESPACE + '}ExceptionReport':
raise ValueError(f'Failed to query CSW:\n{content}')
while tree:
graph = Graph(namespace_manager=namespace_manager)
search_results = tree.find('csw:SearchResults', {'csw': CSW_NAMESPACE})
if not search_results:
log.error(f'No search results found for {url} on page {page}')
break
for child in search_results:
subgraph = Graph(namespace_manager=namespace_manager)
subgraph.parse(data=ET.tostring(child), format=fmt)
graph += subgraph

for node in subgraph.subjects(RDF.type, DCAT.Dataset):
id = subgraph.value(node, DCT.identifier)
kwargs = {'nid': str(node), 'page': page}
kwargs['type'] = 'uriref' if isinstance(node, URIRef) else 'blank'
self.add_item(id, **kwargs)
graphs.append(graph)
page += 1

next_record = int(search_results.attrib['nextRecord'])
matched_count = int(search_results.attrib['numberOfRecordsMatched'])
returned_count = int(search_results.attrib['numberOfRecordsReturned'])

# Break conditions copied gratefully from
# noqa https://github.com/geonetwork/core-geonetwork/blob/main/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java#L338-L369
break_conditions = (
# standard CSW: A value of 0 means all records have been returned.
next_record == 0,

# Misbehaving CSW server returning a next record > matched count
next_record > matched_count,

# No results returned already
returned_count == 0,

# Current next record is lower than previous one
next_record < start,

# Enough items have been harvested already
self.max_items and len(self.job.items) >= self.max_items
)

if any(break_conditions):
break

start = next_record
tree = ET.fromstring(
requests.post(url, data=body.format(start=start, schema=self.DCAT_SCHEMA),
headers=headers).text)

return graphs
Loading

0 comments on commit 23ea32b

Please sign in to comment.