在Python中使用Class.objects.filter(...)模式

2024-09-24 22:18:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我希望使用Model.objects.filter(...)的django模型中使用的模式来构建跨数据的过滤器。这可能是pandas的一个很好的用例,但是在尝试之前,我更感兴趣的是改进我的python(首先)。在

如果我有以下数据:

DATA = [
    {'id': 1, 'name': 'brad', 'color':'red'},
    {'id': 2, 'name': 'sylvia', 'color':'blue'},
]

我想构建类似于以下内容的内容:

^{pr2}$

并将objects等效于“ModelManager”,然后从那里进行筛选,以便调用:

MyData.objects.filter(id>1)

然后得到:

[
    {'id': 2, 'name': 'sylvia', 'color':'blue'}
]

当然,我可以做一些简单的事情:

res = [_ for _ in DATA if _['id'] > 1]

但我更感兴趣的是设计模式本身——这个例子的琐碎本质只是为了展示我想要实现的目标。在

什么是一个好的,基本的方法来正确地做这件事?这是django中与它相关的类:https://github.com/django/django/blob/master/django/db/models/query.py#L185。在


Tags: 数据djangoname模型id过滤器datamodel
3条回答

OP想要这样做MyData.objects.filter(id>1)。在

让我们面对现实吧。在

问题是Python贪婪(急于计算表达式),而不像Haskell那样懒惰。
注意David Beazley - Lambda Calculus from the Ground Up - PyCon 2019有没有让人心烦意乱的东西。在

Python在调用filter之前计算id > 1。如果现在可以停止求值,可以将未求值的表达式传递给filter函数。在

但是,如果我们将表达式括在函数中,则可以将表达式求值延迟到需要时。这就是我的想法。在

如果我们可以实现函数接口,它将是filter(lambda: id > 1)。 这个接口将是超级通用的,因为任何Python表达式都可以被传递和滥用。在

实施情况

{{cdda}如果调用的是另一个函数{cdda},则调用该函数。在

如果我们可以在Python在builtins中找到id之前,在查找路径的某个地方引入一个名为id的对象,我们就可以重新定义表达式的语义。在

我将用eval来实现它,它计算给定上下文中的表达式。在

DATA = [
    {'id': 1, 'name': 'brad', 'color':'red'},
    {'id': 2, 'name': 'sylvia', 'color':'blue'},
]

def myfilter(a_lambda):
    return filter(lambda obj: eval(a_lambda.__code__, obj.copy()),
    DATA)

我将一个dict.copy传递给eval,因为eval修改了它的globals对象。在

Model类的上下文中查看它的实际操作

^{pr2}$

Data类继承自Model。{{{cda>{cda}给了一个名为^ cda}的类{cda},它是一个名为^ cda}的类。在

当从子类访问objects属性时,MetaManager将一个Manager实例返回给Model的子类。MetaManger标识访问类并将其传递给Manager实例。 Manager处理对象的创建、持久化和获取。在

为了简单起见,db被实现为Manager的类属性。在

为了停止通过函数滥用全局对象,filter函数在未传递lambda时引发异常。在

from collections import defaultdict
from collections.abc import Callable


class MetaManager:
    def __get__(self, obj, objtype):
        if obj is None:
            return Manager(objtype)
        else:
            raise AttributeError(
                "Manger isn't accessible via {} instances".format(objtype)
            )


class Manager:
    _store = defaultdict(list)

    def __init__(self, client):
        self._client = client
        self._client_name = "{}.{}".format(client.__module__, client.__qualname__)

    def create(self, **kwargs):
        self._store[self._client_name].append(self._client(**kwargs))

    def all(self):
        return (obj for obj in self._store[self._client_name])

    def filter(self, a_lambda):
        if a_lambda.__code__.co_name != "<lambda>":
            raise ValueError("a lambda required")

        return (
            obj
            for obj in self._store[self._client_name]

            if eval(a_lambda.__code__, vars(obj).copy())
        )


class Model:
    objects = MetaManager()

    def __init__(self, **kwargs):
        if type(self) is Model:
            raise NotImplementedError

        class_attrs = self.__get_class_attributes(type(self))

        self.__init_instance(class_attrs, kwargs)

    def __get_class_attributes(self, cls):
        attrs = vars(cls)
        if "objects" in attrs:
            raise AttributeError(
                'class {} has an attribute named "objects" of type "{}"'.format(
                    type(self), type(attrs["objects"])
                )
            )
        attrs = {
            attr: obj
            for attr, obj in vars(cls).items()
            if not attr.startswith("_") and not isinstance(obj, Callable)
        }
        return attrs

    def __init_instance(self, attrs, kwargs_dict):
        for key, item in kwargs_dict.items():
            if key not in attrs:
                raise TypeError('Got an unexpected key word argument "{}"'.format(key))
            if isinstance(item, type(attrs[key])):
                setattr(self, key, item)
            else:
                raise TypeError(
                    "Expected type {}, got {}".format(type(attrs[key]), type(item))
                )


if __name__ == "__main__":
    from pprint import pprint

    class Data(Model):
        name = str()
        id = int()
        color = str()

    Data.objects.create(**{"id": 1, "name": "brad", "color": "red"})
    Data.objects.create(**{"id": 2, "name": "sylvia", "color": "blue"})
    Data.objects.create(**{"id": 3, "name": "paul", "color": "red"})
    Data.objects.create(**{"id": 4, "name": "brandon", "color": "yello"})
    Data.objects.create(**{"id": 5, "name": "martin", "color": "green"})
    Data.objects.create(**{"id": 6, "name": "annie", "color": "gray"})

    pprint([vars(obj) for obj in Data.objects.filter(lambda: id == 1)])
    pprint([vars(obj) for obj in Data.objects.filter(lambda: 1 <= id <= 2)])
    pprint([vars(obj) for obj in Data.objects.filter(lambda: color == "blue")])
    pprint(
        [
            vars(obj)
            for obj in Data.objects.filter(
                lambda: "e" in color and (name is "brad" or name is "sylvia")
            )
        ]
    )
    pprint([vars(obj) for obj in Data.objects.filter(lambda: id % 2 == 1)])

如果您想要完整的django Model体验,即:

  • datapoint = MyData(name='johndoe', color='green', ...)创建一个新的特征向量或数据条目,就像django中一样:例如new_user=User(username='johndoe', email='jd@jd.com')
  • 使用MyData.objects进行对象管理,如MyData.objects.filter(color__eq='yellow')

下面是一个关于逻辑的方法。在

首先,您需要一个简单的ObjectManager类:

import collections
import operator
import inspect

class ObjectManager(collections.MutableSet):
    def __init__(self):
        # this will hold a list of all attributes from your custom class, once 
        # initiated
        self._object_attributes = None
        self._theset = set()
    def add(self, item):
        self._theset.add(item)
    def discard(self, item):
        self._theset.discard(item)
    def __iter__(self):
        return iter(self._theset)
    def __len__(self):
        return len(self._theset)
    def __contains__(self, item):
        try:
            return item in self._theset
        except AttributeError:
            return False

    def set_attributes(self, an_object):
        self._object_attributes = [
            a[0] for a in  inspect.getmembers(
                an_object, lambda a:not(inspect.isroutine(a))
            ) if not(a[0].startswith('__') and a[0].endswith('__'))
            ]

    def filter(self, **kwargs):
        """Filters your objects according to one or several conditions

        If several filtering conditions are present you can set the 
        combination mode to either 'and' or 'or'.
        """
        mode = kwargs.pop('mode', 'or')
        ok_objects = set()
        for kw in kwargs:
            if '__' in kw:
                _kw, op = kw.split('__')
                # only allow valid operators
                assert op in ('lt', 'le', 'eq', 'ne', 'ge', 'gt')
            else:
                op = 'eq'
                _kw = kw
            _oper = getattr(operator, op)
            # only allow access to valid object attributes
            assert _kw in self._object_attributes
            n_objects = (
                obj for obj in self 
                if _oper(getattr(obj, _kw), kwargs[kw])
                )
            if mode == 'and':
                if n_objects:
                    ok_objects = ok_objects.intersection(n_objects)\
                        if ok_objects else set(n_objects)
                else:
                    return set()

            else:
                ok_objects.update(n_objects)
        return ok_objects

    # feel free to add a `get_or_create`, `create`, etc. 

现在,将该类的一个实例作为属性附加到MyData类,并确保所有新对象都已添加到该类中:

^{pr2}$

现在可以导入向量:

DATA = [
    {'uid': 1, 'name': 'brad', 'color':'red'},
    {'uid': 2, 'name': 'sylvia', 'color':'blue'},
]
for dat in DATA:
    myData(**dat)

或创建新实例:

d1 = MyData(uid=10, name='john', color='yellow')

并利用管理器过滤对象:

print([md.name for md in MyData.objects.filter(uid__ge=10)])
# > ['john']
print([md.name for md in MyData.objects.filter(mode='and',uid__ge=1,name__eq='john')])
# > ['john']
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['john', 'brad', 'sylvia']


如果你不想改变这个班,你就不想换了您甚至可以创建一个ObjectManager,它可以在定义或甚至启动某些实例后挂接到任意类(尽管内置类型不起作用)。在

其思想是对目标类的__init__进行monkey修补,并在ObjectManager实例的init上添加objects属性:

import gc
import inspect
import collections
import operator
import wrapt  # not standard lib > pip install wrapt

class ObjectManager(collections.MutableSet):
    def __init__(self, attach_to):
        self._object_attributes = None
        # add self as class attribute
        attach_to.objects = self
        # monkey patch __init__ of your target class
        @wrapt.patch_function_wrapper(attach_to, '__init__')
        def n_init(wrapped, instance, args, kwargs):
            wrapped(*args, **kwargs)
            c_objects = instance.__class__.objects
            if not c_objects:
                c_objects.set_attributes(instance)
            c_objects.add(instance)
        # make sure to be up to date with the existing instances
        self._theset = set(obj for obj in gc.get_objects() if isinstance(obj, attach_to))
        # already fetch the attributes if instances exist
        if self._theset:
            self.set_attributes(next(iter(self._theset)))
        ...
        # the rest is identical to the version above

你现在应该怎么使用它:

class MyData:

    def __init__(self, uid, name, color):
        self.uid = uid
        self.name = name
        self.color = color

# create some instances
DATA = [
    {'uid': 1, 'name': 'brad', 'color':'red'},
    {'uid': 2, 'name': 'sylvia', 'color':'blue'},
]
my_datas = []
for dat in DATA:
    my_datas.append(myData(**dat))  # appending them just to have a reference
# say that ONLY NOW you decide you want to use an object manager
# Simply do:
ObjectManager(MyData)
# and you are done:
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['brad', 'sylvia']
# also any object you create from now on is included:
d1 = MyData(uid=10, name='john', color='yellow')
print([md.name for md in MyData.objects.filter(mode='or',uid__le=4,name__eq='john')])
# > ['brad', 'sylvia', 'john']

这是你的意思吗?在

此解决方案不依赖于外部库和使用 **kwargs、发电机/外壳和@property decorator。所以从学习的角度来看这可能很有趣。在

如果您设法使用Django读取列表中的数据,那么对于Django兼容性,作为我的代码,这可能会更好。 这取决于你的目标是什么。(完美地模仿django过滤器)或者(学习如何做一个不那么完美的模仿,但是拥有完整的源代码而没有依赖关系)

DATA = [
    {'id': 1, 'name': 'brad',    'color':'red'},
    {'id': 2, 'name': 'sylvia',  'color':'blue'},
    {'id': 3, 'name': 'paul',    'color':'red'},
    {'id': 4, 'name': 'brandon', 'color':'yello'},
    {'id': 5, 'name': 'martin',  'color':'green'},
    {'id': 6, 'name': 'annie',  'color':'gray'},
]

class UnknownOperator(Exception):
    """ custom exception """

class FilterData:
    def __init__(self, data):
        self.data = data

    def _filter_step(self, key, value, data):
        if not "__" in key:
            return (entry for entry in data if entry[key] == value)
        else:
            key, operator = key.split("__")
            if operator == "gt":  # greater than
                return (entry for entry in data if entry[key] > value)
            elif operator == "lt":  # less than
                return (entry for entry in data if entry[key] < value)
            elif operator == "startswith":  # starts with
                return (entry for entry in data if entry[key].startswith(value))
            elif operator == "in":  # starts with
                return (entry for entry in data if entry[key] in value)
            else:
                raise UnknownOperator("operator %s is unknown" % operator)

    def _exclude_step(self, key, value, data):
        if not "__" in key:
            return (entry for entry in data if entry[key] != value)
        else:
            key, operator = key.split("__")
            if operator == "gt":  # greater than
                return (entry for entry in data if entry[key] <= value)
            elif operator == "lt":  # less than
                return (entry for entry in data if entry[key] >= value)
            elif operator == "startswith":  # starts with
                return (entry for entry in data if not entry[key].startswith(value))
            elif operator == "in":  # starts with
                return (entry for entry in data if entry[key] not in value)
            else:
                raise UnknownOperator("operator %s is unknown" % operator)


    def filter(self, **kwargs):
        data = (entry for entry in self.data)
        for key, value in kwargs.items():
            data = self._filter_step(key, value, data)

        return FilterData(data)

    def exclude(self, **kwargs):
        data = (entry for entry in self.data)
        for key, value in kwargs.items():
            data = self._exclude_step(key, value, data)

        return FilterData(data)

    def all(self):
        return FilterData(self.data)

    def count(self):
        cnt = 0
        for cnt, entry in enumerate(self.data, 1):
            pass
        return cnt

    def __iter__(self):
        for entry in self.data:
            yield entry

# make it even more look like django managers / filters
class DataManager:
    def __init__(self, data):
        self.data = data
    @property
    def objects(self):
        return FilterData(self.data)


fdata = FilterData(DATA)

assert [v["id"] for v in fdata.filter(name="paul")] == [3]
assert [v["id"] for v in fdata.filter(color="red")] == [1, 3]
assert [v["id"] for v in fdata.filter(id__gt=2)] == [3, 4, 5, 6]
assert [v["id"] for v in fdata.filter(color__startswith="gr")] == [5, 6]

fmgr = DataManager(DATA)

assert [v["id"] for v in fmgr.objects.filter(name="paul")] == [3]
assert [v["id"] for v in fmgr.objects.filter(color="red")] == [1, 3]
assert [v["id"] for v in fmgr.objects.filter(id__gt=2)] == [3, 4, 5, 6]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr")] == [5, 6]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr", id__lt=6)] == [5]
assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr", id__lt=6)] == [5]

assert [v["id"] for v in fmgr.objects.filter(color__startswith="gr").filter(id__lt=6)] == [5]

assert fmgr.objects.filter(color__startswith="gr").filter(id__lt=6).count() == 1
assert fmgr.objects.filter(id__gt=2).count() == 4
assert fmgr.objects.count() == 6
assert [v["id"] for v in fmgr.objects.all()] == list(range(1, 7))

相关问题 更多 >