flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。

如何在 Flink streaming 中缓存算子级别的并发 hashmap?

可能类似于此处发布的问题,但我的目标是在单个平面映射运算符中跨所有并行性共享并发哈希图。 我有一个包含 的哈希图 可能类似于 here 发布的问题,但我的目标是在单个平面映射运算符中跨所有并行性共享并发哈希图。 我有一个包含 映射的 hashmap,我希望这个映射在运行我的 flatmap 函数的所有任务槽之间共享。 我想要它共享的原因是因为我的平面图是通过 Tuple2 中的 2 值键控的,第一个值是我关心的键,第二个值是辅助键,因为我希望每个事件都有两个键组合最终成为一名工人并聚合。因此,第一个键可能会出现在多个工作线程中,所以我想要一种方法来像某种缓存一样在所有工作线程之间共享一个巨大的哈希图。 Flink 可以吗?这是个好主意吗?谢谢。

回答 0 投票 0

Pyflink->Elastic 将 Varchar 转换为 Long?

我上周开始使用 Pyflink,发现自己陷入了困境。 基本上我尝试从源 A 导入数据并将其下沉到 Elastic,效果很好,但有一个特殊的 ...

回答 1 投票 0

Flink Operator 设计建议

我有一些数据需要处理和聚合,最好避免数据倾斜,但我不确定如何设计能够做到这一点的拓扑。 例如,我的数据如下所示: 结构对象...

回答 0 投票 0

Flink直接消费过滤

我有一个 Flink 消费者,它使用一个 protobuf 结构,只需要几个字段就可以发送给其他运营商。目前,我正在使用一个简单的自定义 deser 模式,它只返回 pb 和...

回答 0 投票 0

Flink RocksDB 自定义选项工厂配置错误禁用块缓存

我正在运行 Flink 1.15.2 并尝试在 RocksDB 中定义一个自定义选项工厂以禁用块缓存。 按照此博客文章中的示例:https://shopify.engineering/optimizing-a...

回答 1 投票 0

Flink 不要用 EventTimeWindows 关闭窗口

为什么这段代码不给出任何东西? 如果我更改为 TumblingProcessingTimeWindows - 一切正常。 我没有在文档中找到我必须添加的其他内容?触发器?驱逐者? 允许延迟...

回答 2 投票 0

微型集群上的 Flink 指标

我希望将石墨指标添加到我的 flink 微型集群中,但我很困惑如何开始这个,因为我能找到的大部分文档都是关于通过配置向集群添加指标的......

回答 0 投票 0

EventTimeWindows 的 Flink 最低要求

使用 EventTimeWindows 的最低要求是什么? 为什么这段代码不给出任何东西? 如果我更改为 TumblingProcessingTimeWindows - 一切正常。 我没有在文档中找到什么

回答 0 投票 0

Flink KafkaConsumer / KafkaSource with AZ Awareness

我想创建一个具有 KafkaSource 的 Flink 流应用程序。 KafkaSource 应该连接到与创建连接的任务管理器位于同一 AZ 中的代理。 我怎样才能达到...

回答 0 投票 0

如何从程序中停止 flink 流作业

我正在尝试为 Flink 流作业创建 JUnit 测试,该作业将数据写入 kafka 主题并分别使用 FlinkKafkaProducer09 和 FlinkKafkaConsumer09 从同一 kafka 主题读取数据...

回答 5 投票 0

Flink StreamingFileSink - ParquetAvroWriters

我正在使用 Flink - 流式文件接收器来写入传入数据 S3 存储桶。我的代码与 forRowFormat 选项完美配合。 现在我正在尝试设置 forBulkFormat 选项以在 parquet 中写入数据 ...

回答 2 投票 0

Flink,如何避免聚合窗口末尾的尖峰

我有一个 flink 作业 (1.14),每隔几个小时计算一次窗口聚合。我发现所有事件都有相同的窗口范围,因此数百万个聚合结果都在 ...

回答 0 投票 0

PyFlink 模块 java.base 不会对未命名模块“打开 java.lang”

我想运行 Flink 文档中的简单示例。 开始后我得到异常: 无法使字段 private final byte[] java.lang.String.value 可访问: 模块 java.base 没有“

回答 0 投票 0

在 apache flink 中,我们应该更新每个收集还是每个输入的状态?

想象一个案例,输入是一个文件名,我们想使用 flink RichFlatMapFunction 更新文件的状态和输出行(每个文件包含 10k 行)。我想知道在哪里...

回答 1 投票 0

如何使用 big keyed 在 Flink Apache 上工作?

在我的流应用程序中,它每秒将接收 70k 条数据记录。每条记录都有一个密钥 (FQDN)。 示例记录即将到来的数据: { “unixTime”:1680064946, "FQDN" : "...

回答 0 投票 0

在作业中动态更改 Flink 作业主题

用例是——一个从不断变化的主题列表中读取、处理它们并写入不同的 Kafka 主题的管道。 所以目前有 150 个 Kafka 主题,但这个列表可以改变。 有

回答 1 投票 0

Flink Kafka:期望类型是 PojoTypeInfo

我的客户类已经使用 maven-avro 插件创建。当我尝试运行这个程序时,我收到错误作为线程“主”java.lang.IllegalStateException 中的异常:期待类型...

回答 3 投票 0

Flink 表获取类型信息

我有一个 flink 表,假设 CREATE TABLE source(id int, name string) with (...) 和目标表,假设 CREATE TABLE destination(id int, unique_name string) with (...)。 unique_name 是

回答 2 投票 0

Flink Streaming - 如何安排数据流在 X 分钟后重新处理?

我有事件的输入数据流, 在处理它们时,我想参考一些要再次重新处理的事件 几分钟后。 有没有办法实现它? 这是一个简化的

回答 1 投票 0

我们可以使用 Flink REST API 和 Flink“应用程序”部署模式吗?

我首先声明了一个使用 Flink“应用程序”模式的工作,然后尝试使用 Flink REST API 在该集群上上传一个 jar。上传 jar API 返回 404 Not Found 错误。 F是真的吗...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.