我是 Flink 的新手,现在我正在使用 flink 1.7 使用 pyflink 构建一个项目,以从 MongoDB 查询数据并接收到 Mysql。但是,我不断收到错误
Caused by: java.lang.ClassCastException: cannot assign instance of org.bson.BsonDocument to field org.apache.flink.connector.mongo..source.MongoSource.filter of type org.bson.BsonDocument in instance of org.apache.flink.connector.mongodb.source.MongoSource
。而且,我猜错误来自 flink-connector-mongodb module,但我对 Java 不熟悉。有人可以帮我解决这个问题吗?或者我错过了哪些jar文件?
以下是我的代码的一部分,其中包括我使用的罐子。
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(
"file:///opt/flink/lib/flink-sql-connector-mongodb-1.1.0-1.17.jar",
"file:///opt/flink/lib/bson-5.1.2.jar",
"file:///opt/flink/lib/mongodb-driver-sync-5.1.2.jar",
"file:///opt/flink/lib/mongodb-driver-core-5.1.2.jar",
"file:///opt/flink/lib/flink-connector-jdbc-3.1.2-1.17.jar",
"file:///opt/flink/lib/mysql-connector-java-8.0.26.jar",
"file:///opt/flink/lib/slf4j-api-2.0.13.jar",
"file:///opt/flink/lib/slf4j-simple-2.0.13.jar",
)
env.add_classpaths(
"file:///opt/flink/lib/bson-5.1.2.jar",
"file:///opt/flink/lib/mongodb-driver-sync-5.1.2.jar",
"file:///opt/flink/lib/mongodb-driver-core-5.1.2.jar",
)
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(
"""
CREATE TABLE source (
...
) with (
'connector' = 'mongodb',
...
)
)
t_env.execute_sql(
"""
CREATE TABLE sink (
...
) with (
'connector' = 'jdbc',
...
)
)
table_result = t_env.sql_query('...')
table_result.execute_insert("sink")
env.execute("MyMongoToMysql")
我不是100%确定,但试试这个
bson-5.1.2.jar
更改为 bson-4.7.2.jarmongodb-driver-core-5.1.2.jar
更改为 mongodb-driver-core-4.7.2.jarmongodb-driver-sync-5.1.2.jar
更改为 mongodb-driver-sync-4.7.2.jar