我有一个混合应用程序,使用
connectMicroservice
连接 Kafka 微服务
const application = await NestFactory.create(appModule)
application.connectMicroservice(
{ transport: Transport.KAFKA, { client: { clientId: 'id', brokers: ['broker:123'] } } },
{ inheritAppConfig: true }
)
await application.startAllMicroservices()
await application.listen(3000)
在业务逻辑中的某个地方,我抛出了一个仅用于日志记录的无效消息的错误。
throw new RpcException('')
我使用全局异常处理程序进行日志记录。 (但是在没有自定义异常处理程序的情况下会出现此问题)。
@Catch()
export class GlobalExceptionFilter implements RpcExceptionFilter {
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
const data = host.switchToRpc().getData()
console.log(data)
return throwError(() => exception.getError())
}
}
我想提交此失败的消息来处理以下消息,但无论我传递给微服务的选项如何,该消息都永远不会提交,并且应用程序会重试处理此无效消息,这会阻止所有其他消息。我尝试了不同的
client
配置但无济于事
retry: { retries: 1 }
我看到以下错误;
Error: ERROR [Runner] Error when calling eachMessage
我为run
尝试了不同的
option配置,但似乎没有任何效果。我还尝试为
eachMessage
定义自定义函数,但它没有被触发。
我正在考虑通过定义客户端来手动提交偏移量,但是nestjs文档似乎缺少一点,因为我很难让
ClientsModule.register
与application.connectMicroservice
一起工作。
我遇到有关已在定义的端口上运行的应用程序的问题,或者该应用程序似乎没有侦听该主题...
检查消息负载。
我收到“错误:ERROR [Runner] 调用eachMessage 时出错..”
但是,就我而言,我正在使用 Kafka 桌面客户端作为生产者来调试消费者。
该错误是由我从生产者发送的 JSON 负载中的语法错误引起的(额外的逗号)。修复 JSON 有效负载后,一切都按预期运行。