假设我在
table_name_a
中有一些样本数据如下:
code val_a val_b remark date
------------------------------------------
1 00001 500 0.1 111 20191108
2 00001 1000 0.2 222 20191109
3 00002 200 0.1 111 20191110
4 00002 400 0.3 222 20191111
5 00001 200 0.2 333 20191112
6 00001 400 0.1 444 20191113
如果我在Hbase上查询如下(该值来自
select code, val_a from table_name_a where remark = 111
):
get 'test_01', '00001-20191108','n:111_a'
我的预期输出如下:
code 111_a
---------------
1 00001 500
我只知道如何在Python(pandas)中像这样导入所有数据
from db_conn import impala, hbasecon
import numpy as np
import pandas as pd
def main():
conn_impa = impala().getcon()
sql = """ SELECT * FROM table_name_a """
df = pd.read_sql(sql=sql, con=conn_impa)
df = df.fillna("")
num = len(df)
if num > 0:
hfdtable = hbasecon(FEDMTABLE).gettable()
with hfdtable.batch(batch_size=1000) as b:
df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
df.apply(lambda row: b.put(row["val_a"], {'table_name_a:val_a': str(row["val_a"])}), axis=1)
df.apply(lambda row: b.put(row["val_b"], {'table_name_a:val_b': str(row["val_b"])}), axis=1)
df.apply(lambda row: b.put(row["date"], {'table_name_a:date': str(row["date"])}), axis=1)
df.apply(lambda row: b.put(row["remark"], {'table_name_a:remark': str(row["remark"])}), axis=1)
return True
if __name__ == '__main__':
main()
但我不知道如何用Python将数据导入到Hbase,因为Hbase需要
111_a
,即带有'111'val_a
的remark
非常感谢您的建议。
这是我的解决方案:
from db_conn import impala, hbasecon
import numpy as np
import pandas as pd
def main():
conn_impa = impala().getcon()
sql = """
SELECT code, date,
CASE WHEN t.remark='111' THEN t.val_a ELSE 0 END 111_a,
CASE WHEN t.remark='111' THEN t.val_b ELSE 0 END 111_b,
CASE WHEN t.remark='222' THEN t.val_a ELSE 0 END 222_a,
CASE WHEN t.remark='222' THEN t.val_b ELSE 0 END 222_b,
CASE WHEN t.remark='333' THEN t.val_a ELSE 0 END 333_a,
CASE WHEN t.remark='333' THEN t.val_b ELSE 0 END 333_b,
CASE WHEN t.remark='444' THEN t.val_a ELSE 0 END 444_a,
CASE WHEN t.remark='444' THEN t.val_b ELSE 0 END 444_b,
FROM table_name_a t
"""
df = pd.read_sql(sql=sql, con=conn_impa)
df = df.fillna("")
num = len(df)
if num > 0:
hfdtable = hbasecon(FEDMTABLE).gettable()
with hfdtable.batch(batch_size=1000) as b:
df.apply(lambda row: b.put(row["code"], {'table_name_a:code': str(row["code"])}), axis=1)
df.apply(lambda row: b.put(row["111_a"], {'table_name_a:111_a': str(row["111_a"])}), axis=1)
df.apply(lambda row: b.put(row["111_b"], {'table_name_a:111_b': str(row["111_b"])}), axis=1)
df.apply(lambda row: b.put(row["222_a"], {'table_name_a:222_a': str(row["222_a"])}), axis=1)
df.apply(lambda row: b.put(row["222_b"], {'table_name_a:222_b': str(row["222_b"])}), axis=1)
df.apply(lambda row: b.put(row["333_a"], {'table_name_a:333_a': str(row["333_a"])}), axis=1)
df.apply(lambda row: b.put(row["333_b"], {'table_name_a:333_b': str(row["333_b"])}), axis=1)
df.apply(lambda row: b.put(row["444_a"], {'table_name_a:444_a': str(row["444_a"])}), axis=1)
df.apply(lambda row: b.put(row["444_b"], {'table_name_a:444_b': str(row["444_b"])}), axis=1)
return True
if __name__ == '__main__':
main()