Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。
上下文您的应用程序最近4个月运行良好。您决定关闭它,重置每个状态并重新启动它。问题...
我需要基于一个密钥加入两个事件源。事件之间的间隔可能长达1年(即,id1的event1可能会在今天到达,而id1的对应event2可能会从第二个事件源发出...
我从.jar文件运行多个作业。我想在工作之间共享状态。但是所有输入在每个作业中都消耗(来自kafka)并生成重复的输出。我看到我的flink面板。所有作业'记录...
我正在运行查询以连接流和表,如下所示。它用完了堆空间。即使它在flink群集中有足够的堆空间(60GB * 3),对于...
我正在尝试编写一个Flink应用程序,该应用程序从Kafka读取事件,从MySQL丰富这些事件,并将此数据写入HBase。我正在RichFlatMapFunction中进行MySQL扩充,我正在...
我有一个Flink应用程序,该应用程序不断出现此错误。 com.org.ads.audience.traffic.MyClass@6eaa21d8无法序列化。该对象可能包含或引用不可序列化的字段。 ...
ECS中的链接无法找到阴影的ContainerCredentialsProvider
我正在尝试使用Fargate在ECS上运行Flink 1.7.2。我已将我的工作的状态后端设置为RocksDB,其路径为= s3:// ...在我的Dockerfile中,我的基本映像为1.7.2-hadoop27-scala_2.11,然后运行...
我正在使用Flink流以替换我的ETL流。我正在处理的任务之一是-计算不同的每日活跃用户。任务本身并不难实施。东西是...
弗林克:处理背压(来源:卡夫卡,水槽:elasticsearch)
我这是从卡夫卡读取数据,进行一定的汇总和结果写入到elasticsearch索引的弗林克工作。我看到在源上高背压。的高背压...
我读的文档有关弗林克一次恰好拥有这里。我不太明白一些句子:经过一个成功的预提交,提交必须最终保证...
我非常新的弗林克和即将载入我们的第一个产品版本。我们有数据流。如果数据是新的状态过滤器检查。这将是更好分裂流...
这个问题已经被问这里,但因为它已经两年了,我不知道什么发生了变化。我有,我想两个弗林克运营商之间共享状态的用例:A组...
如何将EMR流作业日志复制到S3并清除EMR核心节点磁盘上的日志
[好,我正在AWS EMR 5.20上运行Flink(v1.7.1)流作业,我希望在S3中拥有我的作业的所有task_managers和job_manager的日志。按Flink的建议使用Logback ...
从我们知道我们可以通过下面的命令“纱线运行单个弗林克工作”的弗林克正式文件,我的问题是可以通过我们的REST API“运行纱线单弗林克工作”,并得到了...
Apache Flink:如何为动态表启用“ upsert模式”?
我已经在Flink文档和官方Flink博客上基于动态键多次提及动态表的“更新模式”。但是,我没有看到任何示例/文档...
我有一个AggregateFunction,它可以计算WindowedStream中一系列事件的平均值。需要注意的是,需要对事件对计算平均值,而事件对可能超出...
在从卡夫卡摄入消息弗林克流应用程序,1)如何禁用自动提交? 2)如何手动从弗林克成功处理邮件后提交?谢谢。
阿帕奇弗林克:是什么setParallelism()和setMaxParallelism的差异()
我试着设置最大并行的弗林克工作,使用ExecutionConfig.setMaxParallelism()方法,但它似乎没有工作。我还修改了标准字计数例如运行几个...
上注册一个TemporalTableFunction作为函数编译器错误
我下面弗林克的定义时态表功能例如,编译器拒绝接受该代码:TemporalTableFunction率= ratesHistory.createTemporalTableFunction(“r_proctime”,” ...
在窗口处理功能,才有可能知道哪些元素得到了驱逐?使用案例:窗口中出现了很多大事使用逐出Reduce函数是计算昂贵的现在,我...