姜戈芹菜。在publish_date发布帖子

问题描述 投票:0回答:1

我有一个邮政模型。有一个字段

publish_date
。如果帖子的状态为
planned
,那么我需要在publish_date 上执行发布功能。我怎样才能做到这一点?

models.py:

class Post(models.Model):
    STATES = (
        ('draft', 'Draft'),
        ('published', 'Published'),
        ('planned', 'Planned')
    )
    state = models.CharField(choices=STATES, default=STATES[0][0])
    channels = models.ManyToManyField('channel.Channel')
    creator = models.ForeignKey('authentication.User', on_delete=models.SET_NULL, null=True)
    publish_date = models.DateTimeField()
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)


@receiver(post_save, sender=Post)
def reschedule_publish_task(sender, instance, **kwargs):
    # There should be a task setting for a specific time here, as I understand it

我尝试这样做,但没有成功,任务没有完成。

模型.py

@receiver(post_save, sender=Post)
def reschedule_publish_task(sender, instance, **kwargs):
    task_id = f"publish_post_{instance.id}"

    if instance.state == 'planned' and instance.publish_date:
        publish_post.apply_async((instance.id,), eta=instance.publish_date, task_id=task_id)

任务.py

@shared_task
def publish_post(post_id: int) -> None:
    from .models import Post

    post = Post.objects.filter(id=post_id).first()
    if post:
        if post.state == 'planned' and post.publish_date <= now():
            post.state = 'published'
            post.save()
python django celery django-celery celerybeat
1个回答
0
投票

您可以尝试安装

django-celery-results
模块,它将帮助您从管理面板查看 Celery 日志。

@shared_task
def publish_post(post_id: int) -> None:
   from .models import Post

   post = Post.objects.filter(id=post_id).first()
   if post:
       if post.state == 'planned' and post.publish_date <= now():
           post.state = 'published'
           post.save()
           return JsonResponse({"success": True})
      return JsonResponse({"success": False, "error": "date of post Error", "data": f"Post ID: {post_id} \n Post date: {post.publish_date} \n Post state {post.state} \n Date now: {now()}"})
   return JsonResponse({"success": False, "error": "Post not found", "data": post_id})

此外,请确保worker和beat都在运行。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.