异步和多处理 ETL 流程 - 数据库锁

问题描述 投票:0回答:1

我在https://www.vinta.com.br/blog/etl-with-asyncio-asyncpg上找到了如何创建一个流程来从数据库获取数据(使用异步代码),以批量处理该数据使用多处理,然后插入回数据库(也是异步的)。

我的代码看起来像这样:

import asyncio
import aiosqlite

import multiprocessing
from concurrent.futures import ProcessPoolExecutor

from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

from lingua import Language, LanguageDetectorBuilder
import ro_diacritics

import spacy
import json

import argostranslate.package
import argostranslate.translate

import functools
import datetime
import time
import re
import logging
import os


def initializize():

    settings = dict()

    settings['new_table_name'] = 'news2'

    settings['old_table_name'] = 'news'

    settings['remove_columns'] = ['resultTitleLemma', 'articleContentLemma']

    settings['add_columns'] = [
        'newErrorStatus', 'newErrorException', 'articleTextLenght', 
        'resultTitleToken', 'resultTitleLemma', 'resultTitlePos', 
        'resultExtractedTitleToken', 'resultExtractedTitleLemma', 'resultExtractedTitlePos', 
        'articleTitleToken', 'articleTitleLemma', 'articleTitlePos',
        'articleTextToken', 'articleTextLemma', 'articleTextPos', 
        'resultTitleTranslated', 'resultExtractedTitleTranslated', 'articleTitleTranslated', 'articleTextTranslated',
        'resultTitleTranslatedToken', 'resultTitleTranslatedLemma', 'resultTitleTranslatedPos', 
        'resultExtractedTitleTranslatedToken', 'resultExtractedTitleTranslatedLemma', 'resultExtractedTitleTranslatedPos', 
        'articleTitleTranslatedToken', 'articleTitleTranslatedLemma', 'articleTitleTranslatedPos', 
        'articleTextTranslatedToken', 'articleTextTranslatedLemma', 'articleTextTranslatedPos'
        ]
    
    # check for title null or in blacklist
    settings['title_blacklist'] = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']

    # lingua-language detector
    #from lingua import Language, LanguageDetectorBuilder
    settings['wanted_language'] = "ROMANIAN"
    settings['language_detector'] = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()

    # restore diacritics
    #import ro_diacritics
    settings['columns_diacritics'] = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']

    # Spacy models for lemmatizing
    #import spacy
    #import json
    # python -m spacy download ro_core_news_md
    settings['spacy_model_ro'] = spacy.load("ro_core_news_md", disable=['ner', 'parser'])

    settings['lemmatizer_columns_ro'] = {
        'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
        'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
        'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
        'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
    }

    # python -m spacy download en_core_web_md
    settings['spacy_model_en'] = spacy.load("en_core_web_md", disable=['ner', 'parser'])

    settings['lemmatizer_columns_en'] = {
        'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
        'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
        'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
        'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
    }

    # Argostranslate from ro -> en
    #import argostranslate.package
    #import argostranslate.translate

    settings['from_lang'] = 'ro'
    settings['to_lang'] = 'en'

    argostranslate.package.update_package_index()
    available_packages = argostranslate.package.get_available_packages()
    package_to_install = next(
        filter(
            lambda x: x.from_code == settings['from_lang'] and x.to_code == settings['to_lang'], available_packages
        )
    )
    argostranslate.package.install_from_path(package_to_install.download())

    settings['translate_columns'] = {'resultTitle':'resultTitleTranslated', 
                        'resultExtractedTitle':'resultExtractedTitleTranslated', 
                        'articleTitle':'articleTitleTranslated', 
                        'articleText':'articleTextTranslated'}
    
    # Last check for matching 
    settings['check_match_columns'] = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']

    return settings



### Data functions for main_row()

def convert_row_dict(row):
    return dict(row)


def remove_dict_columns(datadict, remove_columns):
    if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
    [datadict.pop(column, None) for column in remove_columns]
    return datadict


def fix_accents_dict(datadict):
    def fix_accents(s):
        char_dict = { "º": "ș", "ª": "Ș", "ş": "ș", "Ş": "Ș", "ţ": "ț", "Ţ": "Ț", "þ": "ț", "Þ": "Ț", "ã": "ă"  }
        for k,v in char_dict.items():
            s = s.replace(k, v)
        return s
    for item in datadict.keys():
        if isinstance(datadict[item], str):
            datadict[item] = fix_accents(datadict[item])
    return datadict


def add_dict_columns(datadict, add_columns, default_value=None):
    if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
    [datadict.update({column: default_value}) for column in add_columns]
    return datadict


def check_null_article_text(datadict):
    if not datadict['articleText']:
        datadict['newErrorStatus'] = str(3)
        datadict['newErrorException'] = 'Content null'
        raise Exception(datadict['newErrorException'])
    return datadict


def calc_content_lenght(datadict):
    datadict['articleTextLenght'] = str(len(datadict['articleText']))
    return datadict


def check_extracted_title(datadict, titleBlackList = None):
    if not datadict['resultExtractedTitle']:
        datadict['newErrorStatus'] = str(4)
        datadict['newErrorException'] = 'Page title null'
        raise Exception(datadict['newErrorException'])
    
    if not titleBlackList:
        titleBlackList = ['nainte de a continua', 'One moment, please', 'Before you continue', 'tiri Google']
    if any([x in datadict['resultExtractedTitle'] for x in titleBlackList]):
        datadict['newErrorStatus'] = str(4)
        datadict['newErrorException'] = 'Title in blocked list'
        raise Exception(datadict['newErrorException'])
    return datadict


def check_md_website(datadict):
    location_url = re.findall("(?:[a-zA-Z]*\.)+([a-zA-Z]+)(?:\/.*)?", str(datadict['articleUrl']))
    if any(x == "md" for x in location_url):
        datadict['newErrorStatus'] = str(5)
        datadict['newErrorException'] = 'Website is from md'
        raise Exception(datadict['newErrorException'])
    return datadict


def check_language(datadict, wanted_language = 'ROMANIAN', language_detector = None):
    if not language_detector:
        language_detector = LanguageDetectorBuilder.from_all_languages_without(Language.TAGALOG, Language.TSONGA, Language.YORUBA, Language.LATIN).with_preloaded_language_models().build()
    try:
        articleLanguage = language_detector.detect_language_of(datadict['articleText']).name
    except:
        datadict['newErrorStatus'] = str(6)
        datadict['newErrorException'] = f'Language could not be extracted'
        raise Exception(datadict['newErrorException'])
    else:
        if articleLanguage != wanted_language:
            datadict['newErrorStatus'] = str(6)
            datadict['newErrorException'] = f'Language of article is probably not Romanian, it is {articleLanguage}'
            raise Exception(datadict['newErrorException'])
    return datadict
        

def restore_diacritics(datadict, columns_diacritics = None):
    if not columns_diacritics:
        columns_diacritics = ['resultTitle', 'resultExtractedTitle', 'articleTitle', 'articleText']
    diacticice = ["ă", "Ă", "î", "Î", "â", "Â", "ș", "Ș", "ț", "Ț"]
    for column in columns_diacritics:
        if not any(e in dict[column] for e in diacticice):
            datadict[column] = ro_diacritics.restore_diacritics(datadict[column])
    return datadict


def lemmatizer(datadict, columns, spacy_model):
    '''
    lemmatizer_columns_ro = {
        'resultTitle': {'text':'resultTitleToken', 'lemma_':'resultTitleLemma', 'pos_':'resultTitlePos'},
        'resultExtractedTitle': {'text':'resultExtractedTitleToken', 'lemma_':'resultExtractedTitleLemma', 'pos_':'resultExtractedTitlePos'},
        'articleTitle': {'text':'articleTitleToken', 'lemma_':'articleTitleLemma', 'pos_':'articleTitlePos'},
        'articleText': {'text':'articleTextToken', 'lemma_':'articleTextLemma', 'pos_':'articleTextPos'},
    }
    spacy_model_ro = spacy.load("ro_core_news_md", disable=['ner', 'parser'])
    lemmatizer_columns_en = {
        'resultTitleTranslated': {'text':'resultTitleTranslatedToken', 'lemma_':'resultTitleTranslatedLemma', 'pos_':'resultTitleTranslatedPos'},
        'resultExtractedTitleTranslated': {'text':'resultExtractedTitleTranslatedToken', 'lemma_':'resultExtractedTitleTranslatedLemma', 'pos_':'resultExtractedTitleTranslatedPos'},
        'articleTitleTranslated': {'text':'articleTitleTranslatedToken', 'lemma_':'articleTitleTranslatedLemma', 'pos_':'articleTitleTranslatedPos'},
        'articleTextTranslated': {'text':'articleTextTranslatedToken', 'lemma_':'articleTextTranslatedLemma', 'pos_':'articleTextTranslatedPos'},
    }
    spacy_model_en = spacy.load("en_core_web_md", disable=['ner', 'parser'])
    '''
    for column in columns:
        result_spacy =  spacy_model(datadict[column])
        for method in columns[column]:
            datadict[columns[column][method]] = json.dumps([getattr(token, method) for token in result_spacy])
    return datadict


def translate_lang_check(from_lang = None, to_lang = None):
    if not from_lang: from_lang = 'ro'
    if not to_lang: to_lang = 'en'
    argostranslate.package.update_package_index()
    available_packages = argostranslate.package.get_available_packages()
    package_to_install = next(
        filter(
            lambda x: x.from_code == from_lang and x.to_code == to_lang, available_packages
        )
    )
    argostranslate.package.install_from_path(package_to_install.download())


def translate(datadict, columns_dict_from_to = None, from_lang = None, to_lang = None, i = None, j = None):
    if not i: i = '?'
    if not j: j = '?'
    if not columns_dict_from_to:
        columns_dict_from_to = {'resultTitle':'resultTitleTranslated', 
                                'resultExtractedTitle':'resultExtractedTitleTranslated', 
                                'articleTitle':'articleTitleTranslated', 
                                'articleText':'articleTextTranslated'}
    if not from_lang: from_lang = 'ro'
    if not to_lang: to_lang = 'en'
    for column in columns_dict_from_to:
        datadict[columns_dict_from_to[column]] = argostranslate.translate.translate(datadict[column], from_lang, to_lang)
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished translating for {i}/{j}")
    return datadict


def removeFromListSmallerNth(listStrings, noCharacters=3):
    listStringsLower = [x.lower() for x in listStrings]
    return set(filter(lambda i: len(i) >= noCharacters, listStringsLower))


def check_match_titles(datadict, columns_match = None):
    if not columns_match:
        columns_match = ['resultTitleLemma', 'resultExtractedTitleLemma', 'articleTitleLemma', 'articleTextLemma']
    results = dict()
    for column in columns_match:
        results[column] = removeFromListSmallerNth(json.loads(datadict[column])) 
    matches = dict()
    for column in results:
        temp_results = results.copy()
        temp_results.pop(column, None)
        matches[column] = dict()
        for temp_column in temp_results:
            matches[column][temp_column] = len((results[column] & results[temp_column]))
    if all([matches[column][value] for column in matches for value in matches[column]]) < 1: 
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - no match exists'
        raise Exception(datadict['newErrorException'])
    if sum([sum(matches[column].values()) for column in matches])/2 < 5:
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - sum of all matches is smaller than 5'
        raise Exception(datadict['newErrorException'])
    if any([sum(matches[column].values()) for column in matches]) < 1: 
        datadict['newErrorStatus'] = str(8)
        datadict['newErrorException'] = 'Article might not match with google title - sum of a group match is 0'
        raise Exception(datadict['newErrorException'])
    return datadict


### Data function for main()

def main_data(row_data, settings, i = None, j = None): #, table_connection, i, j
    if not i: i = '?'
    if not j: j = '?'
    ii = '?'
    ij = '?'
    try:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {i}/{j}")
        #row_data_dict = convert_row_dict(row_data)
        row_data_dict = row_data
        row_data_dict = remove_dict_columns(row_data_dict, settings['remove_columns'])
        row_data_dict = fix_accents_dict(row_data_dict)
        row_data_dict = add_dict_columns(row_data_dict, settings['add_columns'], default_value=None)
        row_data_dict = check_null_article_text(row_data_dict)
        #check error
        row_data_dict = calc_content_lenght(row_data_dict)
        row_data_dict = check_extracted_title(row_data_dict, settings['title_blacklist'])
        #check error
        row_data_dict = check_md_website(row_data_dict)
        #check error
        row_data_dict = check_language(row_data_dict, settings['wanted_language'], settings['language_detector'])
        #check error
        row_data_dict = restore_diacritics(row_data_dict, settings['columns_diacritics'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting ro lemmatizer for {i}/{j}")
        row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_ro'], settings['spacy_model_ro'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting translating for {i}/{j}")
        row_data_dict = translate(row_data_dict, settings['translate_columns'], settings['from_lang'], settings['to_lang'], i, j)
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting en lemmatizer for {i}/{j}")
        row_data_dict = lemmatizer(row_data_dict, settings['lemmatizer_columns_en'], settings['spacy_model_en'])
        row_data_dict = check_match_titles(row_data_dict, settings['check_match_columns'])
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finisted tasks for {i}/{j}")
    except Exception as e:
        if not row_data_dict['newErrorStatus']:
            row_data_dict['newErrorStatus'] = str(0)
        if not row_data_dict['newErrorException']:
            row_data_dict['newErrorException'] = str(e)
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Error task for {i}/{j} - {row_data_dict['newErrorStatus']} / {row_data_dict['newErrorException']}")
    finally:
        #await insert_db(settings['new_table_name'], table_connection, row_data_dict)
        return row_data_dict
    

def transform(batch, settings, len_db):
    logging.info(f"{datetime.datetime.now()} - Started transform")
    transformed_batch = []
    logging.info(f'Len batch = {len(batch)}')
    for irecord, record in enumerate(batch, start=1): 
        transformed_batch.append(list(main_data(record, settings, irecord, len_db).values()))
    logging.info(f"{datetime.datetime.now()} - Ended transform")
    return transformed_batch


async def load(batch, connection, settings):
    logging.info(f"{datetime.datetime.now()} - Started load")
    sqlstring = f'INSERT INTO {settings["new_table_name"]} VALUES (' + ','.join(['?'] * len(batch[0])) + ')'
    await connection.executemany(sqlstring, batch)
    await connection.commit()
    logging.info(f"{datetime.datetime.now()} - Ended load")


async def task_set_load_helper(task_set, connection, settings):
    logging.info(f"{datetime.datetime.now()} - Started task_set_load_helper")
    for future in task_set:
        await load( await future, connection, settings)
    logging.info(f"{datetime.datetime.now()} - Ended task_set_load_helper")
    

async def get_len_db(connection, table_name):
    async with connection.execute(f'SELECT * FROM {table_name}') as cursor:
        return len( await cursor.fetchall())


async def consumer(loop, pool, queue, db_filename, settings):
    logging.info(f"{datetime.datetime.now()} - Started consumer")
    connection = await get_aiosqlite_connection(db_filename)
    len_db = await get_len_db(connection, settings["old_table_name"])
    
    task_set = set()
    batch = []
    while True:
        record = await queue.get()
        if record is not None:
            record = dict(record)
            batch.append(record)
        if queue.empty():
            task = loop.run_in_executor(pool, transform, batch, settings, len_db)
            task_set.add(task)
        if len(task_set) >= pool._max_workers:
            done_set, task_set = await asyncio.wait(
                task_set, return_when=asyncio.FIRST_COMPLETED
                )
            logging.info(f"{datetime.datetime.now()} - Start task_set_load_helper #1")
            #for f in done_set:
            #    await load(f, connection, settings)
            await task_set_load_helper(done_set, connection, settings)
        batch = []
        if record is None:
            break
    if task_set:
        logging.info(f"{datetime.datetime.now()} - Start task_set_load_helper #2")
        await task_set_load_helper(
        asyncio.as_completed(task_set), connection, settings
        )
    await connection.close()
    logging.info(f"{datetime.datetime.now()} - Ended consumer")


async def get_aiosqlite_connection(db_filename):
  logging.info(f"{datetime.datetime.now()} - Started get_aiosqlite_connection")
  connection = await aiosqlite.connect(db_filename)
  logging.info(f"{datetime.datetime.now()} - Ended get_aiosqlite_connection")
  return connection


async def extract(connection, settings):
    logging.info(f"{datetime.datetime.now()} - Started extract")
    connection.row_factory = aiosqlite.Row
    async with connection.execute(f'SELECT * FROM {settings["old_table_name"]}') as cursor:
        async for record in cursor:
            yield record
    logging.info(f"{datetime.datetime.now()} - Ended extract")

    
async def producer(queue, db_filename, settings):
  logging.info(f"{datetime.datetime.now()} - Started producer")
  connection = await get_aiosqlite_connection(db_filename)
  async for record in extract(connection, settings):
    await queue.put(record)
  await queue.put(None)
  await connection.close()
  logging.info(f"{datetime.datetime.now()} - Ended producer")


async def etl(db_filename, settings):
  logging.info(f"{datetime.datetime.now()} - Started etl")
  with ProcessPoolExecutor(
    max_workers=multiprocessing.cpu_count(),
  ) as pool:
    loop = asyncio.get_running_loop()
    queue = asyncio.Queue(maxsize=8)
    await asyncio.gather(
      asyncio.create_task(producer(queue, db_filename, settings)),
      asyncio.create_task(consumer(loop, pool, queue, db_filename, settings)),
      return_exceptions=False,
    )
    logging.info(f"{datetime.datetime.now()} - Ended etl")



### Database insert for main()    

async def insert_db(table_name, table_connection, row_data_dict):
    sqlstring = f'INSERT INTO {table_name} (' + ','.join(row_data_dict.keys()) +') values(' + ','.join(['?'] * len(list(row_data_dict.values()))) + ')'
    await table_connection.execute(sqlstring, list(row_data_dict.values()))
    await table_connection.commit()


### Database functions for main_db()

async def create_db_table_from_table(new_table_name, table_cursor, table_connection, old_table_name = None):
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Create new table {new_table_name}")
    await table_cursor.execute(f"DROP TABLE IF EXISTS {new_table_name}")
    await table_connection.commit()
    if not old_table_name:
        await table_cursor.execute(f"CREATE TABLE {new_table_name}")
        await table_connection.commit()
    else:
        await table_cursor.execute(f"CREATE TABLE {new_table_name} AS SELECT * FROM {old_table_name} WHERE 0")
        await table_connection.commit()


async def remove_db_columns(table_name, table_cursor, table_connection, remove_columns):
    if not isinstance(remove_columns, list) and not isinstance(remove_columns, tuple): remove_columns = [remove_columns]
    for remove_column in remove_columns:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Remove column {remove_column}")
        await table_cursor.execute(f"ALTER TABLE {table_name} DROP COLUMN {remove_column}")
        await table_connection.commit()


async def add_db_columns(table_name, table_cursor, table_connection, add_columns):
    if not isinstance(add_columns, list) and not isinstance(add_columns, tuple): add_columns = [add_columns]
    for new_column in add_columns:
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Add column {new_column}")
        await table_cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {new_column} TEXT")
        await table_connection.commit()


async def get_no_rows_db(db_table_connection, table_name):
    async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:
        return len( await cursor.fetchall())
    

async def get_columns_table(db_table_connection, table_name):
    async with db_table_connection.execute(f'SELECT * FROM {table_name}') as cursor:    
        return [description[0] for description in cursor.description]
    

### Database initialization for main() 

async def main_db(db_table_cursor, db_table_connection, settings):

    await create_db_table_from_table(settings['new_table_name'], db_table_cursor, db_table_connection, settings['old_table_name'])

    await remove_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['remove_columns'])

    await add_db_columns(settings['new_table_name'], db_table_cursor, db_table_connection, settings['add_columns'])

    no_rows_old_table = await get_no_rows_db(db_table_connection, settings['old_table_name'])

    #new_table_columns = await get_columns_table(db_table_connection, settings['new_table_name'])

    return no_rows_old_table


### main loop

async def main(db_filename, ii, ij):

    start = time.monotonic()

    # Initialize parameters
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize parameters for {db_filename}")
    settings = initializize()
    logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize parameters for {db_filename}")

    # Connect to db and modify db (asyncio IO)
    async with aiosqlite.connect(db_filename) as db_table_connection:   
        db_table_cursor = await db_table_connection.cursor()
        
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting initialize database for {db_filename}")
        #no_rows_old_table, new_table_columns = await main_db(db_table_cursor, db_table_connection, settings)
        j = await main_db(db_table_cursor, db_table_connection, settings)
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished initialize database for {db_filename}")

        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Starting tasks for {db_filename}")
        await etl(db_filename, settings)
        #await asyncio.run(etl(db_filename, settings))
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - Finished tasks for {db_filename}")
   

### startup 

if __name__ == "__main__": 
    program_start_time = datetime.datetime.now()
    logging.basicConfig(level=logging.INFO,
                handlers=[
                    logging.FileHandler(f'debug_correct_translate_lemma_{program_start_time.strftime("%Y_%m_%d_%H_%M")}.txt',
                                        'a', 'utf-8'), logging.StreamHandler()])
    logging.info(f"{datetime.datetime.now()} - {__file__} started at {program_start_time}")

    #linux mypath = '/media/vlad/HDD1000/disertatie/extracted_original_1/test1'
    #mypath = 'C:/Users/Vlad/PycharmProjects/newsScrapping/database'
    mypath = 'D:/disertatie/cod/newsScrapping/database'
    filenames = next(os.walk(mypath), (None, None, []))[2]  # [] if no file
    filepaths = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(mypath) for f in filenames] 
    dbfiles = filepaths

    #linux dbfiles = ["/home/vlad/Documents/newsScrapping/news_database_2010-01-01-2023-06-01_creditbancar.db"]
    
    ii = 0
    ij = len(dbfiles)
    for dbfile in dbfiles:
        ii += 1
        keyword_start_time = datetime.datetime.now()
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")

        asyncio.run(main(dbfile, ii, ij))
        
        keyword_end_time = datetime.datetime.now()
        keyword_elapsed_time = keyword_end_time - keyword_start_time
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} started at {keyword_start_time}")
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} ended at {keyword_end_time}")
        logging.info(f"{datetime.datetime.now()} - {ii}/{ij} - {dbfile} executed in {keyword_elapsed_time}")
    
    program_end_time = datetime.datetime.now()
    program_elapsed_time = program_end_time - program_start_time
    logging.info(f"{datetime.datetime.now()} - program started at {program_start_time}")
    logging.info(f"{datetime.datetime.now()} - program ended at {program_end_time}")
    logging.info(f"{datetime.datetime.now()} - program executed in {program_elapsed_time}")

问题是它向我抛出数据库锁定错误,我不知道如何处理它。

INFO:root:2023-11-10 12:23:09.780805 - Started load
Traceback (most recent call last):
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 560, in <module>
    asyncio.run(main(dbfile, ii, ij))
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 529, in main
    await etl(db_filename, settings)
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 434, in etl
    await asyncio.gather(
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 388, in consumer
    await task_set_load_helper(done_set, connection, settings)
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 357, in task_set_load_helper
    await load( await future, connection, settings)
  File "d:\disertatie\cod\newsScrapping\new_test_corect_db copy.py", line 350, in load
    await connection.commit()
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 166, in commit
    await self._execute(self._conn.commit)
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 133, in _execute
    return await future
           ^^^^^^^^^^^^
  File "C:\Users\vladi\AppData\Local\Programs\Python\Python311\Lib\site-packages\aiosqlite\core.py", line 106, in run
    result = function()
             ^^^^^^^^^^
sqlite3.OperationalError: database is locked

如果你想测试数据库drive

我尝试修改代码以实现第二个用于插入的任务池,但我无法使其工作。

python sqlite python-asyncio python-multiprocessing
1个回答
0
投票

所指出的问题反映在您的代码中:您正在打开数据库两次,一次用于

producer
,一次用于
consumer
。 Windows 文件和 SQLite 都需要一个连接,这对于您的程序来说就足够了。

我刚刚看到,这是一个巨大的代码库,实际上甚至不止一个对

aiosqlite.connect
调用的调用。

重新排列代码,以便将在

db_table_connection
中创建的
main
直接传递到
consumer
producer
任务,而不是
db_filename
并在
get_aiosqlite_connection
函数中重新连接。 (您可以简单地更改该函数以将连接作为全局资源返回,而不是再次调用
.connect
) - 并使用“光标”(检查 https://docs.python.org/3/library/ sqlite3.html,但 aiosqlite 应该处理这个问题),以确保您不会在不相关的地方从查询中获取结果。

© www.soinside.com 2019 - 2024. All rights reserved.