如何使用 sqlalchemy 从 pandas 中的 sqlite db 检索数据?

问题描述 投票:0回答:1

我对Python不是很熟练,我有以下问题。 我正在尝试制作一个 python 脚本,在其中从 yhaoo Finance 下载一些数据,如果没有可用的数据库,则创建它,如果有,则上传尚不可用的行。 (增量过程)。目前,我正在尝试使用查询从数据库检索数据。 代码如下:

import pandas as pd
import yfinance as yf
import logging
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Table, MetaData

class DataDownloader():
        
    def __init__(self, ASSET='TSLA'):
        self.ASSET = ASSET

    def download_and_save_data(self,interval='1d'):

        period = '10y'
        if interval[-1] == 'h':
            period = '719d'

        logging.info("Data download in progress")

        file_dir = os.path.dirname(os.path.abspath(__file__))
        data_folder = 'staging1_download'
        file_path = os.path.join(file_dir, data_folder, f'data_ASSET_{self.ASSET}_interval_{interval}.txt')
        
        
        data = yf.download(tickers=self.ASSET, period=period, interval=interval)
        data.sort_index(ascending=True)
        


        if interval[-1] == 'h':
            data = data.reset_index()
            data['Date'] = data['Datetime']
            data['Date'] = data['Date'].astype(str)
            data['Date'] = data['Date'].str[:-6]
            data = data.set_index('Date')
            cols_to_delete = [
                'Adj Close', 
                'Datetime',
                ]
        else:
            cols_to_delete = [
            'Adj Close', 
            ]
        data = data.drop(columns=cols_to_delete)

        #managing last available date. We want candles when markets has generated them.
        last_date_available = data.index[-1]
        current_date = datetime.now() 

        if interval[-1] == 'd':
            current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
            if isinstance(last_date_available, str):
                last_date_available = datetime.strptime(last_date_available, '%Y-%m-%d')


        elif interval[-1] == 'h':
            current_date = current_date.replace(minute=0, second=0, microsecond=0)
            if isinstance(last_date_available, str):
                last_date_available = datetime.strptime(last_date_available, '%Y-%m-%d %H:%M:%S')

        
        #logging.info(f"data downloder last date available {last_date_available} and current {current_date}")
        #logging.info(f"Cond verifiaction {str(current_date) == last_date_available}")

        if current_date == last_date_available:
            data = data[:-1]
        elif current_date < last_date_available:
            raise Exception("Something is wrong with dates during download. Please verify with data provider.")


        data.to_csv(file_path, header=True, index=True, sep=";")


        #when we switch to db
        
        file_dir = os.path.dirname(os.path.abspath(__file__))
        data_folder = 'staging1_download\\'
        database_path = os.path.join(file_dir, data_folder )
        databasePathName = database_path + 'staging1_download_DailyData.sqlite'
        print(databasePathName)

        #data["interval"] = interval
        data["Asset"] = self.ASSET
        data["INTERVAL"] = interval

        if not os.path.isfile(databasePathName):

            if interval[-1] == 'd':
                engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
                data.to_sql("staging1_download_DailyData", con=engine, index=True)


        else:
            engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
            query = f""" 
            SELECT * 
            FROM staging1_download_DailyData
            WHERE ASSET={self.ASSET} AND INTERVAL={interval}
            """
            
            df = pd.read_sql(query, engine)
            print(df.shape)       
        

        
        logging.info("Data download executed")
        

脚本应该运行两次。 AT 首先将正确创建数据库。第二次应该打印 df.但没有打印 df 。相反,它打印了以下内容:

2024-02-16 23:48:27,522 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("
            SELECT *
            FROM staging1_download_DailyData
            WHERE ASSET=TSLA AND INTERVAL=1d
            ")
2024-02-16 23:48:27,524 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,526 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("
            SELECT *
            FROM staging1_download_DailyData
            WHERE ASSET=TSLA AND INTERVAL=1d
            ")
2024-02-16 23:48:27,528 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,531 INFO sqlalchemy.engine.Engine
            SELECT *
            FROM staging1_download_DailyData
            WHERE ASSET=TSLA AND INTERVAL=1d

2024-02-16 23:48:27,532 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:48:27,533 INFO sqlalchemy.engine.Engine ROLLBACK
2024-02-16 23:26:44,201 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-02-16 23:26:44,208 INFO sqlalchemy.engine.Engine ROLLBACK

这意味着什么?控制台中没有打印任何错误

python pandas sqlalchemy etl
1个回答
0
投票

我进行了以下修改以确保选择查询正常工作:

  • 我没有将
    engine
    直接传递给
    pd.read_sql()
    ,而是传递了使用
    con
    创建的连接对象 (
    engine.connect()
    )。
  • 我利用 SQLAlchemy 的
    text
    对象,通过使用
    from sqlalchemy import text
    导入它来创建 SQL select 语句。
  • 为了正确地将变量
    self.ASSET
    interval
    合并到查询中,我将它们用单引号引起来,因为它们需要被视为字符串。

这是解决问题的修改后的代码:

from sqlalchemy import create_engine, text
import pandas as pd

engine = create_engine(f'sqlite:///{databasePathName}', echo=True)
con = engine.connect()
query = text(f""" 
SELECT * 
FROM staging1_download_DailyData
WHERE ASSET='{self.ASSET}' AND INTERVAL='{interval}'
""")

df = pd.read_sql(query, con)
print(df.shape)
© www.soinside.com 2019 - 2024. All rights reserved.