使用 SASL_SSL 通过 Kafka-Python 连接到 MSK 集群时出错

问题描述 投票:0回答:1

我尝试使用 kafka-python 库和 SASL_SSL 身份验证将 AWS Lambda 函数连接到 Amazon MSK(Apache Kafka 的托管流)集群。我正在关注官方文档link。但是,我遇到了与身份验证过程相关的连接错误。

这是我正在使用的代码:

import os
import socket
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

KAFKA_BROKERS = [
    "b-1.xxx:9098",
    "b-2.xxx:9098",
    "b-3.xxx:9098"
]
                
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('us-west-2')
        return token

tp = MSKTokenProvider()
print("Generated Token:", tp.token())
print("Client:", socket.gethostname())

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKERS,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
    api_version=(3, 2, 0)
)

当我尝试运行上述代码时,出现以下错误:

[ERROR] 2024-10-30T23:54:49.698Z e7d86aa6-cd0c-40c4-a374-42b25946d291 <BrokerConnection node_id=bootstrap-2 host=b-3.xxx:9098 <authenticating> [IPv4 ('10.29.34.48', 9098)]>: Error receiving reply from server
Traceback (most recent call last):
  File "/opt/python/kafka/conn.py", line 803, in _try_authenticate_oauth
    data = self._recv_bytes_blocking(4)
  File "/opt/python/kafka/conn.py", line 616, in _recv_bytes_blocking
    raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv

msk集群的apache kafka版本是3.2.0

lambda运行时版本和层版本是Python 3.9

身份验证过程中可能导致连接重置的原因是什么?

是否还有我应该考虑的文档链接中未提及的其他配置或故障排除步骤?

我附上了 msk 集群的安全详细信息、网络设置屏幕截图和 lambda vpc 详细信息

lambda vpc details

security details of the msk cluster

network details of the msk cluster

apache-kafka aws-lambda kafka-python aws-msk sasl
1个回答
0
投票

以下是我要检查的内容(按顺序):

  1. 按照此屏幕截图中的链接操作 - 确保附加到 lambda 的权限与策略示例中的权限匹配。Screenshot of AWS MSK Cluster page

2.您没有向我们展示您的入站安全组设置。您的安全组设置可能不允许 IAM 使用的端口入站到集群。要使用的端口将“取决于”您的配置。看起来您的 lambda 与集群位于同一账户和 vpc 中,在这种情况下,您需要一个入站安全组规则,允许 9098 上的自定义 TCP 流量访问。如果您启用了公共访问,那么它将是端口 9198。

如果 lambda 位于不同的账户中,并且您正在使用多 VPC 专用连接设置,在这种情况下,请参阅
    this
  1. 。基本上,您需要确保有一个附加到集群的集群 iam 策略和一个允许端口范围 14001-14100 上的入站自定义 TCP 流量的安全组规则。这个博客也可能有帮助。
  2. 如果这些都不起作用,您需要与我们分享附加到 lambda 的 IAM 策略、附加到 lambda 和集群的所有安全组的入站和出站安全组规则,以供进一步审查。

© www.soinside.com 2019 - 2024. All rights reserved.