Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
Flink 文档说“在结果元素上设置的唯一相关信息是元素时间戳 [...],它[设置为]结束时间戳 - 1 [...]” 万一我必须
具有窗口 CoGroup 和未对齐检查点的 Flink 状态处理器 API
如何使用 Flink State Processor API 处理窗口 CoGroup(或 Join)函数的状态?文档没有给出这样的例子。 有没有办法使用 Flink State Pro...
包含元组列表的 POJO 的 Apache Flink 类型信息错误
[已编辑:这次添加了正确的错误日志] 我无法创建包含元组列表数据类型的 POJO 类型类。也许我没有正确提供它的类型信息?
从流媒体源消费时, 我们使用 Groupby 窗口聚合: .group_by(col('name'))) (参见此处) 并有一个包含所有可能的名称值的表(在下面的示例中 ['Alice', 'Bob...
我正在尝试将站状态数据与 Flink SQL 中的天气更新结合起来。目标是: 计算 1 分钟窗口内每个站的平均状态指标 加入基于
我正在努力在 scala flink 应用程序上编写单元测试。 例如,我有一个如下所示的异步映射器。它需要一个带有 id 的 User 对象,并随着年龄的增长而丰富: 案例类用户(id:字符串)...
如何修复 MobaXterm 上不正确指定的虚拟机选项“maxmetaspacesize”
我正在尝试使用 mobaXTerm 启动本地集群。我正在使用: 弗林克 1.15.3 MobaXterm 23.6 jdk 11或jdk 8(是相同的错误) Windows 10 我正确注册了所有环境变量,但是...
我有3个Kafka主题:optionsTopic、stocksTopic和referencesTopic。所有内容均以 Kraft 模式本地部署在单个 Kafka (v7.7.1) 实例上。为了降低复杂性,我设置了一个分区...
使用 kafka 连接器运行 flink 时出现 NoClassDefFoundError
我正在尝试使用flink从kafka流式传输数据。我的代码编译没有错误,但在运行时出现以下错误: 错误:发生 JNI 错误,请检查您的安装并尝试
使用 Python 在 AWS EMR 上执行 Flink 作业失败并出现“NoClassDefFoundError”
我正在尝试使用 Python 3.9 和 Apache Flink 以及 PyFlink 在 AWS EMR 集群 (v7.3.0) 上运行 Flink 作业。我的作业从 AWS Kinesis 流中读取数据并将流数据打印到控制台。然而,...
我在 Scala/flink 中为以下方法编写了自己的 processwindow 函数,但由于某种原因,我在 IDE 中收到此错误: 类型不匹配。 必需:ProcessWindowFunction[(String, String, I...
我正在编写一个 Flink 流程序,我需要用一些每 24 小时更新一次的数据(~10GB)来丰富 DataStream 如果它是静态数据,我可以在开始之前将其加载为键值状态...
我正在处理三个 kafka 主题,它们作为数据流加载到 Flink 中:optionTradingData、referenceData 和 stockTradingData。它们的定义方式如下: 期权交易数据 = [{ 打勾...
我正在使用 Apache Flink 编写一个应用程序来替换旧的应用程序。旧应用程序接收来自不同来源的事件,并用于监视来源的事件是否满足
如何使用 PyFlink/Flink 使用 Table API 写入 amazon s3 上的 Apache Iceberg?
settings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = TableEnvironment.create(environment_settings=settings) 目录名称=“胶水目录” staging_database_na...
使用 Protobuf 进行 Flink List 字段序列化
我有一个 Foo 类,由 Flink 在 DataStream 中处理: 公共类 Foo { 私有 int id; 公开列表数据; // getter、setter 和构造函数 公共...
我有独立的 Flink 集群。当我在任务管理器上停止该进程时,作为 ChildFirst 加载的类不会被删除。经过多次启动/停止重复后,元空间超出了最大...
Flink 1.15 中的 cleanupInRocksdbCompactFilter 方法
我无法理解 Apache Flink 1.15 中有关 TTL 设置的“cleanupInRocksdbCompactFilter”方法的“queryTimeAfterNumEntries”参数。 医生说: Rocksdb 时清理过期状态
我有一个 flink 应用程序,我使用 TumblingEventTimeWindows 和 process 函数 数据流>processedEvents = rawEvents .keyBy(eventMap -> { 返回
Apache Flink AsyncRetryStrategy 与 RichAsyncFunction
AsyncRetryStrategy asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3,fixedDelay=100ms .ifResult(RetryPredicates.EMPTY_RESULT_PREDIC...