eventhub“eventhubname”消费者组“consumergroupname”的EventProcessor实例“XXXXXXXX-XXX-XXXX-XXXX-XXXXXXXXX”

问题描述 投票:0回答:1

我正在尝试使用 python ode 将事件从 Azure 事件中心发送到 sql 数据库。我遇到以下错误

eventhub“eventhubname”消费者组“consumergroupname”的EventProcessor实例“XXXXXXXX-XXX-XXXX-XXXX-XXXXXXXXX”。

def on_event(partition_context,event): eventreievedcode and sendingtosqlservercode EH_CONN_STR = f"Endpoint=sb://eventhubnamespace.servicebus.windows.net:9093/;SharedAccessKeyName=SASKeyName;SharedAccessKey=[REDACTED]" 
TOPIC = "EventHubName" 
print(EH_CONN_STR) client = EventHubConsumerClient.from_connection_string(EH_CONN_STR, eventhub_name=TOPIC, consumer_group = consumergroupname) 
receiver = client.receive(on_event = on_event , offset="-1",
 # Offset can be set to -1 to receive messages from the beginning )

负载平衡和声明所有权时发生错误。异常是 ConnectError('由于异常而无法启动连接: [Errno 104] 连接被对等方重置 错误条件:ErrorCondition.SocketError 错误描述:由于异常而无法启动连接:[Errno 104]连接被对等方重置')。 XX.XXXXXXX 秒后重试

任何人都可以帮忙解决上述错误吗

我尝试连接到事件中心,但它不起作用

python azure events hub
1个回答
0
投票

错误

ConnectError('Failed to initiate the connection due to exception: [Errno 104]'
是由于使用 Python 的 Azure Databricks 和 Azure 事件中心服务之间的连接不正确而发生的。

下面的示例 python 代码将 Azure 事件中心与 Spark 集成,以处理遥测数据并将其存储到 Databricks 中的 Azure SQL 数据库中。 我按照 MSDOC 使用 Apache PySpark 读取 Azure 事件中心示例数据。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import asyncio
import json


jdbcUrl = "jdbc:sqlserver://AzureSqlSvername.database.windows.net:1433;database=DatabaseName;user=UserName;password=Paassword;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

connectionProperties = {
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


spark = SparkSession.builder \
    .appName("Read from Data") \
    .config("spark.jars", "/path/to/sqljdbc42.jar") \
    .getOrCreate()


async def on_event(partition_context, event):
    print(f"Received event: {event.body_as_str(encoding='UTF-8')} from partition: {partition_context.partition_id}")
    
    # Example of processing the event: Here you can write it to Azure SQL Database
    event_data = event.body_as_str(encoding='UTF-8')
    json_data = json.loads(event_data)
    
   
    data = [(json_data["id"], json_data["timestamp"], json_data["uv"], json_data["temperature"], json_data["humidity"])]
    columns = ["id", "timestamp", "uv", "temperature", "humidity"]
    df = spark.createDataFrame(data, columns)
    
  
    df.write.jdbc(url=jdbcUrl, table="telemetry_data", mode="append", properties=connectionProperties)
    
   
    await partition_context.update_checkpoint(event)


async def receive_events():
    BLOB_STORAGE_CONNECTION_STRING = "AzureStorageConnectionString"
    BLOB_CONTAINER_NAME = "AzureStorageCONTAINER_NAME"
    EVENT_HUB_CONNECTION_STR = "AzureEVENT_HUB_CONNECTION_STR"
    EVENT_HUB_NAME = "AzureEVENT_HUB_NAME"
    CONSUMER_GROUP = "$Default"
    
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
    )
    
    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENT_HUB_NAME,
        checkpoint_store=checkpoint_store,
    )
    
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # Read from the beginning of the partition.
        )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    await receive_events()

# Stop the Spark session (optional, depending on your environment)
spark.stop()


Azure Databricks 输出: enter image description here

Azure Sql 输出: enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.