import asyncio
import random
import time
from typing import Callable, Awaitable
class TooManyRetries(Exception):
pass
# Decorator code
def retry_on_timeout(max_retries: int, timeout: float, retry_interval: float):
def decorator(coro: Callable[[], Awaitable]):
async def wrapper(*args, **kwargs):
for retry_num in range(0, max_retries + 1):
try:
# Start timing
start_time = time.time()
result = await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
# Calculate execution time
elapsed_time = time.time() - start_time
if elapsed_time <= timeout:
return result
except asyncio.TimeoutError:
# Timeout handling, retry if the maximum number of retries is not reached
if retry_num < max_retries:
print(f"Function '{coro.__name__}' exceeded the timeout of {timeout} seconds. Retrying... (Retry {retry_num + 1})")
await asyncio.sleep(retry_interval)
except Exception as e:
# Other exception handling, exit the retry loop directly
print(f"Function '{coro.__name__}' raised an exception: {e}")
break
# Function reached the maximum number of retries, raise an exception
raise TooManyRetries(f"Function '{coro.__name__}' exceeded the maximum number of retries ({max_retries}).")
return wrapper
return decorator
我写这个装饰器是希望限制执行时间和尝试次数。 我用它来限制与数据库的连接。 尽管对于简单的查询它仍然可以正常工作,但是对于耗时的查询它会失败并显示错误消息“raise RuntimeError”。
这是我连接数据库的代码。
class MySQLQuery:
# Read the configuration file and get database connection information
with open("config.json", 'r') as file:
config = json.load(file)
self.mysql_db = config['mysql']
# Create and configure a logger
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
# Create a file handler and set the formatter
file_handler = logging.FileHandler("mysql_query.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# Add the file handler to the logger
self.logger.addHandler(file_handler)
def print_db_name(self):
print(list(self.mysql_db.keys()))
def get_db_info(self, database: str) -> dict:
# Retrieve the database connection information based on the provided database name
database_info_result = self.mysql_db.get(database)
return database_info_result
def get_dataframe(self, database: str, queries: Union[str, List[str]]) -> Union[pd.DataFrame, dict]:
if isinstance(queries, str):
# If a single query is provided, execute it and return the DataFrame
return self._execute_query(database, queries)
elif isinstance(queries, list):
# If a list of queries is provided, execute them and return the dictionary of DataFrames
results = {}
for i, query in enumerate(queries):
result_df = self._execute_query(database, query)
results[f"result{i+1}"] = result_df
return results
else:
raise ValueError("Invalid input for 'queries'. It should be either a single SQL query (str) or a list of SQL queries (List[str]).")
def _execute_query(self, database: str, query: str) -> pd.DataFrame:
# The internal method to execute a single SQL query and return the result as a DataFrame
info = self.get_db_info(database)
host = info['host']
user = info['user']
password = info['pass']
port = info['port']
try:
# Open a database connection
cnx = mysql.connector.connect(host=host, user=user, password=password, port=port)
# Create a cursor to execute the query
cursor = cnx.cursor()
# Execute the SQL query and fetch the result
cursor.execute(query)
result = cursor.fetchall()
# Get column names
column_names = [i[0] for i in cursor.description]
# Convert the result to a DataFrame
result_df = pd.DataFrame(result, columns=column_names)
except Exception as e:
self.logger.error(f"Connection failed: {e}")
result_df = pd.DataFrame() # Return an empty DataFrame if an error occurs
finally:
# Close the database connection
cursor.close()
cnx.close()
self.logger.info("Connection ended")
return result_df
我想知道如何成功编写代码,提供准确的数据并尊重装饰器施加的限制。 此外,我有兴趣学习如何将此装饰器应用于任何可能需要大量时间的代码。