我有一个使用多个参数运行的批处理作业。这些参数之一代表清单日期,例如2020-05-01,2020-05-02,2020-05-03
,对于每个日期,我尝试从GCP存储中读取数据。
为了运行作业,我使用下面的代码:
public class StoreJob {
public static void main(String[] args) {
StoreJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StoreJobOptions.class);
options.setStreaming(false);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Execute", new ProcessTransform(options.getDates()));
pipeline.run().waitUntilFinish();
}
}
我想在每个日期从存储并行加载数据。现在,我使用for-loop
进行此操作,但我相信可以通过更有效的方式完成此操作
List<String> days = Arrays.stream(dates.get().split(",")).collect(Collectors.toList());
for (String day : days) {
PCollection<KV<Key, User>> users = input.apply("Read users from GCP Storage", AvroIO.read(User.class)
.from(dayToUserPathInGCPStorage(day, bucket, User.class))
.withEmptyMatchTreatment(ALLOW_IF_WILDCARD))
.apply("Group users", ParDo.of(new DoFn<User, KV<Key, User>>() {
@ProcessElement
public void process(ProcessContext context) {
User element = context.element();
context.output(KV.of(new Key(element.getUserId(), element.getFullName()), element));
}
}));
PCollection<KV<Key, Purchase>> purchases = input.apply("Read purchases from GCP Storage", AvroIO.read(Purchase.class)
.from(dayToPurchasePathInGCPStorage(day, bucket, Purchase.class))
.withEmptyMatchTreatment(ALLOW_IF_WILDCARD))
.apply("Group purchases", ParDo.of(new DoFn<Purchase, KV<Key, Purchase>>() {
@ProcessElement
public void process(ProcessContext context) {
Purchase element = context.element();
context.output(KV.of(new Key(element.getId(), element.getName()), element));
}
}));
KeyedPCollectionTuple.of(USER_KEY, users).and(PURCHASE_KEY, purchases)
.apply("transform", new CustomJoinAndCreateMutationsTransform())
.apply("Write back to storage", SpannerIO.write()
.withInstanceId(instance)
.withDatabaseId(database)
.grouped());
}
return PDone.in(input.getPipeline());
如何用Apache Beam方法替换for-loop?
.from
。但是,无法创建仅匹配列表中几天的文件模式。文档中对第二个内容的描述如下:如果要读取的文件模式本身在PCollection中,则可以使用FileIO进行匹配,而使用readFiles(java.lang.Class)进行读取。
此方法适合您的用例。您可以使用天数列表创建文件模式的PCollection,然后在该PCollection上应用FileIO.matchAll()
以获取文件,然后对其应用AvroIO.readFiles()
以从匹配的文件中读取。