Context:
因此,我正在尝试构建一个动态运行规则的Flink应用程序。我有一个规则流,该规则流是从撰写SQL规则的位置,该规则是从读取和执行的。我已经将规则流连接到一个过程,在过程中,我以某种方式可以获取tableEnv并触发sqlexecute.
ruleStream.process(new ProcessFunction<Rule,String>() {
@Override
public void processElement(Rule rule, ProcessFunction<Rule, String>.Context context, Collector<String> collector) throws Exception {
synchronized (this) { // Ensure sequential execution
System.out.println(rule);
String ruleId = rule.getId();
String operation = rule.getOperation();
String ruleStatement = rule.getRuleExpression();
StreamTableEnvironment tableEnv = FlinkTableEnvSingleton.getTableEnv(); // I am saving the TableEnv generated in the main function in a seperate class and accessing it
try {
switch (operation) {
case "CREATE":
LOG.info("Creating rule: {}", rule);
String createTable = "rule" + ruleId;
tableEnv.executeSql(ruleStatement);
tableEnv.executeSql("CREATE TABLE rule_sink"+ ruleId+ " ("
+ "message STRING "
+ ") WITH ("
+ "'connector' = 'kafka', "
+ "'topic' = 'outputTopic', "
+ "'properties.bootstrap.servers' = 'localhost:9092', "
+ "'format' = 'raw'"
+ ")");
tableEnv.executeSql("INSERT INTO rule_sink" + ruleId + " SELECT * FROM " + createTable);
collector.collect("Rule data inserted into Kafka topic.");
break
default:
collector.collect("Unknown operation: " + operation);
}
} catch (Exception e) {
collector.collect("Error executing SQL: " + e.getMessage());
}
}
}
private transient StreamTableEnvironment tableEnv;
@Override
public void open(Configuration parameters) {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
tableEnv = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(),
settings
);
tableEnv.executeSql("CREATE CATALOG my_catalog WITH ('type'='generic_in_memory')");
tableEnv.executeSql("USE CATALOG my_catalog");
}
}).setParallelism(10); // Ensure single-threaded executiontry
我上面通过的示例规则是:
{"id": "1", "name": "HighTemperatureRule", "equipmentName": "SensorA", "operation": "CREATE", "ruleExpression": "CREATE TEMPORARY VIEW rule1 AS SELECT * FROM inputTable;"}
我的问题:使用目录等内容后,我能够以某种方式运行。我能够查询多个SQL语句,并验证它们是否正确执行,但不完全确定其工作原理。现在,如果我看一下DAG,无论我添加多少规则,它总是如此。我的意思是它如何运行,什么是管理运行时间处理它的运行时间?分配了单个SQL查询多少?如果可能的话,任何人都可以解释此和正常的flink SQL(带有Zepplin或etc)之间的差异,该差距通过SQL-Client进行查询。
flinkdag
它正在工作,但需要解释它是如何工作及其基础体系结构的。
要知道答案。答案是,如果以适当的群集运行,上面的答案将不起作用。我正在使用Intellij在没有flink群集的情况下在本地测试它。因此,Intellij使用本地JVM运行它,所有内容都可以从水平访问过程中访问。但是,如果我们将上述运行到集群中,则FlinkTableEnvsingleton。无法从过程fucntion访问并返回null。