我有这个代码:
@dlt.table(
name="kafka_bronze",
table_properties={"pipelines.autoOptimize.enabled": "true"}
)
def kafka_bronze():
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", {Masked})
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.load()
.select(col("value").cast(StringType()).alias("json"))
.select(from_json("json", jsonSchema).alias("data"))
.select("data.*"))
return df
但是它失败并且不写入任何数据。我可以成功地将数据流式传输到笔记本中,但使用管道时它不会加载数据。仅供参考,我正在使用统一目录
您收到的错误是 org.apache.spark.sql.streaming.StreamingQueryException,特别提到来自 Kafka 的 TimeoutException, 指示您的 Spark 流作业在尝试与 Kafka 代理通信时超时。
我同意@JayashankarGS 可能由于网络问题等原因而发生 Kafka 代理过载,或者 Spark 作业中的 Kafka 配置不正确。
以下代码可帮助您增加 Kafka 的超时设置:
.option("kafka.consumer.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
我尝试过以下代码:
def kafka_bronze():
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "your_kafka_bootstrap_servers")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.option("request.timeout.ms", "60000")
.option("session.timeout.ms", "30000")
.load()
.select(col("value").cast(StringType()).alias("json"))
.select(from_json("json", jsonSchema).alias("data"))
.select("data.*"))
return df
结果:
Name Type
field1 string
field2 string