Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
Apache Flink - 高 promethues 指标基数
在我们的组织中,我们有许多系统在 flink 1.16 上运行。 我们使用 PrometheusReporterFactory。 将我们的指标暴露给 promethues scrape。 由于 flink 的动态标签定义
出现以下错误 引起:java.lang.NoClassDefFoundError:org/apache/kafka/clients/admin/AdminClient 将 flink 连接到 kafka 时 我正在使用 flink 1.17 并使用 flink-sql-connector-kafka...
Apache Flink 中自定义 Partitioner 实现的分区方法是否需要线程安全?
我正在通过扩展 Apache Flink 中的 org.apache.flink.api.common.functions.Partitioner 接口来实现自定义分区器。这需要我重写分区方法。我的问题是...
我遇到了与java.util.List和java.util.Map的Flink序列化所描述的相同的问题,但对于java.util.Set。 我有一个 POJO 类,其中包含列表字段和设置字段。我已经能够...
Flink Kafka GroupId 在使用 KafkaSource 时似乎被忽略了
我是 Apache Flink 的新手。我尝试使用 Flink 的 KafkaSource 从 Apache Kafka 获取事件。到目前为止一切顺利,看起来效果很好。重新启动 flink 任务后,我得到了相同的信息...
Flink:无法在类路径中找到实现“org.apache.flink.table.factories.CatalogFactory”的标识符“kafka”的任何工厂
我正在尝试将Kafka连接到Flink并通过sql-client.sh运行。但是,无论我如何处理 .yaml 和库,我都会不断收到错误: 线程“main”org.apache.flink 中出现异常。
我使用以下简单代码来说明文件系统连接器的行为。 我有两个观察结果想要询问并确认。 如果我没有启用检查点,那么所有
在flink 1.14中,我怎样才能拥有一个同时写入kafka和其他数据源的接收器。 我尝试创建一个扩展 RichSinkFunction 的自定义接收器。在开放方法中,我也
我的数据源发出具有以下结构的物联网数据 - io_id、值、时间戳 232,1223,1718191205 321,671,1718191254 54,2313,1718191275 232,432,1718191315 321,983,1718191394 ………… 有...
我正在 Apache Flink 中编写数据流处理代码,但我在 Apache Flink 文档中找不到任何提及基于计数的窗口的内容。 我检查了下面的链接,但是...
java.util.HashMap 和 java.time.Duration 不是 Flink 的有效 POJO 类型
我的 Flink 1.15 应用程序的日志中有“java.time.Duration”和“java.util.HashMap”的提示: 类 class <*> 不能用作 POJO 类型,因为并非所有字段都是有效的 POJO fie...
使用 Table API 将作业名称设置为 Flink 作业
我想为使用 Table API 编写的 Flink 应用程序设置一个作业名称,就像我使用 Streaming API env.execute(jobName) 所做的那样。 我想更换: 我在文档中找不到方法,除了......
Flink Operator 卡在 100% 繁忙状态,如何解决?
我已将 Flink 集群部署为纱线应用程序。作为纱线配置的一部分,我将 32 个 vCore 关联到每个任务管理器。我还为每个任务管理器分配了 2 个插槽。 工作管道:Kafka
我有一个用 scala 编写的 flink 作业,我正在创建一个自定义指标来计算流中的事件数量。该作业部署在 kubernetes 上,我看到了作业管理器和任务的系统指标-
我正在尝试使用 Flink 的状态 API 引导我的操作员广播状态和键控状态,以便创建一个保存点来初始化我的作业。 据我所知,我可以创建一个转换...
AWS Apache flink,应用程序不在aws中运行,但在本地运行。可能是什么问题?
我正在构建一个应用程序,该应用程序从 kinesis 数据流获取数据并使用端点将该数据发送到外部数据库。我使用 python table API 来实现这一点,当我运行下面的...
团队, 我正在研究 Flink。这是我的要求 从 Kafka 读取事件 对事件应用转换 沉入 MongoDB 向 Kafka 生成原子事件 在这里,我面临问题
这是我们的用例: 我们计划在 flink 之上构建一个具有大量规则(1000 条)的基于规则的引擎。规则可以是无状态的或有状态的。 无状态规则示例为:A.id = 3 &...
当 flink 中以批处理模式执行作业时,fileSink 按并行数生成多个文件,但我只想在一个文件中输出而不更改并行数 我该怎么办...
我正在使用一个从 Kafka 读取数据流的 Apache Flink 应用程序。应用程序处理流,从数据创建对象并通过标记计算处理时间...