我有一个邮政模型。有一个字段
publish_date
。如果帖子的状态为 planned
,那么我需要在publish_date 上执行发布功能。我怎样才能做到这一点?
models.py:
class Post(models.Model):
STATES = (
('draft', 'Draft'),
('published', 'Published'),
('planned', 'Planned')
)
state = models.CharField(choices=STATES, default=STATES[0][0])
channels = models.ManyToManyField('channel.Channel')
creator = models.ForeignKey('authentication.User', on_delete=models.SET_NULL, null=True)
publish_date = models.DateTimeField()
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
@receiver(post_save, sender=Post)
def reschedule_publish_task(sender, instance, **kwargs):
# There should be a task setting for a specific time here, as I understand it
我尝试这样做,但没有成功,任务没有完成。
模型.py
@receiver(post_save, sender=Post)
def reschedule_publish_task(sender, instance, **kwargs):
task_id = f"publish_post_{instance.id}"
if instance.state == 'planned' and instance.publish_date:
publish_post.apply_async((instance.id,), eta=instance.publish_date, task_id=task_id)
任务.py
@shared_task
def publish_post(post_id: int) -> None:
from .models import Post
post = Post.objects.filter(id=post_id).first()
if post:
if post.state == 'planned' and post.publish_date <= now():
post.state = 'published'
post.save()
您可以尝试安装
django-celery-results
模块,它将帮助您从管理面板查看 Celery 日志。
@shared_task
def publish_post(post_id: int) -> None:
from .models import Post
post = Post.objects.filter(id=post_id).first()
if post:
if post.state == 'planned' and post.publish_date <= now():
post.state = 'published'
post.save()
return JsonResponse({"success": True})
return JsonResponse({"success": False, "error": "date of post Error", "data": f"Post ID: {post_id} \n Post date: {post.publish_date} \n Post state {post.state} \n Date now: {now()}"})
return JsonResponse({"success": False, "error": "Post not found", "data": post_id})
此外,请确保worker和beat都在运行。