GCP 数据流 - 梁管道返回空文件

问题描述 投票:0回答:1

我想使用数据流在 GCS 存储桶中的每个文件之间映射函数。我想我已经很接近了,但由于某种原因,结果只是一个空文件。我在下面包含了我的代码,用虚拟数据重现了问题。

我的流程中是否遗漏了某个步骤?在这个玩具示例中,输出文件应该是

dummy_data.csv
requests.txt 文件的前 10 行只是 pandas 和 apache-beam。

工作.py

import apache_beam as beam
import pandas as pd
from apache_beam.io.fileio import MatchFiles

# Create a dummy csv file
dummy_data = pd.DataFrame({'col1': [5] * 100, 'col2': [6] * 100})
dummy_data.to_csv('dummy_data.csv', index=False)

def data_cleaning_function(filepath):
    df = pd.read_csv(filepath)
    df = df.head(10)
    return df

options = beam.options.pipeline_options.PipelineOptions()

worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 8
worker_options.machine_type = 'e2-small'

setup_options = options.view_as(beam.options.pipeline_options.SetupOptions)
setup_options.requirements_file = 'requirements.txt'


p1 = beam.Pipeline(options=options)

(p1
        | 'read' >> MatchFiles('dummy_data.csv')
        | 'map' >> beam.Map(lambda file: (file.path, data_cleaning_function))
        | 'write' >> beam.io.WriteToText('gs://my-bucket/data_out/result.csv')
)

p1.run()
python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

在 lambda fn 中调用

data_cleaning_function
。这应该有效。

p1 = beam.Pipeline(options=options)

(p1
        | 'read' >> MatchFiles('dummy_data.csv')
        | 'map' >> beam.Map(lambda file: (data_cleaning_function(file.path))
        | 'write' >> beam.io.WriteToText('gs://my-bucket/data_out/result.csv')
)

p1.run()
© www.soinside.com 2019 - 2024. All rights reserved.