[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka:// default)] [7347a]正在启动。 StageActor Actor [akka:// default / system / Materializers / StreamSupervisor-0 / $$ a#-993709322]SLF4J:找不到SLF4J提供程序。SLF4J:默认为无操作(NOP)记录器实现SLF4J:有关更多详细信息,请参见http://www.slf4j.org/codes.html#noProviders。[INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka:// default)] [7347a]正在完成[INFO] [akkaDeadLetter] [06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka:// default / system / kafka-consumer-1]消息[akka。 kafka.internal.KafkaConsumerActor $ Internal $ StopFromStage]从Actor [akka:// default / system / Materializers / StreamSupervisor-0 / $$ a#-993709322]到Actor [akka:// default / system / kafka-consumer-1# 76265671]未交付。 [1]遇到死信。如果这不是预期的行为,则Actor [akka:// default / system / kafka-consumer-1#76265671]可能已意外终止。可以使用配置设置'akka.log-dead-letters'和'akka.log-dead-letters-during-shutdown'来关闭或调整此日志记录。
这是我面临的问题,但仅当我使用带有Kafka源的流时。
这是我的代码
val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
Json.parse(message.value).as[Book]
}
val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))
def toInstant(book: Book): Future[Book] = Future{
val array = book.publicationDate.split("-")
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
val timeTillNow = Time.timeSince(instant)
val royalty = if (timeTillNow.toDays > 1000) {
.10 * book.price * book.copiesSold
} else {
.15 * book.price * book.copiesSold
}
book.copy(royalty = Some(royalty))
}
val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
在运行时,我收到了死信bookData.via(parseBook).via(flow).runWith(print)
但运行此程序时没有死信:bookData.via(parseBook).runWith(print)
。即使在运行后面的一个,我也得到了预期的输出。
有人可以帮忙吗?
这很可能发生,因为流程引发了异常。通常,我希望看到包含更多详细信息的日志记录,但是由于未配置SLF4J实现,因此可能会丢失这些详细信息。有关如何配置日志记录的详细信息,请参见Akka文档中的"SLF4J backend"。
查看您的代码,我认为最有可能在此行出现错误:
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
请注意,它在日期和月份中都使用
array(2).toInt
。我的猜测是该月份应为Month.of(array(1).toInt)
。如果array(2)
确实是月份中的某天,则任何大于12的值都将导致Month.of
抛出Month.of
,这将终止流。