Skip to content

Commit

Permalink
FST-250 bf build arguments validation
Browse files Browse the repository at this point in the history
  • Loading branch information
artnowo-alle committed Jan 19, 2022
1 parent 233fe90 commit 348d9ef
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 38 deletions.
52 changes: 30 additions & 22 deletions bigflow/build/operate.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,11 @@ def build_dags(
start_time: str,
workflow_id: typing.Optional[str] = None,
):
logger.info("Building airflow DAGs...")
clear_dags_leftovers(project_spec)

image_version = bf_commons.build_docker_image_tag(project_spec.docker_repository, project_spec.version)
create_image_version_file(str(project_spec.project_dir), image_version)

# TODO: Move common frunctions from bigflow.cli to bigflow.commons (or other shared module)
from bigflow.cli import _valid_datetime, walk_workflows
_valid_datetime(start_time)
# TODO: Move common functions from bigflow.cli to bigflow.commons (or other shared module)
from bigflow.cli import walk_workflows

cnt = 0
logger.debug('Loading workflow(s)...')
workflows = []
for root_package in project_spec.packages:
if "." in root_package:
# leaf package
Expand All @@ -133,18 +127,32 @@ def build_dags(
for workflow in walk_workflows(project_spec.project_dir / root_package):
if workflow_id is not None and workflow_id != workflow.workflow_id:
continue
logger.info("Generating DAG file for %s", workflow.workflow_id)
cnt += 1
bigflow.dagbuilder.generate_dag_file(
str(project_spec.project_dir),
image_version,
workflow,
start_time,
project_spec.version,
root_package,
)
workflows.append((workflow, root_package))

if not workflows:
if not workflow_id:
raise Exception('No workflow found')
else:
raise Exception("Workflow '{}' not found".format(workflow_id))

logger.info("Building airflow DAGs...")
clear_dags_leftovers(project_spec)

image_version = bf_commons.build_docker_image_tag(project_spec.docker_repository, project_spec.version)
create_image_version_file(str(project_spec.project_dir), image_version)

for (workflow, package) in workflows:
logger.info("Generating DAG file for %s", workflow.workflow_id)
bigflow.dagbuilder.generate_dag_file(
str(project_spec.project_dir),
image_version,
workflow,
start_time,
project_spec.version,
package,
)

logger.info("Geneated %d DAG files", cnt)
logger.info("Generated %d DAG files", len(workflows))


def _rmtree(p: Path):
Expand Down Expand Up @@ -194,7 +202,7 @@ def build_project(
workflow_id: typing.Optional[str] = None,
):
logger.info("Build the project")
build_dags(project_spec, start_time, workflow_id=workflow_id)
build_package(project_spec)
build_image(project_spec)
build_dags(project_spec, start_time, workflow_id=workflow_id)
logger.info("Project was built")
17 changes: 1 addition & 16 deletions bigflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,6 @@ def _create_build_package_parser(subparsers):
subparsers.add_parser('build-package', description='Builds .whl package from local sources.')


def _valid_datetime(dt):
if dt == 'NOW':
return

try:
datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")
return dt
except ValueError:
try:
datetime.strptime(dt, "%Y-%m-%d")
return dt
except ValueError:
raise ValueError("Not a valid date: '{0}'.".format(dt))


def _add_build_dags_parser_arguments(parser):
parser.add_argument('-w', '--workflow',
type=str,
Expand All @@ -330,7 +315,7 @@ def _add_build_dags_parser_arguments(parser):
'For workflows triggered hourly -- datetime in format: Y-m-d H:M:S, for example 2020-01-01 00:00:00. '
'For workflows triggered daily -- date in format: Y-m-d, for example 2020-01-01. '
'If empty or set as NOW, current hour is used.',
type=_valid_datetime)
type=bf_commons.valid_datetime)


def _create_build_dags_parser(subparsers):
Expand Down
21 changes: 21 additions & 0 deletions bigflow/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,24 @@ def as_timedelta(v: None | str | Number | timedelta) -> timedelta | None:
return None
else:
return timedelta(seconds=float(v))


def valid_datetime(dt: str) -> str:
"""
Validates provided datetime string. Raises ValueError for strings that are none of:
* 'NOW'
* valid '%Y-%m-%d %H:%M:%S'
* valid '%Y-%m-%d'
"""
if dt == 'NOW':
return dt

try:
datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
datetime.strptime(dt, "%Y-%m-%d")
except ValueError:
raise ValueError("Not a valid date: '{0}'.".format(dt))

return dt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import bigflow

the_workflow = bigflow.Workflow(
workflow_id='the_workflow',
definition=[]
)
39 changes: 39 additions & 0 deletions test/test_commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,45 @@ def test_should_raise_error_when_given_file_does_not_exists(self):
# when
decode_version_number_from_file_name(Path('/Users/image-0.1122123.0.tar'))

def test_valid_datetime_should_pass_for_NOW(self):
# when
valid_datetime('NOW')

# then
# no error is raised

def test_valid_datetime_should_pass_for_valid_YmdHMS(self):
# when
valid_datetime('2022-01-01 12:34:56')

# then
# no error is raised

def test_valid_datetime_should_raise_error_for_invalid_YmdHMS(self):
# then
with self.assertRaises(ValueError):
# when
valid_datetime('2022-01-01 34:56:78')

def test_valid_datetime_should_pass_for_valid_Ymd(self):
# when
valid_datetime('2022-01-01')

# then
# no error is raised

def test_valid_datetime_should_raise_error_for_invalid_Ymd(self):
# then
with self.assertRaises(ValueError):
# when
valid_datetime('2022-23-45')

def test_valid_datetime_should_raise_error_for_other_string(self):
# then
with self.assertRaises(ValueError):
# when
valid_datetime('foo bar baz')

def _touch_file(self, file_name: str, content: str = ''):
workdir = Path(os.getcwd())
f = workdir / file_name
Expand Down

0 comments on commit 348d9ef

Please sign in to comment.