如何在Apache Beam Transform中创建循环

问题描述 投票:0回答:1

我有一个使用多个参数运行的批处理作业。这些参数之一代表清单日期,例如2020-05-01,2020-05-02,2020-05-03,对于每个日期,我尝试从GCP存储中读取数据。

为了运行作业,我使用下面的代码:

public class StoreJob {
    public static void main(String[] args) {
        StoreJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StoreJobOptions.class);
        options.setStreaming(false);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("Execute", new ProcessTransform(options.getDates()));
        pipeline.run().waitUntilFinish();
    }
}

我想在每个日期从存储并行加载数据。现在,我使用for-loop进行此操作,但我相信可以通过更有效的方式完成此操作

List<String> days = Arrays.stream(dates.get().split(",")).collect(Collectors.toList());
for (String day : days) {
    PCollection<KV<Key, User>> users = input.apply("Read users from GCP Storage", AvroIO.read(User.class)
            .from(dayToUserPathInGCPStorage(day, bucket, User.class))
            .withEmptyMatchTreatment(ALLOW_IF_WILDCARD))
            .apply("Group users", ParDo.of(new DoFn<User, KV<Key, User>>() {
                @ProcessElement
                public void process(ProcessContext context) {
                    User element = context.element();
                    context.output(KV.of(new Key(element.getUserId(), element.getFullName()), element));
                }
            }));
    PCollection<KV<Key, Purchase>> purchases = input.apply("Read purchases from GCP Storage", AvroIO.read(Purchase.class)
            .from(dayToPurchasePathInGCPStorage(day, bucket, Purchase.class))
            .withEmptyMatchTreatment(ALLOW_IF_WILDCARD))
            .apply("Group purchases", ParDo.of(new DoFn<Purchase, KV<Key, Purchase>>() {
                @ProcessElement
                public void process(ProcessContext context) {
                    Purchase element = context.element();
                    context.output(KV.of(new Key(element.getId(), element.getName()), element));
                }
            }));
    KeyedPCollectionTuple.of(USER_KEY, users).and(PURCHASE_KEY, purchases)
            .apply("transform", new CustomJoinAndCreateMutationsTransform())
            .apply("Write back to storage", SpannerIO.write()
                    .withInstanceId(instance)
                    .withDatabaseId(database)
                    .grouped());
}
return PDone.in(input.getPipeline());

如何用Apache Beam方法替换for-loop?

google-cloud-dataflow apache-beam
1个回答
0
投票
根据documentation for AvroIO,有两种方法可从AvroIO转换读取多个文件。首先是将文件模式传递给.from。但是,无法创建仅匹配列表中几天的文件模式。文档中对第二个内容的描述如下:

如果要读取的文件模式本身在PCollection中,则可以使用FileIO进行匹配,而使用readFiles(java.lang.Class)进行读取。

此方法适合您的用例。您可以使用天数列表创建文件模式的PCollection,然后在该PCollection上应用FileIO.matchAll()以获取文件,然后对其应用AvroIO.readFiles()以从匹配的文件中读取。
© www.soinside.com 2019 - 2024. All rights reserved.