-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsolidate_frames.py
executable file
·415 lines (343 loc) · 18.4 KB
/
consolidate_frames.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#!/usr/bin/env python3
# -*- coding: utf8 -*-
#
# © 2013-2014 Institute of Mathematics and Computer Science, University of Latvia
# (LU aģentūra "Latvijas Universitātes Matemātikas un informātikas institūts")
#
# All rights reserved.
from __future__ import unicode_literals
import sys, os
sys.path.append(os.path.join(os.path.dirname(__file__), "src"))
from pprint import pprint
import logging as log
import os.path
from datetime import datetime
import os, itertools,getopt
import EntityFrames as EF
from db_config import api_conn_info, instance_name, log_dir
from SemanticApiPostgres import SemanticApiPostgres, PostgresConnection
from ConsolidateFrames import BaseConsolidator
from FrameInfo import FrameInfo
from TextGenerator import get_mentioned_entities, get_frame_data
import Relationships
f_info = FrameInfo(os.path.join(os.path.dirname(__file__), "input/frames-new-modified.xlsx"))
#f_info = FrameInfo("input/frames-new-modified.xlsx")
processID = instance_name + ' ' + str(os.getpid())
def get_entity_frames(e_id_list, api):
try:
for e_id in e_id_list:
# FIXME - replace w. EntityFrames for Postgres API
# - remove the need for self.api in EntityFrames (at least when pickling)
yield EF.EntityFrames(api, e_id)
except Exception:
log.exception("Error getting entity frames from the API:")
raise
def consolidate_frames(entity_list, api):
c = BaseConsolidator()
#c = Consolidator()
mentioned_entities = get_mentioned_entities(entity_list, api) # Ielasam visu freimos pieminēto (ne tikai galveno) entītiju vārdus un locījumus
relationship_entityids = set(Relationships.relationship_entities(api).values())
for entity in entity_list:
if entity.entity is not None and entity.frames is not None:
try:
# autoheal: check for corrupted summary relationship frames, if found unbless and delete
if hasattr(entity, 'blessed_summary_frames'):
frames = entity.blessed_summary_frames
# print(len(entity.frames), entity.frames)
i = 0
while i < len(frames):
frame = frames[i]
if frame.get('FrameType') != 3:
i += 1
continue
partner1ID = None
partner2ID = None
for fd in frame.get('FrameData', tuple()):
if fd.get('roleid') == 1:
partner1ID = fd.get('entityid')
if fd.get('roleid') == 2:
partner2ID = fd.get('entityid')
if partner1ID in relationship_entityids or partner2ID in relationship_entityids:
# unbless this frame if was blessed
frameID = frame.get('FrameId', -1)
log.warning('removing corrupted relationship frame %i' % (frameID,))
if frameID != -1:
if frame.get('Blessed') and not frame.get('IsHidden'):
api.unblessSummaryFact(frameID)
api.delete_summary_frames([frameID], commit=True)
frames.pop(i)
continue
i += 1
frames = list(filter(lambda x: x["FrameData"] is not None and not x['IsUnfinished'], entity.frames))
log.info("Found %s frames for entity %s", len(frames), entity.entity)
# show_frame_debuginfo(frames, '2406184', 'pirms normalizācijas')
# Pievienojam sekundāro relāciju freimus
frames += Relationships.build_relations(api, entity.entity.get('EntityId'), frames, mentioned_entities)
EF.normalize_frames(frames, api, mentioned_entities)
EF.normalize_summary_frames(entity.blessed_summary_frames, api, mentioned_entities)
# show_frame_debuginfo(frames, '2406184', 'peec normalizācijas')
frames = c.apply(frames, entity.blessed_summary_frames)
log.info("Finished consolidating frames. Result frame count: %s\n", len(frames))
frames = list(filter(valid_frame, frames)) # Izmetam tos kam arī pēc apvienošanas par maz datu
log.info("Frames after filtering for sparsity: %s\n", len(frames))
# Building frame descriptions
for frame in frames:
try:
frametext, frame["Date"], frame["StartDate"], cv_frame_category = get_frame_data(mentioned_entities, frame)
if frame.get('CVFrameCategory'):
manual_categories = frame.get('CVFrameCategory').get('manual')
if not cv_frame_category:
cv_frame_category = {}
cv_frame_category['manual'] = manual_categories
if manual_categories and isinstance(manual_categories, dict):
for entity_id, res in manual_categories.items():
if res: # the expected format is - "123456":true
cv_frame_category[entity_id] = frame.get('CVFrameCategory').get(entity_id) # keep the old value
frame['CVFrameCategory'] = cv_frame_category
except KeyError as e:
log.exception("Key error in get_frame_data:\n%s", e)
frametext = None
if frametext is not None:
frametext = frametext.strip()
frame["FrameText"] = frametext
except TypeError:
log_data = "\n".join([repr(log_item) for log_item in entity.frames])
log.exception("Error consolidating frames:\n%s", log_data)
api.setEntityProcessingStatus([entity.entity_id for entity in entity_list], processID, 406) # nevalīdi dati
raise
entity.set_consolidated_frames(frames)
yield entity
def invalid_frame(frame):
return not valid_frame(frame)
def valid_frame(frame):
if frame.get('Blessed') == True:
return True # Ja jau blesots, tad viss ok
if len(frame["FrameData"]) < 2:
return False # Ja tikai 1 elements, tad fakta reāli nav
frame_type = frame["FrameType"]
roles = set()
for element in frame["FrameData"]:
try:
roles.add(f_info.elem_name_from_id(frame_type,element["Key"]-1))
except KeyError as e:
log.error("Entity ID %s (used in a frame element) not found! Location: valid_frame() - Data:\n%r\n", element["Value"]["Entity"], frame)
print("Entity ID %s (used in a frame element) not found! Location: valid_frame() - Data:\n%r\n" % (element["Value"]["Entity"], frame))
# api.setEntityProcessingStatus(entity_list, processID, 406) # nevalīdi dati - trūkst entītes
api.setEntityProcessingStatus([int(element["Value"]["Entity"])], processID, 406) # nevalīdi dati - trūkst entītes
continue
return True # FIXME - Gunta doma, ka šeit nevajag labot, ja jālabo, tad pie datu avota
if frame_type == 0: # Dzimšana
if 'Bērns' not in roles: return False
if frame_type == 1: # Vecums
if 'Persona' not in roles: return False
if 'Vecums' not in roles: return False
if frame_type == 2: # Miršana
if 'Mirušais' not in roles: return False
if frame_type == 3: # Attiecības
#if ('Partneris_1' not in roles or 'Partneris_2' not in roles) and 'Partneri' not in roles: return False
# Pielikām abstraktas attiecības ("Jānis ir precējies"), un tad Partneris_2 ir optional
if 'Partneris_1' not in roles and 'Partneri' not in roles: return False
if 'Attiecības' not in roles: return False
if frame_type == 4: # Vārds alternatīvais
if 'Vārds' not in roles: return False
if 'Entītija' not in roles: return False
if frame_type == 5: # Dzīvesvieta
if 'Rezidents' not in roles: return False
if 'Vieta' not in roles: return False
if frame_type == 6: # Izglītība
if 'Students' not in roles: return False
if frame_type == 7: # Nodarbošanās
if 'Persona' not in roles: return False
if 'Nodarbošanās' not in roles: return False
if frame_type == 8: # Izcelsme
if 'Persona' not in roles: return False
if frame_type in (9,10,11): # Amats, Darba sākums, Darba Beigas
if 'Darbinieks' not in roles: return False
if frame_type == 12: # Dalība
if 'Biedrs' not in roles: return False
if 'Organizācija' not in roles: return False
if frame_type == 13: # Vēlēšanas
if 'Dalībnieks' not in roles: return False
if frame_type == 14: # Atbalsts
if 'Atbalstītājs' not in roles: return False
if 'Saņēmējs' not in roles: return False
if frame_type == 15: # Dibināšana
if 'Organizācija' not in roles: return False
if frame_type == 16: # Piedalīšanās
if 'Notikums' not in roles: return False
if frame_type == 17: # Finanses
if 'Organizācija' not in roles: return False
if frame_type == 18: # Īpašums
if 'Īpašums' not in roles: return False
if 'Īpašnieks' not in roles: return False
if frame_type == 19: # Parāds
if 'Parādnieks' not in roles and 'Aizdevējs' not in roles: return False
# if frame_type == 22: # Sasniegums
# if 'Sasniegums' not in roles: return False
if frame_type == 23: # Ziņošana
if 'Ziņa' not in roles: return False
if frame_type == 25: # Zīmols
if 'Organizācija' not in roles: return False
return True
def save_entity_frames_to_api(api, entity_list):
already_inserted_keys = set()
for entity in entity_list:
# delete previous summary frames
api.delete_entity_summary_frames_except_blessed(entity.entity_id, commit=False)
# insert all entity frames [as summary frames]
summary_frame_ids = []
error_frames = []
to_save = entity.cons_frames
log.info("Save_entity_frames_to_api - summary frames to save for entity %s: %s", entity.entity_id, len(to_save))
for frame in to_save:
summary_frame_id = frame.get("SummaryFrameID")
if summary_frame_id:
# FIXME - frametext, date un startdate principā te nav jāaiztiek - tas ir tāpēc, lai verbalizācijas utml uzlabojumi nonāktu līdz konsolidētajiem faktiem
api.updateSummaryFrameRawFrames(summary_frame_id, frame["SummarizedFrames"], frame.get('FrameText'), frame.get('Date'), frame.get('StartDate'), frame.get('CVFrameCategory'), commit=False)
summary_frame_ids.append(summary_frame_id)
# print('apdeitoju freimu # %s "%s"' % (summary_frame_id,frame.get('FrameText')))
else:
key = frame.get('MergeKey')
if key and key in already_inserted_keys:
# Ja ir jauns blesots konsolidējamais fakts un tiek vienā batch konsolidētas vairākas entītijas ar šo faktu, tad sanāk race condition ka ne-pirmās entītijas konsolidācija neredz, ka jau šādu nupat saražoja
# lai nerastos dublikāti, šādi pārbaudam
continue
already_inserted_keys.add(key)
frame_id = api.insert_summary_frame(frame, commit=False)
summary_frame_ids.append(frame_id)
# print('insertoju freimu # %s "%s"' % (frame_id,frame.get('FrameText')))
# print('insertoju %s' % (frame_id,))
# commit changes (delete + insert in one transaction)
api.api.commit()
log.info("")
log.info("Save_entity_frames_to_api - completed.")
log.info(" - list of frame IDs (for %s frames saved):\n%s", len(summary_frame_ids), repr(summary_frame_ids))
if len(error_frames)>0:
log.info(" - %s frames could not be saved, returned errors.", len(error_frames))
print(" - %s frames could not be saved, returned errors." % (len(error_frames)))
log.debug("list of frames that could not be saved:")
print("list of frames that could not be saved:")
for fr in error_frames:
log.debug("%s", repr(fr))
print(repr(fr))
api.setEntityProcessingStatus([entity.entity_id], processID, 410) # kaut kas nepatika
if len(error_frames)>0:
status = "ERROR"
else:
api.setEntityProcessingStatus([entity.entity_id], processID, 201) # šai entītijai viss ok
status = "OK"
print("%s\t%s" % (entity.entity_id, status))
sys.stdout.flush()
def process_entities(entity_list, out_dir, api):
api.setEntityProcessingStatus(entity_list, processID, 202) # sākam apstrādi
data = list(get_entity_frames(entity_list, api))
api.setEntityProcessingStatus(entity_list, processID, 203) # jēlie freimi izvilkti
gen_frames = consolidate_frames(data, api)
frames = list(gen_frames)
for fr in frames:
log.info("-"*60)
log.info("Entity name: %s\tId: %s", fr.entity["Name"], fr.entity["EntityId"])
log.info("Frames (Total/Consolidated): %s / %s", len(fr.frames), len(fr.cons_frames))
api.setEntityProcessingStatus(entity_list, processID, 205) # sāku saglabāt freimus
save_entity_frames_to_api(api, frames)
def entity_ids_from_stdin():
"""
Generator. Returns Entity_IDs (int) read from stdin.
"""
for e_line in sys.stdin:
if len(e_line.strip()) > 0:
e_id = int(e_line)
yield e_id
# NB! tas nozīmētu ka pirmo entītiju nesāks procesēt, kamēr nebūs padots pilns čunks vai arī EOF.
def split_seq(iterable, size):
it = iter(iterable)
item = list(itertools.islice(it, size))
while item:
yield item
item = list(itertools.islice(it, size))
def main():
start_logging(log.DEBUG) #log.INFO)
log.info("Starting %s", sys.argv[0])
out_dir = "./output"
log.info("Output directory: %s\n", out_dir)
single_load = False
load_all_dirty = False
load_all_persons = False
options, remainder = getopt.getopt(sys.argv[1:], 's', ['help', 'dirty', 'single', 'allpersons', 'database='])
for opt, arg in options:
if opt == '--help':
print('Frame consolidation script')
print('')
print('Usage: consolidates entities according to an ID list provided over stdin, one ID per line')
print('--database=<dbname> overrides the database name from the one set in db_config.py')
print('--single processes each entity as submitted, instead of waiting for a full batch. Normal batch mode is more efficient.')
print('--dirty instead of waiting for entity IDs, takes all entities marked in the database as "dirty".')
print('--allpersons fetches a list of all persons in database and consolidates them.')
quit()
elif opt == '--database':
api_conn_info["dbname"] = arg
elif opt == '--dirty':
load_all_dirty = True
elif opt == '--single':
single_load = True
elif opt == '--allpersons':
load_all_persons = True
conn = PostgresConnection(api_conn_info)
api = SemanticApiPostgres(conn)
if load_all_dirty or load_all_persons:
if load_all_dirty:
entity_list = list(api.get_dirty_entities())
elif load_all_persons:
persons = api.api.query('select entityid from entities where deleted is false and (category = 3 or category = 2)', None)
entity_list = list(map(lambda x: int(x[0]), persons)) # kursors iedod sarakstu ar tuplēm, mums vajag sarakstu ar tīriem elementiem)
print('Consolidating %s dirty entities' % len(entity_list))
if len(entity_list)<100:
print(entity_list)
for nr, chunk in enumerate(split_seq(entity_list, 50)): # TODO - čunka izmērs var nebūt optimāls, cits cipars varbūt dod labāku ātrdarbību
process_entities(chunk, out_dir, api=api)
if nr % 5 == 4:
print((nr+1)*len(chunk))
print('All dirty entities processed')
else:
entity_list = entity_ids_from_stdin()
if single_load or '-single' in sys.argv: # -single legacy opcija, jo tas bija vienā Didža webrīkā iekodēts, lai nav jāmaina
# reālā laika apstrāde - pēc katra ID uzreiz apstrādāt
while 1:
try:
line = sys.stdin.readline()
except KeyboardInterrupt:
break
if not line or line == "\n":
break
process_entities([int(line)], out_dir, api=api)
else: # batch processing - dalam visu porcijās
for chunk in split_seq(entity_list, 30): # TODO - čunka izmērs 30 var nebūt optimāls, cits cipars varbūt dod labāku ātrdarbību
process_entities(chunk, out_dir, api=api)
log.info('Darbs pabeigts.')
def start_logging(log_level = log.ERROR):
if not os.path.exists(log_dir):
os.mkdir(log_dir)
filename = "consolidate_frames-%s.log" % (datetime.now().strftime("%Y_%m_%d-%H_%M"))
filename = os.path.join(log_dir, filename)
# Windows + Python 3: failam, kurā tiek log-ots, vajag norādīt encoding.
log.getLogger().setLevel(log_level)
handler = log.FileHandler(filename, encoding='utf-8')
handler.setFormatter(log.Formatter(datefmt= "%Y-%m-%d %H:%M:%S", fmt = "%(asctime)s: %(name)s: %(levelname)s: %(message)s"))
log.getLogger().addHandler(handler)
# log.getLogger().removeHandler('std')
# log.basicConfig(
# filename = filename,
# level = log_level,
# datefmt= "%Y-%m-%d %H:%M:%S",
# format = "%(asctime)s: %(name)s: %(levelname)s: %(message)s",
# )
log.getLogger("SemanticApiPostgres").level = log.INFO
def show_frame_debuginfo(frames, frameid, text = 'DEBUG'):
print('----', text)
for frame in frames:
if frameid in str(frame):
print(frame)
print('')
# ----------------------------------------
if __name__ == "__main__":
main()