在我们的项目中,我们有一个使用 Apache Flink 的需求,并使用 MongoDB SQL Connector 从 Flink 写入数据。
根据 Flink 文档,如果 DDL 中未定义主键,Mongo 连接器在向 Mongo 内插入文档时会使用 upsert 语义。
当我们在没有主键的 Flink 表中尝试此操作时,我们得到了这个异常:
org.apache.flink.table.api.TableException:表接收器'default_catalog.default_database.Correlation'不支持使用节点Join(joinType = [InnerJoin],where = [($ f3)产生的更新和删除更改= id)],选择= [图,$ f3,id,类型,startDate,hascontextEntityId,providedBy,harvestedDate],leftInputSpec = [NoUniqueKey],rightInputSpec = [NoUniqueKey])在org.apache.flink.table.planner.plan .optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:405) 在 org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:277) org.apache .flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:366) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChi孩子们$1(FlinkChangelogModeInferenceProgram.scala:355) 在 org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:354) 在 scala.collection.TraversableLike。 $anonfun$map$1(TraversableLike.scala:233) 在 scala.collection.immutable.Range.foreach(Range.scala:155) 在 scala.collection.TraversableLike.map(TraversableLike.scala:233) 在 scala.collection.TraversableLike .map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104)
有什么方法可以跳过 upsert 语义并在 Mongo 中执行普通插入吗?
查询的结果是变更日志流,而不是仅附加流。鉴于您已从接收器定义中删除了主键,它会尝试将更改日志流发送到仅附加接收器,但这是不兼容的。因此,您需要以这样的方式修复您的查询,使其生成仅附加流而不是更改日志流。