flink-streaming 相关问题

Apache Flink是一个用于可扩展批处理和流数据处理的开源平台。 Flink在一个系统中支持批量和流分析。分析程序可以用Java和Scala中简洁优雅的API编写。



通过proccessFunction触发时,如何在Apache Flink中运行SQLEXECUTE查询?如何管理SQL任务?

Context: 因此,我正在尝试构建一个动态运行规则的Flink应用程序。我有一个规则流,该规则流是从撰写SQL规则的位置,该规则是从读取和执行的。我已经连接了r ...

回答 1 投票 0

java.io.io.ioexception:无需方案文件系统:flink

我在我当地的环境中在Docker中弯曲。我尝试编写flink作业,以使用CDC将MySQL数据同步到S3(以Apache Hudi格式存储)。我的弗林克工作看起来像: t_env = streamTableEnvironment.c ...

回答 1 投票 0

如何使用Flink SQL来汇总DataStream<List<Row>>?

我如何使用Flink SQL来汇总数据串数据中的每个列表? // 来源 DataSource>源; // ListStream singleOutputStreamOperator>

回答 0 投票 0


用卡夫卡水印策略

我想汇总我的数据流以每5秒返回值的总和(最终目标是平均5秒)。使用以下类总和()我能够返回继续...

回答 1 投票 0

可能没有触发ProcessWindowFunction的原因

我无法理解为什么我的Flink流媒体代码基于以下AWS示例不起作用...首先,我发现没有结果下沉。我开始调试代码,在我看来,IS ...

回答 1 投票 0

使用广播流与Flink进行链条处理正在提供不一致的结果

//定义来自Kafka主题的传入原始数据流采购 datastream主流= env.AddSource(...); //定义来自Kafka主题的参考数据流采购 datastr ...

回答 0 投票 0


Fllink连续文件源,具有定期相同的文件

我有一个外部系统,每小时都会写一个具有相同名称的文件。我在读取后使用Flink 1.18.1读取此目录,但在第一次处理后,未读取文件。我...

回答 1 投票 0

如何读取flink sql code生成状态 我有这张桌子 /**模式('流')*/ 创建或替换表eoj_table( `tenantid`字符串, `id`字符串, `name`字符串, `标题'map

/** mode('streaming')*/ CREATE OR REPLACE TABLE eoj_table ( `tenantId` string, `id` string, `name` string, `headers` MAP<STRING, BYTES> METADATA , `hard_deleted` boolean, `kafka_key` STRING, `ts` timestamp(3) METADATA FROM 'timestamp'VIRTUAL, `procTime` AS PROCTIME(), WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'group_id_1', 'topic-pattern' = '^topic(_backfill)?$', 'value.format' = 'json', 'format' = 'json', 'key.format' = 'raw', 'key.fields' = 'kafka_key', 'value.fields-include' = 'EXCEPT_KEY', 'scan.startup.mode' = 'earliest-offset', 'json.timestamp-format.standard' = 'ISO-8601', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );

回答 1 投票 0

当Apache Flink从1.16升级到1.20时失败的单元测试 - classNotFoundException:org.apache.flink.api.common.common.executionconfig

我在Scala上写了一个Apache Flink作业。我将Flink从1.16.1升级到1.20.0,Java从11升至17。我在执行单位测试后会遇到以下错误

回答 0 投票 0

fllink sql lag witch_frame不起作用

/**模式('流')*/ 创建或替换表eoj_table( `pk`字符串, `id`字符串, `name`字符串, `标题'map元数据, `hard_deleted`布尔值, `kafka_k ...

回答 1 投票 0



取消设计建议,以生成来自Cassandra

我最初考虑运行单个查询并使用 分页并保留文件的本地副本,然后上传到FTP服务器。但 我的应用程序正在kubernetes群集上运行,因此不能创建本地文件,因为不建议这样做。

回答 0 投票 0




最新问题
© www.soinside.com 2019 - 2025. All rights reserved.