我想使用数据流在 GCS 存储桶中的每个文件之间映射函数。我想我已经很接近了,但由于某种原因,结果只是一个空文件。我在下面包含了我的代码,用虚拟数据重现了问题。
我的流程中是否遗漏了某个步骤?在这个玩具示例中,输出文件应该是
dummy_data.csv
requests.txt 文件的前 10 行只是 pandas 和 apache-beam。
import apache_beam as beam
import pandas as pd
from apache_beam.io.fileio import MatchFiles
# Create a dummy csv file
dummy_data = pd.DataFrame({'col1': [5] * 100, 'col2': [6] * 100})
dummy_data.to_csv('dummy_data.csv', index=False)
def data_cleaning_function(filepath):
df = pd.read_csv(filepath)
df = df.head(10)
return df
options = beam.options.pipeline_options.PipelineOptions()
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 8
worker_options.machine_type = 'e2-small'
setup_options = options.view_as(beam.options.pipeline_options.SetupOptions)
setup_options.requirements_file = 'requirements.txt'
p1 = beam.Pipeline(options=options)
(p1
| 'read' >> MatchFiles('dummy_data.csv')
| 'map' >> beam.Map(lambda file: (file.path, data_cleaning_function))
| 'write' >> beam.io.WriteToText('gs://my-bucket/data_out/result.csv')
)
p1.run()
在 lambda fn 中调用
data_cleaning_function
。这应该有效。
p1 = beam.Pipeline(options=options)
(p1
| 'read' >> MatchFiles('dummy_data.csv')
| 'map' >> beam.Map(lambda file: (data_cleaning_function(file.path))
| 'write' >> beam.io.WriteToText('gs://my-bucket/data_out/result.csv')
)
p1.run()