我必须在mysql数据库中进行保存,然后通过Rabbitmq向我的客户端发送一个事件。
现在,当推到RMq时,如果由于某种原因失败,我需要回滚保存。应该是全部还是什么都不应该,数据库中不能有没有发生事件的数据。
本质上是,
Begin Transaction
Save to db
Push to queue
If exception rollback else commit
End transaction
另一种方法
现在,我只能在事务中执行保存操作。然后,如果失败了,可以采取某种方法重试该队列,但这会变得过于复杂。
是否有任何最佳做法?有关遵循哪种方法的任何建议。
PS:rmq上的事件包含其某些数据已更改的ID。希望客户端对更改后的ID进行http获取以执行其操作。
好吧,您几乎可以正确地进行所有操作。 RabbitMQ伙计们发表了一篇有关信号量队列的好文章:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/
[我认为这是“最佳实践”,因为它是由参与开发RabbitMQ的人们编写的。
如果我的理解正确,您主要关心的是您是否发布了要成功交换的消息。您可以在RabbitMQ中使用Confirms以确保您的消息被队列接受。
取决于您如何设计数据库中的更新,以及从数据库更新到客户端获得ID所允许的时间差多少,因此请注意以下几点:
您可以在数据库中添加一个新字段,您可以将其用作标记来检查特定更新的消息是否发送到RabbitMQ。当您执行更新时,将标志设置为0,尝试将消息发布到交换,如果成功,则将标志更新为1。如果需要使用旧值,直到将消息发送给客户端,这可能会很复杂。已发送,将需要您隔一段时间浏览数据库,并尝试将所有未将标志设置为1的行再次发送到RabbitMQ消息。
将消息发布到RabbitMQ,如果得到确认,则提交数据库更新。如果由于某种原因提交失败,您的客户端将进行更新,但会获得通常不会出现问题的旧值,但这实际上取决于您的应用程序。这可能是更好/更简便的方法,尤其是因为没有理由期望任何事情都会不断失败。您还应该考虑从客户端接收到要更新的ID并实际进行更新的时间起,在客户端增加某种短暂的延迟(给数据库更新一些时间)。
在这种异构环境中,您可以使用Two Phase Commits。您将“脏”记录添加到数据库,发布消息,然后编辑记录以将其标记为“干净”并准备就绪。
[另外,当您处理消息传递中间件时,您必须做好准备,以防某些消息可能丢失或它们的使用者无法完全处理它,因此可能需要来自使用者的一些反馈,例如您发布消息并在消费者收到消息时,然后它将记录添加到db。