无法使用 Pyspark 从 Azure 事件中心检索数据

问题描述 投票:0回答:1

我正在尝试使用 pysprak 从 Azure 事件中心检索数据。代码只是继续运行,但不显示任何数据

EH_CONN_STR = 'Endpoint=sb://event-hub-18-jul.servicebus.windows.net/;SharedAccessKeyName=eh_policy_18_july;SharedAccessKey=TOc+O/+U+QuuZ5R33HsiwUjsc1C8qRhCy+AEhFxkLRE=;EntityPath=ehub-18'
EH_NAMESPACE = 'event-hub-18-jul'

EH_NAME ='ehub-18'

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms": "60000",
  "kafka.session.timeout.ms": "30000",
  "kafka.metadata.max.age.ms": "10000"
}
df = spark.read.format("kafka").options(**KAFKA_OPTIONS).load()
display(df)

下面是我使用黄色出租车和连接字符串生成的事件中心数据的快照 “端点=sb://event-hub-18-jul.servicebus.windows.net/;SharedAccessKeyName=eh_policy_18_july;SharedAccessKey=TOc+O/+U+QuuZ5R33HsiwUjsc1C8qRhCy+AEhFxkLRE=;EntityPath=ehub-18”

enter image description here

如果我读取数据的方式有任何问题,请告诉我吗?

azure pyspark spark-streaming azure-eventhub
1个回答
0
投票

首先您需要检查您的事件中心是否启用了kafka源 enter image description here

如果未启用,请启用它。 仅在部署事件中心资源时启用,方法是选择 定价等级为溢价。

enter image description here

接下来,启用后使用您的代码。

此外,您还可以使用事件中心库从事件中心读取数据。

azure-eventhubs-spark_2.12

从maven坐标导入并使用下面的代码。

connectionString = "YOUR.CONNECTION.STRING"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

有关更多信息,请关注此文档以获取更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.