我正在尝试使用 Mongo CDC 连接器作为 Flink 作业中 DataStream 源的源。我使用与[根据文档][1]相同的示例代码。
这是我的代码:
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
.username("myUsername")
.password("myPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
由于我使用的是副本集,因此我将所有三个节点的连接 URI 与端口放在一起,并用逗号分隔。我尝试使用 Compass 的正常连接 URI,但由于一开始的
mongodb+srv
,产生了另一个错误,我不得不更改 URI。
我的本地 Flink 集群正在运行,我通过终端提交:
./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar
大约需要 30 秒,作业才会出现在端口 8081 上的 Flink UI 中,并且不断在
RUNNING
和 RESTARTING
之间切换状态。
作业产生的错误是这样的:
Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]
我确信问题不在 Mongo 中,因为我通过指南针连接没有问题,并且我还尝试了 [Mongo sink/source without cdc][2] 的另一个示例,即使使用标准,一切都正常
mongodb+srv
URI。
我正在使用 Flink 1.20.0,这些是我的依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.2.1</version>
</dependency>
```
What could I be missing?
[1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
[2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/
1.更正hosts参数hosts参数必须列出所有副本集成员及其IP或主机名和端口。将 和 <...> 替换为实际值。例如:
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
.username("yourUsername")
.password("yourPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
2.确保副本集配置验证您的 MongoDB 集群是否作为副本集运行:
登录 Mongo shell 或 Compass 并运行:
rs.status()
该命令应返回副本集的状态。如果它 如果没有,请配置您的 MongoDB 服务器以启用复制。
3.启用检查点 MongoDB CDC 需要检查点。将其添加到您的代码中:
env.enableCheckpointing(5000);
4.增加超时设置添加连接超时参数,以留出更多时间建立连接:
.connectionTimeout(60000) // Timeout in milliseconds
5.检查MongoDB权限确保用户具有数据库和oplog的读权限。在 MongoDB Compass 中,为目标数据库的用户分配读取或读写角色。
6.依赖项 在 Flink 1.20 中使用正确的依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>3.2.1</version>
</dependency>
7.测试网络连接使用 telnet 或 ping 确保您的机器可以到达副本集节点:
telnet cluster0-shard-00-00.mongodb.net 27017
8.简化并测试运行这个最小的工作:
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
.username("yourUsername")
.password("yourPassword")
.databaseList("exercises")
.collectionList("exercises.movies")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(1)
.print();
env.execute("mongo-cdc");
9.检查 Flink 日志运行作业后,如果问题仍然存在,请通过 Flink 的 Web UI (http://localhost:8081) 或终端检查日志以获取更具体的详细信息。
10.使用 SRV 连接重试(如果需要) 如果必须使用 mongodb+srv,请考虑使用自定义连接解决方案,因为 Flink CDC 连接器本身不支持它。