-
Notifications
You must be signed in to change notification settings - Fork 1
/
sparql.py
120 lines (93 loc) · 4.3 KB
/
sparql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from hashlib import md5
from rdflib import Graph
from rdflib.plugins.sparql.results.csvresults import CSVResultSerializer
from SPARQLWrapper import SPARQLWrapper, CSV
from SPARQLWrapper.SPARQLExceptions import EndPointInternalError, EndPointNotFound, SPARQLWrapperException, Unauthorized
from logger import ErrorLogger
import re
class SPARQL:
def __init__(self, config, type):
if type != 'source' and type != 'target':
raise Exception("Wrong type (not source or target) specified")
self.config = config
self.type = type
self.query_hash = self.get_query_hash()
self.sparql_error_logger = ErrorLogger('SparqlErrorLogger', 'sparql_errors', self.query_hash)
def build_query(self, offset, limit=None):
if self.config.get_rawquery(self.type) is not None:
query = self.config.get_rawquery(self.type)
query_offset = 'OFFSET {}'.format(offset) if self.config.get_endpoint_type(self.type) == 'remote' else ''
query = '{} {}'.format(query, query_offset)
query_limit = 'LIMIT {}'.format(limit) if self.config.get_endpoint_type(self.type) == 'remote' else ''
if limit is None:
return query
query = '{} {}'.format(query, query_limit)
return query
else:
query_prefixes = self.build_prefixes()
query_select = 'SELECT DISTINCT ?{} ?{}'.format(self.config.get_var_uri(self.type), self.config.get_var_shape(self.type))
query_from = 'FROM <{}>'.format(self.config.get_graph(self.type))
query_where = self.build_where()
query_offset = 'OFFSET {}'.format(offset) if self.config.get_endpoint_type(self.type) == 'remote' else ''
query_limit = 'LIMIT {}'.format(limit) if self.config.get_endpoint_type(self.type) == 'remote' else ''
query = '{} {} {} {} {}'.format(query_prefixes, query_select, query_from, query_where, query_offset)
if limit is None:
return query
return '{} {}'.format(query, query_limit)
def build_prefixes(self):
prefixes = self.config.get_prefixes()
if prefixes is None:
return ''
query_prefixes = ''
for prefix in prefixes:
if len(query_prefixes) > 0:
query_prefixes += ' '
current_prefix = 'PREFIX {}: <{}>'.format(prefix['label'], prefix['namespace'])
query_prefixes += current_prefix
return query_prefixes
def build_where(self):
restriction = self.config.get_restriction(self.type)
property = self.config.get_property(self.type)
query_where = 'WHERE {'
if restriction is not None:
query_where += restriction + ' . '
if property is not None:
query_where += property + ' . '
query_where += '}'
return query_where
def clean_query(self, query):
query = re.sub('\n', ' ', query)
query = re.sub('[ ]+', ' ', query)
query = re.sub('[ ]?{[ ]?', ' {', query)
query = re.sub('[ ]?}[ ]?', '} ', query)
return query
def query(self, offset, limit=None):
query = self.build_query(offset, limit)
if self.config.get_endpoint_type(self.type) == 'remote':
sparql = SPARQLWrapper(self.config.get_endpoint(self.type))
sparql.customHttpHeaders['Accept-Encoding'] = 'gzip'
sparql.setQuery(query)
sparql.setReturnFormat(CSV)
try:
return sparql.query()
except EndPointNotFound as e:
print(e)
except Unauthorized as e:
print(e)
except EndPointInternalError as e:
print(e)
except SPARQLWrapperException as e:
print(e)
elif self.config.get_endpoint_type(self.type) == 'local':
endpoint = self.config.get_endpoint(self.type)
endpoint.replace('file://', '')
graph = Graph()
graph.parse(endpoint, format='nt')
result = graph.query(query)
return result.serialize(format='csv')
return None
def get_query_hash(self):
query = self.clean_query(self.build_query(0))
return md5(query.encode('utf-8')).hexdigest()