[从alpakka kafka消费时我收到了死信

问题描述 投票:0回答:1

[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka:// default)] [7347a]正在启动。 StageActor Actor [akka:// default / system / Materializers / StreamSupervisor-0 / $$ a#-993709322]SLF4J:找不到SLF4J提供程序。SLF4J:默认为无操作(NOP)记录器实现SLF4J:有关更多详细信息,请参见http://www.slf4j.org/codes.html#noProviders。[INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka:// default)] [7347a]正在完成[INFO] [akkaDeadLetter] [06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka:// default / system / kafka-consumer-1]消息[akka。 kafka.internal.KafkaConsumerActor $ Internal $ StopFromStage]从Actor [akka:// default / system / Materializers / StreamSupervisor-0 / $$ a#-993709322]到Actor [akka:// default / system / kafka-consumer-1# 76265671]未交付。 [1]遇到死信。如果这不是预期的行为,则Actor [akka:// default / system / kafka-consumer-1#76265671]可能已意外终止。可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'来关闭或调整此日志记录。

这是我面临的问题,但仅当我使用带有Kafka源的流时。

这是我的代码

val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
  val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
    Json.parse(message.value).as[Book]
  }
  val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))

  def toInstant(book: Book): Future[Book] = Future{
    val array = book.publicationDate.split("-")
    val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
    val timeTillNow = Time.timeSince(instant)
    val royalty = if (timeTillNow.toDays > 1000) {
      .10 * book.price * book.copiesSold
    } else {
      .15 * book.price * book.copiesSold
    }
    book.copy(royalty = Some(royalty))
  }
 val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))

在运行时,我收到了死信bookData.via(parseBook).via(flow).runWith(print)但运行此程序时没有死信:bookData.via(parseBook).runWith(print)。即使在运行后面的一个,我也得到了预期的输出。

有人可以帮忙吗?

scala apache-kafka akka
1个回答
0
投票

这很可能发生,因为流程引发了异常。通常,我希望看到包含更多详细信息的日志记录,但是由于未配置SLF4J实现,因此可能会丢失这些详细信息。有关如何配置日志记录的详细信息,请参见Akka文档中的"SLF4J backend"

查看您的代码,我认为最有可能在此行出现错误:

val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)

请注意,它在日期和月份中都使用array(2).toInt。我的猜测是该月份应为Month.of(array(1).toInt)。如果array(2)确实是月份中的某天,则任何大于12的值都将导致Month.of抛出Month.of,这将终止流。

© www.soinside.com 2019 - 2024. All rights reserved.