From 1cadc3a03b385bd1221026a1e1ae9a1b1a176c6a Mon Sep 17 00:00:00 2001 From: Grant Jenks Date: Thu, 28 May 2015 15:08:38 -0700 Subject: [PATCH] Refactor report_to_list * Refactor permissions check into can_change_or_view * Make group-by on multiple display fields work * Clean-up code around display field paths and totals * Generate values_list in a compatible (with history) way * Refactor sorting to use defaults rather than try/catch * Refactor choice_lists so that missing choices don't error * Refactor display_format with formatter function * Apply formatting to totals and arrange columns correctly * Simplify sort_helper based on new default-argument These changes should be entirely backwards compatible. --- report_utils/mixins.py | 500 +++++++++++++++++++++-------------------- 1 file changed, 262 insertions(+), 238 deletions(-) diff --git a/report_utils/mixins.py b/report_utils/mixins.py index 419324e..69cafc7 100644 --- a/report_utils/mixins.py +++ b/report_utils/mixins.py @@ -20,14 +20,14 @@ get_properties_from_model, get_direct_fields_from_model, get_model_from_path_string, - get_custom_fields_from_model,) + get_custom_fields_from_model, +) DisplayField = namedtuple( "DisplayField", "path path_verbose field field_verbose aggregate total group choices field_type", ) - class DataExportMixin(object): def build_sheet(self, data, ws, sheet_name='report', header=None, widths=None): first_row = 1 @@ -116,334 +116,358 @@ def list_to_xlsx_response(self, data, title='report', header=None, return self.build_xlsx_response(wb, title=title) def add_aggregates(self, queryset, display_fields): + agg_funcs = { + 'Avg': Avg, 'Min': Min, 'Max': Max, 'Count': Count, 'Sum': Sum + } + for display_field in display_fields: - if hasattr(display_field, 'aggregate'): - if display_field.aggregate == "Avg": - queryset = queryset.annotate(Avg(display_field.path + display_field.field)) - elif display_field.aggregate == "Max": - queryset = queryset.annotate(Max(display_field.path + display_field.field)) - elif display_field.aggregate == "Min": - queryset = queryset.annotate(Min(display_field.path + display_field.field)) - elif display_field.aggregate == "Count": - queryset = queryset.annotate(Count(display_field.path + display_field.field)) - elif display_field.aggregate == "Sum": - queryset = queryset.annotate(Sum(display_field.path + display_field.field)) + if display_field.aggregate: + func = agg_funcs[display_field.aggregate] + full_name = display_field.path + display_field.field + queryset = queryset.annotate(func(full_name)) + return queryset def report_to_list(self, queryset, display_fields, user, property_filters=[], preview=False): - """ Create list from a report with all data filtering - preview: Return only first 50 - objects: Provide objects for list, instead of running filters - display_fields: a list of fields or a report_builder display field model - Returns list, message in case of issues + """ Create list from a report with all data filtering. + + queryset: initial queryset to generate results + display_fields: list of field references or DisplayField models + user: requesting user + property_filters: ??? + preview: return only first 50 rows + + Returns list, message in case of issues. """ model_class = queryset.model + + def can_change_or_view(model): + """ Return True iff `user` has either change or view permission + for `model`. + """ + try: + model_name = model._meta.model_name + except AttributeError: + # Needed for Django 1.4.* (LTS). + model_name = model._meta.module_name + + app_label = model._meta.app_label + can_change = user.has_perm(app_label + '.change_' + model_name) + can_view = user.has_perm(app_label + '.view_' + model_name) + + return can_change or can_view + + if not can_change_or_view(model_class): + return [], 'Permission Denied' + if isinstance(display_fields, list): - # Make it a report_builder.models.DisplayField like object + # Convert list of strings to DisplayField objects. + new_display_fields = [] + for display_field in display_fields: field_list = display_field.split('__') field = field_list[-1] - path = '__'.join([str(x) for x in field_list[:-1]]) + path = '__'.join(field_list[:-1]) + if path: - path += '__' # Legacy format to append a __ here. + path += '__' # Legacy format to append a __ here. + new_model = get_model_from_path_string(model_class, path) model_field = new_model._meta.get_field_by_name(field)[0] choices = model_field.choices new_display_fields.append(DisplayField( - path, '', field, '', '', None, None, choices, '')) + path, '', field, '', '', None, None, choices, '' + )) + display_fields = new_display_fields + # Build group-by field list. + + group = [df.path + df.field for df in display_fields if df.group] + + # To support group-by with multiple fields, we turn all the other + # fields into aggregations. The default aggregation is `Max`. + + if group: + for field in display_fields: + if (not field.group) and (not field.aggregate): + field.aggregate = 'Max' + message = "" objects = self.add_aggregates(queryset, display_fields) # Display Values + display_field_paths = [] property_list = {} custom_list = {} display_totals = {} - def append_display_total(display_totals, display_field, - display_field_key): - if display_field.total: - display_totals[display_field_key] = {'val': Decimal('0.00')} - for i, display_field in enumerate(display_fields): model = get_model_from_path_string(model_class, display_field.path) + if display_field.field_type == "Invalid": continue - if (user.has_perm(model._meta.app_label + '.change_' + model._meta.model_name) - or user.has_perm(model._meta.app_label + '.view_' + model._meta.model_name) - or not model): - # TODO: clean this up a bit + + if not model or can_change_or_view(model): display_field_key = display_field.path + display_field.field + if display_field.field_type == "Property": property_list[i] = display_field_key - append_display_total(display_totals, display_field, display_field_key) elif display_field.field_type == "Custom Field": custom_list[i] = display_field_key - append_display_total(display_totals, display_field, display_field_key) elif display_field.aggregate == "Avg": display_field_key += '__avg' - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) elif display_field.aggregate == "Max": display_field_key += '__max' - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) elif display_field.aggregate == "Min": display_field_key += '__min' - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) elif display_field.aggregate == "Count": display_field_key += '__count' - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) elif display_field.aggregate == "Sum": display_field_key += '__sum' - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) - else: - display_field_paths += [display_field_key] - append_display_total(display_totals, display_field, display_field_key) + + if display_field.field_type not in ('Property', 'Custom Field'): + display_field_paths.append(display_field_key) + + if display_field.total: + display_totals[display_field_key] = Decimal(0) + else: - message += "You don't have permission to " + display_field.name - - try: - model_name = model_class._meta.model_name - except AttributeError: - model_name = model_class._meta.module_name # needed for Django 1.4.* (LTS) - - if user.has_perm(model_class._meta.app_label + '.change_' + model_name) \ - or user.has_perm(model_class._meta.app_label + '.view_' + model_name): - - def increment_total(display_field_key, display_totals, val): - if display_field_key in display_totals: - # Booleans are Numbers - blah - if isinstance(val, Number) and not isinstance(val, bool): - # do decimal math for all numbers - display_totals[display_field_key]['val'] += Decimal(str(val)) - else: - display_totals[display_field_key]['val'] += Decimal('1.00') + message += 'Error: Permission denied on access to {0}.'.format( + display_field.name + ) - # get pk for primary and m2m relations in order to retrieve objects - # for adding properties to report rows + def increment_total(display_field_key, val): + """ Increment display total by `val` if given `display_field_key` in + `display_totals`. + """ + if display_field_key in display_totals: + if isinstance(val, bool): + # True: 1, False: 0 + display_totals[display_field_key] += Decimal(val) + elif isinstance(val, Number): + display_totals[display_field_key] += Decimal(str(val)) + elif val: + display_totals[display_field_key] += Decimal(1) + + # Select pk for primary and m2m relations in order to retrieve objects + # for adding properties to report rows. Group-by queries do not support + # Property nor Custom Field filters. + + if not group: display_field_paths.insert(0, 'pk') + m2m_relations = [] for position, property_path in property_list.items(): property_root = property_path.split('__')[0] root_class = model_class + try: property_root_class = getattr(root_class, property_root) - # django-hstore schema compatibility - except AttributeError: + except AttributeError: # django-hstore schema compatibility continue + if type(property_root_class) == ReverseManyRelatedObjectsDescriptor: display_field_paths.insert(1, '%s__pk' % property_root) m2m_relations.append(property_root) - values_and_properties_list = [] + + if group: + values = objects.values(*group) + values = self.add_aggregates(values, display_fields) + filtered_report_rows = [ + [row[field] for field in display_field_paths] + for row in values + ] + for row in filtered_report_rows: + for pos, field in enumerate(display_field_paths): + increment_total(field, row[pos]) + else: filtered_report_rows = [] - group = None - for df in display_fields: - if df.group: - group = df.path + df.field - break - if group: - filtered_report_rows = self.add_aggregates(objects.values_list(group), display_fields) - else: - values_list = objects.values_list(*display_field_paths) - - if not group: - for row in values_list: - row = list(row) - values_and_properties_list.append(row[1:]) - obj = None # we will get this only if needed for more complex processing - #related_objects - remove_row = False - # filter properties (remove rows with excluded properties) - for property_filter in property_filters: + values_and_properties_list = [] + + values_list = objects.values_list(*display_field_paths) + + for row in values_list: + row = list(row) + values_and_properties_list.append(row[1:]) + obj = None # we will get this only if needed for more complex processing + #related_objects + remove_row = False + # filter properties (remove rows with excluded properties) + for property_filter in property_filters: + if not obj: + obj = model_class.objects.get(pk=row.pop(0)) + root_relation = property_filter.path.split('__')[0] + if root_relation in m2m_relations: + pk = row[0] + if pk is not None: + # a related object exists + m2m_obj = getattr(obj, root_relation).get(pk=pk) + val = reduce(getattr, [property_filter.field], m2m_obj) + else: + val = None + else: + if property_filter.field_type == 'Custom Field': + for relation in property_filter.path.split('__'): + if hasattr(obj, root_relation): + obj = getattr(obj, root_relation) + val = obj.get_custom_value(property_filter.field) + else: + val = reduce(getattr, (property_filter.path + property_filter.field).split('__'), obj) + if property_filter.filter_property(val): + remove_row = True + values_and_properties_list.pop() + break + if not remove_row: + for i, field in enumerate(display_field_paths[1:]): + increment_total(field, row[i + 1]) + + for position, display_property in property_list.items(): if not obj: obj = model_class.objects.get(pk=row.pop(0)) - root_relation = property_filter.path.split('__')[0] + relations = display_property.split('__') + root_relation = relations[0] if root_relation in m2m_relations: - pk = row[0] + pk = row.pop(0) if pk is not None: # a related object exists m2m_obj = getattr(obj, root_relation).get(pk=pk) - val = reduce(getattr, [property_filter.field], m2m_obj) + val = reduce(getattr, relations[1:], m2m_obj) else: val = None else: - if property_filter.field_type == 'Custom Field': - for relation in property_filter.path.split('__'): - if hasattr(obj, root_relation): - obj = getattr(obj, root_relation) - val = obj.get_custom_value(property_filter.field) - else: - val = reduce(getattr, (property_filter.path + property_filter.field).split('__'), obj) - if property_filter.filter_property(val): - remove_row = True - values_and_properties_list.pop() - break - if not remove_row: - # increment totals for fields - for i, field in enumerate(display_field_paths[1:]): - if field in display_totals.keys(): - increment_total(field, display_totals, row[i]) - for position, display_property in property_list.items(): - if not obj: - obj = model_class.objects.get(pk=row.pop(0)) - relations = display_property.split('__') - root_relation = relations[0] - if root_relation in m2m_relations: - pk = row.pop(0) - if pk is not None: - # a related object exists - m2m_obj = getattr(obj, root_relation).get(pk=pk) - val = reduce(getattr, relations[1:], m2m_obj) - else: - val = None - else: - # Could error if a related field doesn't exist - try: - val = reduce(getattr, relations, obj) - except AttributeError: - val = None - values_and_properties_list[-1].insert(position, val) - increment_total(display_property, display_totals, val) - for position, display_custom in custom_list.items(): - if not obj: - obj = model_class.objects.get(pk=row.pop(0)) - val = obj.get_custom_value(display_custom) - values_and_properties_list[-1].insert(position, val) - increment_total(display_custom, display_totals, val) - filtered_report_rows += [values_and_properties_list[-1]] - if preview and len(filtered_report_rows) == 50: - break - if hasattr(display_fields, 'filter'): - sort_fields = display_fields.filter( - sort__gt=0, - ).order_by('-sort').values_list('position', 'sort_reverse') - for sort_field in sort_fields: - sort_value = sort_field[0] - try: - filtered_report_rows = sorted( - filtered_report_rows, - key=lambda x: self.sort_helper(x, sort_value), - reverse=sort_field[1] - ) - # Crappy way to deal with null dates. - except TypeError: - try: - filtered_report_rows = sorted( - filtered_report_rows, - key=lambda x: self.sort_helper( - x, sort_value, date_field=True), - reverse=sort_field[1] - ) - except TypeError: - filtered_report_rows = sorted( - filtered_report_rows, - key=lambda x: self.sort_helper( - x, sort_value, number_field=True), - reverse=sort_field[1] - ) - - values_and_properties_list = filtered_report_rows - else: - values_and_properties_list = [] - message = "Permission Denied" + # Could error if a related field doesn't exist + try: + val = reduce(getattr, relations, obj) + except AttributeError: + val = None + values_and_properties_list[-1].insert(position, val) + increment_total(display_property, val) + + for position, display_custom in custom_list.items(): + if not obj: + obj = model_class.objects.get(pk=row.pop(0)) + val = obj.get_custom_value(display_custom) + values_and_properties_list[-1].insert(position, val) + increment_total(display_custom, val) + + filtered_report_rows.append(values_and_properties_list[-1]) + + if preview and len(filtered_report_rows) == 50: + break + + # Sort results if requested. + + if hasattr(display_fields, 'filter'): + defaults = { + None: unicode, + datetime.date: lambda: datetime.date(datetime.MINYEAR, 1, 1), + datetime.datetime: lambda: datetime.datetime(datetime.MINYEAR, 1, 1), + } + + # Order sort fields in reverse order so that ascending, descending + # sort orders work together (based on Python's stable sort). See + # http://stackoverflow.com/questions/6666748/ for details. + + sort_fields = display_fields.filter(sort__gt=0).order_by('-sort') + sort_values = sort_fields.values_list('position', 'sort_reverse') + + for pos, reverse in sort_values: + column = (row[pos] for row in filtered_report_rows) + type_col = (type(val) for val in column if val is not None) + field_type = next(type_col, None) + default = defaults.get(field_type, field_type)() + + filtered_report_rows = sorted( + filtered_report_rows, + key=lambda row: self.sort_helper(row[pos], default), + reverse=reverse, + ) + + values_and_properties_list = filtered_report_rows + + # Build mapping from display field position to choices list. - # add choice list display and display field formatting choice_lists = {} - display_formats = {} - final_list = [] for df in display_fields: if df.choices and hasattr(df, 'choices_dict'): df_choices = df.choices_dict - # Insert blank and None as valid choices + # Insert blank and None as valid choices. df_choices[''] = '' df_choices[None] = '' - choice_lists.update({df.position: df_choices}) + choice_lists[df.position] = df_choices + + # Build mapping from display field position to format. + + display_formats = {} + + for df in display_fields: if hasattr(df, 'display_format') and df.display_format: - display_formats.update({df.position: df.display_format}) + display_formats[df.position] = df.display_format + + def formatter(value, style): + # Convert value to Decimal to apply numeric formats. + try: + value = Decimal(value) + except Exception: + pass + + try: + return style.string.format(value) + except ValueError: + return value + + # Iterate rows and convert values by choice lists and field formats. + + final_list = [] for row in values_and_properties_list: - # add display totals for grouped result sets - # TODO: dry this up, duplicated logic in non-grouped total routine - if group: - # increment totals for fields - for i, field in enumerate(display_field_paths[1:]): - if field in display_totals.keys(): - increment_total(field, display_totals, row[i]) row = list(row) + for position, choice_list in choice_lists.items(): - row[position] = unicode(choice_list[row[position]]) - for position, display_format in display_formats.items(): - # convert value to be formatted into Decimal in order to apply - # numeric formats - try: - value = Decimal(row[position]) - except: - value = row[position] - # Try to format the value, let it go without formatting for ValueErrors try: - row[position] = display_format.string.format(value) - except ValueError: - row[position] = value + row[position] = unicode(choice_list[row[position]]) + except Exception: + row[position] = unicode(row[position]) + + for pos, style in display_formats.items(): + row[pos] = formatter(row[pos], style) + final_list.append(row) + values_and_properties_list = final_list if display_totals: display_totals_row = [] - fields_and_properties = list(display_field_paths[1:]) - for position, value in property_list.iteritems(): + fields_and_properties = list(display_field_paths[0 if group else 1:]) + + for position, value in property_list.items(): fields_and_properties.insert(position, value) - for i, field in enumerate(fields_and_properties): - if field in display_totals.keys(): - display_totals_row += [display_totals[field]['val']] - else: - display_totals_row += [''] - - # add formatting to display totals - for df in display_fields: - if df.display_format: - try: - value = Decimal(display_totals_row[df.position-1]) - except: - value = display_totals_row[df.position-1] - # Fall back to original value if format string and value - # aren't compatible, e.g. a numerically-oriented format - # string with value which is not numeric. - try: - value = df.display_format.string.format(value) - except ValueError: - pass - display_totals_row[df.position-1] = value - - values_and_properties_list = ( - values_and_properties_list + [ - ['TOTALS'] + (len(fields_and_properties) - 1) * [''] - ] + [display_totals_row] - ) - return values_and_properties_list, message + for field in fields_and_properties: + display_totals_row.append(display_totals.get(field, '')) - def sort_helper(self, x, sort_key, date_field=False, number_field=False): - # If comparing datefields, assume null is the min year - if date_field and x[sort_key] == None: - result = datetime.date(datetime.MINYEAR, 1, 1) - elif number_field and x[sort_key] == None: - result = 0 - else: - result = x[sort_key] - if isinstance(result, string_types): - return result.lower() - elif result is None: - return '' - return result + # Add formatting to display totals. + + for pos, style in display_formats.items(): + display_totals_row[pos] = formatter(display_totals_row[pos], style) + + values_and_properties_list.append( + ['TOTALS'] + (len(fields_and_properties) - 1) * [''] + ) + values_and_properties_list.append(display_totals_row) + + return values_and_properties_list, message + def sort_helper(self, value, default): + if value is None: + value = default + if isinstance(value, string_types): + value = value.lower() + return value class GetFieldsMixin(object): def get_fields(self, model_class, field_name='', path='', path_verbose=''):