forked from vmware-tanzu-labs/yaml-overlay-tool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyot
executable file
·821 lines (790 loc) · 39.8 KB
/
yot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
#!/usr/bin/env python3
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: MIT
from os import getcwd, getenv, listdir, mkdir, makedirs
from jsonpath_ng import parse
from os.path import splitext, isfile, join, abspath, basename, isdir, split, relpath, commonpath, dirname, exists
from jinja2 import Environment, FileSystemLoader
from box import Box
import re
import yaml
import json
import argparse
def get_args():
"""
Collects command line arguments
"""
parser = argparse.ArgumentParser(
description='yot (YAML overlay tool) is a yaml overlay tool which allows for the templating of overlay instruction data with jinja2, and the application of rendered overlays "over the top" of a yaml file. yot only produces valid yaml documents on output.'
)
parser.add_argument(
'-d',
'--default-values-file',
required=False,
action='append',
help='Path to your default values file. If not set, you must pass a values file of "defaults.yaml" or "defaults.yml" within a path from the "-v" option. Takes multiple default values files in case you would like to separate out some of the values. After the first default values file, each subsequent file passed with "-d" will be merged with the values from the first. If a "defaults.yaml" or "defaults.yml" file is discovered in one of your "-v" paths, it will be merged with these values last.'
)
parser.add_argument(
'-v',
'--values-path',
required=False,
action='append',
help='Values file path. May be a path to a file or directory containing value files ending in either .yml or .yaml. This option can be provided multiple times as required. A file named "defaults.yaml" or "defaults.yml" is required within the path(s) if not using the "-d" option, and you may have only 1 default value file in that scenario. Additional values files are merged over the defaults.yaml file values. Each values file is treated as a unique "site" and will render your instructions differently based on its values'
)
parser.add_argument(
'-i',
'--instruction-file',
required=True,
help='Instruction file path. Defaults to ./instructions.yaml'
)
parser.add_argument(
'-o',
'--output-directory',
default=join(getcwd(),"output"),
help='Path to directory to write the overlayed yaml files to. If value files were supplied in addition to a defaults.yaml/.yml then the rendered templates will land in <output dir>/<addl value file name>.'
)
parser.add_argument(
'-s',
'--stdout',
action='store_true',
default=False,
help='Render output to stdout. Templated instructions files will still be output to the "--output-directory."'
)
parser.add_argument(
'-r',
'--dump-rendered-instructions',
action='store_true',
default=False,
help='If using a templated instructions file, you can dump the rendered instructions to stdout to allow for reviewing how they were rendered prior to a full run of yot. Equivalent to a dry-run. Exits with return code 0 prior to processing instructions'
)
args = parser.parse_args()
return args
def get_file_paths(path):
"""
returns a list of .yml or .yaml files based on whether a path is a dir or
file
"""
try:
files = []
if isfile(abspath(path)):
# path is single file
## check for 'yml' or 'yaml'
if (splitext(abspath(path))[1] == '.yml') or (splitext(abspath(path))[1] == '.yaml'):
files = [abspath(path)]
else:
# path is dir, get all .yml and .yaml files (no traversal into subdirs currently)
files = [join(abspath(path), filename) for filename in listdir(path) if isfile(join(path, filename)) and ((splitext(filename)[1] == '.yml') or (splitext(filename)[1] == '.yaml'))]
except:
print(f'error: No file or directory found at {path}')
exit(1)
return files
def get_files(path, type='files'):
""" Gets a list of files from either a directory or an absolute path with file extension of .yaml or .yml. Can be a list or single string. type: can be files | dir and changes behavior
type=files: looks for all .yml, .yaml in a path and creates a list of all absolute paths.
type=dir: checks if the path is a dir and then returns a list of dirs
"""
files = []
if type == 'files':
if isinstance(path, list):
# path is a list of paths
for i in path:
files.extend(get_file_paths(i))
else:
# path is not a list of paths
files.extend(get_file_paths(path))
elif type == 'dir':
if isinstance(path, list):
for p in path:
if isdir(p):
files.append(get_file_paths(abspath(p)))
else:
print(f'Path must be a directory{abspath(p)}\n ...ignoring')
else:
if isdir(path):
files.append(get_file_paths(abspath(path)))
else:
print(f'Path must be a directory {abspath(path)}\n ...ignoring')
return files
def get_values(files, template=False):
"""
Collects values from a list of yaml files returned from the get_files()
function.
template=True will attempt to render the values file as a jinja2 template
"""
values = dict()
for f in files:
# open the value file
data_file = open(f, 'r')
if not template:
all_yaml_data = yaml.load_all(data_file, Loader=yaml.FullLoader)
elif template:
paths = [split(f)[0], getcwd()]
j2 = Environment(loader=FileSystemLoader(paths, followlinks=True))
template = j2.get_template(basename(f))
pre = template.render()
# Re-render values with the pre-rendered values file as variables
post = template.render(yaml.load(pre, Loader=yaml.FullLoader))
all_yaml_data = yaml.load_all(post, Loader=yaml.FullLoader)
# yaml.load_all gets all yaml docs in a file as generator, so we need
## to suck them into a list
yaml_docs = [yaml for yaml in all_yaml_data]
values.update({f: yaml_docs})
return values
def get_output_dir(path, values_path):
"""
Sets up the base output directory structure and returns the absolute path
"""
try:
out = abspath(path)
except:
if path is None:
print(f"error: --output-directory option was not provided: '{path}'")
exit(1)
# prep the output directories
if not isdir(out):
mkdir(out)
if not values_path is None:
if not isdir(join(out, 'rendered_instructions')):
mkdir(join(out, 'rendered_instructions'))
return out
def merge_defaults(defaults, key, values):
"""
Merges add'l site values over the top of defaults
"""
merged = Box(defaults.copy(), box_dots=True)
file_values = {}
# ensure we've collected all possible yaml docs in value file
for yaml in values[key]:
file_values.update(yaml)
addl_values = Box(file_values, box_dots=True)
merged.merge_update(addl_values)
return merged.to_dict()
def make_output_dirs(output_path, paths):
"""
Creates output paths in output dir if add'l value files provided. Otherwise,
drops outputs to the output dir
"""
if not isdir(join(output_path, 'yaml_files')):
mkdir(join(output_path, 'yaml_files'))
for i in paths:
dir = join(join(output_path, 'yaml_files'),splitext(basename(i))[0])
if not isdir(dir):
mkdir(dir)
return
def render_instructions(defaults, values, instructions_path, stdout):
"""
Renders a templated instruction file with default and site specific values.
defaults is the merged collected default values (-d) or
(defaults.yaml in -v path)
values is all the collected site value files (-v)
instructions_path is the path to the templated instruction file (-i)
"""
if not stdout:
print('Rendering instructions template...\n')
instruction_set = dict()
j2 = Environment(loader=FileSystemLoader(split(instructions_path[0])[0]))
if len(values.keys()) > 1:
# we have site specific value files
for i in values.keys():
merged_vars = merge_defaults(defaults['values'], i, values)
if not stdout:
print(f'Merged default values from "{relpath(defaults["file_path"])}" with "{relpath(i)}"')
template = j2.get_template(basename(instructions_path[0]))
instruct = template.render(merged_vars)
instruction_set.update({splitext(basename(i))[0]: instruct})
if not stdout:
print(f'Rendered instructions for "{i}" values\n')
else:
# only defaults file values (single-site)
template = j2.get_template(basename(instructions_path[0]))
instruct = template.render(defaults['values'])
instruction_set.update({splitext(basename(defaults['file_path']))[0]: instruct})
if not stdout:
print(f'Rendered instructions for "default" values')
return instruction_set
def load_instructions(rendered_instructions):
"""
Loads up instructions file into dict of {value_file_name, instruction_yaml}
"""
instruction_yaml = dict()
for i in rendered_instructions.keys():
try:
instructions = yaml.load(rendered_instructions[i], Loader=yaml.FullLoader)
except:
print(f'\n\nCould not load instructions for {i} instruction set. Invalid yaml.\nDid you forget to pass in values files?\nrendered_instructions:\n{rendered_instructions[i]}\nProblem loading the above instruction set.\n\n')
exit(1)
instruction_yaml.update({i: instructions})
return instruction_yaml
def transform_data(key, desired_value, orig_val, action):
"""
Manipulates data based on the provided action
Accepts the actions of "merge", "replace", and "delete"
"""
transformed = orig_val
# print(f'the orig_val: {transformed}')
# print(f'the key: {key}')
# handle types
if isinstance(transformed, dict):
# print(f'im a dict {orig_val}')
if action == "merge":
transformed.merge_update(desired_value)
elif action == "replace":
transformed = desired_value
elif isinstance(transformed, list):
# print(f'im a list: {orig_val}')
if action == "merge":
transformed.extend(desired_value)
elif action == "replace":
transformed = desired_value
elif isinstance(transformed, str):
# print(f'im a str: {orig_val}')
if action == "merge":
transformed = transformed + desired_value
elif action == "replace":
# print(f'replace string {desired_value}')
transformed = desired_value
else:
# handle anything else that might come back such as None
# if transformed is None:
# print('im None')
transformed = desired_value
# print(transformed)
return transformed
def test_for_key(key, manifest):
"""
Uses jsonpath-ng to perform jsonpath queries on yaml files
Returns a list
If no matches, item [0] of the list will be True, otherwise False in list
position [0] represents the query is not missing.
If matches are found, item [0] will be False and item [1] will be a list of
matches from the jsonquery on the data, which is used later to modify the
value of those matches with the desired value
"""
jq = parse(key).find(json.loads(json.dumps(manifest)))
if len(jq) > 0:
out = [False]
val = [Box({str(match.full_path): match.value}, box_dots=True) for match in jq]
out.append(val)
return out
return [True]
def perform_overlayment(action, manifest, overlay_expression, overlay_value, on_missing, inject_path, manifest_path, loop_count, stdout=False):
"""
Top-level overlayment work starts here. Returns an updated/modified
yaml file
"""
# handle an empty yaml doc
if manifest is None:
manifest = {}
if action == "delete":
is_missing = test_for_key(overlay_expression, manifest)
if not is_missing[0]:
for i in is_missing[1]:
for k in i.to_dict().keys():
del manifest[k]
# print('Performed delete action')
# implicit ignore on missing
elif (action == "merge") or (action == "replace"):
is_missing = test_for_key(overlay_expression, manifest)
if not is_missing[0]:
# the overlay expression exists in the manifest, act on match paths
# print(is_missing[1])
for match in is_missing[1]:
for k in match.to_dict().keys():
manifest[k] = transform_data(k, overlay_value, match[k], action)
else:
# handle on_missing, pass overlay_value twice,
## because it'll update over itself
if on_missing == "inject":
if len(inject_path) > 0:
valid_expression = re.compile('^([a-z]*[A-Z]*[0-9]*\.*)+$')
for inject_p in inject_path:
if valid_expression.match(inject_p):
add_key = ""
for k in inject_p.split('.'):
if len(add_key) == 0:
# covers first iteration
add_key += k
else:
add_key += ('.' + k)
# force the Box to create the key
manifest[add_key] = {}
# make sure we dont merge a str with itself
if isinstance(overlay_value, str) or isinstance(overlay_value, list):
manifest[inject_p] = transform_data(inject_p, overlay_value, "", action)
else:
manifest[inject_p] = transform_data(inject_p, overlay_value, overlay_value, action)
# if action == "merge":
# print('Performed merge action')
# elif action == "replace":
# print('Performed replace action')
else:
valid_expression = re.compile('^([a-z]*[A-Z]*[0-9]*\.*)+$')
if valid_expression.match(overlay_expression):
add_key = ""
for k in overlay_expression.split('.'):
if len(add_key) == 0:
# covers first iteration
add_key += k
else:
add_key += ('.' + k)
try:
# if we're unable to set something, then it's not there and needs creating
test = manifest[add_key]
except:
# force the Box to create the key
manifest[add_key] = {}
# now inject, but make sure we don't merge a str with itself
if isinstance(overlay_value, str) or isinstance(overlay_value, list):
manifest[overlay_expression] = transform_data(overlay_expression, overlay_value, "", action)
else:
manifest[overlay_expression] = transform_data(overlay_expression, overlay_value, overlay_value, action)
else:
if not stdout:
print(f'warning: while searching with the query "{overlay_expression}", no matches were found in the yaml file "{relpath(manifest_path)}" at document {loop_count}, and the query is not a valid fully-qualified path.\n Please specify a on_missing.inject_path to properly inject this value {overlay_value}.\n ...ignoring inject action\n')
elif (on_missing != "ignore"):
print(f'"{on_missing} is not a valid option for key "on_missing.action"\n...exiting')
exit(1)
else:
print(f'invalid action {action}. Must be one of \'delete\', \'merge\', or \'replace\'.\n ...exiting')
exit(1)
return manifest
def test_document_query(document_query_key, manifest):
"""
Takes in the document_query_key and a yaml manifest for checking if we
should apply the overlay to a document.
Requires that all conditions in each item all match. If the group of
conditions all match, then we add a True to the matched list var. If True is in
the matched list, then we will return True, otherwise we'll return False
"""
matched = []
# Loop through the groupings of document_query conditions
for i in document_query_key:
try:
conditions = i["conditions"]
except:
print(f'the document_query item did not have a "conditions" key:\n{i}\n...exiting')
exit(1)
condition_group_matched = []
for c in i["conditions"]:
# succeed early, since only one of these 'i' groupings needs be True
if True in matched:
return True
# check for a match
matches = test_for_key(c["key"], manifest)
if not matches[0]:
# print(matches[1])
for m in matches[1]:
for v in m.to_dict().values():
# print(v)
if c["value"] == v:
# we have a match in manifest
condition_group_matched.append(True)
# print(f'matched {c["value"]} with query {c["key"]}')
else:
# no match, stop processing because they all need to have valid matches if op is 'and'
condition_group_matched.append(False)
# print(f'no match on {c["value"]} with query {c["key"]}')
break
if False in condition_group_matched:
# print('exited document_query grouping check because of a non-match')
break
elif matches[0]:
# no match, stop processing because they all need to have matches if op is 'and'
condition_group_matched.append(False)
# print(f'no matches returned from query {q["key"]}')
break
if False in condition_group_matched:
matched.append(False)
if False not in condition_group_matched and len(condition_group_matched) > 0 and True in condition_group_matched:
matched.append(True)
# print(f'processed document_query results for item:\n{i["conditions"]}')
# print(f'condition_group_matched: {condition_group_matched}\n')
# Processing complete, setup return
# print(f'matched: {matched}')
if len(matched) == 0:
# we didn't have any proper conditions
# print('returned "False" from "document_query"')
return False
elif True in matched:
# one of the condition groupings has matched
# print('returned "True" from "document_query"')
return True
elif True not in matched:
# print('returned "False" from "document_query"')
return False
else:
# catch all
# print('returned "False" from all document_query items')
return False
def process_manifests(overlays, manifests, manifest_path, type, stdout=False, document=''):
"""
Type can be either 'common' for common_overlays and overlays or 'document' for documents.overlays, which are processed slightly differently
Contains the logic for processing each yaml_file entry in the instructions
"""
## document is the doc index path in a multi-doc yaml file
for overlay in overlays:
overlay_expression = overlay["query"]
# if we don't have a string or list in the query key, exit with error
## otherwise convert the string to a list for processing
if not (isinstance(overlay_expression, str)) and not (isinstance(overlay_expression, list)):
print(f'error: the query of {overlay_expression} is not a string or list.')
exit(1)
elif isinstance(overlay_expression, str):
overlay_expression = [overlay_expression]
value = overlay["value"]
# handle data types as required
if (isinstance(value, str)) or (isinstance(value, list)):
overlay_value = value
elif isinstance(value, dict):
overlay_value = Box(overlay["value"], box_dots=True)
else:
printf(f'value of {value} is an unsupported data type: {type(value)}')
action = overlay["action"]
# loop over all the queries for this overlay
for expression in overlay_expression:
# on_missing does not apply to an action of delete
if (action == "replace") or (action == "merge"):
if type == "common":
# see if we have 'document_query' set
try:
# key and value are required if document_query set
# document_query = overlay["document_query"]["key"]
# document_query_value = overlay["document_query"]["value"]
document_query = overlay["document_query"]
document_query_key = True
except:
document_query_key = False
# see if we have a 'document_index' set
try:
document_index = overlay["document_index"]
document_index_key = True
except:
document_index_key = False
# see if we have on_missing set
try:
on_missing = overlay["on_missing"]["action"]
try:
# see if we have an inject_path
inject_path = overlay["on_missing"]["inject_path"]
# make it a list if it's a string
if isinstance(inject_path, str):
inject_path = [inject_path]
elif not isinstance(inject_path, list):
print(f'error: inject_path must be a string or list: {inject_path}.\n...ignoring inject')
on_missing = "ignore"
except:
# validate overlay_expression otherwise set to empty
inject_path = []
except:
# implicit default action of ignore
on_missing = "ignore"
# default inject_path to nothing
inject_path = []
else:
on_missing = None
inject_path = []
if type == "common":
manifest_count = 0 # to provide a doc path index on errors/warnings
for manifest in manifests[manifest_path]:
# print(f'manifest count is {manifest_count}')
# if we have a qualifier like document_index or document_query,
## act on the manifest only if we have a proper match
# if document_query_key and document_index_key:
if document_query_key and document_index_key:
# print('we have a document_query key and document_index key')
# we have both qualifiers
if manifest_count in document_index:
doc_query_valid = test_document_query(document_query, manifest)
# print(doc_query)
if doc_query_valid:
# queries successfully matched
manifest.merge_update(perform_overlayment(action, manifest, expression, overlay_value, on_missing, inject_path, manifest_path, manifest_count, stdout))
elif document_index_key and not document_query_key:
# print('we have a document_index')
if manifest_count in document_index:
# do the overlayment, because it's an index match
manifest.merge_update(perform_overlayment(action, manifest, expression, overlay_value, on_missing, inject_path, manifest_path, manifest_count, stdout))
elif document_query_key and not document_index_key:
# print('we have a document_query key')
doc_query_valid = test_document_query(document_query, manifest)
# print(doc_query_valid)
if doc_query_valid:
# queries successfully matched
manifest.merge_update(perform_overlayment(action, manifest, expression, overlay_value, on_missing, inject_path, manifest_path, manifest_count, stdout))
else:
# process all yaml-docs in manifest with top-level common_overlays and overlays first
# print(manifest)
manifest.merge_update(perform_overlayment(action, manifest, expression, overlay_value, on_missing, inject_path, manifest_path, manifest_count, stdout))
manifest_count += 1
elif type == "document":
# we don't take document_query or document_index for the documents key
try:
# it's possible to not have the specified doc index when on_missing.action == inject
manifests[manifest_path][document].merge_update(perform_overlayment(action, manifests[manifest_path][document], expression, overlay_value, on_missing, inject_path, manifest_path, document, stdout))
except:
# the doc path didn't exist, so lets create one
## start at item 0 and work our way up to the specified path
for i in range(0, document + 1, 1):
# See if the doc index is there, otherwise create empties until we hit our specified doc index
try:
test = manifests[manifest_path][i]
except:
manifests[manifest_path].append({})
# now we're ready to inject a new yaml doc
manifests[manifest_path][document].merge_update(perform_overlayment(action, manifests[manifest_path][document], expression, overlay_value, on_missing, inject_path, manifest_path, document, stdout))
return manifests
def process_instructions(instructions, output_dir, stdout, dryrun):
"""
Kicks off the processing of all the overlay procedures
Also creates any additional output directory structures
"""
# Setup output dir structure preservation to avoid filename collissions
all_instruction_paths = [k for k in instructions.keys()]
all_yaml_paths = list()
for i in all_instruction_paths:
all_yaml_paths.extend([abspath(p["path"]) for p in instructions[i]["yaml_files"]])
common_path = commonpath(all_yaml_paths)
unique_paths = list(set([dirname(p.split(common_path)[1]) for p in all_yaml_paths]))
if dryrun:
for instruction_path in all_instruction_paths:
if instruction_path != "untemplated":
print(f'{instruction_path} rendered instruction set:\n{yaml.dump(instructions[instruction_path])}')
else:
print(f'This instruction file is not templated.\n{instruction_path} instruction set:\n{yaml.dump(instructions[instruction_path])}')
exit(0)
if not stdout:
for dir in unique_paths:
makedirs(join(output_dir, 'yaml_files') + dir, exist_ok=True)
for instruction_path in all_instruction_paths:
# dump the instructions, only when templated, and before other work in
## case something goes wrong along the way, we can see what was in there
if instruction_path != "untemplated":
instruction_file = (instruction_path + '_instructions.yaml')
output_path = join(join(output_dir, 'rendered_instructions'), instruction_file)
output = open(output_path, 'w')
yaml.dump(instructions[instruction_path], output)
output.close()
if not stdout:
print(f'Wrote rendered instructions file "{output_path}"\n')
# see if we have common_overlays
try:
common_overlays = instructions[instruction_path]["common_overlays"]
common_overlays_key = True
except:
common_overlays = []
common_overlays_key = False
for yaml_file in instructions[instruction_path]["yaml_files"]:
# print(f'processing {yaml_file["path"]}')
manifests = Box(box_dots=True)
## following try statement may be useful for debugging or verbose outputs
# try:
# # see if we have a name key
# print(f'Processing "{yaml_file["name"]}" from "{instruction_path}" instructions in "{yaml_file["path"]}":')
# except:
# # no name key and we really don't care about it
# pass
manifest_path = abspath(yaml_file["path"])
dir_path_yaml_files = [] # for when the yaml_file["path"] is a dir
if exists(manifest_path):
if isfile(manifest_path):
manifests.merge_update(get_values(get_files(manifest_path))) # manifests is plural because we will have multi-docs loaded into a list here
path_is_dir = False
# print(f'manifests:\n{yaml.dump(manifests.to_dict())}')
elif isdir(manifest_path):
dir_path_yaml_files = get_files(manifest_path)
path_is_dir = True
# for p in dir_path_yaml_files:
# manifests.merge_update(get_values(get_files(p)))
# print(f'manifests:\n{yaml.dump(manifests.to_dict())}')
else:
print(f'The path {manifest_path} is invalid.')
exit(1)
try:
isinstance(yaml_file['overlays'], list)
overlays_key = True
except:
# we didn't have overlays key
overlays_key = False
# we may not have overlays at top level in multi-doc yaml (optional)
if overlays_key:
# Run the common_overlays on file first by extending the list with
## the file's overlays
overlays = common_overlays.copy()
overlays.extend(yaml_file["overlays"])
if len(dir_path_yaml_files) == 0 and not path_is_dir:
manifests.merge_update(process_manifests(overlays, manifests, manifest_path, 'common', stdout))
# now process multi-doc yamls with specific overlays
try:
isinstance(yaml_file['documents'], dict)
documents_key = True
except:
# we didn't have documents key
documents_key = False
if documents_key:
for doc in yaml_file['documents']:
## following try statement may be useful for debugging or verbose outputs
# try:
# print(f'Processing "{doc["name"]}" from "{path}" instructions in document "{doc["path"]}":')
# except:
# # don't care if name is not there
# pass
document = doc["path"]
# if we don't have top-level overlays key we need to apply common_overlays here
if not overlays_key and common_overlays_key:
overlays = list()
overlays = common_overlays.copy()
try:
overlays.extend(doc["overlays"])
except:
# just take the common_overlays
pass
else:
overlays = []
overlays.extend(doc["overlays"])
# handle dir as path
if len(dir_path_yaml_files) == 0 and not path_is_dir:
# normal yaml file as path
manifests.merge_update(process_manifests(overlays, manifests, manifest_path, 'document', stdout, document))
# we didn't have a 'overlays' or 'documents' key for this path, apply common_overlays only if we have them
if not overlays_key and not documents_key and common_overlays_key and not path_is_dir:
overlays = common_overlays.copy()
manifests.merge_update(process_manifests(overlays, manifests, manifest_path, 'common', stdout))
# Add yaml_files from a path that was a directory to be processed properly later
if path_is_dir and len(dir_path_yaml_files) > 0:
for p in dir_path_yaml_files:
add_yaml_file = {'name': '', 'path': p, 'overlays': [], 'documents': []}
try:
add_yaml_file["overlays"] = yaml_file["overlays"]
except:
del add_yaml_file["overlays"]
try:
add_yaml_file["documents"] = yaml_file["documents"]
except:
del add_yaml_file["documents"]
try:
add_yaml_file["name"] = yaml_file["name"]
except:
del add_yaml_file["name"]
# print(yaml.dump(add_yaml_file))
# add the yaml_file to be processed
instructions[instruction_path]["yaml_files"].extend([add_yaml_file])
# all instructions are now done for this yaml_file
# choose correct output location
# the 'or instruction_path == "defaults"' temporary bandaid until -d arg is fully implemented
if not path_is_dir:
if not stdout:
if instruction_path == "untemplated" or instruction_path == "defaults":
opath = join(output_dir, 'yaml_files')
# drop files preserving source pathing to avoid filename collissions
opath += dirname(list(manifests.to_dict().keys())[0].split(common_path)[1])
else:
opath = join(join(output_dir, 'yaml_files'), instruction_path)
opath += dirname(list(manifests.to_dict().keys())[0].split(common_path)[1])
# print(yaml.dump(manifests.to_dict()))
# prep for overlayed output dump
for f in manifests.to_dict().keys():
new_manifests = list()
for data in manifests[f]:
new_manifests.append(data.to_dict())
# handle dir as a path
if not stdout:
if not isdir(opath):
makedirs(opath, exist_ok=True)
output_path = join(opath, basename(f))
output = open(output_path, 'w')
yaml.dump_all(new_manifests, output)
output.close()
print(f'Wrote yaml file "{output_path}"')
else:
print(yaml.dump_all(new_manifests, explicit_start=True))
return
def main():
"""
Main program loop
"""
# if -d is not passed, we expect the following in the -v path
default_file_basename = ['defaults.yaml', 'defaults.yml']
args = get_args()
instructions_file = get_files(args.instruction_file)
output_dir = get_output_dir(args.output_directory, args.values_path)
stdout = args.stdout
dryrun = args.dump_rendered_instructions
# do we have values to render instructions with?
if args.values_path:
# we have template values
value_files = get_files(args.values_path)
# template=True means to attempt to render the value file as a template
tmp_values = get_values(value_files, template=True)
values = tmp_values.copy()
discovered_default_values = {}
# handle discovered defaults.yaml/yml files in values path
for i in tmp_values.keys():
if basename(i) in default_file_basename:
discovered_default_values.update({i: values[i]})
del values[i] # clean out the defaults from site values
else:
# set empty dicts for further processing, because we have no site values and no discovered defaults.yaml/yml files
values = {}
discovered_default_values = {}
if args.default_values_file:
if not stdout:
print(f'\nCollecting default values...\n')
default_values = get_values(get_files(args.default_values_file), template=True)
# merge all the defaults together
tmp_defaults = {}
for i in args.default_values_file:
file_defaults = {}
# merge multi-doc yaml default files
for data in default_values[abspath(i)]:
file_defaults.update(data)
# now merge the file's defaults with tmp_defaults
tmp_defaults.update(file_defaults)
if not stdout:
print(f'Merged default values from "{relpath(i)}"')
if len(discovered_default_values.keys()) > 0:
for i in discovered_default_values.keys():
file_defaults = {}
for data in discovered_default_values[abspath(i)]:
file_defaults.update(data)
# now merge the discovered default data with those specified with -d
tmp_defaults.update(file_defaults)
if not stdout:
print(f'Merged default values from "{relpath(i)}"')
for_output = [relpath(i) for i in args.default_values_file]
for_output.extend([relpath(i) for i in discovered_default_values.keys()])
defaults = {'file_path': ', '.join(for_output), 'values': tmp_defaults}
else:
if len(discovered_default_values.keys()) > 0:
# move the discovered defaults into defaults format
tmp_defaults = {}
for i in discovered_default_values.keys():
file_defaults = {}
for data in discovered_default_values[abspath(i)]:
file_defaults.update(data)
tmp_defaults.update(file_defaults)
if not stdout:
print(f'Merged discovered default values from "{relpath(i)}"')
for_output = [relpath(i) for i in discovered_default_values.keys()]
defaults = {'file_path': ', '.join(for_output), 'values': tmp_defaults}
else:
# we have no values
defaults = {}
if len(defaults.keys()) > 0:
# load up rendered instructions as yaml
instructions = render_instructions(defaults, values, instructions_file, stdout)
try:
# see if we had value files and create output structure
if len(value_files.keys()) > 0:
# add'l output dirs created per basename of value files minus file extension
make_output_dirs(output_dir, value_files)
except:
# we didn't have values files
pass
else:
# have no template values
instructions = {'untemplated': open(instructions_file[0], 'r')}
instruct_yaml = load_instructions(instructions)
process_instructions(instruct_yaml, output_dir, stdout, dryrun)
return 0
if __name__ == "__main__":
exit(main())