在Django REST List API视图中对原始SQL查询进行分页的最佳方法是什么?

问题描述 投票:1回答:3

我有一个原始的SQL查询,我用它来构建Django REST ListAPI视图的查询集。它符合以下几点(请原谅无意义的名称):

class MyView(ListAPIView):
    serializer_class = MySerializer
    paginate_by = 10
    def get_queryset(self):
        params = {
            "uid": str(self.request.user.id),
            "param": str(self.kwargs['param'])
        }
        query = 'SELECT f.id ' \
            'FROM myapp_foo f, myapp_bar b ' \
            'WHERE b.foo_id = f.id AND ' \
            'b.param >= %(param)s AND ' \
            'f.dt_tm >= NOW() AND ' \
            '(SELECT COUNT(*) FROM myapp_baz z ' \
            'WHERE z.user_id = %(uid)s AND ' \
            'z.qux_id = f.qux_id) = 0 ' \
            'ORDER BY f.dt_tm;'
        return Foo.objects.raw(query, params)

这给出了错误:

object of type 'RawQuerySet' has no len()

我想用类似的SQL查询计算一个计数,然后使用LIMIT和OFFSET参数来进行分页。我已经阅读了一些建议,其中列表项目被计算得到len,但这似乎并不令人满意,因为除非查询中有一个小的LIMIT(在任何情况下都会破坏分页的目的),它将是低效的。

更新:我刚注意到paginate_by正在等待弃用。

首先,我将如何向返回的对象添加count方法?

python sql django pagination django-rest-framework
3个回答
3
投票

比其他替代方案更有效的解决方案是编写自己的RawQuerySet替代品。我正在显示下面的代码,但你也可以access it as a gist here。绝对不能保证没有错误;然而,我在Python 3的Django 1.11中使用它(PostgreSQL作为数据库;也应该与MySQL一起使用)。简单地说,这个类将相应的LIMITOFFSET子句添加到原始SQL查询中。没有什么是疯狂的,只是一些简单的字符串连接,所以请确保不要在原始SQL查询中包含这些子句。

The class

from django.db import models
from django.db.models import sql
from django.db.models.query import RawQuerySet


class PaginatedRawQuerySet(RawQuerySet):
    def __init__(self, raw_query, **kwargs):
        super(PaginatedRawQuerySet, self).__init__(raw_query, **kwargs)
        self.original_raw_query = raw_query
        self._result_cache = None

    def __getitem__(self, k):
        """
        Retrieves an item or slice from the set of results.
        """
        if not isinstance(k, (slice, int,)):
            raise TypeError
        assert ((not isinstance(k, slice) and (k >= 0)) or
                (isinstance(k, slice) and (k.start is None or k.start >= 0) and
                 (k.stop is None or k.stop >= 0))), \
            "Negative indexing is not supported."

        if self._result_cache is not None:
            return self._result_cache[k]

        if isinstance(k, slice):
            qs = self._clone()
            if k.start is not None:
                start = int(k.start)
            else:
                start = None
            if k.stop is not None:
                stop = int(k.stop)
            else:
                stop = None
            qs.set_limits(start, stop)
            return qs

        qs = self._clone()
        qs.set_limits(k, k + 1)
        return list(qs)[0]

    def __iter__(self):
        self._fetch_all()
        return iter(self._result_cache)

    def count(self):
        if self._result_cache is not None:
            return len(self._result_cache)

        return self.model.objects.count()

    def set_limits(self, start, stop):
        limit_offset = ''

        new_params = tuple()
        if start is None:
            start = 0
        elif start > 0:
            new_params += (start,)
            limit_offset = ' OFFSET %s'
        if stop is not None:
            new_params = (stop - start,) + new_params
            limit_offset = 'LIMIT %s' + limit_offset

        self.params = self.params + new_params
        self.raw_query = self.original_raw_query + limit_offset
        self.query = sql.RawQuery(sql=self.raw_query, using=self.db, params=self.params)

    def _fetch_all(self):
        if self._result_cache is None:
            self._result_cache = list(super().__iter__())

    def __repr__(self):
        return '<%s: %s>' % (self.__class__.__name__, self.model.__name__)

    def __len__(self):
        self._fetch_all()
        return len(self._result_cache)

    def _clone(self):
        clone = self.__class__(raw_query=self.raw_query, model=self.model, using=self._db, hints=self._hints,
                               query=self.query, params=self.params, translations=self.translations)
        return clone

How to use it

自定义经理

我通过自定义管理器使用上面的查询集:

class MyModelRawManager(models.Manager):
    def raw(self, raw_query, params=None, translations=None, using=None):
        if using is None:
            using = self.db
        return PaginatedRawQuerySet(raw_query, model=self.model, params=params, translations=translations, using=using)

    def my_raw_sql_method(some_arg):
        # set up your query and params
        query = 'your query'
        params = ('your', 'params', 'tuple')
        return self.raw(raw_query=query, params=params)

自定义分页类

为了完成,我还包括一个分页类:

from rest_framework.pagination import PageNumberPagination


class MyModelResultsPagination(PageNumberPagination):
    """Fixed page-size pagination with 10 items."""
    page_size = 10
    max_page_size = 10

你的ListAPIView

class MyModelView(generics.ListAPIView):

    serializer_class = MyModelSerializer
    pagination_class = MyModelResultsPagination

    def get_queryset(self):
        return MyModel.raw_manager.my_raw_sql_method(some_arg)

A word of caution

PaginatedRawQuerySet类虽然对我有用,但尚未经过广泛测试,但我相信它确实提供了一个解决方案,它比为每个调用选择查询集中的所有项目更有效。

你可能会注意到有一个自定义的count方法实现(最初从RawQuerySet中丢失),它通过调用self.model.objects.count()来计算。如果没有这种方法,分页器将评估len(your_raw_queryset),这将对性能产生与其他答案相同的效果。

这个类并不是RawQuerySet的一个通用的替代品,这意味着您应该添加自己的自定义项以使其符合您的需求。

例如,如果你需要更复杂的东西,可以在PaginatedRawQuerySet类中添加另一个属性,称为raw_count_query,然后在count()中调用,而不是按照它现在的方式计算所有对象(这将用于你的情况)需要过滤; raw_count_query将提供SQL根据您的条件计算子集)。


2
投票

如果在返回之前将原始查询集强制转换为列表,则应该阻止'RawQuerySet' has no len()错误。

return list(Foo.objects.raw(query))

正如您所说,这将是低效的,因为它将加载整个查询集。

可以编写一个自定义分页类,使用limit和offset有效地分页,并在视图中使用pagination_class属性。


0
投票

我遇到了同样的问题,我发现使用raw可以使用额外的:

(...)
return Foo.objects.extra(where=query, params=params) 

变量附加

where=['data->>"$.SOMETHING" = %s OR data->>"$.SOMETHING" = %s OR data->>"$.SOMETHING" = %s', 'data->>"$.GROUP" LIKE %s'] 
params=['EX1', 'EX2', 'EX3', '%EXEMPLE4%']

注意:主要问题是使用具有QuerySet相同属性的RawQuerySet,如果可能的话,IMHO使用额外的QuerySet API的最佳方式。

© www.soinside.com 2019 - 2024. All rights reserved.