我正在尝试使用 python ode 将事件从 Azure 事件中心发送到 sql 数据库。我遇到以下错误
eventhub“eventhubname”消费者组“consumergroupname”的EventProcessor实例“XXXXXXXX-XXX-XXXX-XXXX-XXXXXXXXX”。
def on_event(partition_context,event): eventreievedcode and sendingtosqlservercode EH_CONN_STR = f"Endpoint=sb://eventhubnamespace.servicebus.windows.net:9093/;SharedAccessKeyName=SASKeyName;SharedAccessKey=[REDACTED]"
TOPIC = "EventHubName"
print(EH_CONN_STR) client = EventHubConsumerClient.from_connection_string(EH_CONN_STR, eventhub_name=TOPIC, consumer_group = consumergroupname)
receiver = client.receive(on_event = on_event , offset="-1",
# Offset can be set to -1 to receive messages from the beginning )
负载平衡和声明所有权时发生错误。异常是 ConnectError('由于异常而无法启动连接: [Errno 104] 连接被对等方重置 错误条件:ErrorCondition.SocketError 错误描述:由于异常而无法启动连接:[Errno 104]连接被对等方重置')。 XX.XXXXXXX 秒后重试
任何人都可以帮忙解决上述错误吗
我尝试连接到事件中心,但它不起作用
错误
ConnectError('Failed to initiate the connection due to exception: [Errno 104]'
是由于使用 Python 的 Azure Databricks 和 Azure 事件中心服务之间的连接不正确而发生的。
下面的示例 python 代码将 Azure 事件中心与 Spark 集成,以处理遥测数据并将其存储到 Databricks 中的 Azure SQL 数据库中。 我按照 MSDOC 使用 Apache PySpark 读取 Azure 事件中心示例数据。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import asyncio
import json
jdbcUrl = "jdbc:sqlserver://AzureSqlSvername.database.windows.net:1433;database=DatabaseName;user=UserName;password=Paassword;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
connectionProperties = {
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
spark = SparkSession.builder \
.appName("Read from Data") \
.config("spark.jars", "/path/to/sqljdbc42.jar") \
.getOrCreate()
async def on_event(partition_context, event):
print(f"Received event: {event.body_as_str(encoding='UTF-8')} from partition: {partition_context.partition_id}")
# Example of processing the event: Here you can write it to Azure SQL Database
event_data = event.body_as_str(encoding='UTF-8')
json_data = json.loads(event_data)
data = [(json_data["id"], json_data["timestamp"], json_data["uv"], json_data["temperature"], json_data["humidity"])]
columns = ["id", "timestamp", "uv", "temperature", "humidity"]
df = spark.createDataFrame(data, columns)
df.write.jdbc(url=jdbcUrl, table="telemetry_data", mode="append", properties=connectionProperties)
await partition_context.update_checkpoint(event)
async def receive_events():
BLOB_STORAGE_CONNECTION_STRING = "AzureStorageConnectionString"
BLOB_CONTAINER_NAME = "AzureStorageCONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "AzureEVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "AzureEVENT_HUB_NAME"
CONSUMER_GROUP = "$Default"
checkpoint_store = BlobCheckpointStore.from_connection_string(
BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
)
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENT_HUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
await client.receive(
on_event=on_event,
starting_position="-1", # Read from the beginning of the partition.
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
await receive_events()
# Stop the Spark session (optional, depending on your environment)
spark.stop()
Azure Databricks 输出:
Azure Sql 输出: