无法将数据从 Kafka 写入 Databricks 中的 Delta Live 表

问题描述 投票:0回答:1

我有这个代码:

@dlt.table(  
    name="kafka_bronze",  
    table_properties={"pipelines.autoOptimize.enabled": "true"}  
)  
def kafka_bronze():  
    df = (spark.readStream   
        .format("kafka")   
        .option("kafka.bootstrap.servers", {Masked})  
        .option("subscribe", "topic1")    
        .option("startingOffsets", "earliest")   
        .option("maxOffsetsPerTrigger", 100)   
        .load()  
        .select(col("value").cast(StringType()).alias("json"))  
        .select(from_json("json", jsonSchema).alias("data"))  
        .select("data.*"))  
  
    return df  




但是它失败并且不写入任何数据。我可以成功地将数据流式传输到笔记本中,但使用管道时它不会加载数据。仅供参考,我正在使用统一目录

pyspark databricks azure-databricks apache-kafka-streams
1个回答
0
投票

您收到的错误是 org.apache.spark.sql.streaming.StreamingQueryException,特别提到来自 Kafka 的 TimeoutException, 指示您的 Spark 流作业在尝试与 Kafka 代理通信时超时。

我同意@JayashankarGS 可能由于网络问题等原因而发生 Kafka 代理过载,或者 Spark 作业中的 Kafka 配置不正确。

以下代码可帮助您增加 Kafka 的超时设置:

.option("kafka.consumer.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")

我尝试过以下代码:

def kafka_bronze():  
    df = (spark.readStream   
        .format("kafka")   
        .option("kafka.bootstrap.servers", "your_kafka_bootstrap_servers")
        .option("subscribe", "topic1")    
        .option("startingOffsets", "earliest")   
        .option("maxOffsetsPerTrigger", 100)  
        .option("request.timeout.ms", "60000")
        .option("session.timeout.ms", "30000") 
        .load()  
        .select(col("value").cast(StringType()).alias("json"))  
        .select(from_json("json", jsonSchema).alias("data"))  
        .select("data.*"))  
  
    return df

结果:


Name    Type
field1  string
field2  string
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.