Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
将flink从1.10升级到1.11,遇到错误“No ExecutorFactory found to run the application”
java.lang.IllegalStateException:找不到执行应用程序的 ExecutorFactory。 在 org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLo...
我的Spring Boot项目无法启动,因为集成了Flink
我的Spring Boot项目无法启动,因为集成了Flink!提示找不到类,但是这个类确实存在,打包也可以 日志 2024-05-29 10:02:19,750 信息 (
Flink 中使用 JDBC Sink 时 AbstractRichFunction 的实现不可序列化
我试图从事件发件箱表中读取数据,然后有两个接收器,一个将事件推送到 Kafka 主题,另一个使用 Flink 更新同一数据库中的表。因为我想要这两个
在这个问题中写道:“CEP 总是按时间戳对其输入进行排序”。 你能告诉我排序在 cep 库代码中的具体位置吗? 有没有办法控制这种排序,
具有多个 Kafka 源的 Apache Flink。确保在使用另一个主题的数据之前完全阅读一个主题
通过创建 GlobalKTable 使用 Kafka Streams 我知道根据定义,该表将在其他源的流式传输开始之前完全填充。 我正在寻找类似的
无法将 org.bson.BsonDocument 的实例分配给字段 org.apache.flink.connector.mongo..source.MongoSource.filter
我是Flink的新手,现在我正在使用flink 1.7用pyflink构建一个项目来从MongoDB查询数据并接收到Mysql。但是,我不断收到错误 Caused by: java.lang.ClassCastException: c...
Flink 用户定义的Sink Connector 无法将数据序列化为json 格式
我正在开发用户定义的 Flink MQTT 连接器。 https://github.com/yinjilong/StoneForests-flink-mqtt-connector 但是,当我尝试用 json 写入消息时遇到序列化问题
我有一个数据流[GenericRecord]: val Consumer = new FlinkKafkaConsumer[String]("input_csv_topic", new SimpleStringSchema(), 属性) val 流 = senv. 添加源(消费者)。 ...
Flink Kubernetes Operator Pod 资源请求和限制
我在使用 Flink Kubernetes Operator v1.8 时一直无法找到一种方法来限制 pod 的资源请求和限制。 我还尝试在 podTem 的容器部分中配置它......
是否有机会使用 apache flink 的 jdbc 接收器来限制数据库会话?
我们正在使用jdbc接收器(apache flink),通过它我们可以达到数据库最大会话数,特别是当我们增加并行度时。 我们的测试表明,如果我们增加默认并行...
当我尝试在 flink 集群上运行 Beam Pipeline 时,为什么会出现 ERROR:root:java.lang.NullPointerException?
我正在尝试在本地托管的 Flink 集群上运行一个简单的 Beam 管道,但在执行此操作时遇到错误。我已经尝试了在互联网上可以找到的所有内容。 导入 apache_beam 作为光束 来自
我有一个 Flink 作业,它从 Kafka 主题(有 6 个分区)读取数据,在一个窗口(当前是 24 小时窗口)上处理每个事件,然后将窗口事件接收到 Blob 存储容器中......
Amazon S3 作为使用 CDC 的 Apache Flink 的源
每当将文件添加到 S3、Apache Flink 时,我都需要使用 CDC 发布事件,必须从 Flink 读取并处理新添加的文件。
Flink GlobalWindow Trigger 只处理触发事件
我通过事件属性通过数据流键,然后将其传递到全局窗口,在特定事件进入时触发,问题是当触发窗口来处理事件时,它会...
为什么带有 Kafka Connector 的 Flink Table 无法返回基于窗口的聚合操作的结果?
我创建了一个表 创建表出价( 拍卖BIGINT, 投标人 BIGINT, 价格大整数, 通道 VARCHAR, 网址 VARCHAR, 日期时间时间戳(...
我们正在评估 apache flink 用于部署流式机器学习应用程序。 apache flink 尤其是执行环境中如何处理依赖管理? 想象一下具有 diff 的 python 任务...
我定义一个POJO如下: @数据 @AllArgsConstructor @NoArgs构造函数 公共类 IdCount { 私有整数 ID; 私有字符串名称; } 我使用以下命令测试它是否是有效的 POJO...
Flink KeyedProcessFunction 创建计数
我是 Flink 新手,试图了解创建的 KeyedProcessFunction 实例的数量是否会根据我创建函数的位置而变化。 MyProcessFunction myFunction = new MyProcessFunc...
flink中group by时如何将表流下沉到postgres?
我正在使用 Apache Flink 1.19 和 java 17。 当我们对数据流进行分组时,它会不断更改和更新。因此,当我想将结果发送到 postgres 时,我首先尝试将其转换为
Apache Flink Python Datastream API 接收到 Parquet
我有一个包含 json 消息的 Kafka 主题。我尝试使用 Flink Python API 处理此消息并将其存储在 GCS 中的 parquet 文件中。 这是清理过的代码片段: 类提取(MapFunction): ...