我正在使用 ClickHouseHook 将数据插入数据库。
from airflow_clickhouse_plugin.hooks.clickhouse import ClickHouseHook
ch_hook = ClickHouseHook(clickhouse_conn_id=connections_name)
def update_replacingmergetree(ch_hook, table_name: str, df: pd.DataFrame):
values = tuple(df.to_records(index=False))
ch_hook.execute(f'INSERT INTO do_you.{table_name} VALUES', (v for v in values))
我的函数工作正常,然后 DataFrame 不包含日期。 但然后我传递这样的数据(带有值的元组的一部分):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 18 non-null string
1 s_id 18 non-null int32
2 month_date 18 non-null datetime64[ns]
3 sum 18 non-null int32
id | s_id | 月份_日期 | 总和 | |
---|---|---|---|---|
0 | 265609333301876169530781520669667823585 | 84 | 2024-01-01 00:00:00 | 100 |
我收到错误:
TypeError: unsupported operand type(s) for &: 'str' and 'int'
为了避免这个错误,我需要将 id 转换为 int128,但我不能这样做,因为 pandas 或 numpy 没有 int128 类型。
如何将int128值放入Clickhouse?
我用此代码解决了问题,并将其应用于具有大数字的每一列:
# column must containg 'string' type, not 'object'
df = df.astype({'column': 'string'}
# converting to int128 (in df.info() i still see 'string' type
df[column] = df[column].apply(lambda x: int(x, 10))
# but in tuple with values I will have int128 type
values = tuple(df.to_records(index=False))
完整代码:
def update_replacingmergetree(ch_hook, table_name: str, df: pd.DataFrame):
df = df.astype({'column': 'string'}
df['column'] = df['column'].apply(lambda x: int(x, 10))
values = tuple(df.to_records(index=False))
ch_hook.execute(f'INSERT INTO do_you.{table_name} VALUES', (v for v in values))