akka-stream 相关问题

用于处理JVM上的流数据的Akka实现

Scala Akka HTTP Actor 类协议类型不匹配错误

我正在使用 scala 3.3 和 Akka actor 构建 Akka HTTP 应用程序。 当在 StarRegistry 对象中创建恒星、行星或月亮时,我希望将其值临时存储在注册表中...

回答 1 投票 0

如何防止实体在 Pekko/Akka 中多次实体化?

我在 Pekko 代码方面遇到了一些问题,我相信这与默认 Flow 已经具体化数据这一事实有关: def addSha(请求:HttpRequest)(使用 如:ActorSystem[任何], ...

回答 1 投票 0

如何测试偏移量是否已提交到 Kafka

我有一个 Akka Stream Kafka 源,正在从 Kafka 主题中读取数据。 我有一个简单的任务,允许禁用消息偏移量的提交。提交通常是调用 commitScala 完成的...

回答 3 投票 0

Azure 事件中心集成测试

我编写了一些代码来从 Azure 事件中心推送和拉取数据。通过互联网搜索找不到任何有关如何进行集成测试的示例。过去我可以使用

回答 1 投票 0

pekko:将节流阀与 foreachAsync 结合使用

我想对需要异步处理的东西进行速率限制。 但是,当我尝试将throttle 与 Sink.foreachAsync 一起使用时,它似乎不会等待 future 完成才允许...

回答 1 投票 0

通过InputStream管道创建流[ByteString, ByteString, NotUsed]

我需要使用akka不支持的压缩解压缩数据,但其他提供InputStream接口的库支持。 为了使其与 akka 流一起工作,我需要实现函数: ...

回答 1 投票 0

安排 Akka Streams 中的 Kafka Consumer 在一天中的特定时间运行

我使用Akka Streams编写了一个Kafka消费者: RestartSource.withBackoff(consumerResetProps(), () -> Consumer.committablePartitionedSource(consumerProps(), Subscriptions.top...

回答 1 投票 0

Akka 类型的 actor 向另外两个 actor 询问信息的正确方法是什么?

我是 Akka 新手,我正在将传统服务转换为 Actor。 我理解其中大部分内容并没有太大困难,但我对询问另外两个人的正确方法有点困惑

回答 1 投票 0

在 Slick 事务中处理 Akka 流

软件版本: 阿卡2.4.4 光滑3.1.0 我想在 Slick 事务中处理 Akka 流中的元素。 下面是一些简化的代码来说明一种可能的方法: def 插入(d:

回答 1 投票 0

在 Akka Stream BroadcastHub 中使用泛型类

我正在尝试创建 ConsumerRecord 类型的 BroadcastHub 来自相同元素类型的 Source,但不允许泛型类类型。 可运行图 我正在尝试创建一个 BroadcastHub 类型的 ConsumerRecord<String, String> 来自相同元素类型的 Source,但不允许泛型类类型。 RunnableGraph<Source<ConsumerRecord<String, String>, NotUsed>> graph = source.toMat( BroadcastHub.of(ConsumerRecord.class, 256), Keep.right() ); 发现类似的问题已得到解答here,但看起来没有类似的方法BroadcastHub 目前我只能选择将泛型类型包装到包装类中 RunnableGraph<Source<ConsumerRecordWrapper, NotUsed>> graph = source.map(ConsumerRecordWrapper::new) .toMat( BroadcastHub.of(ConsumerRecordWrapper.class, 256), Keep.right() ); 有更好的解决方案吗? 根本问题是擦除。 由于 ConsumerRecord.class 的存在只是为了指导类型推断(在运行时,无论类型如何,类对象都是相同的,并且有效地表现得像它始终 ConsumerRecord<Object, Object> 一样),因此可以安全地使用 Class 对象的未经检查的强制转换来让类型发挥作用。 所以这样的东西适合我编译 @SuppressWarnings("unchecked") final Class<ConsumerRecord<String, String>> consumerRecordStringToStringClass = (Class<ConsumerRecord<String, String>>)(Class<?>)ConsumerRecord.class; RunnableGraph<Source<ConsumerRecord<String, String>, NotUsed>> graph = source.toMat( BroadcastHub.of(consumerRecordStringToStringClass, 256), Keep.right() ); (这是其中一些答案的专业化)

回答 1 投票 0

如何创建具体化为 ActorRef 的 akka 源,其中传入消息知道发送者

val ref = Source.actorRef[String]( 完成匹配器 = PartialFunction.empty, failureMatcher = PartialFunction.empty, 100000, OverflowStrategy.dropNew ).to(Sink.foreachAsync(1){ ...

回答 1 投票 0

将 Akka GraphDSL 与 Zip 阶段结合使用

考虑以下代码: GraphDSL.create() { 隐式构建器 => 导入 GraphDSL.Implicits._ val in = 源(0 到 10) val fanOut = builder.add(广播[Int](2)) val toString = 构建...

回答 1 投票 0

Akka Streams:如何使用 GraphDSL 构建源中源?

这是一个简单的场景。 让我们从单个 Akka 源开始:比方说,从数据库检索的行。基于分区函数,不同的行需要被转移到不同的...

回答 2 投票 0

在失败的 akka 流上跳过流程

我不想在不丢失发生故障时发送的数据的情况下跳过流程。但我找不到办法做到这一点。这是我用来测试的示例代码。 val 决策者:Supervision.Dec...

回答 1 投票 0

通过合并 akka 流中的多个慢速源来保持排序

我需要合并多个慢速源但保持顺序。 如果代码执行两次,顺序必须相同。 非常简单的解决方案如下: 来源 .来自(分区()...

回答 1 投票 0

我们如何使用akka kafka连接器控制从kafka队列轮询消息

我们有一个用例,我们需要根据参与者的可用性使用 akka kafka 连接器控制来自 kafka 队列的消息轮询,以便处理消息。我们有 2 个消费者 5 个

回答 1 投票 0

akka.http.scaladsl.model.ParsingException:使用 akka http 将大文件上传到 S3 时,多部分实体意外结束

我正在尝试使用 Akka HTTP 和 Alpakka S3 连接器将大文件(目前为 90 MB)上传到 S3。它对于小文件(25 MB)工作正常,但是当我尝试上传大文件(90 MB)时,我得到了

回答 2 投票 0

Akka Streaming - 将块重新分配到 max_permissible_chunk_size Scala

我的代码使用 Akka 流将二进制文件上传到 s3,如下所示: 来源 .via(distributeChunks(MAX_BYTES_PER_CHUNK)) .throttle(maxRequestsPerSecond, 1.秒,

回答 1 投票 0

来自具有阻塞操作的迭代器的 Akka 源代码

Source.fromIterator 上的 Akka 文档 (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) 说: 如果迭代器执行阻塞操作,请确保 r...

回答 1 投票 0

Akka Streams 递归流程调用

我正在尝试使用 Akka Streams 实现分页。目前我有 case class SomeObject(id:Long, next_page:Option[Map[String,String]]) def chainRequests(uri: Uri): Future[Option[(Uri, T)]] =...

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.