我对Python不是很熟练,我有以下问题。 我正在尝试制作一个 python 脚本,在其中从 yhaoo Finance 下载一些数据,如果没有可用的数据库,则创建它,如果有,则上传尚不可用的行。 (增量过程)。目前,我正在尝试使用查询从数据库检索数据。 代码如下:
import pandas as pd
import yfinance as yf
import logging
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Table, MetaData
class DataDownloader():
def __init__(self, ASSET='TSLA'):
self.ASSET = ASSET
def download_and_save_data(self,interval='1d'):
period = '10y'
if interval[-1] == 'h':
period = '719d'
logging.info("Data download in progress")
file_dir = os.path.dirname(os.path.abspath(__file__))
data_folder = 'staging1_download'
file_path = os.path.join(file_dir, data_folder, f'data_ASSET_{self.ASSET}_interval_{interval}.txt')
data = yf.download(tickers=self.ASSET, period=period, interval=interval)
data.sort_index(ascending=True)
if interval[-1] == 'h':
data = data.reset_index()
data['Date'] = data['Datetime']
data['Date'] = data['Date'].astype(str)
data['Date'] = data['Date'].str[:-6]
data = data.set_index('Date')
cols_to_delete = [
'Adj Close',
'Datetime',
]
else:
cols_to_delete = [
'Adj Close',
]
data = data.drop(columns=cols_to_delete)
#managing last available date. We want candles when markets has generated them.
last_date_available = data.index[-1]
current_date = datetime.now()
if interval[-1] == 'd':
current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
if isinstance(last_date_available, str):
last_date_available = datetime.strptime(last_date_available, '%Y-%m-%d')
elif interval[-1] == 'h':
current_date = current_date.replace(minute=0, second=0, microsecond=0)
if isinstance(last_date_available, str):
last_date_available = datetime.strptime(last_date_available, '%Y-%m-%d %H:%M:%S')
#logging.info(f"data downloder last date available {last_date_available} and current {current_date}")
#logging.info(f"Cond verifiaction {str(current_date) == last_date_available}")
if current_date == last_date_available:
data = data[:-1]
elif current_date < last_date_available:
raise Exception("Something is wrong with dates during download. Please verify with data provider.")
data.to_csv(file_path, header=True, index=True, sep=";")
#when we switch to db
file_dir = os.path.dirname(os.path.abspath(__file__))
data_folder = 'staging1_download\\'
database_path = os.path.join(file_dir, data_folder )
databasePathName = database_path + 'staging1_download_DailyData.sqlite'
print(databasePathName)
#data["interval"] = interval
data["Asset"] = self.ASSET
data["INTERVAL"] = interval
if not os.path.isfile(databasePathName):
if interval[-1] == 'd':
engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
data.to_sql("staging1_download_DailyData", con=engine, index=True)
else:
engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
query = f"""
SELECT *
FROM staging1_download_DailyData
WHERE ASSET={self.ASSET} AND INTERVAL={interval}
"""
df = pd.read_sql(query, engine)
print(df.shape)
logging.info("Data download executed")
脚本应该运行两次。 AT 首先将正确创建数据库。第二次应该打印 df.但没有打印 df 。相反,它打印了以下内容:
2024-02-16 23:48:27,522 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("
SELECT *
FROM staging1_download_DailyData
WHERE ASSET=TSLA AND INTERVAL=1d
")
2024-02-16 23:48:27,524 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,526 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("
SELECT *
FROM staging1_download_DailyData
WHERE ASSET=TSLA AND INTERVAL=1d
")
2024-02-16 23:48:27,528 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,531 INFO sqlalchemy.engine.Engine
SELECT *
FROM staging1_download_DailyData
WHERE ASSET=TSLA AND INTERVAL=1d
2024-02-16 23:48:27,532 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,533 INFO sqlalchemy.engine.Engine ROLLBACK
2024-02-16 23:26:44,201 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:26:44,208 INFO sqlalchemy.engine.Engine ROLLBACK
这意味着什么?控制台中没有打印任何错误
我进行了以下修改以确保选择查询正常工作:
engine
直接传递给 pd.read_sql()
,而是传递了使用 con
创建的连接对象 (engine.connect()
)。text
对象,通过使用 from sqlalchemy import text
导入它来创建 SQL select 语句。self.ASSET
和 interval
合并到查询中,我将它们用单引号引起来,因为它们需要被视为字符串。这是解决问题的修改后的代码:
from sqlalchemy import create_engine, text
import pandas as pd
engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
con = engine.connect()
query = text(f"""
SELECT *
FROM staging1_download_DailyData
WHERE ASSET='{self.ASSET}' AND INTERVAL='{interval}'
""")
df = pd.read_sql(query, con)
print(df.shape)