如何使用Django跨模式查询?

2024-10-01 13:31:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我的postgres数据库中有很多模式。我想在我所有的模式中得到一个查询集(UNION all)。你知道吗

我使用的是Django2.2、Python3.6和Postgres9.7。你知道吗

我尝试定义一个特定的DBManager,比如:

class UnionManager(models.Manager):
    def get_queryset(self):
        return super().get_queryset().using("db1") | super().get_queryset().using("db2")


class Account(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
    label = models.CharField(max_length=100, null=True)   
    objects = models.Manager()
    union_manager = UnionManager()


并使用以下数据库配置:

    config = {
        'default': {
            'ENGINE': 'django.db.backends.postgresql_psycopg2',            
            'NAME': 'databasename',
            'USER': bdd_user,
            'PASSWORD': db_pwd,
            'PORT': bdd_port,
            'HOST': host_writer,
        },
        'db1': {
            'ENGINE': 'django.db.backends.postgresql_psycopg2',
            'OPTIONS': {
                'options': '-c search_path=db1,extensions'
            },
            'NAME': 'databasename',
            'USER': bdd_user,
            'PASSWORD': db_pwd,
            'PORT': bdd_port,
            'HOST': host_writer,
        },
        'db2': {
            'ENGINE': 'django.db.backends.postgresql_psycopg2',
            'OPTIONS': {
                'options': '-c search_path=db2,extensions'
            },
            'NAME': 'databasename',
            'USER': bdd_user,
            'PASSWORD': db_pwd,
            'PORT': bdd_port,
            'HOST': host_writer,
        }
   }

queryset只从UnionManager类(这里是“db1”)中的第一个模式(Account.union_manager.all())返回对象 有什么建议吗?你知道吗


Tags: djangotruedbgetmodelspostgresql模式bdd
1条回答
网友
1楼 · 发布于 2024-10-01 13:31:12

我找到了一个解决方案(未完全实施):

重新定义查询集:

def call_wrapper(fn):
    def wrapper(self, *args, **kwargs):
        return CustomQueryset(querysets=[getattr(qs, fn.__name__)(*args, **kwargs) for qs in self.querysets])

    return wrapper


class CustomQueryset(QuerySet):

    def __init__(self, querysets, model=None, query=None, using=None, hints=None):
        self.querysets = querysets
        super().__init__(model, query, using, hints)

    def __deepcopy__(self, memo):
        raise NotImplementedError("__deepcopy__ not implemented")

    def __getstate__(self):
        raise NotImplementedError("__getstate__ not implemented")

    def __setstate__(self, state):
        raise NotImplementedError("__getstate__ not implemented")

    def __iter__(self):
        return chain(*[qs for qs in self.querysets])

    def __repr__(self):
        return "\n".join([str(qs) for qs in self.querysets])

    def __len__(self):
        return sum([len(qs) for qs in self.querysets])

    def __bool__(self):
        raise NotImplementedError("__bool__ not implemented")    

    @call_wrapper
    def filter(self, *args, **kwargs):
        pass

    def order_by(self, *field_names):
        raise ValueError('order_by not suported for CustomQueryset')

    @call_wrapper
    def exclude(self, *args, **kwargs):
        pass

    @call_wrapper
    def distinct(self, *field_names):
        pass

    def count(self):
        return sum([qs.count() for qs in self.querysets])

    @call_wrapper
    def select_related(self, *fields):
        pass

    @call_wrapper
    def none(self):
        pass

    @call_wrapper
    def values_list(self, *fields, flat=False, named=False):
        pass

    @call_wrapper
    def annotate(self, *args, **kwargs):
        pass

    @call_wrapper
    def raw(self, raw_query, params=None, translations=None, using=None):
        pass

    @call_wrapper
    def values(self, *fields, **expressions):
        pass

    def union(self, *other_qs, all=False):
        querysets = []
        for o_qs in other_qs:
            if len(o_qs.querysets) != len(self.querysets):
                raise ValueError('Incompatible CustomQueryset')
            querysets += [qs.union(o) for qs, o in zip(self.querysets, o_qs.querysets)]
        return CustomQueryset(querysets=querysets)

    def aggregate(self, *args, **kwargs):
        counter = collections.Counter()
        # Sum of values by key if function are Count or Sum
        if all([isinstance(t, (Count, Sum)) for t in kwargs.values()]):
            for d in [qs.aggregate(*args, **kwargs) for qs in self.querysets]:
                counter.update(d)
            return dict(counter)
        else:
            raise TypeError('Only Count ans Sum method in CustomQueryset')

我修改了工会经理:

class UnionManager(models.Manager):

    def get_queryset(self):

        return CustomQueryset(
            querysets=[super().get_queryset().using('db1'), super().get_queryset().using('db2')],
            model=self.model,
        )

现在Account.union_manager.all()从我的2个模式返回对象!你知道吗

相关问题 更多 >