我正在编写一个 Flink 应用程序,目的是利用数据库 (Postgis) 上存在的数据丰富从 Kinesis 源获取的数据。
我在文档中发现 Flink 上实现这一目标的可用功能是使用 SQL Lookup Join。
但是,当尝试使用它时,应用程序会引发
"Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field"
错误。
这是我想要做的代码片段:
// Using Sedona Flink SQL
...
DataStream<Row> inputStream = geometryStream
.map(i -> Row.of(i.f0.customerId(), i.f1))
.returns(
Types.ROW_NAMED(
new String[]{"customer_id", "geometry"},
Types.STRING, Types.GENERIC(Geometry.class)
)
);
Table inputStreamTbl = sedona.fromDataStream(
inputStream,
Schema.newBuilder()
// "proc_time" would be needed to make the lookup using 'FOR SYSTEM_TIME AS OF'; see: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
.columnByExpression("proc_time", "PROCTIME()")
// .columnByExpression("proc_time", "CURRENT_TIMESTAMP") -> doesn't work either
// .columnByExpression("proc_time", "CURRENT_TIME") -> doesn't work either
.build()
);
inputStreamTbl.printSchema();
DataStream<Row> enrichedInputStream = sedona.toDataStream(inputStreamTbl);
sedona.createTemporaryView("Input", enrichedInputStream);
Table postgisTbl = sedona.from(
TableDescriptor
.forConnector("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/sedona-flink-test")
.option("table-name", "public.vw_customer_area")
...
.schema(
Schema.newBuilder()
.column("area_id", DataTypes.STRING().notNull())
.column("customer_id", DataTypes.STRING().notNull())
.column("area_geometry", DataTypes.BYTES().notNull())
.column("created_at", DataTypes.TIMESTAMP())
.column("updated_at", DataTypes.TIMESTAMP())
.columnByExpression("rowtime", "CAST(updated_at AS TIMESTAMP_LTZ(3))")
.build()
)
.build()
);
postgisTbl.printSchema();
sedona.createTemporaryView("CustomerArea", postgisTbl);
Table joinedTable = sedona.sqlQuery(
"SELECT i.proc_time, i.customer_id, ca.area_id " +
"FROM Input AS i " +
// FIXME the lookup should be made on Postgres but it's not working using the 'FOR SYSTEM_TIME AS OF' statement
"INNER JOIN CustomerArea FOR SYSTEM_TIME AS OF i.proc_time AS ca ON ca.customer_id = i.customer_id " +
"WHERE ST_Intersects(i.geometry, ST_GeomFromWKB(ca.area_geometry))"
);
joinedTable.printSchema();
DataStream<Row> joinedStream = sedona.toDataStream(joinedTable);
joinedStream.print("sedona-sql-join-stream");
这是输出:
(
`customer_id` STRING NOT NULL,
`geometry` RAW('org.locationtech.jts.geom.Geometry', '...'),
`proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
)
(
`area_id` STRING NOT NULL,
`customer_id` STRING NOT NULL,
`area_geometry` BYTES NOT NULL,
`created_at` TIMESTAMP(6),
`updated_at` TIMESTAMP(6),
`rowtime` TIMESTAMP_LTZ(3) AS CAST(updated_at AS TIMESTAMP_LTZ(3))
)
(
`proc_time` TIMESTAMP_LTZ(3) NOT NULL,
`customer_id` STRING NOT NULL,
`area_id` STRING NOT NULL
)
Exception in thread "main" org.apache.flink.table.api.ValidationException: Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableRule.validateSnapshotInCorrelate(LogicalCorrelateToJoinFromTemporalTableRule.scala:74)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:259)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:420)
at org.apache.calcite.plan.hep.HepPlanner.executeRuleInstance(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance$State.execute(HepInstruction.java:178)
at org.apache.calcite.plan.hep.HepPlanner.lambda$executeProgram$0(HepPlanner.java:211)
at org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
at org.apache.calcite.plan.hep.HepProgram$State.execute(HepProgram.java:118)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:205)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:191)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:305)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:259)
...
Process finished with exit code 1
'FOR SYSTEM_TIME AS OF'
声明评估似乎效果不佳或者我错过了什么?所引用的列是记录在案的“左表”中的一列。
我还想了解这是否是使用外部数据库中的数据丰富传入流数据的最佳方法......我认为这是开始开发流应用程序的人们的一些常见场景。
不确定这是否是 Flink 查询验证器上的错误,或者文档是否不清楚(或者我是否遗漏了某些内容)。
无论如何,我怀疑这个问题可能是由于查询引用的表不在Flink的目录中而发生的。在重写我的应用程序以确保两个表都在目录中之后,我得到了预期的行为(在需要时查找 Postgis)。
在添加解决方案之前,我认为该问题与 DataStream 到 Table 之间的转换有关。发生这种情况时(例如使用
tblEnv.fromDataStream(kinesisStream, Schema.newBuilder()...)
),表不会在目录上注册...如果您使用临时表/视图(例如tblEnv.createTemporaryView("MyTable", kinesisStream)
),情况也是如此。
我可以使用此代码检查目录以及已注册或未注册的内容:
tblEnv.getCatalog(tblEnv.getCurrentCatalog()).ifPresent(catalog -> {
log.info("Databases: {}", catalog.listDatabases());
catalog.listDatabases().forEach(db -> {
try {
log.info("Tables: {}", catalog.listTables(db));
log.info("Views: {}", catalog.listViews(db));
log.info("Functions: {}", catalog.listFunctions(db));
} catch (DatabaseNotExistException e) {
throw new RuntimeException(e);
}
});
});
也就是说,这是我使用的解决方案:
// Make sure the Tables (specially the one with PROCTIME()) is registered on internal Catalog
tblEnv.createTable("Input", TableDescriptor.forConnector("kinesis")...);
tblEnv.createTable("CustomerArea", TableDescriptor.forConnector("jdbc")...);
Table joinedTable = sedona.sqlQuery(
"SELECT i.proc_time, i.customer_id, ca.area_id " +
"FROM Input AS i " +
"INNER JOIN CustomerArea FOR SYSTEM_TIME AS OF i.proc_time AS ca ON ca.customer_id = i.customer_id " +
"WHERE ST_Intersects(i.geometry, ST_GeomFromWKB(ca.area_geometry))"
);
这是一个简单的解决方案,但它有其缺点:我已经有一个工作
FlinkKinesisConsumer<>
,并应用了一些ProcessFunction
/KeyedProcessFunction
/ReduceFunction
,我被迫重新实现该处理流程。