-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
176 lines (147 loc) · 5.19 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import csv
import os
import threading
import uuid
import boto3
import botocore
def get_child_session(account_id, role_name, session=None):
"""
get session, with error handling, allows for passing in an sts client. This allows Account A > B > C where A cannot assume a role directly to C
:param account_id:
:param role_name:
:param session=None:
:return:
"""
# “/“ + name if not name.startswith(“/“) else name
try:
# allow for a to b to c if given sts client.
if session == None:
session = boto3.session.Session()
client = session.client('sts')
response = client.get_caller_identity()
# remove the first slash
role_name = role_name[1:] if role_name.startswith("/") else role_name
# never have a slash in front of the role name
role_arn = 'arn:aws:iam::' + account_id + ':role/' + role_name
print("Creating new session with role: {} from {}".format(role_arn, response['Arn']))
response = client.assume_role(
RoleArn=role_arn,
RoleSessionName=str(uuid.uuid1())
)
credentials = response['Credentials']
session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
return session
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
print(e)
# raise Exception(e)
elif 'Not authorized to perform sts:AssumeRole' in str(e):
print(e)
# raise Exception(f"ERROR:Not authorized to perform sts:AssumeRole on {role_arn}")
else:
print(e)
# raise Exception(e)
finally:
pass
def get_org_accounts(session):
"""
return a list of all accounts in the organization
:param session:
:return:
"""
org_client = session.client('organizations')
account_ids = []
response = org_client.list_accounts()
for account in response['Accounts']:
account_ids.append(account['Id'])
while 'NextToken' in response:
response = org_client.list_accounts(NextToken=response['NextToken'])
for account in response['Accounts']:
account_ids.append(account['Id'])
return account_ids
def worker(account, session):
"""
function to run inside threads, new session required for each thread. caught errors when only using 1 argument
:param account:
:param session:
:return:
"""
vpc = None
session = boto3.session.Session()
try:
print(f"Processing Account: {account}")
role_name = os.environ.get('RoleName', 'OrganizationAccountAccessRole')
child_session = get_child_session(account_id=account, role_name=role_name, session=session)
ec2 = child_session.client('ec2')
region_list = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
for region in region_list:
ec2 = child_session.client('ec2', region_name=region)
vpcs = []
response = ec2.describe_vpcs()
for vpc in response['Vpcs']:
vpcs.append(vpc)
while 'NextToken' in response:
response = ec2.describe_vpcs(NextToken=response['NextToken'])
for vpc in response['Vpcs']:
vpcs.append(vpc)
for vpc in vpcs:
if vpc['IsDefault'] == True: continue
# if vpc['OwnerId'] != account: continue
vpc_dict = {'AccountId': account, 'VpcId': vpc['VpcId'], 'CIDR': vpc['CidrBlock'], 'Region': region}
print(vpc_dict)
final_result.append(vpc_dict)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'OptInRequired':
print(e)
pass
except Exception as e:
raise e
def get_headers(results):
"""
getting keys from downstream result so that custom logic added after won't required updating in multiple places
:param results:
:return:
"""
headers = []
for d in results:
for key in d.keys():
headers.append(key)
headers = list(set(headers))
return headers
def write_csv(results):
"""
write to csv
:param results:
:return:
"""
headers = get_headers(results)
output_ec2 = 'output.csv'
with open(output_ec2, 'w') as csvfile:
fieldnames = headers
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator='\n')
writer.writeheader()
for result in results:
row = result
writer.writerow(row)
def main():
global final_result
threads = []
final_result = []
session = boto3.session.Session()
org_accounts = get_org_accounts(session)
for account in org_accounts:
t = threading.Thread(target=worker, args=(account, None))
threads.append(t)
t.start()
# wait for threads to finish
for thread in threads:
thread.join()
print(len(final_result))
write_csv(final_result)
return
if __name__ == '__main__':
main()