我想在GCP中创建一个流式Apache Beam管道,该管道从Google Pub / Sub读取数据并将其推送到GCS。我可以从Pub / Sub读取数据。我当前的代码如下所示(从GCP Apache Beam模板之一中选取)
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
AvroIO.write(AdEvent.class)
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input)))
.withWindowedWrites()
.withNumShards(options.getNumShards()));
它可以生成如下所示的文件windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro
我想将数据存储在动态创建的目录中的GCS中。在以下目录2020-04-28/01
,2020-04-28/02
等中-01
和02
是子目录,表示数据流流传输管道处理数据的时刻。
示例:
gs://data/2020-04-28/01/0000000.avro gs://data/2020-04-28/01/0000001.avro gs://data/2020-04-28/01/.... gs://data/2020-04-28/02/0000000.avro gs://data/2020-04-28/02/0000001.avro gs://data/2020-04-28/02/.... gs://data/2020-04-28/03/0000000.avro gs://data/2020-04-28/03/0000001.avro gs://data/2020-04-28/03/.... ...
0000000、0000001等是我用于说明的简单文件名,我不希望这些文件是顺序名称。您认为在GCP数据流流式传输设置中有可能吗?
我想在GCP中创建一个流式Apache Beam管道,该管道从Google Pub / Sub读取数据并将其推送到GCS。我可以从Pub / Sub读取数据。我当前的代码如下所示(...
您可以实现自己的FilenamePolicy(也许以WindowedFilenamePolicy
作为起点)来使用自己的逻辑来定义输出路径。您可以根据需要在文件路径中使用/
字符(顺便说一下,GCS存储桶为"flat",它们实际上没有目录)。要获取日期/时间,windowedFilename
方法将窗口信息作为参数,因此您可以在返回值中使用它,但您认为合适。
您需要使用writeDynamic
而不是Write
。不幸的是,AvroIO不像here那样本机支持writeDynamic,而是需要使用FileIO。
您可以使用Pub/Sub to Cloud Storage Avro template是一种流传输管道,可从发布/订阅主题中读取数据,并将Avro文件写入指定的Cloud Storage存储桶。该管道支持可选的用户提供的窗口持续时间,用于执行窗口写入。