我正在使用
dask
连接到 AWS Redshift 并查询数据库。尝试将连接字符串传递给 read_sql_query
方法时遇到错误。
# connect to aws redshift cluster
import redshift_connector
conn = redshift_connector.connect(
host=os.environ['host'],
database=os.environ['database'],
port=int(os.environ['port']),
user=os.environ['user'],
password=os.environ['password']
)
import sqlalchemy as sa
conn_str = f'redshift+redshift_connector://{user}:{password}@{host}:{port}/{database}'
# dask
import dask.dataframe as dd
"redshift+redshift_connector://('user',):pwd@hostname,):('5439',)/('tracking',)"
# Query table using dask dataframe
query = '''
SELECT *
FROM tbl
WHERE type = 'xxx'
and created_at >= '2023-01-01 00:00:00'
and created_at <= '2023-12-01 00:00:00'
'''
df = dd.read_sql_query(query, conn_str, index_col = 'id')
ValueError: invalid literal for int() with base 10: "('5439',)"
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File <command-2539550446659032>:10
1 # Query table using dask dataframe
2 query = '''
3 SELECT *
4 FROM pmf
(...)
7 and created_at <= '2023-12-01 00:00:00'
8 '''
---> 10 df = dd.read_sql_query(query, conn_str, index_col = 'id')
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/dask/dataframe/io/sql.py:107, in read_sql_query(sql, con, index_col, divisions, npartitions, limits, bytes_per_chunk, head_rows, meta, engine_kwargs, **kwargs)
104 raise TypeError("Must supply either 'divisions' or 'npartitions', not both")
106 engine_kwargs = {} if engine_kwargs is None else engine_kwargs
--> 107 engine = sa.create_engine(con, **engine_kwargs)
我试图通过
port
作为int
和str
。我如何连接到 aws redshift 并在 dask 中运行查询?
这个连接字符串
"redshift+redshift_connector://('user',):pwd@hostname,):('5439',)/('tracking',)"
确实看起来不对!也许应该是
"redshift+redshift_connector://user:pwd@hostname:5439/tracking"
似乎传递给 f 字符串的所有变量都是元组而不是简单值 (str/int)。由于您没有说明如何指定这些值,因此很难提供帮助,但它可能就像代码中的杂散逗号一样简单。