我在 apache beam 中的输出是 (['key'],{'id':name})
预计 ('key',{'id':名称})
如何使用apache beam中的Map进行转换以获得预期的输出
这是一个带有 lambda 函数的测试管道,可以使用 Map 重新格式化您的元组:
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
bad_record = (['key'],{'id':'name'})
records = (
p
| beam.Create([bad_record])
| beam.Map(lambda e: (e[0][0], e[1]))
| beam.Map(print)
)
输出为:
('key', {'id': 'name'})
我猜测您的 [key] 可能正在生成,而不是在管道早期的 Map 中返回。如果您在那里修复它,则不需要此步骤。
您可以使用 MapElements 将输入地图转换为所需的输出。例如,下面的示例代码执行此转换(有关详细信息,请参阅内联代码注释)
// Create the pipeline
Pipeline p = Pipeline.create();
// Example input data
List<KV<List<String>, Map<String, String>>> inputData = Arrays.asList(
KV.of(Arrays.asList("key1"), new HashMap<String, String>() {{
put("id", "name1");
}}),
KV.of(Arrays.asList("key2"), new HashMap<String, String>() {{
put("id", "name2");
}})
);
// Create a PCollection from the input data
p.apply("CreateInput", Create.of(inputData))
// Apply the transformation using MapElements
.apply("TransformElements", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings())))
.via((KV<List<String>, Map<String, String>> element) -> {
String key = element.getKey().get(0); // Extract the single key from the list
Map<String, String> value = element.getValue();
return KV.of(key, value);
}))
// Print the output (for demonstration purposes)
.apply("PrintOutput", ParDo.of(new PrintElements()));
// Run the pipeline
p.run().waitUntilFinish();