我正在尝试使用 pysprak 从 Azure 事件中心检索数据。代码只是继续运行,但不显示任何数据
EH_CONN_STR = 'Endpoint=sb://event-hub-18-jul.servicebus.windows.net/;SharedAccessKeyName=eh_policy_18_july;SharedAccessKey=TOc+O/+U+QuuZ5R33HsiwUjsc1C8qRhCy+AEhFxkLRE=;EntityPath=ehub-18'
EH_NAMESPACE = 'event-hub-18-jul'
EH_NAME ='ehub-18'
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"kafka.metadata.max.age.ms": "10000"
}
df = spark.read.format("kafka").options(**KAFKA_OPTIONS).load()
display(df)
下面是我使用黄色出租车和连接字符串生成的事件中心数据的快照 “端点=sb://event-hub-18-jul.servicebus.windows.net/;SharedAccessKeyName=eh_policy_18_july;SharedAccessKey=TOc+O/+U+QuuZ5R33HsiwUjsc1C8qRhCy+AEhFxkLRE=;EntityPath=ehub-18”
如果我读取数据的方式有任何问题,请告诉我吗?
首先您需要检查您的事件中心是否启用了kafka源
如果未启用,请启用它。 仅在部署事件中心资源时启用,方法是选择 定价等级为溢价。
接下来,启用后使用您的代码。
此外,您还可以使用事件中心库从事件中心读取数据。
azure-eventhubs-spark_2.12
从maven坐标导入并使用下面的代码。
connectionString = "YOUR.CONNECTION.STRING"
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
有关更多信息,请关注此文档以获取更多信息。