Pyspark:通过服务主体向事件中心进行身份验证

问题描述 投票:0回答:1

我有一个服务主体,该主体有权读取 Eventhub 上的主题。我想阅读其中的主题并尝试以下操作:

# Event Hub connection string using Service Principal (SAS)
eventhub_conn_str = f"Endpoint=sb://{namespace}/;SharedAccessKeyName={client_id};SharedAccessKey={client_secret}"


# Setting up the Azure Event Hubs Spark connector
event_hub_config = {
    'eventhubs.connectionString':eventhub_conn_str,
    'eventhubs.consumerGroup': consumer_group,
    'eventhubs.maxEventsPerTrigger': '1000'
}


# Read from Event Hub using Spark Structured Streaming
raw_events = (
    spark.readStream
    .format("eventhubs")
    .option("startingPosition", "@latest")
    .options(**event_hub_config)
    .load()
)

但是,我总是收到以下错误:

java.lang.IllegalArgumentException: Input byte array has wrong 4-byte ending unit

我的身份验证似乎失败(使用任何密码时都会出现相同的错误)。如何使用 client_id 和 client_secret 进行身份验证?

spark-streaming azure-eventhub
1个回答
0
投票

您用于事件中心连接的连接字符串用于共享密钥身份验证,并且您在该连接字符串中传递服务主体客户端 ID 和密钥,这是不兼容的,并且无法正确检测到它可能是错误的原因.

您需要创建一个从

org.apache.spark.eventhubs.utils.AadAuthenticationCallback
扩展的回调类,以使用具有 Secret 的服务主体来从 PySpark 授权 EventHub。

要连接到 Azure EventHubs 连接,您仍然必须使用基于 Java 的代码,如连接器文档中所述。

有关更多详细信息,请参阅此堆栈解决方案

© www.soinside.com 2019 - 2024. All rights reserved.