“startTimestamp”选项如何适用于速率微批量格式?

问题描述 投票:0回答:2

这是Spark 3.5.0的,没有尝试过其他版本。

我编写了一个简单的 Spark 流应用程序,使用

rate-micro-batch
格式,用于生成测试数据。

根据本指南,它有一个选项

startTimestamp
,这是生成时间的起始值。但更改这个选项似乎没有任何作用,我尝试将其设置为不同的值,并且开始时间始终在
1970-01-01
左右。

我不明白什么,还是这是一个错误?

package org.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class StreamingSparkPartitioned {
   public static void main(String[] args) throws TimeoutException, StreamingQueryException {
      SparkSession spark = SparkSession.builder()
         .master("local[*]")
         .getOrCreate();

      Column expression =  when(expr("value % 3 = 1"), "stupid_event").otherwise(
         when(expr("value % 3 = 2"), "smart_event")
            .otherwise("neutral_event"));

      DataStreamWriter streamingDF = spark.readStream()
         .format("rate-micro-batch")
         .option("rowsPerBatch", "100")
         .option("startTimestamp", "10000")
         .load()
         .withColumn("event_type", expression)
         .writeStream()
         .option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");

      streamingDF.outputMode(OutputMode.Append())
         .format("console")
         .trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
         .start()
         .awaitTermination();
   }
}
java apache-spark
2个回答
0
投票

2024 年,这在 Spark 中不起作用


0
投票

我认为

startTimestamp
的单位是毫秒,因此将其设置为 10000 毫秒会让您在 1970 纪元之后 10 秒,即 1970-01-01。

如果您将其设置为更高的值,例如 1234567890123,它将为您提供 2009 年的日期:

  DataStreamWriter streamingDF = spark.readStream()
     .format("rate-micro-batch")
     .option("rowsPerBatch", "100")
     .option("startTimestamp", "1234567890123")
     .load()
     .withColumn("event_type", expression)
     .writeStream()
     .option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");

我还没有测试来确认,但当我有时间的时候会做。

© www.soinside.com 2019 - 2024. All rights reserved.