如何使用 Apache Beam 过滤和转换流数据

问题描述 投票:0回答:1

您将获得表示交易数据的 JSON 对象流。每笔交易都有以下字段:

transaction_id
user_id
amount
timestamp
。您需要创建一个 Apache Beam 管道来处理此事务流,并满足以下要求:

过滤掉金额低于 10 美元的交易。 将剩余交易转换为新格式:

{"user": user_id, "total_amount": amount}
。 按 user_id 对交易进行分组,并对每个用户的总金额求和。 将结果输出到名为 UserTransactions 的 BigQuery 表,其中包含 user 和total_amount 列。

输入示例

[
    {"transaction_id": "1", "user_id": "user_123", "amount": 5.0, "timestamp": "2024-07-29T10:00:00Z"},
    {"transaction_id": "2", "user_id": "user_123", "amount": 15.0, "timestamp": "2024-07-29T10:01:00Z"},
    {"transaction_id": "3", "user_id": "user_456", "amount": 20.0, "timestamp": "2024-07-29T10:02:00Z"}
]

输出示例:

UserTransactions Table in BigQuery:
+---------+--------------+
|  user   | total_amount |
+---------+--------------+
| user_123| 15.0         |
| user_456| 20.0         |
+---------+--------------+

这是代码。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class FilterTransactions(beam.DoFn):
    def process(self, element):
        if element['amount'] >= 10: 
            yield element

class TransformTransactions(beam.DoFn):
    def process(self, element):
        yield {'user': element['user_id'], 'total_amount': element['amount']}

class SumAmounts(beam.CombineFn):
    def create_accumulator(self):
        return 0.0

    def add_input(self, accumulator, input):
        return accumulator + input['total_amount'] 

    def merge_accumulators(self, accumulators):
        return sum(accumulators)

    def extract_output(self, accumulator):
        return accumulator

def run():
    options = PipelineOptions()
    p = beam.Pipeline(options=options)

    transactions = p | 'ReadTransactions' >> beam.Create([
        {"transaction_id": "1", "user_id": "user_123", "amount": 5.0, "timestamp": "2024-07-29T10:00:00Z"},
        {"transaction_id": "2", "user_id": "user_123", "amount": 15.0, "timestamp": "2024-07-29T10:01:00Z"},
        {"transaction_id": "3", "user_id": "user_456", "amount": 20.0, "timestamp": "2024-07-29T10:02:00Z"}
    ])

    filtered_transactions = transactions | 'FilterTransactions' >> beam.ParDo(FilterTransactions())
    transformed_transactions = filtered_transactions | 'TransformTransactions' >> beam.ParDo(TransformTransactions())

    summed_transactions = (transformed_transactions
        | 'GroupByUser' >> beam.GroupByKey()
        | 'SumAmounts' >> beam.CombinePerKey(SumAmounts())  
    )

    def format_for_bigquery(element):
        return {
            'user': element[0],
            'total_amount': element[1]
        }

    formatted_results = summed_transactions | 'FormatForBigQuery' >> beam.Map(format_for_bigquery)

    formatted_results | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
        'UserTransactions', 
        schema='user:STRING, total_amount:FLOAT',
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

    p.run().wait_until_finish()

if __name__ == '__main__':
    run()
python-3.x google-bigquery apache-beam
1个回答
0
投票

我认为你的转换函数有错误。 转换交易

yield (element['user_id'], {'total_amount': element['amount']})

更改这行代码即可修复该问题。

© www.soinside.com 2019 - 2024. All rights reserved.