Skip to content

Commit

Permalink
[DAPHNE-#522] DaphneLib complex control flow (#574)
Browse files Browse the repository at this point in the history
- So far, DaphneLib (DAPHNE's Python API) did not directly support complex control flow.
- Using Python's control flow statements was not possible due to the lazy evaluation of DaphneLib, or did not result in the generation of the appropriate DaphneDSL control flow constructs.
- Now, DaphneLib supports lazily evaluated complex control flow statements.
  - if-then-else
  - for/while/do-while loops
  - user-defined functions
- Added numerous script-level test cases comparing the outputs of equivalent DaphneDSL and DaphneLib scripts.
  - Some of those needed to be commented out, due to current limitations in DaphneDSL.
- Added basic documentation (API reference),
- Closes #522.
  • Loading branch information
lachezar-n authored Oct 3, 2023
1 parent 02c0686 commit cba0f69
Show file tree
Hide file tree
Showing 74 changed files with 1,834 additions and 35 deletions.
59 changes: 58 additions & 1 deletion doc/DaphneLib/APIRef.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.

This document is a hand-crafted reference of the DaphneLib API.
A general introduction to [DaphneLib (DAPHNE's Python API)](/doc/DaphneLib/Overview.md) can be found in a separate document.
DaphneLib offers numerous methods for *obtaining DAPHNE matrices and frames* as well as for *building complex computations* based on them.
DaphneLib offers numerous methods for *obtaining DAPHNE matrices and frames* as well as for *building complex computations* based on them, including complex control flow with if-then-else, loops, and user-defined functions.
Ultimately, DaphneLib will support all [DaphneDSL built-in functions](/doc/DaphneDSL/Builtins.md) on matrices and frames.
Futhermore, **we also plan to create a library of higher-level primitives** allowing users to productively implement integrated data analysis pipelines at a much higher level of abstraction.

Expand Down Expand Up @@ -207,3 +207,60 @@ In the following, we describe only the latter.
**Input/output:**

- **`print`**`()`

### `DaphneContext` API Reference

**Logical operators**

Logical *and* (`&&`) and *or* (`||`) operators can be used for the conditions for while-loops and do-while-loops as well as for the predicates for if-then-else statements.
*Note that these logical operators may be provided in another way than via the `DaphneContext` in the future.*

- **`logical_and`**`(left_operand: Scalar, right_operand: Scalar) -> Scalar`
- **`logical_or`**`(left_operand: Scalar, right_operand: Scalar) -> Scalar`

## Building Complex Control Structures

Complex control structures like if-then-else, for-loops, while-loops and do-while-loops can be built using methods of the `DaphneContext`.
These control structures can be used to manipulate matrices, frames, and scalars, and are lazily evaluated. Futhermore, user-defined functions can be created to build reusable code which can then be again lazily evaluated.
User-defined functions can manipulate matrices, frames, and scalars, too.

### `DaphneContext` API Reference

**If-then-else**

- **`cond`**`(input_nodes, pred, then_fn, else_fn)`
* input_nodes: Iterable[VALID_COMPUTED_TYPES]
* pred: Callable *(0 arguments, 1 return value)*
* then_fn: Callable *(n arguments, n return values, n=[1, ...])*
* else_fn: Callable *(n arguments, n return values, n=[1, ...])*
* returns: Tuple[VALID_COMPUTED_TYPES] *(length n)*

**Loops**

- **`for_loop`**`(input_nodes, callback, start, end, step)`
* input_nodes: Iterable[VALID_COMPUTED_TYPES]
* callback: Callable *(n+1 arguments, n return values, n=[1, ...]; the last argument is the iteration variable)*
* start: int
* end: int
* step: Union[int, None]
* returns: Tuple[VALID_COMPUTED_TYPES] *(length n)*
- **`while_loop`**`(input_nodes, cond, callback)`
* input_nodes: Iterable[VALID_COMPUTED_TYPES]
* cond: Callable *(n arguments, 1 return value, n=[1, ...])*
* callback: Callable *(n arguments, n return values)*
* returns: Tuple[VALID_COMPUTED_TYPES] *(length n)*
- **`do_while_loop`**`(input_nodes, cond, callback)`
* input_nodes: Iterable[VALID_COMPUTED_TYPES]
* cond: Callable *(n arguments, 1 return value, n=[1, ...])*
* callback: Callable *(n arguments, n return values)*
* returns: Tuple[VALID_COMPUTED_TYPES] *(length n)*

**User-defined functions**

- **`@function`**, **`function`**`(callback)`
* callback: Callable
- This function requires adding typing hints in case the arguments are supposed to be handled as `Scalar` or `Frame`, all arguments without hints are handled as `Matrix` objects.
Hinting `Matrix` is optional.
Wrong or missing typing hints can trigger errors before and during computing (lazy evaluation).
* returns: Tuple[VALID_COMPUTED_TYPES] *(length equals the return values of callback)*
* if the decorator `@function` is used the *callback* is defined right below it like regular Python method
5 changes: 4 additions & 1 deletion src/api/python/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,8 @@
# -------------------------------------------------------------

from api.python.context.daphne_context import DaphneContext
from api.python.operator.nodes.matrix import Matrix
from api.python.operator.nodes.frame import Frame
from api.python.operator.nodes.scalar import Scalar

__all__ = ["DaphneContext"]
__all__ = ["DaphneContext", "Matrix", "Frame", "Scalar"]
116 changes: 112 additions & 4 deletions src/api/python/context/daphne_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@
#
# -------------------------------------------------------------

__all__ = ["DaphneContext"]
__all__ = ["DaphneContext", "Matrix", "Frame", "Scalar"]

from api.python.operator.nodes.frame import Frame
from api.python.operator.nodes.matrix import Matrix
from api.python.operator.nodes.scalar import Scalar
from api.python.utils.consts import VALID_INPUT_TYPES, TMP_PATH, F64, F32, SI64, SI32, SI8, UI64, UI32, UI8
from api.python.operator.nodes.for_loop import ForLoop
from api.python.operator.nodes.cond import Cond
from api.python.operator.nodes.while_loop import WhileLoop
from api.python.operator.nodes.do_while_loop import DoWhileLoop
from api.python.operator.nodes.multi_return import MultiReturn
from api.python.utils.consts import VALID_INPUT_TYPES, VALID_COMPUTED_TYPES, TMP_PATH, F64, F32, SI64, SI32, SI8, UI64, UI32, UI8

import numpy as np
import pandas as pd

from typing import Sequence, Dict, Union, List
from typing import Sequence, Dict, Union, List, Callable, Tuple, Optional, Iterable

class DaphneContext(object):
_functions: dict

def __init__(self):
self._functions = dict()

def readMatrix(self, file: str) -> Matrix:
"""Reads a matrix from a file.
Expand Down Expand Up @@ -157,4 +166,103 @@ def sample(self, range, size, withReplacement: bool, seed = -1) -> 'Matrix':
return Matrix(self, 'sample', [range, size, withReplacement, seed])

def diagMatrix(self, arg: Matrix) -> 'Matrix':
return Matrix(self, 'diagMatrix', [arg])
return Matrix(self, 'diagMatrix', [arg])

def for_loop(self, input_nodes: Iterable[VALID_COMPUTED_TYPES], callback: Callable, start: int, end: int, step: Optional[int] = None) -> Tuple[VALID_COMPUTED_TYPES]:
"""
Generates a for-loop block for lazy evaluation.
The generated block/operation cannot be directly computed
but any of the outputs can.
:param input_nodes: matrices for manipulation
:param callback: body functionality (n+1 arguments, n return values, n=[1, ...])
:param start
:param end
:param step
:return: manipulated matrices (length n)
"""
named_input_nodes = {
"start": start,
"end": end,
"step": step
}
return tuple(ForLoop(self, callback, input_nodes, named_input_nodes))

def cond(self, input_nodes: Iterable[VALID_COMPUTED_TYPES], pred: Callable, then_fn: Callable, else_fn: Callable = None) -> Tuple[VALID_COMPUTED_TYPES]:
"""
Generates an if-then-else statement block for lazy evaluation.
The generated block/operation cannot be directly computed
but any of the outputs can.
:param input_nodes: matrices for manipulation
:param pred: the predicate (0 arguments, 1 return value)
:param then_fn: callable to be performed if pred evaluates to true (n arguments, n return values, n=[1, ...])
:param else_fn: callable to be performed if pred evaluates to false (n arguments, n return values)
:return: manipulated matrices (length n)
"""
return tuple(Cond(self, pred, then_fn, else_fn, input_nodes))

def while_loop(self, input_nodes: Iterable[VALID_COMPUTED_TYPES], cond: Callable, callback: Callable) -> Tuple[VALID_COMPUTED_TYPES]:
"""
Generates a while-loop block for lazy evaluation.
The generated block/operation cannot be directly computed
but any of the outputs can.
:param input_nodes: matrices for manipulation
:param cond: the condition (n arguments, 1 return value)
:param callback: callable to be performed as long as cond evaluates to true (n arguments, n return values, n=[1, ...])
:return: manipulated matrices (length n)
"""
return tuple(WhileLoop(self, cond, callback, input_nodes))

def do_while_loop(self, input_nodes: Iterable[VALID_COMPUTED_TYPES], cond: Callable, callback: Callable) -> Tuple[VALID_COMPUTED_TYPES]:
"""
Generates a do-while-loop block for lazy evaluation.
The generated block/operation cannot be directly computed
but any of the outputs can.
:param input_nodes: matrices for manipulation
:param cond: the condition (n arguments, 1 return value)
:param callback: callable to be performed as long as cond evaluates to true (n arguments, n return values, n=[1, ...])
:return: manipulated matrices (length n)
"""
return tuple(DoWhileLoop(self, cond, callback, input_nodes))

def logical_and(self, left_operand: 'Scalar', right_operand: 'Scalar'):
"""
Logical AND operation for lazy evaluation.
:param left_operand
:param right_operand
:return new Scalar
"""
return Scalar(self, '&&', [left_operand, right_operand])

def logical_or(self, left_operand: 'Scalar', right_operand: 'Scalar'):
"""
Logical OR operation for lazy evaluation.
:param left_operand
:param right_operand
:return new Scalar
"""
return Scalar(self, '||', [left_operand, right_operand])

def function(self, callback: Callable):
"""
Generates a user-defined function for lazy evaluation.
The generated function cannot be directly computed
but any of the outputs can by using indexing.
:param callback: callable with user-defined instructions
:return: output nodes (matrices, scalars or frames)
"""
# generate function definition
function_name, callback_outputs = MultiReturn.define_function(self, callback)
# generate function for calling
def dctx_function(*args):
output_nodes = list()
for node in callback_outputs:
if isinstance(node, Matrix):
output_nodes.append(Matrix(self, ''))
elif isinstance(node, Frame):
output_nodes.append(Frame(self, ''))
elif isinstance(node, Scalar):
output_nodes.append(Scalar(self, ''))
return tuple(MultiReturn(self, function_name, output_nodes, args))

return dctx_function
171 changes: 171 additions & 0 deletions src/api/python/operator/nodes/cond.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Modifications Copyright 2022 The DAPHNE Consortium
#
# -------------------------------------------------------------

from api.python.operator.operation_node import OperationNode
from api.python.operator.nodes.matrix import Matrix
from api.python.operator.nodes.frame import Frame
from api.python.operator.nodes.scalar import Scalar
from api.python.script_building.dag import OutputType
from api.python.utils.consts import VALID_INPUT_TYPES
from api.python.script_building.nested_script import NestedDaphneDSLScript
from api.python.utils import analyzer

from typing import TYPE_CHECKING, Dict, Iterable, Sequence, Tuple, Callable
import textwrap

if TYPE_CHECKING:
# to avoid cyclic dependencies during runtime
from context.daphne_context import DaphneContext


class Cond(OperationNode):
_outputs: Iterable['OperationNode']

def __init__(self, daphne_context: 'DaphneContext', pred: Callable, then_fn: Callable, else_fn: Callable,
unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None) -> 'Cond':
"""
Operational node that represents if-then-else statement functionality.
Its reserved variable is left unused and is not added ot the generated script.
:param daphne_context:
:param pred: function with single scalar output representing the predicate
:param then_fn: function for the logic of the TRUE-block
:param else_fn: function for the logic of the FALSE-block
:param unnamed_input_nodes: operation nodes that are up for manipulation
"""
self.nested_level = 0 # default value
_named_input_nodes = dict()
# cast the iterable to list for consistency and to avoid additional copying
_unnamed_input_nodes = list(unnamed_input_nodes)
# analyze if the passed functions fulfill the requirements
if else_fn is not None:
if analyzer.get_number_argument(then_fn) != analyzer.get_number_argument(else_fn):
raise ValueError(f"{then_fn} and {else_fn} do not have the same number of arguents")
elif analyzer.get_number_argument(then_fn) != len(unnamed_input_nodes):
raise ValueError(f"{then_fn} and {else_fn} do not have the same number of arguments as input nodes")
else:
if analyzer.get_number_argument(then_fn) != len(unnamed_input_nodes):
raise ValueError(f"{then_fn} does not have the same number of arguments as input nodes")

if analyzer.get_number_argument(pred) != 0:
raise ValueError(f"{pred} has more then 0 arguments")
elif isinstance(pred(), tuple):
raise ValueError(f"{pred} has more then 1 return values")

# spare storing the arguments additionally by redefined the functions
self.then_fn = lambda: then_fn(*unnamed_input_nodes)
self.else_fn = None
if else_fn:
self.else_fn = lambda: else_fn(*unnamed_input_nodes)
# get the variables in outer scope to the according functions
outer_vars_then = analyzer.get_outer_scope_variables(then_fn)
outer_vars_else = analyzer.get_outer_scope_variables(else_fn) if else_fn else dict()
outer_vars_both = {**outer_vars_then, **outer_vars_else}
outer_vars_pred = analyzer.get_outer_scope_variables(pred)
# append the outer scope variables to input nodes so these
# can be defined upfront by the depth-first-search pass
for node in outer_vars_both.values():
if node:
_unnamed_input_nodes.append(node)
for node in outer_vars_pred.values():
if node:
_unnamed_input_nodes.append(node)
# evaluate the predicate upfront
_named_input_nodes.update({'pred': pred()})
# TODO: decide if here is the best place for this piece of code: maybe just after the first analysis
# initiate the output operation nodes
self._outputs = list()
for node in unnamed_input_nodes:
new_matrix_node = None
if isinstance(node, Matrix):
new_matrix_node = Matrix(daphne_context, None, [node], copy=True)
elif isinstance(node, Frame):
new_matrix_node = Frame(daphne_context, None, [node], copy=True)
elif isinstance(node, Scalar):
new_matrix_node = Scalar(daphne_context, None, [node], copy=True)
else:
raise ValueError(f"Unsupported input node type {type(node)}")
new_matrix_node._source_node = self
self._outputs.append(new_matrix_node)

super().__init__(daphne_context, 'cond', unnamed_input_nodes=_unnamed_input_nodes, named_input_nodes=_named_input_nodes,
output_type=OutputType.NONE)

def __getitem__(self, index) -> Tuple['Matrix']:
return self._outputs[index]

def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
named_input_vars: Dict[str, str]) -> str:
"""
Generates the DaphneDSL code block for the if-then-else statement.
Here the 'then_fn' and 'else_fn' (if not None) are being
evaluated and then the code lines for their according functionalities
are generated and added inside the if-then-else code structure.
:param var_name: variable name reserved for the operation node - NOT used
:param unnamed_input_vars:
:param named_input_vars:
:return:
"""
# get tuple of output operation nodes for the 'then_fn'
then_fn_outputs = self.then_fn()
if not isinstance(then_fn_outputs, tuple):
then_fn_outputs = (then_fn_outputs, )
# generate the code lines for the 'then_fn' functionality
then_script = NestedDaphneDSLScript(self.daphne_context, self.nested_level + 1)
# get the inner scope variable names storing the output operation nodes
then_names = then_script.build_code(then_fn_outputs)
# store the generated code lines as string
then_body = then_script.daphnedsl_script
# assignment of the inner scope variable names to the variables of the outer scope
for i, name in enumerate(then_names):
then_body += f"{unnamed_input_vars[i]}={name};\n"
# pack all code lines in the if-statement structure
multiline_str = str()
multiline_str += f"if ({named_input_vars['pred']}) {{\n"
multiline_str += textwrap.indent(then_body, prefix=" ")
multiline_str += "}"

if self.else_fn:
# get tuple of output operation nodes for the 'else_fn'
else_fn_outputs = self.else_fn()
if not isinstance(else_fn_outputs, tuple):
else_fn_outputs = (else_fn_outputs,)
# generate the code lines for the 'else_fn' functionality
else_script = NestedDaphneDSLScript(self.daphne_context, self.nested_level + 1)
# get the inner scope variable names storing the output operation nodes
else_names = else_script.build_code(else_fn_outputs)
# store the generated code lines as string
else_body = else_script.daphnedsl_script
# assignment of the inner scope variable names to the variables of the outer scope
for i, name in enumerate(else_names):
else_body += f"{unnamed_input_vars[i]}={name};\n"
# pack all code lines in the else-statement structure
multiline_str += " else {\n"
multiline_str += textwrap.indent(else_body, prefix=" ")
multiline_str += "}"

return multiline_str

def compute(self) -> None:
raise NotImplementedError("'Cond' node is not intended to be computed")
Loading

0 comments on commit cba0f69

Please sign in to comment.