bcdate好像出错了,我把它转换成正确的格式,但错误仍然存在,我的代码如下:
def transform_pandas(data):
import pandas as pd
import json
import datetime as dt
df = pd.DataFrame([data])
# Fill all columns with null if there is no data
columns = ['peakid', 'route1', 'bcdate', 'pkname', 'heightm']
df = df.reindex(columns, fill_value='null', axis=1)
df['bcdate'] = pd.to_datetime(df['bcdate'], errors='coerce').dt.strftime('%Y-%m-%d')
return json.loads(df.to_json(orient = 'records'))
和写入 BigQuery 的代码:
output = (
(input_p1, input_p2)
| 'Join' >> beam.CoGroupByKey()
| 'Final Dict' >> beam.Map(lambda el: to_final_dict(el[1])) >> it got a result here
| 'Transformation' >> beam.Map(transform_pandas) >> the error happen in here
| beam.Map(print)
| 'Write To BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table='project:dataset.expeditions',
# schema='peakid:STRING,route1:STRING,bcdate:DATETIME,pkname:STRING,heightm:INTEGER',
method='FILE_LOADS',
custom_gcs_temp_location='gs://bucket/folder/temp',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
这是我的结果
但是当我尝试下沉到 BigQuery 时出现错误:
BigQuery 作业 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_329_215864ba592a2e01f0c4e2157cc60c47_3a904aab56c3444bb56bda650a7404b3 失败。错误结果:
您的
bcdate
在结果YYYY-MM-DD
中具有以下日期格式Dict
但在您的Bigquery
架构和表格中您有一个datetime
类型。
您可以检查文档以传递
datetime
的正确格式:https://cloud.google.com/bigquery/docs/reference/standard-sql/datetime_functions?hl=en
例如:
2017-05-26T00:00:00
您还可以在
beam
文档中找到信息:
https://beam.apache.org/documentation/io/built-in/google-bigquery/
例子:
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
问题是 unknown_columns——pcollection 有未知列..对于文件加载,我们不能在 writetobigquery 中设置 ignore_unknown_columns=True