Apache Flink 作业尝试通过 cdc 源连接器从 Mongo 读取结果导致 MongoTimeoutException

问题描述 投票:0回答:1

我正在尝试使用 Mongo CDC 连接器作为 Flink 作业中 DataStream 源的源。我使用与[根据文档][1]相同的示例代码。

这是我的代码:

MongoDBSource<String> mongoSource =
        MongoDBSource.<String>builder()
              .hosts("cluster0-shard-...:<port>,cluster0-shard-...:<port>,cluster0-shard-...:<port>")
              .username("myUsername")
              .password("myPassword")
              .databaseList("exercises")
              .collectionList("exercises.movies")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(3000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
   .setParallelism(1)
   .print();

env.execute("mongo-cdc");

由于我使用的是副本集,因此我将所有三个节点的连接 URI 与端口放在一起,并用逗号分隔。我尝试使用 Compass 的正常连接 URI,但由于一开始的

mongodb+srv
,产生了另一个错误,我不得不更改 URI。

我的本地 Flink 集群正在运行,我通过终端提交:

./bin/flink run -c org.example.MongoStream D:/Java-Projects/SimpleFlinkJob/target/flink-1.0.jar

大约需要 30 秒,作业才会出现在端口 8081 上的 Flink UI 中,并且不断在

RUNNING
RESTARTING
之间切换状态。

作业产生的错误是这样的:

Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-00-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}, {address=cluster0-shard-...:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}]

我确信问题不在 Mongo 中,因为我通过指南针连接没有问题,并且我还尝试了 [Mongo sink/source without cdc][2] 的另一个示例,即使使用标准,一切都正常

 mongodb+srv
URI。

我正在使用 Flink 1.20.0,这些是我的依赖项:

<dependency>
        <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.2.0-1.19</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>3.2.1</version>
        </dependency>
    ```

What could I be missing?


  [1]: https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/#datastream-source
  [2]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/mongodb/
java mongodb apache-flink connector mongo-connector
1个回答
0
投票

1.更正hosts参数hosts参数必须列出所有副本集成员及其IP或主机名和端口。将 和 <...> 替换为实际值。例如:

MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
      .hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
      .username("yourUsername")
      .password("yourPassword")
      .databaseList("exercises")
      .collectionList("exercises.movies")
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build();
  • 重要提示:避免使用 mongodb+srv,因为它不受 Flink MongoDB CDC 连接器。

2.确保副本集配置验证您的 MongoDB 集群是否作为副本集运行:

  • 登录 Mongo shell 或 Compass 并运行:

    rs.status()

  • 该命令应返回副本集的状态。如果它 如果没有,请配置您的 MongoDB 服务器以启用复制。

3.启用检查点 MongoDB CDC 需要检查点。将其添加到您的代码中:

env.enableCheckpointing(5000); 

4.增加超时设置添加连接超时参数,以留出更多时间建立连接:

.connectionTimeout(60000) // Timeout in milliseconds

5.检查MongoDB权限确保用户具有数据库和oplog的读权限。在 MongoDB Compass 中,为目标数据库的用户分配读取或读写角色。

6.依赖项 在 Flink 1.20 中使用正确的依赖项:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.20.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.20.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>1.2.0-1.19</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-mongodb-cdc</artifactId>
    <version>3.2.1</version>
</dependency>

7.测试网络连接使用 telnet 或 ping 确保您的机器可以到达副本集节点:

telnet cluster0-shard-00-00.mongodb.net 27017

8.简化并测试运行这个最小的工作:

MongoDBSource<String> mongoSource =
    MongoDBSource.<String>builder()
          .hosts("cluster0-shard-00-00.mongodb.net:27017,cluster0-shard-00-01.mongodb.net:27017,cluster0-shard-00-02.mongodb.net:27017")
          .username("yourUsername")
          .password("yourPassword")
          .databaseList("exercises")
          .collectionList("exercises.movies")
          .deserializer(new JsonDebeziumDeserializationSchema())
          .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
   .setParallelism(1)
   .print();

env.execute("mongo-cdc");

9.检查 Flink 日志运行作业后,如果问题仍然存在,请通过 Flink 的 Web UI (http://localhost:8081) 或终端检查日志以获取更具体的详细信息。

10.使用 SRV 连接重试(如果需要) 如果必须使用 mongodb+srv,请考虑使用自定义连接解决方案,因为 Flink CDC 连接器本身不支持它。

© www.soinside.com 2019 - 2024. All rights reserved.