Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
使用flink sql join 2 source时如何读取rocksdb状态
我的sql定义为 如果 TABLE_1 不存在则创建表( 标头 VARCHAR NOT NULL, id VARCHAR 不为空, `时间戳` TIMESTAMP_LTZ(3) NULL, 类型 VARCHAR NOT ...
在 Flink 中为具有接口字段的类实现 TypeInformation
我有一个相当嵌套的数据类型,它通过 Kafka 主题进入 flink 中。 JSON 被反序列化为使用接口的 Java 类层次结构(与 JsonSubTypes 一起,请参阅 https://www.bae...
Flink 在应用程序重启/错误修复时是否考虑 Kafka 偏移?
我在 Kafka-Flink 应用程序中遇到了一个代码错误,我需要重新部署整个 Flink 应用程序。我知道检查点和保存点,但由于我的应用程序必须重新启动,所以它会...
Apache Flink Azure ABFS 文件接收器错误(流)- UnsupportedFileSystemException:方案“文件”没有文件系统
我们将 Apache Flink 版本 1.17.1 与 Scala 结合使用。 我们正在尝试将流数据写入 ABFS 文件系统。 请参阅 Scala 中的简单示例代码。 对象简单流{ val 环境 =
Apache Flink 通过 Jenkins 和 Spinnaker 提交作业时抛出异常
我们正在使用配置为独立 Kubernetes pod 的 Apache Flink 1.16.1,以便我们的应用程序之一从融合的 Kafka 主题中读取数据以进行事件关联。我们正在使用flink的Table AP...
Flink + RocksDB 需要很长时间才能恢复大型 s3 检查点
我正在运行一个低并行度(4 个插槽)作业,其检查点可能会变得非常大。 在示例中,我将展示检查点为 142 GB,保存在 S3 中,需要 40 分钟才能恢复...
为什么flink作业的maxparallelism不能在不丢失状态的情况下更新?
我刚刚读到,Flink 作业的最大并行度(由 setMaxParallelism 定义)无法在不丢失状态的情况下更改。这让我有点惊讶,不难想象一个场景......
如何将数据发送到 Kafka,其中特定于某个键的数据仅在 Flink 流作业中使用 KafkaSink 发送到同一分区?
我有一个要求,只有当数据具有相同的密钥时,我才希望将数据发送到同一分区。 例如: {“field1”:33,“field2”:44,“field3”:55,“唯一...
使用连接器在 kubernetes 上部署 pyflink(kafka/kinesis)
我正在尝试找到一种使用 k8s 运算符在 k8s 上部署 pyflink 的方法。我已经能够使用 k8s Operator 上传作业,但我找不到如何向其添加连接器(例如 kafka-
org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema 无法转换为 ObjectNode
当我使用以下代码时: KafkaSource源= KafkaSource.builder() .setProperties(kafkaProps) .setProperty("ssl.truststore.
我使用的是 flink 1.17.1 和 java 版本 11 。 我遇到以下错误 引起:java.lang.ClassNotFoundException:org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList 需要解决方案...
由于 state.checkpoints.num-retained 配置导致触发 Flink 检查点(S3 后端)延迟
以下是我的 Flink 检查点配置,我们有 S3 作为后端。我们正在 EMR 集群中运行这个 flink 作业(版本:1.17.0) 检查点间隔:70000 检查点之间的最小暂停时间:15000 最大-
我希望通过一个流读取数据库表中的ID字段,将该字段存储到列表中,然后根据这个ID创建一个新的流来过滤对应的数据并同步到...
我有一个java类,正在向flink集群提交sql文件。 我有 StreamExecutionEnvironment StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
如果在 GoLang 中实现,操作员一可以通过 GoLang 通道在管道中将事件数据传递给操作员二。这有助于避免在运行时失败时出现关注点分离。 flink 操作吗...
为什么 Flink 没有均匀地分配我的工作以及如何解决这个问题?
我有一个从具有 5 个分区的数据源读取数据的 Flink 作业。我在配置文件中将每个任务管理器的并行度设置为 100。 在我的输入操作中,它只使用了5/100任务人...
Apache Flink 是否删除了 StreamOperatorTestHarness 类,或者它们是否转移到了不同的工件?
我正在使用intellij、maven 3和flink 1.15.1来编写有状态的流作业。我正在尝试为我的自定义 KeyedProcessFunction 编写单元测试,并尝试按照此处的文档进行操作...
我有一个需求,需要从flink中的redis缓存中读取数据,但是按照需求,缓存数据平均每两个小时刷新一次。我正在查看文档并且
Flink 重复数据删除 - out.collect() 如何处理无界流?
以下是代码: 公共类验证重复{ 公共静态无效主(字符串[] args)抛出异常{ StreamExecutionEnvironment env = StreamExecutionEnvironment。
在 Flink 1.17 中没有看到 Kinesis 连接器包
我们在 Flink 作业中使用了 flink(1.16) 包的 Flink kinesis 连接器库。现在我们计划迁移到 Flink 1.17,但我看到 flink-kinesis-connector 包已从 Fli 中删除...