apache beam 中的键值

问题描述 投票:0回答:2

我在 apache beam 中的输出是 (['key'],{'id':name})

预计 ('key',{'id':名称})

如何使用apache beam中的Map进行转换以获得预期的输出

apache-beam
2个回答
1
投票

这是一个带有 lambda 函数的测试管道,可以使用 Map 重新格式化您的元组:

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with TestPipeline(options=options) as p:
    
    bad_record = (['key'],{'id':'name'})

    records = (
        p 
        | beam.Create([bad_record])
        | beam.Map(lambda e: (e[0][0], e[1]))
        | beam.Map(print)
    )

输出为:

('key', {'id': 'name'})

我猜测您的 [key] 可能正在生成,而不是在管道早期的 Map 中返回。如果您在那里修复它,则不需要此步骤。


0
投票

您可以使用 MapElements 将输入地图转换为所需的输出。例如,下面的示例代码执行此转换(有关详细信息,请参阅内联代码注释)

        // Create the pipeline
        Pipeline p = Pipeline.create();

        // Example input data
        List<KV<List<String>, Map<String, String>>> inputData = Arrays.asList(
                KV.of(Arrays.asList("key1"), new HashMap<String, String>() {{
                    put("id", "name1");
                }}),
                KV.of(Arrays.asList("key2"), new HashMap<String, String>() {{
                    put("id", "name2");
                }})
        );

        // Create a PCollection from the input data
        p.apply("CreateInput", Create.of(inputData))
         // Apply the transformation using MapElements
         .apply("TransformElements", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings())))
                 .via((KV<List<String>, Map<String, String>> element) -> {
                     String key = element.getKey().get(0); // Extract the single key from the list
                     Map<String, String> value = element.getValue();
                     return KV.of(key, value);
                 }))
         // Print the output (for demonstration purposes)
         .apply("PrintOutput", ParDo.of(new PrintElements()));

        // Run the pipeline
        p.run().waitUntilFinish();
© www.soinside.com 2019 - 2024. All rights reserved.