通过proccessFunction触发时,如何在Apache Flink中运行SQLEXECUTE查询?如何管理SQL任务?

问题描述 投票:0回答:1

Context:
因此,我正在尝试构建一个动态运行规则的Flink应用程序。我有一个规则流,该规则流是从撰写SQL规则的位置,该规则是从读取和执行的。我已经将规则流连接到一个过程,在过程中,我以某种方式可以获取tableEnv并触发sqlexecute.

ruleStream.process(new ProcessFunction<Rule,String>() {
            @Override
            public void processElement(Rule rule, ProcessFunction<Rule, String>.Context context, Collector<String> collector) throws Exception {
                synchronized (this) { // Ensure sequential execution
                    System.out.println(rule);
                    String ruleId = rule.getId();
                    String operation = rule.getOperation();
                    String ruleStatement = rule.getRuleExpression();
                    StreamTableEnvironment tableEnv = FlinkTableEnvSingleton.getTableEnv(); // I am saving the TableEnv generated in the main function in a seperate class and accessing it

                    try {
                        switch (operation) {
                            case "CREATE":
                                LOG.info("Creating rule: {}", rule);
                                String createTable = "rule" + ruleId;
                                tableEnv.executeSql(ruleStatement);

                                tableEnv.executeSql("CREATE TABLE rule_sink"+ ruleId+ " ("
                                        + "message STRING "
                                        + ") WITH ("
                                        + "'connector' = 'kafka', "
                                        + "'topic' = 'outputTopic', "
                                        + "'properties.bootstrap.servers' = 'localhost:9092', "
                                        + "'format' = 'raw'"
                                        + ")");

                                tableEnv.executeSql("INSERT INTO rule_sink" + ruleId + " SELECT * FROM " + createTable);
                                collector.collect("Rule data inserted into Kafka topic.");
                                break
                            default:
                                collector.collect("Unknown operation: " + operation);
                        }

                     
                    } catch (Exception e) {
                        collector.collect("Error executing SQL: " + e.getMessage());
                    }
                }
            }

            private transient StreamTableEnvironment tableEnv;

            @Override
            public void open(Configuration parameters) {
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .build();

                tableEnv = StreamTableEnvironment.create(
                        StreamExecutionEnvironment.getExecutionEnvironment(),
                        settings
                );

                tableEnv.executeSql("CREATE CATALOG my_catalog WITH ('type'='generic_in_memory')");
                tableEnv.executeSql("USE CATALOG my_catalog");
            }

        }).setParallelism(10); // Ensure single-threaded executiontry 
我上面通过的示例规则是:

{"id": "1", "name": "HighTemperatureRule", "equipmentName": "SensorA", "operation": "CREATE", "ruleExpression": "CREATE TEMPORARY VIEW rule1 AS SELECT * FROM inputTable;"}
我的问题:
使用目录等内容后,我能够以某种方式运行。我能够查询多个SQL语句,并验证它们是否正确执行,但不完全确定其工作原理。现在,如果我看一下DAG,无论我添加多少规则,它总是如此。我的意思是它如何运行,什么是管理运行时间处理它的运行时间?分配了单个SQL查询多少?如果可能的话,任何人都可以解释此和正常的flink SQL(带有Zepplin或etc)之间的差异,该差距通过SQL-Client进行查询。

flinkdag
它正在工作,但需要解释它是如何工作及其基础体系结构的。

要知道答案。答案是,如果以适当的群集运行,上面的答案将不起作用。我正在使用Intellij在没有flink群集的情况下在本地测试它。因此,Intellij使用本地JVM运行它,所有内容都可以从水平访问过程中访问。但是,如果我们将上述运行到集群中,则FlinkTableEnvsingleton。无法从过程fucntion访问并返回null。

apache-flink flink-streaming flink-sql amazon-kinesis-analytics flink-table-api
1个回答
0
投票
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.