临时连接错误。目前Temporal Table Join中的join key不能为空

问题描述 投票:0回答:2

我的flink版本是flink-1.12.2-bin-scala_2.12

这是我的 SQL:

SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id and o.customer_id is not null and  c.id is not null ;

SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id ;

会报以下错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
    at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
    at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
apache-flink flink-sql
2个回答
0
投票

这是我写的一个简单的例子,希望能给大家一些帮助

  1. 创建流表
CREATE TABLE TEST_FLOW 
(  
    col1 STRING,
    proc_time AS PROCTIME(),  
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'topic' = 'TEST_FLOW'
    ...
);
  1. 创建维度表
CREATE TABLE TEST_DIM
(
    col1 STRING,
    col2 STRING,
    PRIMARY KEY(col1) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'TEST_DIM'
    ...
);
  1. 查找加入:
select 
    A.col1,
    B.col2
from TEST_FLOW A
join TEST_DIM FOR SYSTEM_TIME AS OF A.proc_time AS B ON A.col1 = B.col1;

0
投票

如果有人遇到同样的错误,我可以建议比较表的架构。我的问题是

STRING
-
VARCHAR()
TIMESTAMP_LTZ(3)
-
TIMESTAMP(3)
之间的差异。修复类型后,我的加入工作完美。

© www.soinside.com 2019 - 2024. All rights reserved.