通过 PySpark 将 2 GBPS 数据推送到 Kafka 主题的最佳方式

问题描述 投票:0回答:1

我正在尝试对

load
Kafka Producer
进行
Kafka Consumer
测试。因此,我生成了一个文件的
2 GBPS
数据,每个文件的有效负载为
4KB
。我已经使用
PySpark
加载此数据并将其推送到 Kafka 主题,因为我想实现 2 GBPS
throughput

from pyspark.sql.session import SparkSession

spark = SparkSession.\
    builder. \
    appName("Batch JSON read"). \
    config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"). \
    getOrCreate()

df = spark.read.option("multiline","true").json("/Users/Downloads/sample_data.json")

df.selectExpr("to_json(struct(*)) AS value") \
    .write.format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "test01") \
    .save()

我可以推送数据,但如果这是在

"batches"
中推送数据的正确方法,如果我想继续使用 PySpark,或者如果我可以通过 CLI 利用 Kafka 的
batch.size
linder.ms
属性,我需要一些建议。

一旦解决了该方法,我会将其与

JMeter
集成以进行负载测试。

apache-spark pyspark apache-kafka
1个回答
0
投票

首先,Kafka 有内置的生产者性能测试脚本...但是如果你想要 Spark 的速度,请使用 Scala/Java 来跳过字节码转换而不是 Python,并且不要使用 JSON,只需将数据帧保持为二进制...另外,如果你有流数据集,Flink 会更好

但除此之外,您可以在 Spark 中的生产者配置上调整的参数只有这么多,但就批次而言,是的,

.write
已经是一批了。使用 for 循环写入多个数据,或改用流数据帧

© www.soinside.com 2019 - 2024. All rights reserved.