forked from cisagov/saver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
create_sld_to_agency_name_and_id_mapping.py
executable file
·113 lines (92 loc) · 3.47 KB
/
create_sld_to_agency_name_and_id_mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
import csv
import datetime
import yaml
from pymongo import MongoClient
DB_CONFIG_FILE = '/run/secrets/scan_write_creds.yml'
INCLUDE_DATA_DIR = '/home/saver/include/'
SHARED_DATA_DIR = '/home/saver/shared/'
AGENCIES_FILE = INCLUDE_DATA_DIR + 'agencies.csv'
CURRENT_FEDERAL_FILE = SHARED_DATA_DIR + \
'artifacts/current-federal_modified.csv'
def db_from_config(config_filename):
db = None
with open(config_filename, 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
if config is not None:
try:
db_uri = config['database']['uri']
db_name = config['database']['name']
except KeyError:
print('Incorrect database config file format: '
'{}'.format(config_filename))
db_connection = MongoClient(host=db_uri, tz_aware=True)
db = db_connection[db_name]
return db
def main():
# Import the agency mapping data
with open(AGENCIES_FILE, 'r', newline='') as agencies_file:
csvreader = csv.reader(agencies_file)
agency_mapping = {row[0]: row[1] for row in csvreader}
# Set up the scan database connection
db = db_from_config(DB_CONFIG_FILE)
# Import the current-federal data and create the records to be
# inserted into the database.
#
# I hate using update_one() in a loop like this. Once we move to
# Mongo 4 we can use a transaction to atomically (1) drop all the
# rows from the collection and (2) use insert_many() to insert all
# the new data. That will be much cleaner!
now = datetime.datetime.utcnow()
with open(CURRENT_FEDERAL_FILE, 'r', newline='') as current_federal_file:
csvreader = csv.DictReader(current_federal_file)
for row in csvreader:
domain = row['Domain Name'].lower()
agency = row['Agency'].replace(
'&', 'and'
).replace(
'/', ' '
).replace(
'U. S.', 'U.S.'
).replace(
',', ''
)
cyhy_id = agency
is_cyhy_stakeholder = False
if agency in agency_mapping:
# The agency is in the agency mapping file, so it is
# mapped to a CyHy stakeholder
cyhy_id = agency_mapping[agency]
is_cyhy_stakeholder = True
record = {
'_id': domain,
'agency': {
'id': cyhy_id,
'name': agency
},
'cyhy_stakeholder': is_cyhy_stakeholder,
'scan_date': now
}
# Add this result to the database via an upsert
res = db.domains.update_one({
'_id': domain
}, {
'$set': record
}, upsert=True)
if not res.acknowledged:
print(f'Unable to write new SLD record for {domain} to '
f'"{db.name}" database on {db.client.address[0]}.')
# Now delete any entries whose scan_date is not now
res = db.domains.delete_many({
'scan_date': {
'$ne': now
}
})
if not res.acknowledged:
print(f'Unable to delete old SLD records in "{db.name}" database '
f'on {db.client.address[0]}.')
else:
print(f'Deleted {res.deleted_count} old SLD records from '
f'"{db.name}" database on {db.client.address[0]}.')
if __name__ == '__main__':
main()