-
Notifications
You must be signed in to change notification settings - Fork 2
/
action.py
352 lines (316 loc) · 16 KB
/
action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#!/usr/bin/env python3
#created by Juan Garces
import os
import glob
import requests
import jmespath
import hashlib
import sys
from loguru import logger
logger.remove()
logger.add(sys.stdout, colorize=True, level="INFO", format="<blue>{time:HH:mm:ss!UTC}</blue>: <lvl>{message}</lvl>")
#function to get the token
@logger.catch
def get_jamf_token(url, auth_type, username, password):
if auth_type == "auth":
token_request = requests.post(url=f"{url}/uapi/auth/tokens", auth=(username,password))
elif auth_type =='oauth':
data = {"client_id": username,"client_secret": password, "grant_type": "client_credentials"}
token_request = requests.post(url=f"{url}/api/oauth/token", data=data)
if token_request.status_code == requests.codes.ok:
if auth_type == "auth":
logger.success(f"got the token! it expires in: {token_request.json()['expires']}")
return token_request.json()['token']
elif auth_type == "oauth":
logger.success(f"got the token! it expires in: {token_request.json()['expires_in']}")
return token_request.json()['access_token']
elif token_request.status_code == requests.codes.not_found:
logger.error('failed to retrieve a valid token, please check the url')
raise Exception("failed to retrieve a valid token, please check the credentials")
elif token_request.status_code == requests.codes.unauthorized:
logger.error('failed to retrieve a valid token, please check the credentials')
raise Exception("failed to retrieve a valid token, please check the credentials")
else:
logger.error('failed to retrieve a valid token')
logger.error(token_request.text)
raise Exception("failed to retrieve a valid token, please check the credentials")
#function to invalidate a token so it can't be use after we're done
@logger.catch
def invalidate_jamf_token(url, token):
header = {"Authorization": f"Bearer {token}"}
token_request = requests.post(url=f"{url}/uapi/auth/invalidateToken", headers=header)
if token_request.status_code == requests.codes.no_content:
logger.success("token invalidated succesfully")
return True
else:
logger.warning("failed to invalidate the token, maybe it's already expired?")
logger.warning(token_request.text)
#function to create a new script
@logger.catch
def create_jamf_script(url, token, payload):
header = {"Authorization": f"Bearer {token}"}
script_request = requests.post(url=f"{url}/uapi/v1/scripts", headers=header, json=payload)
if script_request.status_code == requests.codes.created:
logger.success("script created")
return True
else:
logger.warning("failed to create the script")
logger.debug(f"status code for create: {script_request.status_code}")
logger.warning(script_request.text)
sys.exit(1)
#function to update an already existing script
@logger.catch
def update_jamf_script(url, token, payload):
header = {"Authorization": f"Bearer {token}"}
script_request = requests.put(url=f"{url}/uapi/v1/scripts/{payload['id']}", headers=header, json=payload)
if script_request.status_code in [requests.codes.accepted, requests.codes.ok]:
logger.success("script was updated succesfully")
return True
else:
logger.warning("failed to update the script")
logger.debug(f"status code for put: {script_request.status_code}")
logger.warning(script_request.text)
sys.exit(1)
@logger.catch
def delete_jamf_script(url, token, id):
header = {"Authorization": f"Bearer {token}"}
script_request = requests.delete(url=f"{url}/uapi/v1/scripts/{id}", headers=header)
if script_request.status_code in [requests.codes.ok, requests.codes.accepted, requests.codes.no_content]:
logger.success("script was deleted succesfully")
return True
else:
logger.warning("failed to delete the script")
logger.debug(f"status code for delete: {script_request.status_code}")
logger.warning(script_request.text)
sys.exit(1)
#retrieves all scripts in a json
@logger.catch
def get_all_jamf_scripts(url, token, scripts = [], page = 0):
header = {"Authorization": f"Bearer {token}"}
page_size=50
params = {"page": page, "page-size": page_size, "sort": "name:asc"}
script_list = requests.get(url=f"{url}/uapi/v1/scripts", headers=header, params=params)
if script_list.status_code == requests.codes.ok:
script_list = script_list.json()
logger.info(f"we got {len(script_list['results'])+page} of {script_list['totalCount']} results")
page+=1
if (page*page_size) < script_list['totalCount']:
logger.info("seems there's more to grab")
scripts.extend(script_list['results'])
return get_all_jamf_scripts(url, token, scripts, page)
else:
logger.info("reached the end of our search")
scripts.extend(script_list['results'])
logger.success(f"retrieved {len(scripts)} total scripts")
return scripts
else:
logger.error(f"status code: {script_list.status_code}")
logger.error("error retrevieving script list")
logger.error(script_list.text)
raise Exception("error retrevieving script list")
#search for the script name and return the json that for it
@logger.catch
def find_jamf_script(url, token, script_name, page = 0):
header = {"Authorization": f"Bearer {token}"}
page_size=50
params = {"page": page, "page-size": page_size, "sort": "name:asc"}
script_list = requests.get(url=f"{url}/uapi/v1/scripts", headers=header, params=params)
if script_list.status_code == requests.codes.ok:
script_list = script_list.json()
logger.info(f"we have searched {len(script_list['results'])+page} of {script_list['totalCount']} results")
script_search = jmespath.search(f"results[?name == '{script_name}']", script_list)
if len(script_search) == 1:
logger.info('found the script, returning it')
return script_search[0]
elif len(script_search) == 0 and (page*page_size) < script_list['totalCount']:
logger.info("couldn't find the script in this page, seems there's more to look through")
return find_jamf_script(url, token, script_name, page+1)
else:
logger.info(f"did not find any script named {script_name}")
return "n/a"
else:
logger.error(f"status code: {script_list.status_code}")
logger.error("error retrevieving script list")
logger.error(script_list.text)
raise Exception("failed to find the script, please investigate!")
#function to find a EA script using the filename as the script name
@logger.catch
def find_ea_script(ea_name):
ea_script = requests.get(url = f"{url}/JSSResource/computerextensionattributes/name/{ea_name}", auth=(username,password))
if ea_script.status_code == requests.codes.ok:
return ea_script.json()['computer_extension_attribute']
elif ea_script.status_code == requests.codes.not_found:
logger.warning(f"Found no script with name: {ea_name}")
return None
else:
logger.error("encountered an error retriving the extension attribute, stopping")
logger.error(ea_script.text)
raise Exception("encountered an error retriving the extension attribute, stopping")
#function to create EA script
@logger.catch
def create_ea_script(payload, id):
headers = {"Accept": "text/xml", "Content-Type": "text/xml"}
ea_script = requests.post(url = f"{url}/JSSResource/computerextensionattributes/id/{id}", json=payload, auth=(username,password))
if ea_script.status_code == requests.codes.ok:
return "success"
else:
logger.error("encountered an error creating the extension attribute, stopping")
logger.error(ea_script.text)
raise Exception("encountered an error creating the extension attribute, stopping")
#function to update existin EA script
@logger.catch
def update_ea_script(payload, id):
headers = {"Accept": "text/xml", "Content-Type": "text/xml"}
ea_script = requests.put(url=f"{url}/JSSResource/computerextensionattributes/id/{id}", json=payload, auth=(username,password))
if ea_script.status_code == requests.codes.ok:
return "success"
else:
logger.error("encountered an error creating the extension attribute, stopping")
logger.error(ea_script.text)
raise Exception("encountered an error creating the extension attribute, stopping")
#function to compare sripts and see if they have changed. If they haven't, no need to update it
@logger.catch
def compare_scripts(new, old):
md5_new = hashlib.md5(new.encode())
logger.info(f"hash of the of github script: {md5_new.hexdigest()}")
md5_old = hashlib.md5(old.encode())
logger.info(f"hash of the of jamf script: {md5_old.hexdigest()}")
if md5_new.hexdigest() == md5_old.hexdigest():
logger.info("scripts are the same")
return True
else:
logger.warning("scripts are different")
return False
#retrieves list of files given a folder path and the list of valid file extensions to look for
@logger.catch
def find_local_scripts(script_dir, script_extensions):
script_list = []
logger.info(f"searching for files ending in {script_extensions} in {script_dir}")
for file_type in script_extensions:
script_list.extend(glob.glob(f"{script_dir}/**/*.{file_type}", recursive = True))
logger.info("found these: ", script_dir)
logger.info(script_list)
return script_list
#strips out the path and extension to get the scripts name
@logger.catch
def get_script_name(script_path):
return script_path.split('/')[-1].rsplit('.', 1)[0]
@logger.catch
def push_scripts():
#grab the token from jamf
logger.info('grabing the token from jamf')
token = get_jamf_token(url,auth_type, username, password)
logger.info('checking the list of local scripts to upload or create')
scripts = {}
#this retrives the full path of the scripts we're trying to sync from github
scripts['github'] = find_local_scripts(script_dir, script_extensions)
#I need to simplify this array down to the just the name of the script, stripping out the path.
scripts['github_simple_name'] = []
for script in scripts['github']:
scripts['github_simple_name'].append(get_script_name(script).lower())
logger.info('doublechecking for duplicate script names')
for count, script in enumerate(scripts['github_simple_name']):
if scripts['github_simple_name'].count(script) >= 2:
logger.error(f"the script name {script} is duplicated {scripts['github_simple_name'].count(script)} times, please give it a unique name")
#logger.error(scripts['github'][count])
sys.exit(1)
#continue if no dupes are found
logger.success("nice, no duplicate script names, we can continue")
logger.info('now checking jamf for its list of scripts')
scripts['jamf'] = get_all_jamf_scripts(url, token)
logger.info("setting all script names to lower case to avoid false positives in our search.")
logger.info("worry not, this won't affect the actual naming :)")
#save the scripts name all in lower_case
for script in scripts['jamf']:
script['lower_case_name'] = script['name'].lower()
#make a copy of the jamf scripts, we'll use this to determine which to delete later on
scripts['to_delete'] = scripts['jamf']
logger.info("processing each script now")
for count, script in enumerate(scripts['github']):
logger.info("----------------------")
logger.info(f"script {count+1} of {len(scripts['github'])}")
logger.info(f"path of the script: {script}")
script_name = get_script_name(script)
if enable_prefix == "false":
#don't use the prefix
logger.info(f"script name is: {script_name}")
else:
#use the branch name as prefix
prefix = branch.split('/')[-1]
script_name = f"{prefix}_{script_name}"
logger.info(f"the new script name: {script_name}")
#check to see if the script name exists in jamf
logger.info(f"now let's see if {script_name} exists in jamf already")
script_search = jmespath.search(f"[?lower_case_name == '{script_name.lower()}']", scripts['jamf'])
if len(script_search) == 0:
logger.info("it doesn't exist, lets create it")
#it doesn't exist, we can create it
with open(script, 'r') as upload_script:
payload = {"name": script_name, "info": "", "notes": "created via github action", "priority": "AFTER" , "categoryId": "1", "categoryName":"", "parameter4":"", "parameter5":"", "parameter6":"", "parameter7":"", "parameter8":"", "parameter9":"", "parameter10":"", "parameter11":"", "osRequirements":"", "scriptContents":f"{upload_script.read()}"}
create_jamf_script(url, token, payload)
elif len(script_search) == 1:
jamf_script = script_search.pop()
del jamf_script['lower_case_name']
scripts['to_delete'].remove(jamf_script)
logger.info("it does exist, lets compare them")
#it does exists, lets see if has changed
with open(script, 'r') as upload_script:
script_text = upload_script.read()
if not compare_scripts(script_text, jamf_script['scriptContents']):
logger.info("the local version is different than the one in jamf, updating jamf")
#the hash of the scripts is not the same, so we'll update it
jamf_script['scriptContents'] = script_text
update_jamf_script(url, token, jamf_script)
else:
logger.info("we're skipping this one.")
if delete == 'true':
logger.warning(f"we have {len(scripts['to_delete'])} scripts left to delete")
for script in scripts['to_delete']:
logger.info(f"attempting to delete script {script['name']} in jamf")
delete_jamf_script(url, token, script['id'])
logger.info("expiring the token so it can't be used further")
invalidate_jamf_token(url, token)
logger.success("finished with the scripts")
def push_ea_scripts():
return ""
#run this thing
if __name__ == "__main__":
logger.info('reading environment variables')
url = os.getenv('INPUT_JAMF_URL')
auth_type = os.getenv("INPUT_JAMF_AUTH_TYPE")
if auth_type not in ["auth","oauth"]:
logger.error("please use 'auth' or 'oauth' as they auth_type")
#if using oauth, we're just going to re-use the same variables as they are similar enough.
#client_id is username
username = os.getenv('INPUT_JAMF_USERNAME')
#client_secret is password
password = os.getenv('INPUT_JAMF_PASSWORD')
script_dir = os.getenv('INPUT_SCRIPT_DIR')
ea_script_dir = os.getenv('INPUT_EA_SCRIPT_DIR')
workspace_dir = os.getenv('GITHUB_WORKSPACE')
if script_dir != workspace_dir:
script_dir = f"{workspace_dir}/{script_dir}"
enable_prefix = os.getenv('INPUT_PREFIX')
branch = os.getenv('GITHUB_REF')
script_extensions = os.getenv('INPUT_SCRIPT_EXTENSIONS')
delete = os.getenv('INPUT_DELETE')
script_extensions = script_extensions.split()
logger.info(f"url is: {url}")
logger.info(f"workspace dir is: {workspace_dir}")
logger.info(f"script_dir is: {script_dir}")
logger.info(f"branch is set to: {branch}")
logger.info(f"script_deletion is: {delete}")
logger.info(f"scripts_extensions are: {script_extensions}")
if enable_prefix == 'false':
logger.warning('prefix is disabled')
else:
logger.warning(f"prefix enabled, using: {branch.split('/')[-1]}")
#run the block to push the "normal" scripts to jamf
push_scripts()
#check to see if we have an EA scripts to push over
if ea_script_dir != 'false':
logger.info("we have some EA scripts to process")
push_ea_scripts()
else:
logger.warning("no EA script folder set, skipping")
logger.success("we're done!")