我在https://www.vinta.com.br/blog/etl-with-asyncio-asyncpg上找到了如何创建一个流程来从数据库获取数据(使用异步代码),以批量处理该数据使用多处理,然后插入回数据库(也是异步的)。
我的代码看起来像这样:
import asyncio
import aiosqlite
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from lingua import Language, LanguageDetectorBuilder
import ro_diacritics
import spacy
import json
import argostranslate.package
import argostranslate.translate
import functools
import datetime
import time
import re
import logging
import os
def initializize():
settings = dict()
settings['new_table_name'] = 'news2'
settings['old_table_name'] = 'news'
settings['remove_columns'] = ['resultTitleLemma', 'articleContentLemma']
settings['add_columns'] = [
'newErrorStatus', 'newErrorException', 'articleTextLenght',
'resultTitleToken', 'resultTitleLemma', 'resultTitlePos',
'resultExtractedTitleToken', 'resultExtractedTitleLemma', 'resultExtractedTitlePos',
'articleTitleToken', 'articleTitleLemma', 'articleTitlePos',
'articleTextToken', 'articleTextLemma', 'articleTextPos',
'resultTitleTranslated', 'resultExtractedTitleTranslated', 'articleTitleTranslated', 'articleTextTranslated',
'resultTitleTranslatedToken', 'resultTitleTranslatedLemma', 'resultTitleTranslatedPos',
'resultExtractedTitleTranslatedToken', 'resultExtractedTitleTranslatedLemma', 'resultExtractedTitleTranslatedPos',
'articleTitleTranslatedToken', 'articleTitleTranslatedLemma', 'articleTitleTranslatedPos',
'articleTextTranslatedToken', 'articleTextTranslatedLemma', 'articleTextTranslatedPos'
]
# check for title null or in blacklist
settings['title_blacklist'] = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
# lingua-language detector
#from lingua import Language, LanguageDetectorBuilder
settings['wanted_language'] = "ROMANIAN"
settings['language_detector'] = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
# restore diacritics
#import ro_diacritics
settings['columns_diacritics'] = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
# Spacy models for lemmatizing
#import spacy
#import json
# python -m spacy download ro_core_news_md
settings['spacy_model_ro'] = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
settings['lemmatizer_columns_ro'] = {
'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
}
# python -m spacy download en_core_web_md
settings['spacy_model_en'] = spacy.load("en_core_web_md", disable=['ner', 'parser'])
settings['lemmatizer_columns_en'] = {
'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
}
# Argostranslate from ro -> en
#import argostranslate.package
#import argostranslate.translate
settings['from_lang'] = 'ro'
settings['to_lang'] = 'en'
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
filter(
lambda x: x.from_code == settings['from_lang'] and x.to_code == settings['to_lang'], available_packages
)
)
argostranslate.package.install_from_path(package_to_install.download())
settings['translate_columns'] = {'resultTitle':'resultTitleTranslated',
'resultExtractedTitle':'resultExtractedTitleTranslated',
'articleTitle':'articleTitleTranslated',
'articleText':'articleTextTranslated'}
# Last check for matching
settings['check_match_columns'] = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
return settings
### Data functions for main_row()
def convert_row_dict(row):
return dict(row)
def remove_dict_columns(datadict, remove_columns):
if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
[datadict.pop(column, None) for column in remove_columns]
return datadict
def fix_accents_dict(datadict):
def fix_accents(s):
char_dict = { "º": "ș", "ª": "Ș", "ş": "ș", "Ş": "Ș", "ţ": "ț", "Ţ": "Ț", "þ": "ț", "Þ": "Ț", "ã": "ă" }
for k,v in char_dict.items():
s = s.replace(k, v)
return s
for item in datadict.keys():
if isinstance(datadict[item], str):
datadict[item] = fix_accents(datadict[item])
return datadict
def add_dict_columns(datadict, add_columns, default_value=None):
if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
[datadict.update({column: default_value}) for column in add_columns]
return datadict
def check_null_article_text(datadict):
if not datadict['articleText']:
datadict['newErrorStatus'] = str(3)
datadict['newErrorException'] = 'Content null'
raise Exception(datadict['newErrorException'])
return datadict
def calc_content_lenght(datadict):
datadict['articleTextLenght'] = str(len(datadict['articleText']))
return datadict
def check_extracted_title(datadict, titleBlackList = None):
if not datadict['resultExtractedTitle']:
datadict['newErrorStatus'] = str(4)
datadict['newErrorException'] = 'Page title null'
raise Exception(datadict['newErrorException'])
if not titleBlackList:
titleBlackList = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
if any([x in datadict['resultExtractedTitle'] for x in titleBlackList]):
datadict['newErrorStatus'] = str(4)
datadict['newErrorException'] = 'Title in blocked list'
raise Exception(datadict['newErrorException'])
return datadict
def check_md_website(datadict):
location_url = re.findall("(?:[a-zA-Z]*\.)+([a-zA-Z]+)(?:\/.*)?", str(datadict['articleUrl']))
if any(x == "md" for x in location_url):
datadict['newErrorStatus'] = str(5)
datadict['newErrorException'] = 'Website is from md'
raise Exception(datadict['newErrorException'])
return datadict
def check_language(datadict, wanted_language = 'ROMANIAN', language_detector = None):
if not language_detector:
language_detector = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
try:
articleLanguage = language_detector.detect_language_of(datadict['articleText']).name
except:
datadict['newErrorStatus'] = str(6)
datadict['newErrorException'] = f'Language could not be extracted'
raise Exception(datadict['newErrorException'])
else:
if articleLanguage != wanted_language:
datadict['newErrorStatus'] = str(6)
datadict['newErrorException'] = f'Language of article is probably not Romanian, it is {articleLanguage}'
raise Exception(datadict['newErrorException'])
return datadict
def restore_diacritics(datadict, columns_diacritics = None):
if not columns_diacritics:
columns_diacritics = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
diacticice = ["ă", "Ă", "î", "Î", "â", "Â", "ș", "Ș", "ț", "Ț"]
for column in columns_diacritics:
if not any(e in dict[column] for e in diacticice):
datadict[column] = ro_diacritics.restore_diacritics(datadict[column])
return datadict
def lemmatizer(datadict, columns, spacy_model):
'''
lemmatizer_columns_ro = {
'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
}
spacy_model_ro = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
lemmatizer_columns_en = {
'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
}
spacy_model_en = spacy.load("en_core_web_md", disable=['ner', 'parser'])
'''
for column in columns:
result_spacy = spacy_model(datadict[column])
for method in columns[column]:
datadict[columns[column][method]] = json.dumps([getattr(token, method) for token in result_spacy])
return datadict
def translate_lang_check(from_lang = None, to_lang = None):
if not from_lang: from_lang = 'ro'
if not to_lang: to_lang = 'en'
argostranslate.package.update_package_index()
available_packages = argostranslate.package.get_available_packages()
package_to_install = next(
filter(
lambda x: x.from_code == from_lang and x.to_code == to_lang, available_packages
)
)
argostranslate.package.install_from_path(package_to_install.download())
def translate(datadict, columns_dict_from_to = None, from_lang = None, to_lang = None, i = None, j = None):
if not i: i = '?'
if not j: j = '?'
if not columns_dict_from_to:
columns_dict_from_to = {'resultTitle':'resultTitleTranslated',
'resultExtractedTitle':'resultExtractedTitleTranslated',
'articleTitle':'articleTitleTranslated',
'articleText':'articleTextTranslated'}
if not from_lang: from_lang = 'ro'
if not to_lang: to_lang = 'en'
for column in columns_dict_from_to:
datadict[columns_dict_from_to[column]] = argostranslate.translate.translate(datadict[column], from_lang, to_lang)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished translating for {i}/{j}")
return datadict
def removeFromListSmallerNth(listStrings, noCharacters=3):
listStringsLower = [x.lower() for x in listStrings]
return set(filter(lambda i: len(i) >= noCharacters, listStringsLower))
def check_match_titles(datadict, columns_match = None):
if not columns_match:
columns_match = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
results = dict()
for column in columns_match:
results[column] = removeFromListSmallerNth(json.loads(datadict[column]))
matches = dict()
for column in results:
temp_results = results.copy()
temp_results.pop(column, None)
matches[column] = dict()
for temp_column in temp_results:
matches[column][temp_column] = len((results[column] & results[temp_column]))
if all([matches[column][value] for column in matches for value in matches[column]]) < 1:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - no match exists'
raise Exception(datadict['newErrorException'])
if sum([sum(matches[column].values()) for column in matches])/2 < 5:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - sum of all matches is smaller than 5'
raise Exception(datadict['newErrorException'])
if any([sum(matches[column].values()) for column in matches]) < 1:
datadict['newErrorStatus'] = str(8)
datadict['newErrorException'] = 'Article might not match with google title - sum of a group match is 0'
raise Exception(datadict['newErrorException'])
return datadict
### Data function for main()
def main_data(row_data, settings, i = None, j = None): #, table_connection, i, j
if not i: i = '?'
if not j: j = '?'
ii = '?'
ij = '?'
try:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {i}/{j}")
#row_data_dict = convert_row_dict(row_data)
row_data_dict = row_data
row_data_dict = remove_dict_columns(row_data_dict, settings['remove_columns'])
row_data_dict = fix_accents_dict(row_data_dict)
row_data_dict = add_dict_columns(row_data_dict, settings['add_columns'], default_value=None)
row_data_dict = check_null_article_text(row_data_dict)
#check error
row_data_dict = calc_content_lenght(row_data_dict)
row_data_dict = check_extracted_title(row_data_dict, settings['title_blacklist'])
#check error
row_data_dict = check_md_website(row_data_dict)
#check error
row_data_dict = check_language(row_data_dict, settings['wanted_language'], settings['language_detector'])
#check error
row_data_dict = restore_diacritics(row_data_dict, settings['columns_diacritics'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting ro lemmatizer for {i}/{j}")
row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_ro'], settings['spacy_model_ro'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting translating for {i}/{j}")
row_data_dict = translate(row_data_dict, settings['translate_columns'], settings['from_lang'], settings['to_lang'], i, j)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting en lemmatizer for {i}/{j}")
row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_en'], settings['spacy_model_en'])
row_data_dict = check_match_titles(row_data_dict, settings['check_match_columns'])
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finisted tasks for {i}/{j}")
except Exception as e:
if not row_data_dict['newErrorStatus']:
row_data_dict['newErrorStatus'] = str(0)
if not row_data_dict['newErrorException']:
row_data_dict['newErrorException'] = str(e)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Error task for {i}/{j} - {row_data_dict['newErrorStatus']} / {row_data_dict['newErrorException']}")
finally:
#await insert_db(settings['new_table_name'], table_connection, row_data_dict)
return row_data_dict
def transform(batch, settings, len_db):
logging.info(f"{datetime.datetime.now()} - Started transform")
transformed_batch = []
logging.info(f'Len batch = {len(batch)}')
for irecord, record in enumerate(batch, start=1):
transformed_batch.append(list(main_data(record, settings, irecord, len_db).values()))
logging.info(f"{datetime.datetime.now()} - Ended transform")
return transformed_batch
async def load(batch, connection, settings):
logging.info(f"{datetime.datetime.now()} - Started load")
sqlstring = f'INSERT INTO {settings["new_table_name"]} VALUES (' + ','.join(['?'] * len(batch[0])) + ')'
await connection.executemany(sqlstring, batch)
await connection.commit()
logging.info(f"{datetime.datetime.now()} - Ended load")
async def task_set_load_helper(task_set, connection, settings):
logging.info(f"{datetime.datetime.now()} - Started task_set_load_helper")
for future in task_set:
await load( await future, connection, settings)
logging.info(f"{datetime.datetime.now()} - Ended task_set_load_helper")
async def get_len_db(connection, table_name):
async with connection.execute(f'SELECT * FROM {table_name}') as cursor:
return len( await cursor.fetchall())
async def consumer(loop, pool, queue, db_filename, settings):
logging.info(f"{datetime.datetime.now()} - Started consumer")
connection = await get_aiosqlite_connection(db_filename)
len_db = await get_len_db(connection, settings["old_table_name"])
task_set = set()
batch = []
while True:
record = await queue.get()
if record is not None:
record = dict(record)
batch.append(record)
if queue.empty():
task = loop.run_in_executor(pool, transform, batch, settings, len_db)
task_set.add(task)
if len(task_set) >= pool._max_workers:
done_set, task_set = await asyncio.wait(
task_set, return_when=asyncio.FIRST_COMPLETED
)
logging.info(f"{datetime.datetime.now()} - Start task_set_load_helper #1")
#for f in done_set:
# await load(f, connection, settings)
await task_set_load_helper(done_set, connection, settings)
batch = []
if record is None:
break
if task_set:
logging.info(f"{datetime.datetime.now()} - Start task_set_load_helper #2")
await task_set_load_helper(
asyncio.as_completed(task_set), connection, settings
)
await connection.close()
logging.info(f"{datetime.datetime.now()} - Ended consumer")
async def get_aiosqlite_connection(db_filename):
logging.info(f"{datetime.datetime.now()} - Started get_aiosqlite_connection")
connection = await aiosqlite.connect(db_filename)
logging.info(f"{datetime.datetime.now()} - Ended get_aiosqlite_connection")
return connection
async def extract(connection, settings):
logging.info(f"{datetime.datetime.now()} - Started extract")
connection.row_factory = aiosqlite.Row
async with connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
async for record in cursor:
yield record
logging.info(f"{datetime.datetime.now()} - Ended extract")
async def producer(queue, db_filename, settings):
logging.info(f"{datetime.datetime.now()} - Started producer")
connection = await get_aiosqlite_connection(db_filename)
async for record in extract(connection, settings):
await queue.put(record)
await queue.put(None)
await connection.close()
logging.info(f"{datetime.datetime.now()} - Ended producer")
async def etl(db_filename, settings):
logging.info(f"{datetime.datetime.now()} - Started etl")
with ProcessPoolExecutor(
max_workers=multiprocessing.cpu_count(),
) as pool:
loop = asyncio.get_running_loop()
queue = asyncio.Queue(maxsize=8)
await asyncio.gather(
asyncio.create_task(producer(queue, db_filename, settings)),
asyncio.create_task(consumer(loop, pool, queue, db_filename, settings)),
return_exceptions=False,
)
logging.info(f"{datetime.datetime.now()} - Ended etl")
### Database insert for main()
async def insert_db(table_name, table_connection, row_data_dict):
sqlstring = f'INSERT INTO {table_name} (' + ','.join(row_data_dict.keys()) +') values(' + ','.join(['?'] * len(list(row_data_dict.values()))) + ')'
await table_connection.execute(sqlstring, list(row_data_dict.values()))
await table_connection.commit()
### Database functions for main_db()
async def create_db_table_from_table(new_table_name, table_cursor, table_connection, old_table_name = None):
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Create new table {new_table_name}")
await table_cursor.execute(f"DROP TABLE IF EXISTS {new_table_name}")
await table_connection.commit()
if not old_table_name:
await table_cursor.execute(f"CREATE TABLE {new_table_name}")
await table_connection.commit()
else:
await table_cursor.execute(f"CREATE TABLE {new_table_name} AS SELECT * FROM {old_table_name} WHERE 0")
await table_connection.commit()
async def remove_db_columns(table_name, table_cursor, table_connection, remove_columns):
if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
for remove_column in remove_columns:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Remove column {remove_column}")
await table_cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN {remove_column}")
await table_connection.commit()
async def add_db_columns(table_name, table_cursor, table_connection, add_columns):
if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
for new_column in add_columns:
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Add column {new_column}")
await table_cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {new_column} TEXT")
await table_connection.commit()
async def get_no_rows_db(db_table_connection, table_name):
async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
return len( await cursor.fetchall())
async def get_columns_table(db_table_connection, table_name):
async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
return [description[0] for description in cursor.description]
### Database initialization for main()
async def main_db(db_table_cursor, db_table_connection, settings):
await create_db_table_from_table(settings['new_table_name'], db_table_cursor, db_table_connection, settings['old_table_name'])
await remove_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['remove_columns'])
await add_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['add_columns'])
no_rows_old_table = await get_no_rows_db(db_table_connection, settings['old_table_name'])
#new_table_columns = await get_columns_table(db_table_connection, settings['new_table_name'])
return no_rows_old_table
### main loop
async def main(db_filename, ii, ij):
start = time.monotonic()
# Initialize parameters
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize parameters for {db_filename}")
settings = initializize()
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize parameters for {db_filename}")
# Connect to db and modify db (asyncio IO)
async with aiosqlite.connect(db_filename) as db_table_connection:
db_table_cursor = await db_table_connection.cursor()
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize database for {db_filename}")
#no_rows_old_table, new_table_columns = await main_db(db_table_cursor, db_table_connection, settings)
j = await main_db(db_table_cursor, db_table_connection, settings)
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize database for {db_filename}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {db_filename}")
await etl(db_filename, settings)
#await asyncio.run(etl(db_filename, settings))
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished tasks for {db_filename}")
### startup
if __name__ == "__main__":
program_start_time = datetime.datetime.now()
logging.basicConfig(level=logging.INFO,
handlers=[
logging.FileHandler(f'debug_correct_translate_lemma_{program_start_time.strftime("%Y_%m_%d_%H_%M")}.txt',
'a', 'utf-8'), logging.StreamHandler()])
logging.info(f"{datetime.datetime.now()} - {__file__} started at {program_start_time}")
#linux mypath = '/media/vlad/HDD1000/disertatie/extracted_original_1/test1'
#mypath = 'C:/Users/Vlad/PycharmProjects/newsScrapping/database'
mypath = 'D:/disertatie/cod/newsScrapping/database'
filenames = next(os.walk(mypath), (None, None, []))[2] # [] if no file
filepaths = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(mypath) for f in filenames]
dbfiles = filepaths
#linux dbfiles = ["/home/vlad/Documents/newsScrapping/news_database_2010-01-01-2023-06-01_creditbancar.db"]
ii = 0
ij = len(dbfiles)
for dbfile in dbfiles:
ii += 1
keyword_start_time = datetime.datetime.now()
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
asyncio.run(main(dbfile, ii, ij))
keyword_end_time = datetime.datetime.now()
keyword_elapsed_time = keyword_end_time - keyword_start_time
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} ended at {keyword_end_time}")
logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} executed in {keyword_elapsed_time}")
program_end_time = datetime.datetime.now()
program_elapsed_time = program_end_time - program_start_time
logging.info(f"{datetime.datetime.now()} - program started at {program_start_time}")
logging.info(f"{datetime.datetime.now()} - program ended at {program_end_time}")
logging.info(f"{datetime.datetime.now()} - program executed in {program_elapsed_time}")
问题是它向我抛出数据库锁定错误,我不知道如何处理它。
INFO:root:2023-11-10 12:23:09.780805 - Started load
Traceback (most recent call last):
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 560, in <module>
asyncio.run(main(dbfile, ii, ij))
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 529, in main
await etl(db_filename, settings)
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 434, in etl
await asyncio.gather(
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 388, in consumer
await task_set_load_helper(done_set, connection, settings)
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 357, in task_set_load_helper
await load( await future, connection, settings)
File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 350, in load
await connection.commit()
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 166, in commit
await self._execute(self._conn.commit)
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 133, in _execute
return await future
^^^^^^^^^^^^
File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 106, in run
result = function()
^^^^^^^^^^
sqlite3.OperationalError: database is locked
如果你想测试数据库drive
我尝试修改代码以实现第二个用于插入的任务池,但我无法使其工作。
所指出的问题反映在您的代码中:您正在打开数据库两次,一次用于
producer
,一次用于consumer
。 Windows 文件和 SQLite 都需要一个连接,这对于您的程序来说就足够了。
我刚刚看到,这是一个巨大的代码库,实际上甚至不止一个对
aiosqlite.connect
调用的调用。
重新排列代码,以便将在
db_table_connection
中创建的 main
直接传递到 consumer
和 producer
任务,而不是 db_filename
并在 get_aiosqlite_connection
函数中重新连接。 (您可以简单地更改该函数以将连接作为全局资源返回,而不是再次调用 .connect
) - 并使用“光标”(检查 https://docs.python.org/3/library/ sqlite3.html,但 aiosqlite 应该处理这个问题),以确保您不会在不相关的地方从查询中获取结果。