您将获得表示交易数据的 JSON 对象流。每笔交易都有以下字段:
transaction_id
、user_id
、amount
和timestamp
。您需要创建一个 Apache Beam 管道来处理此事务流,并满足以下要求:
过滤掉金额低于 10 美元的交易。 将剩余交易转换为新格式:
{"user": user_id, "total_amount": amount}
。
按 user_id 对交易进行分组,并对每个用户的总金额求和。
将结果输出到名为 UserTransactions 的 BigQuery 表,其中包含 user 和total_amount 列。
输入示例
[
{"transaction_id": "1", "user_id": "user_123", "amount": 5.0, "timestamp": "2024-07-29T10:00:00Z"},
{"transaction_id": "2", "user_id": "user_123", "amount": 15.0, "timestamp": "2024-07-29T10:01:00Z"},
{"transaction_id": "3", "user_id": "user_456", "amount": 20.0, "timestamp": "2024-07-29T10:02:00Z"}
]
输出示例:
UserTransactions Table in BigQuery:
+---------+--------------+
| user | total_amount |
+---------+--------------+
| user_123| 15.0 |
| user_456| 20.0 |
+---------+--------------+
这是代码。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class FilterTransactions(beam.DoFn):
def process(self, element):
if element['amount'] >= 10:
yield element
class TransformTransactions(beam.DoFn):
def process(self, element):
yield {'user': element['user_id'], 'total_amount': element['amount']}
class SumAmounts(beam.CombineFn):
def create_accumulator(self):
return 0.0
def add_input(self, accumulator, input):
return accumulator + input['total_amount']
def merge_accumulators(self, accumulators):
return sum(accumulators)
def extract_output(self, accumulator):
return accumulator
def run():
options = PipelineOptions()
p = beam.Pipeline(options=options)
transactions = p | 'ReadTransactions' >> beam.Create([
{"transaction_id": "1", "user_id": "user_123", "amount": 5.0, "timestamp": "2024-07-29T10:00:00Z"},
{"transaction_id": "2", "user_id": "user_123", "amount": 15.0, "timestamp": "2024-07-29T10:01:00Z"},
{"transaction_id": "3", "user_id": "user_456", "amount": 20.0, "timestamp": "2024-07-29T10:02:00Z"}
])
filtered_transactions = transactions | 'FilterTransactions' >> beam.ParDo(FilterTransactions())
transformed_transactions = filtered_transactions | 'TransformTransactions' >> beam.ParDo(TransformTransactions())
summed_transactions = (transformed_transactions
| 'GroupByUser' >> beam.GroupByKey()
| 'SumAmounts' >> beam.CombinePerKey(SumAmounts())
)
def format_for_bigquery(element):
return {
'user': element[0],
'total_amount': element[1]
}
formatted_results = summed_transactions | 'FormatForBigQuery' >> beam.Map(format_for_bigquery)
formatted_results | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'UserTransactions',
schema='user:STRING, total_amount:FLOAT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
p.run().wait_until_finish()
if __name__ == '__main__':
run()
我认为你的转换函数有错误。 转换交易
yield (element['user_id'], {'total_amount': element['amount']})
更改这行代码即可修复该问题。