使用 Dataflow Apache Beam 下沉到 BigQuery 的正确格式

问题描述 投票:0回答:2

bcdate好像出错了,我把它转换成正确的格式,但错误仍然存在,我的代码如下:

def transform_pandas(data):
    import pandas as pd
    import json
    import datetime as dt

    df = pd.DataFrame([data])
    
    # Fill all columns with null if there is no data    
    columns = ['peakid', 'route1', 'bcdate', 'pkname', 'heightm']
    df = df.reindex(columns, fill_value='null', axis=1)
    
    df['bcdate'] = pd.to_datetime(df['bcdate'], errors='coerce').dt.strftime('%Y-%m-%d')
    
    return json.loads(df.to_json(orient = 'records'))

和写入 BigQuery 的代码:

output = (
        (input_p1, input_p2)
        | 'Join' >> beam.CoGroupByKey()
        | 'Final Dict' >> beam.Map(lambda el: to_final_dict(el[1])) >> it got a result here
        | 'Transformation' >> beam.Map(transform_pandas) >> the error happen in here
        | beam.Map(print)
        | 'Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
           table='project:dataset.expeditions',
           # schema='peakid:STRING,route1:STRING,bcdate:DATETIME,pkname:STRING,heightm:INTEGER',
           method='FILE_LOADS',
           custom_gcs_temp_location='gs://bucket/folder/temp',
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)    
    )

这是我的结果

  • [{'peakid': 'ACHN', 'route1': '', 'bcdate': None, 'pkname': 'Aichyn', 'heightm': '6055'}]
  • [{'peakid': 'AGLE', 'route1': 'null', 'bcdate': None, 'pkname': 'Agole East', 'heightm': '6675'}]
  • [{'peakid': 'KCHS', 'route1': 'NW Ridge', 'bcdate': '2019-10-24', 'pkname': 'Kangchung Shar', 'heightm': '6063'}]
  • [{'peakid': 'LNAK', 'route1': 'SSE Ridge', 'bcdate': '2015-09-17', 'pkname': 'Lhonak', 'heightm': '6070'}]
  • [{'peakid': 'SPH1', 'route1': 'S Face', 'bcdate': '2017-04-14', 'pkname': 'Sharphu I', 'heightm': '6433'}]
  • ...

但是当我尝试下沉到 BigQuery 时出现错误: BigQuery 作业 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_329_215864ba592a2e01f0c4e2157cc60c47_3a904aab56c3444bb56bda650a7404b3 失败。错误结果: [在运行“写入 BigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs”时]

pandas google-bigquery google-cloud-dataflow apache-beam
2个回答
0
投票

您的

bcdate
在结果
YYYY-MM-DD
中具有以下日期格式
Dict
但在您的
Bigquery
架构和表格中您有一个
datetime
类型。

您可以检查文档以传递

datetime
的正确格式:https://cloud.google.com/bigquery/docs/reference/standard-sql/datetime_functions?hl=en

例如:

2017-05-26T00:00:00

您还可以在

beam
文档中找到信息:

https://beam.apache.org/documentation/io/built-in/google-bigquery/

例子:

bigquery_data = [{     
'string': 'abc',     
'bytes': base64.b64encode(b'\xab\xac'),     
'integer': 5,     
'float': 0.5,     
'numeric': Decimal('5'),     
'boolean': True,     
'timestamp': '2018-12-31 12:44:31.744957 UTC',     
'date': '2018-12-31',     
'time': '12:44:31',     
'datetime': '2018-12-31T12:44:31',     
'geography': 'POINT(30 10)'  
}]

0
投票

问题是 unknown_columns——pcollection 有未知列..对于文件加载,我们不能在 writetobigquery 中设置 ignore_unknown_columns=True

© www.soinside.com 2019 - 2024. All rights reserved.