我正在尝试对
load
和 Kafka Producer
进行 Kafka Consumer
测试。因此,我生成了一个文件的 2 GBPS
数据,每个文件的有效负载为 4KB
。我已经使用 PySpark
加载此数据并将其推送到 Kafka 主题,因为我想实现 2 GBPS throughput
。
from pyspark.sql.session import SparkSession
spark = SparkSession.\
builder. \
appName("Batch JSON read"). \
config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"). \
getOrCreate()
df = spark.read.option("multiline","true").json("/Users/Downloads/sample_data.json")
df.selectExpr("to_json(struct(*)) AS value") \
.write.format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "test01") \
.save()
我可以推送数据,但如果这是在
"batches"
中推送数据的正确方法,如果我想继续使用 PySpark,或者如果我可以通过 CLI 利用 Kafka 的 batch.size
或 linder.ms
属性,我需要一些建议。
一旦解决了该方法,我会将其与
JMeter
集成以进行负载测试。
首先,Kafka 有内置的生产者性能测试脚本...但是如果你想要 Spark 的速度,请使用 Scala/Java 来跳过字节码转换而不是 Python,并且不要使用 JSON,只需将数据帧保持为二进制...另外,如果你有流数据集,Flink 会更好
但除此之外,您可以在 Spark 中的生产者配置上调整的参数只有这么多,但就批次而言,是的,
.write
已经是一批了。使用 for 循环写入多个数据,或改用流数据帧