使用数据库连接的异步重试装饰器限制执行时间和尝试

问题描述 投票:0回答:0
import asyncio
import random
import time
from typing import Callable, Awaitable

class TooManyRetries(Exception):
    pass

# Decorator code

def retry_on_timeout(max_retries: int, timeout: float, retry_interval: float):
    
    def decorator(coro: Callable[[], Awaitable]):
        
        async def wrapper(*args, **kwargs):
           
            for retry_num in range(0, max_retries + 1):
                try:
                    # Start timing
                    start_time = time.time()
                    result = await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
                    # Calculate execution time
                    elapsed_time = time.time() - start_time

                    if elapsed_time <= timeout:
                        return result

                except asyncio.TimeoutError:
                    # Timeout handling, retry if the maximum number of retries is not reached
                    if retry_num < max_retries:
                        print(f"Function '{coro.__name__}' exceeded the timeout of {timeout} seconds. Retrying... (Retry {retry_num + 1})")
                        await asyncio.sleep(retry_interval)

                except Exception as e:
                    # Other exception handling, exit the retry loop directly
                    print(f"Function '{coro.__name__}' raised an exception: {e}")
                    break

            # Function reached the maximum number of retries, raise an exception
            raise TooManyRetries(f"Function '{coro.__name__}' exceeded the maximum number of retries ({max_retries}).")

        return wrapper
    return decorator

我写这个装饰器是希望限制执行时间和尝试次数。 我用它来限制与数据库的连接。 尽管对于简单的查询它仍然可以正常工作,但是对于耗时的查询它会失败并显示错误消息“raise RuntimeError”。

这是我连接数据库的代码。

class MySQLQuery:
    
        # Read the configuration file and get database connection information
        with open("config.json", 'r') as file:
            config = json.load(file)

        self.mysql_db = config['mysql']
        
        # Create and configure a logger
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.DEBUG)

        # Create a file handler and set the formatter
        file_handler = logging.FileHandler("mysql_query.log")
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)

        # Add the file handler to the logger
        self.logger.addHandler(file_handler)

    def print_db_name(self):
        print(list(self.mysql_db.keys()))

    def get_db_info(self, database: str) -> dict:
        
        # Retrieve the database connection information based on the provided database name
        database_info_result = self.mysql_db.get(database)
        return database_info_result
 
    def get_dataframe(self, database: str, queries: Union[str, List[str]]) -> Union[pd.DataFrame, dict]:

        if isinstance(queries, str):
            # If a single query is provided, execute it and return the DataFrame
            return self._execute_query(database, queries)
        elif isinstance(queries, list):
            # If a list of queries is provided, execute them and return the dictionary of DataFrames
            results = {}
            for i, query in enumerate(queries):
                result_df = self._execute_query(database, query)
                results[f"result{i+1}"] = result_df
            return results
        else:
            raise ValueError("Invalid input for 'queries'. It should be either a single SQL query (str) or a list of SQL queries (List[str]).")

    def _execute_query(self, database: str, query: str) -> pd.DataFrame:
        
        # The internal method to execute a single SQL query and return the result as a DataFrame
        info = self.get_db_info(database)
        host = info['host']
        user = info['user']
        password = info['pass']
        port = info['port']

        try:
            # Open a database connection
            cnx = mysql.connector.connect(host=host, user=user, password=password, port=port)

            # Create a cursor to execute the query
            cursor = cnx.cursor()

            # Execute the SQL query and fetch the result
            cursor.execute(query)
            result = cursor.fetchall()

            # Get column names
            column_names = [i[0] for i in cursor.description]

            # Convert the result to a DataFrame
            result_df = pd.DataFrame(result, columns=column_names)

        except Exception as e:
            self.logger.error(f"Connection failed: {e}")
            result_df = pd.DataFrame()  # Return an empty DataFrame if an error occurs

        finally:
            # Close the database connection
            cursor.close()
            cnx.close()
            self.logger.info("Connection ended")

        return result_df

我想知道如何成功编写代码,提供准确的数据并尊重装饰器施加的限制。 此外,我有兴趣学习如何将此装饰器应用于任何可能需要大量时间的代码。

python asynchronous decorator
© www.soinside.com 2019 - 2024. All rights reserved.