我仅通过使用属性来使用Spring Cloud Streams和默认的Spring Retry机制。它运行良好,可以重试消息,然后转到DLQ。到目前为止一切都很顺利。现在出现了问题...
我需要在消息中添加一些自定义信息,然后再将其从我的服务发送到DLQ。它们足够简单,可以帮助我识别失败的消息,而无需接触通用的有效负载。
可能我可以添加自定义标头或将其包装在已知模型中,在该模型中我可以检索所需的信息-无论是哪种方式,我都需要截取/修改消息。
最简单的方法是什么,而又不花多少钱?我的意思是,我们只使用简单的配置来进行重试,所以“成本”是指将配置与其他内容交换。还是谢谢!
使用Kafka活页夹,您可以将ProducerInterceptor
添加到kafka生产者配置interceptor.classes
。
/**
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
* same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
* as expected.
* <p>
* Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
* Most often, it should be the same topic/partition from 'record'.
* <p>
* Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
* <p>
* Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
* specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
* in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
* the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
* modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
* is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
* @return producer record to send to topic/partition
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
生产者记录包含目标主题名称;您可以在此处添加/删除标题。
目前对于RabbitMQ活页夹没有类似的钩子。如果您使用的是活页夹,请根据活页夹在GitHub上打开新功能。