如何使用子查询编写Django查询作为WHERE子句的一部分?

问题描述 投票:2回答:3

我正在使用Django和Python 3.7。我无法弄清楚如何编写一个Django查询,其中有一个子查询作为where子句的一部分。这是模特......

class Article(models.Model):
    objects = ArticleManager()
    title = models.TextField(default='', null=False)
    created_on = models.DateTimeField(auto_now_add=True)


class ArticleStat(models.Model):
    objects = ArticleStatManager()
    article = models.ForeignKey(Article, on_delete=models.CASCADE, related_name='articlestats')
    elapsed_time_in_seconds = models.IntegerField(default=0, null=False)
    votes = models.FloatField(default=0, null=False)


class StatByHour(models.Model):
    index = models.FloatField(default=0)
    # this tracks the hour when the article came out
    hour_of_day = IntegerField(
        null=False,
        validators=[
            MaxValueValidator(23),
            MinValueValidator(0)
        ]
    )

在PostGres中,查询看起来类似于

SELECT *
FROM article a,
     articlestat ast
WHERE a.id = ast.article_id
  AND ast.votes > 100 * (
    SELECT "index" 
    FROM statbyhour 
    WHERE hour_of_day = extract(hour from (a.created_on + 1000 * interval '1 second')))

请注意子查询是WHERE子句的一部分

ast.votes > 100 * (select index from statbyhour where hour_of_day = extract(hour from (a.created_on + 1000 * interval '1 second'))) 

所以我以为我可以这样做......

hour_filter = Func(
    Func(
        (F("article__created_on") + avg_fp_time_in_seconds * "interval '1 second'"),
        function='HOUR FROM'),
    function='EXTRACT')
...
votes_criterion2 = Q(votes__gte=F("article__website__stats__total_score") / F(
    "article__website__stats__num_articles") * settings.TRENDING_PCT_FLOOR *
                                StatByHour.objects.get(hour_of_day=hour_filter) * day_of_week_index)
qset = ArticleStat.objects.filter(votes_criterion1 & votes_criterion2,
                                  comments__lte=25)

但这会导致“无法解析关键字'文章'到字段中。选项包括:hour_of_day,id,index,num_articles,total_score”错误。我认为这是因为Django在运行其中的较大查询之前正在调查我的“StatByHour.objects”查询,但我不知道如何重写事物以使子查询同时运行。

编辑:K,将我的子查询移动到实际的“子查询”函数中,并引用我使用OuterRef创建的过滤器...

hour_filter = Func(
    Func(
        (F("article__created_on") + avg_fp_time_in_seconds * "interval '1 second'"),
        function='HOUR FROM'),
    function='EXTRACT')
query = StatByHour.objects.get(hour_of_day=OuterRef(hour_filter))


...
votes_criterion2 = Q(votes__gte=F("article__website__stats__total_score") / F(
    "article__website__stats__num_articles") * settings.TRENDING_PCT_FLOOR *
                                Subquery(query) * 
                 day_of_week_index)
qset = ArticleStat.objects.filter(votes_criterion1 & votes_criterion2,
                                  comments__lte=25)

这会导致一个

This queryset contains a reference to an outer query and may only be used in a subquery.

这很奇怪,因为我在子查询中使用它。

编辑#2:即使根据给出的答案更改查询...

hour_filter = Func(
    Func(
        (F("article__created_on") + avg_fp_time_in_seconds * "interval '1 second'"),
        function='HOUR FROM'),
    function='EXTRACT')
query = StatByHour.objects.filter(hour_of_day=OuterRef(hour_filter))[:1]

...
votes_criterion2 = Q(votes__gte=F("article__website__stats__total_score") / F(
    "article__website__stats__num_articles") * settings.TRENDING_PCT_FLOOR *
                                Subquery(query) *
                                day_of_week_index)
qset = ArticleStat.objects.filter(et_criterion1 & et_criterion2 & et_criterion3,
                                  votes_criterion1 & votes_criterion2,
                                  article__front_page_first_appeared_date__isnull=True,
                                  comments__lte=25)

我仍然得到错误

'Func' object has no attribute 'split'
python django python-3.x postgresql subquery
3个回答
0
投票

Subqueries需要是未立即评估的查询,以便可以推迟评估,直到运行外部查询。 get()不适合账单,因为它立即执行并返回一个对象实例而不是Queryset

然而,用filter代替get,然后取一个[:1]切片应该有效:

StatByHour.objects.filter(hour_of_day=OuterRef('hour_filter')).values('hour_of_day')[:1]

注意OuterRef中的字段引用是字符串文字而不是变量。

此外,子查询需要返回单个列和单个行(因为它们被分配给单个字段),因此values()和上面的切片。

另外,我还没有在Q对象中使用子查询;我不确定它会起作用。您可能必须先在注释中保存子查询输出,然后将其用于过滤器计算。


0
投票

我发现尽可能多地移动到注释中以澄清正在发生的事情是有帮助的。

您可以使用Extract函数来获取小时数。如果你想要加入更复杂的avg_fp_time_in_seconds东西,你需要定义你自己的Func,我没有尝试复制,因为它应该得到自己的帖子(这可能是'Func' object has no attribute 'split'错误来自的地方)。

# First, add a field for the hour 
articles_with_hour = Article.objects.annotate(created_on_hour=ExtractHour('created_on'))

# Set up the subquery, referencing the annotated field
for_this_hour = StatByHour.objects.filter(hour_of_day=OuterRef('created_on_hour'))

# Add the subquery, making sure to slice down to one value
articles_with_hour_index = articles_with_hour.annotate(
    index_for_this_hour=Subquery(for_this_hour.values('index')[:1]),
)

# Add the website averages for later calculations 
#  (note if total_score and num_articles are different field types
#  you may need an ExpressionWrapper)
articles_with_avg_website_score = articles_with_hour_index.annotate(
    average_article_score_for_website=(
        F("website__stats__total_score") / F("website__stats__num_articles")
    )
)

# Use the averages to calculate the trending floor for each article
articles_with_trending_floor = articles_with_avg_website_score.annotate(
    trending_floor=F('average_article_score_for_website') * settings.TRENDING_PCT_FLOOR,
)

# Set up the criteria, referencing fields that are already annotated on the qs
# ...
votes_gte_trending_floor_for_this_hour_criterion = Q(articlestats__votes__gte=(
    F('trending_floor')
    * F('index_for_this_hour')
    * day_of_week_index  # not sure where this comes from?
))
# ...

# Then just filter down (note this is an Article QuerySet, not ArticleStat)
qset = articles_with_trending_floor.filter(
    votes_gte_trending_floor_for_this_hour_criterion,
    # other criteria
    front_page_first_appeared_date__isnull=True,
    articlestats__comments__lte=25,
)

这些计算中的许多都可以被浓缩,甚至可以使用多个kwargs在单个annotate调用中完成所有操作,但我认为将其全部放在一起会使其更容易理解。


0
投票

这肯定像是一个Subquery解决方案。

Django> = 1.11

作为一个警告,我测试了代码,但只是模型,我没有任何数据,所以,这个答案只是努力指出你在正确的方向

# Query that references an outer field from another model, in this case created_on.
# On wich we are performing a lookup in order to "extract" the hour (assuming here)
# a DateTimeField or a TimeField.
stat_by_hour = StatByHour.objects.filter(hour_of_day=OuterRef('created_on__hour'))


# Then filter articles, that have articlestats.votes 
# greater than 100 * stat_by_hour.index
result = Article.objects.filter(
    articlestats__votes__gt=100 * Subquery(stat_by_hour.values('index')[:1], output_field=FloatField())
)

乍一看,它看起来你可能需要在子查询中执行order_by('index')order_by('-index'),这样切片[:1]将获得最小值或最大值(取决于您的需要。)

我相信你可以使用它(或非常相似的东西)来实现你想要的。


0
投票

看看Django queries。我认为您可以通过将SQL基本查询更改为Django提供的内容来解决问题。

如果它不起作用,你可以perform raw SQL queries


0
投票

使用通过hour_of_day=ExtractHour(OuterRef('article__created_on') + timedelta(seconds=avg_fp_time_in_seconds))过滤的子查询进行过滤。真正的代码将需要一个额外的ExpressionWrapper,只适用于Django >= 2.1.0

import datetime

from django.db import models
from django.db.models import F, OuterRef, Subquery, Value
from django.db.models.functions import ExtractHour, Coalesce
from django.db.models.expressions import ExpressionWrapper


relevant_hour_stats = (
    StatByHour.objects
    .filter(
        hour_of_day=ExtractHour(ExpressionWrapper(
            OuterRef('article__created_on')  # NOTE: `OuterRef()+Expression` works only on Django >= 2.1.0
            +
            datetime.timedelta(seconds=avg_fp_time_in_seconds),
            output_field=models.DateTimeField()
        )),
    )
    .annotate(
        votes_threshold=Coalesce(
            100.0 * F('index'),
            0.0,
            output_field=models.FloatField(),
        ),
    )
    .order_by('-votes_threshold')
    # NOTE: your StatByHour model does not have unique=True on hour_of_day
    # field, so there may be several stat for same hour.
    # And from your SQL example it's unclear how should they be handled. So I
    # assume that "greatest" threshold is needed.
)

article_stats = (
    ArticleStat.objects
    .all()
    .filter(
        votes__gt=Coalesce(
            Subquery(relevant_hour_stats.values('votes_threshold')[:1]),
            Value(0.0),
            output_field=models.FloatField(),
        ),
    )
)

附:如果你在github上设置一些“演示项目”会更容易,这样任何人都可以克隆它并在本地检查他们的想法。

P.P.S.此代码经过测试可用,但在不同的型号/字段上:

In [15]: relevant_something = (ModelOne.objects.filter(index=ExtractHour(ExpressionWrapper(OuterRef('due_date') + datetime.timedelta(seconds=1000), output_field=models.DateTimeField()))).annotate(votes_threshold=100*F('indent')).order_by('-votes_threshold'))

In [16]: ts = ModelTwo.objects.all().filter(votes__gt=Subquery(relevant_notes.values('votes_threshold')[:1], output_field=models.IntegerField()))

In [17]: print(ts.query)
SELECT 
    ...
FROM 
    "some_app_model_two" 
WHERE 
    "some_app_model_two"."votes" > (
        SELECT 
            (100 * U0."indent") AS "votes_threshold" 
        FROM 
            "some_app_model_one" U0 
        WHERE 
            U0."index" = (
                EXTRACT(
                    'hour' 
                    FROM ("some_app_model_two"."due_date" + 0:16:40) 
                    AT TIME ZONE 'America/Los_Angeles'
                )
            ) 
        ORDER BY "votes_threshold" DESC 
        LIMIT 1
    )
ORDER BY 
    "some_app_model_two"."due_date" ASC, 
    "some_app_model_two"."priority" ASC, 
    "some_app_model_two"."updated_at" DESC

因此,如果您收到任何错误,请显示您正在运行的ACTUAL代码

© www.soinside.com 2019 - 2024. All rights reserved.