这是Spark 3.5.0的,没有尝试过其他版本。
我编写了一个简单的 Spark 流应用程序,使用
rate-micro-batch
格式,用于生成测试数据。
根据本指南,它有一个选项
startTimestamp
,这是生成时间的起始值。但更改这个选项似乎没有任何作用,我尝试将其设置为不同的值,并且开始时间始终在1970-01-01
左右。
我不明白什么,还是这是一个错误?
package org.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class StreamingSparkPartitioned {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.getOrCreate();
Column expression = when(expr("value % 3 = 1"), "stupid_event").otherwise(
when(expr("value % 3 = 2"), "smart_event")
.otherwise("neutral_event"));
DataStreamWriter streamingDF = spark.readStream()
.format("rate-micro-batch")
.option("rowsPerBatch", "100")
.option("startTimestamp", "10000")
.load()
.withColumn("event_type", expression)
.writeStream()
.option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");
streamingDF.outputMode(OutputMode.Append())
.format("console")
.trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
.start()
.awaitTermination();
}
}
2024 年,这在 Spark 中不起作用
我认为
startTimestamp
的单位是毫秒,因此将其设置为 10000 毫秒会让您在 1970 纪元之后 10 秒,即 1970-01-01。
如果您将其设置为更高的值,例如 1234567890123,它将为您提供 2009 年的日期:
DataStreamWriter streamingDF = spark.readStream()
.format("rate-micro-batch")
.option("rowsPerBatch", "100")
.option("startTimestamp", "1234567890123")
.load()
.withColumn("event_type", expression)
.writeStream()
.option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");
我还没有测试来确认,但当我有时间的时候会做。