我尝试使用 kafka-python 库和 SASL_SSL 身份验证将 AWS Lambda 函数连接到 Amazon MSK(Apache Kafka 的托管流)集群。我正在关注官方文档link。但是,我遇到了与身份验证过程相关的连接错误。
这是我正在使用的代码:
import os
import socket
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
KAFKA_BROKERS = [
"b-1.xxx:9098",
"b-2.xxx:9098",
"b-3.xxx:9098"
]
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('us-west-2')
return token
tp = MSKTokenProvider()
print("Generated Token:", tp.token())
print("Client:", socket.gethostname())
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKERS,
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
api_version=(3, 2, 0)
)
当我尝试运行上述代码时,出现以下错误:
[ERROR] 2024-10-30T23:54:49.698Z e7d86aa6-cd0c-40c4-a374-42b25946d291 <BrokerConnection node_id=bootstrap-2 host=b-3.xxx:9098 <authenticating> [IPv4 ('10.29.34.48', 9098)]>: Error receiving reply from server
Traceback (most recent call last):
File "/opt/python/kafka/conn.py", line 803, in _try_authenticate_oauth
data = self._recv_bytes_blocking(4)
File "/opt/python/kafka/conn.py", line 616, in _recv_bytes_blocking
raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
msk集群的apache kafka版本是3.2.0
lambda运行时版本和层版本是Python 3.9
身份验证过程中可能导致连接重置的原因是什么?
是否还有我应该考虑的文档链接中未提及的其他配置或故障排除步骤?
我附上了 msk 集群的安全详细信息、网络设置屏幕截图和 lambda vpc 详细信息
以下是我要检查的内容(按顺序):
2.您没有向我们展示您的入站安全组设置。您的安全组设置可能不允许 IAM 使用的端口入站到集群。要使用的端口将“取决于”您的配置。看起来您的 lambda 与集群位于同一账户和 vpc 中,在这种情况下,您需要一个入站安全组规则,允许 9098 上的自定义 TCP 流量访问。如果您启用了公共访问,那么它将是端口 9198。
如果 lambda 位于不同的账户中,并且您正在使用多 VPC 专用连接设置,在这种情况下,请参阅