Source code for flask_appbuilder.models.group

from __future__ import unicode_literals

import calendar
import datetime
from functools import reduce
from itertools import groupby
import logging

from flask_appbuilder._compat import as_unicode
from flask_babel import lazy_gettext as _

from .. import const as c

log = logging.getLogger(__name__)


def aggregate(label=""):
    """
    Use this decorator to set a label for your aggregation functions on charts.

    :param label:
        The label to complement with the column
    """

    def wrap(f):
        f._label = label
        return f

    return wrap


[docs] @aggregate(_("Count of")) def aggregate_count(items, col): """ Function to use on Group by Charts. accepts a list and returns the count of the list's items """ return len(list(items))
[docs] @aggregate(_("Sum of")) def aggregate_sum(items, col): """ Function to use on Group by Charts. accepts a list and returns the sum of the list's items """ return sum(getattr(item, col) for item in items)
[docs] @aggregate(_("Avg. of")) def aggregate_avg(items, col): """ Function to use on Group by Charts. accepts a list and returns the average of the list's items """ try: return aggregate_sum(items, col) / aggregate_count(items, col) except Exception: log.warning(c.LOGMSG_WAR_DBI_AVG_ZERODIV) return 0.0
class BaseGroupBy(object): column_name = "" name = "" aggregate_func = None aggregate_col = "" def __init__( self, column_name, name, aggregate_func=aggregate_count, aggregate_col="" ): """ Constructor. :param column_name: Model field name :param name: The group by name """ self.column_name = column_name self.name = name self.aggregate_func = aggregate_func self.aggregate_col = aggregate_col def apply(self, data): """ Override this to implement you own new filters """ pass def get_group_col(self, item): return getattr(item, self.column_name) def get_format_group_col(self, item): return item def get_aggregate_col_name(self): if self.aggregate_col: return self.aggregate_func.__name__ + "_" + self.aggregate_col else: return self.aggregate_func.__name__ def __repr__(self): return self.name class GroupByCol(BaseGroupBy): def _apply(self, data): data = sorted(data, key=self.get_group_col) json_data = dict() json_data["cols"] = [ {"id": self.column_name, "label": self.column_name, "type": "string"}, { "id": self.aggregate_func.__name__ + "_" + self.column_name, "label": self.aggregate_func.__name__ + "_" + self.column_name, "type": "number", }, ] json_data["rows"] = [] for grouped, items in groupby(data, self.get_group_col): aggregate_value = self.aggregate_func(items, self.aggregate_col) json_data["rows"].append( { "c": [ {"v": self.get_format_group_col(grouped)}, {"v": aggregate_value}, ] } ) return json_data def apply(self, data): data = sorted(data, key=self.get_group_col) return [ [ self.get_format_group_col(grouped), self.aggregate_func(items, self.aggregate_col), ] for (grouped, items) in groupby(data, self.get_group_col) ] class GroupByDateYear(BaseGroupBy): def apply(self, data): data = sorted(data, key=self.get_group_col) return [ [ self.get_format_group_col(grouped), self.aggregate_func(items, self.aggregate_col), ] for (grouped, items) in groupby(data, self.get_group_col) ] def get_group_col(self, item): value = getattr(item, self.column_name) if value: return value.year class GroupByDateMonth(BaseGroupBy): def apply(self, data): data = sorted(data, key=self.get_group_col) return [ [ self.get_format_group_col(grouped), self.aggregate_func(items, self.aggregate_col), ] for (grouped, items) in groupby(data, self.get_group_col) if grouped ] def get_group_col(self, item): value = getattr(item, self.column_name) if value: return value.year, value.month def get_format_group_col(self, item): return calendar.month_name[item[1]] + " " + str(item[0]) class BaseProcessData(object): """ Base class to process data. It will group data by one or many columns or functions. The aggregation is made by an already defined function, or by a custom function :group_bys_cols: A list of columns or functions to group data. :aggr_by_cols: A list of tuples [(<AGGR FUNC>,'<COLNAME>'),...]. :formatter_by_cols: A dict. """ group_bys_cols = None # ['<COLNAME>',<FUNC>, ....] aggr_by_cols = None # [(<AGGR FUNC>,'<COLNAME>'),...] formatter_by_cols = {} # {<FUNC>: '<COLNAME>',...} def __init__(self, group_by_cols, aggr_by_cols, formatter_by_cols): self.group_bys_cols = group_by_cols self.aggr_by_cols = aggr_by_cols self.formatter_by_cols = formatter_by_cols def attrgetter(self, *items): if len(items) == 1: attr = items[0] def g(obj): return self.resolve_attr(obj, attr) else: def g(obj): return tuple(self.resolve_attr(obj, attr) for attr in items) return g def resolve_attr(self, obj, attr): if not hasattr(obj, attr): # it's an inner obj attr return reduce(getattr, attr.split("."), obj) if hasattr(getattr(obj, attr), "__call__"): # its a function return getattr(obj, attr)() else: # it's an attribute return getattr(obj, attr) def format_columns(self, *values): if len(values) == 1: return self.format_column(self.group_bys_cols[0], values[0]) else: return tuple( self.format_column(item, value) for item, value in (self.group_bys_cols, values) ) def format_column(self, item, value): if item in self.formatter_by_cols: return self.formatter_by_cols[item](value) else: return value def apply(self, data): pass def to_dict(self, data): ret = [] for item in data: row = {} if not isinstance(item[0], tuple): row[self.group_bys_cols[0]] = str(item[0]) else: for group_col_data, i in zip(item[0], enumerate(item[0])): row[self.group_bys_cols[i]] = str(group_col_data) for col_data, i in zip(item[1:], enumerate(item[1:])): log.debug("%s,%s", col_data, i) key = self.aggr_by_cols[i].__name__ + self.aggr_by_cols[i] if isinstance(col_data, datetime.date): row[key] = str(col_data) else: row[key] = col_data ret.append(row) return ret def to_json(self, data, labels=None): """ Will return a dict with Google JSON structure for charts The Google structure:: { cols: [{id:<COL_NAME>, label:<LABEL FOR COL>, type: <COL TYPE>}, ...] rows: [{c: [{v: <COL VALUE}, ...], ... ] } :param data: :param labels: dict with labels to include on Google JSON strcut :return: dict with Google JSON structure """ labels = labels or dict() json_data = dict() json_data["cols"] = [] # Create Structure to identify the grouped columns for group_col in self.group_bys_cols: label = "" or as_unicode(labels[group_col]) json_data["cols"].append( {"id": group_col, "label": label, "type": "string"} ) # Create Structure to identify the Aggregated columns for aggr_col in self.aggr_by_cols: if isinstance(aggr_col, tuple): label_key = aggr_col[0].__name__ + aggr_col[1] aggr_col = aggr_col[1] else: label_key = aggr_col label = "" or as_unicode(labels[label_key]) json_data["cols"].append({"id": aggr_col, "label": label, "type": "number"}) # Create Structure with the data json_data["rows"] = [] for item in data: row = {"c": []} if not isinstance(item[0], tuple): row["c"].append({"v": "{0}".format(item[0])}) else: for group_col_data in item[0]: row["c"].append({"v": "{0}".format(group_col_data)}) for col_data in item[1:]: if isinstance(col_data, datetime.date): row["c"].append({"v": "{0}".format(col_data)}) else: row["c"].append({"v": col_data}) json_data["rows"].append(row) return json_data class DirectProcessData(BaseProcessData): def apply(self, data, sort=True): group_by = self.group_bys_cols[0] if sort: data = sorted(data, key=self.attrgetter(group_by)) result = [] for item in data: result_item = [self.format_columns(self.attrgetter(group_by)(item))] for aggr_by_col in self.aggr_by_cols: result_item.append(self.attrgetter(aggr_by_col)(item)) result.append(result_item) return result class GroupByProcessData(BaseProcessData): """ Groups by data by chosen columns (property group_bys_cols). :data: A list of objects :sort: boolean, if true python will sort the data :return: A List of lists with group column and aggregation """ def apply(self, data, sort=True): if sort: data = sorted(data, key=self.attrgetter(*self.group_bys_cols)) result = [] for grouped, items in groupby(data, key=self.attrgetter(*self.group_bys_cols)): items = list(items) result_item = [self.format_columns(grouped)] for aggr_by_col in self.aggr_by_cols: result_item.append(aggr_by_col[0](items, aggr_by_col[1])) result.append(result_item) return result