Source code for flask_appbuilder.models.generic

__author__ = "dpgaspar"

from datetime import date, datetime
import operator
import os

from ..._compat import with_metaclass

# --------------------------------------
#        Exceptions
# --------------------------------------


class PKMissingException(Exception):
    def __init__(self, model_name=""):
        message = "Please set one primary key on: {0}".format(model_name)
        super(PKMissingException, self).__init__(self, message)


[docs]class GenericColumn(object): col_type = None primary_key = None unique = None nullable = None def __init__(self, col_type, primary_key=False, unique=False, nullable=False): self.col_type = col_type self.primary_key = primary_key self.unique = unique self.nullable = nullable def check_type(self, value): return isinstance(value, self.col_type)
class MetaGenericModel(type): """ Meta class for GenericModel will change default properties: - instantiates internal '_col_defs' dict with all the defined columns. - Define pk property with the name of the primary key column - Define properties with a list of all column's properties - Define columns with a list of all column's name """ pk = None properties = None columns = None def __new__(meta, name, bases, dct): obj = super(MetaGenericModel, meta).__new__(meta, name, bases, dct) obj._col_defs = dict() obj._name = name for prop in dct: if isinstance(dct[prop], GenericColumn): vol_col = dct[prop] obj._col_defs[prop] = vol_col obj.properties = obj._col_defs obj.columns = list(obj._col_defs.keys()) for col in obj.columns: if obj._col_defs[col].primary_key: obj.pk = col break return obj
[docs]class GenericModel(with_metaclass(MetaGenericModel, object)): """ Generic Model class to define generic purpose models to use with the framework. Use GenericSession much like SQLAlchemy's Session Class. Extend GenericSession to implement specific engine features. Define your models like:: class MyGenericModel(GenericModel): id = GenericColumn(int, primary_key=True) age = GenericColumn(int) name = GenericColumn(str) """ def __init__(self, **kwargs): if not self.pk: # if only one column, set it as pk if len(self.columns) == 1: self._col_defs[self.columns[0]].primary_key = True else: raise PKMissingException(self._name) for arg in kwargs: if arg in self._col_defs: value = kwargs.get(arg) setattr(self, arg, value) def get_col_type(self, col_name): return self._col_defs[col_name].col_type def __repr__(self): return str(self) def __str__(self): str = self.__class__.__name__ + "=(" for col in self.columns: str += "{0}:{1};".format(col, getattr(self, col)) str += ")\n" return str
[docs]class GenericSession(object): """ This class is a base, you should subclass it to implement your own generic data source. Override at least the **all** method. **GenericSession** will implement filter and orders based on your data generation on the **all** method. """ def __init__(self): self._order_by_cmd = None self._filters_cmd = list() self.store = dict() self.query_filters = list() self.query_class = "" self._offset = 0 self._limit = 0
[docs] def clear(self): """ Deletes the entire store """ self.store = dict()
[docs] def delete_all(self, model_cls): """ Deletes all objects of type model_cls """ self.store[model_cls._name] = []
[docs] def get(self, pk): """ Returns the object for the key Override it for efficiency. """ for item in self.store.get(self.query_class): # coverts pk value to correct type pk = item.properties[item.pk].col_type(pk) if getattr(item, item.pk) == pk: return item
[docs] def query(self, model_cls): """ SQLAlchemy query like method """ self._filters_cmd = list() self.query_filters = list() self._order_by_cmd = None self._offset = 0 self._limit = 0 self.query_class = model_cls._name return self
def order_by(self, order_cmd): self._order_by_cmd = order_cmd return self def _order_by(self, data, order_cmd): col_name, direction = order_cmd.split() reverse_flag = direction == "desc" # patched as suggested by: # http://stackoverflow.com/questions/18411560/python-sort-list-with-none-at-the-end # and # http://stackoverflow.com/questions/5055942/sort-python-list-of-objects-by-date-when-some-are-none def col_name_if_not_none(data): """ - sqlite sets to null unfilled fields. - sqlalchemy cast this to None - this is a killer if the datum is of type datetime.date: - it breaks a simple key=operator.attrgetter(col_name) approach. this function tries to patch the issue """ op = operator.attrgetter(col_name) # noqa missing = getattr(data, col_name) is not None return missing, getattr(data, col_name) return sorted(data, key=col_name_if_not_none, reverse=reverse_flag) def scalar(self): return 0 # ----------------------------------------- # FUNCTIONS for FILTERS # ----------------------------------------- def starts_with(self, col_name, value): self._filters_cmd.append((self._starts_with, col_name, value)) return self def _starts_with(self, item, col_name, value): lw_col = getattr(item, col_name) try: lw_col = lw_col.lower() except Exception: return None lw_value = value.lower() lw_value_list = lw_value.split(" ") for lw_item in lw_value_list: if not lw_col.startswith(lw_item): return None return col_name def greater(self, col_name, value): self._filters_cmd.append((self._greater, col_name, value)) return self def _greater(self, item, col_name, value): source_value = getattr(item, col_name) try: # whatever we have to copare it will never match if source_value is None: return False # date has special constructor, tested only on sqlite elif isinstance(source_value, date): value = datetime.strptime(value, "%Y-%m-%d").date() # fallback to native python types else: value = type(source_value)(value) return source_value > value except Exception: # when everything fails silently report False return False def smaller(self, col_name, value): self._filters_cmd.append((self._smaller, col_name, value)) return self def _smaller(self, item, col_name, value): source_value = getattr(item, col_name) try: # whatever we have to copare it will never match if source_value is None: return False # date has special constructor, tested only on sqlite elif isinstance(source_value, date): value = datetime.strptime(value, "%Y-%m-%d").date() # fallback to native python types else: value = type(source_value)(value) return source_value < value except Exception: # when everything fails silently report False return False def ilike(self, col_name, value): self._filters_cmd.append((self._ilike, col_name, value)) return self def _ilike(self, item, col_name, value): lw_col = getattr(item, col_name) try: lw_col = lw_col.lower() except Exception: return None lw_value = value.lower() lw_value_list = lw_value.split(" ") for lw_item in lw_value_list: if lw_item not in lw_col: return None return col_name def like(self, col_name, value): self._filters_cmd.append((self._like, col_name, value)) return self def _like(self, item, col_name, value): lw_col = getattr(item, col_name) lw_value_list = value.split(" ") for lw_item in lw_value_list: if lw_item not in lw_col: return None return col_name def not_like(self, col_name, value): self._filters_cmd.append((self._not_like, col_name, value)) return self def _not_like(self, item, col_name, value): return value not in getattr(item, col_name) def equal(self, col_name, value): self._filters_cmd.append((self._equal, col_name, value)) return self def _equal(self, item, col_name, value): source_value = getattr(item, col_name) try: # whatever we have to copare it will never match if source_value is None: return False # date has special constructor, tested only on sqlite elif isinstance(source_value, date): value = datetime.strptime(value, "%Y-%m-%d").date() # fallback to native python types else: value = type(source_value)(value) return source_value == value except Exception: # when everything fails silently report False return False def not_equal(self, col_name, value): self._filters_cmd.append((self._not_equal, col_name, value)) return self def _not_equal(self, item, col_name, value): return not self._equal(item, col_name, value) def offset(self, offset=0): self._offset = offset return self def limit(self, limit=0): self._limit = limit return self
[docs] def all(self): """ SQLA like 'all' method, will populate all rows and apply all filters and orders to it. """ items = list() if not self._filters_cmd: items = self.store.get(self.query_class) else: for item in self.store.get(self.query_class): tmp_flag = True for filter_cmd in self._filters_cmd: if not filter_cmd[0](item, filter_cmd[1], filter_cmd[2]): tmp_flag = False break if tmp_flag: items.append(item) if self._order_by_cmd: items = self._order_by(items, self._order_by_cmd) total_length = len(items) if self._limit != 0: items = items[self._offset: self._offset + self._limit] return total_length, items
def add(self, model): model_cls_name = model._name cls_list = self.store.get(model_cls_name) if not cls_list: self.store[model_cls_name] = [] self.store[model_cls_name].append(model)
# ------------------------------------- # Example of an Generic Data Source # ------------------------------------- class PSModel(GenericModel): UID = GenericColumn(str) PID = GenericColumn(int, primary_key=True) PPID = GenericColumn(int) C = GenericColumn(int) STIME = GenericColumn(str) TTY = GenericColumn(str) TIME = GenericColumn(str) CMD = GenericColumn(str) class PSSession(GenericSession): regexp = ( "(\w+) +(\w+) +(\w+) +(\w+) +(\w+:\w+|\w+) (\?|tty\w+) +(\w+:\w+:\w+) +(.+)\n" ) def add_object(self, line): import re group = re.findall(self.regexp, line) if group: model = PSModel() model.UID = group[0][0] model.PID = int(group[0][1]) model.PPID = int(group[0][2]) model.C = int(group[0][3]) model.STIME = group[0][4] model.TTY = group[0][5] model.TIME = group[0][6] model.CMD = group[0][7] self.add(model) def get(self, pk): self.delete_all(PSModel()) out = os.popen("ps -p {0} -f".format(pk)) for line in out.readlines(): self.add_object(line) return super(PSSession, self).get(pk) def all(self): self.delete_all(PSModel()) out = os.popen("ps -ef") for line in out.readlines(): self.add_object(line) return super(PSSession, self).all()