我的集成测试结构是否良好以及为什么它返回导入错误

问题描述 投票:0回答:1

我正在学习模拟并在个人项目中尝试它。

这是我的项目结构:

project/
│
├── src/
│   └── My_app/
│       ├── __init__.py
│       ├── application/
│       │   └── main_code.py
│       │   └── __init__.py
│       ├── infrastructure/
│       │   ├── __init__.py
│       │   ├── data_quality.py
│       │   └── s3_utils.py
│       └── settings/
│           ├── __init__.py
│           └── s3_utils.py
│
└── tests/
    └── integration_tests/
        └── application/
            └── test_main_code.py

目标是为

main_code.py

编写集成测试

main_code.py 的简化版本

import My_app.settings.config as stg
from awsglue.utils import getResolvedOptions
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession


def main(argv: List[str]) -> None:
    args = getResolvedOptions(
        argv,
        [
            'JOB_NAME',
            'S3_BRONZE_BUCKET_NAME',
            'S3_PRE_SILVER_BUCKET_NAME',
            'S3_BRONZE_PATH',
            'S3_PRE_SILVER_PATH',
            'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
        ],
    )

    s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
    s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
    s3_bronze_path = args['S3_BRONZE_PATH']
    s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
    s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']

    spark = SparkSession.builder.getOrCreate()  # TODO replace this init with common method (waiting for S3 part)
    spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')


    for table in list(stg.data_schema.keys()):
        raw_data = stg.data_schema[table].columns.to_dict()

        schema = generateSchema(raw_data)
        df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, schema, '\t')


        (quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)

        dataframeWriter(
            quality_df,
            s3_data_quality_logs_bucket_name,
            'data-quality/',
            'logs',
            'date',
            'append',
        )


if __name__ == '__main__':
    main(sys.argv)

data_quality.py 的简化版本

import My_app.settings.config as stg
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType


def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
    ...
    return StringType()


def generateSchema(columns_dict: dict) -> StructType:
    ...
    return schema


def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
    ...
    return (
        EvaluateDataQuality.apply(...)
        .toDF(),
        True,
    )

s3_utils.py的简化版本

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType


def csvReader(spark: SparkSession, bucket: str, path: str, table: str, schema: StructType, sep: str) -> DataFrame:
    """Reads a CSV file as a Dataframe from S3 using user parameters for format."""
    return (
        spark.read.format('csv')
        .option('header', 'true')
        .option('sep', sep)
        .schema(schema)
        .load(f's3a://{bucket}/{path}/{table}.csv')
    )


def dataframeWriter(
    df: DataFrame, bucket: str, path: str, table: str, partition_key: str, mode: str = 'overwrite'
) -> None:
    """Writes a dataframe in S3 in parquet format using user parameters to define path and partition key."""
    df.write.partitionBy(partition_key).mode(mode).parquet(f's3a://{bucket}/{path}/{table}/')

我想做的事

main_code.py
编写集成测试,同时:

  • 模拟
    csvReader
    函数并将其替换为
    local_csvReader
  • 模拟
    dataframeWriter
    函数并将其替换为
    local_dataframeWriter
  • 模拟来自
    awsgluedq
    的导入以避免在本地安装。

我做了什么:

test_main_code.py

"""Module that contains unit tests for My_app pre silver job."""

import os
from unittest import TestCase
from unittest.mock import patch, Mock

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType


def local_csvReader(spark: SparkSession, bu: str, pa: str, table: str, schema: StructType, sep: str):
    """Mocked function that replaces real csvReader. this one reads from local rather than S3."""
    return (
        spark.read.format('csv')
        .option('header', 'true')
        .option('sep', ';')
        .schema(schema)
        .load(f'./tests/integration_tests/input_mock/{table}.csv')
    )


def local_dataframeWriter(df, bu: str, pa: str, table: str, partition_key: str):
    """Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
    output_dir = f'./tests/integration_tests/output_mock/{table}/'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    df.write.partitionBy(partition_key).mode('overwrite').parquet(output_dir)


class IntegrationTest(TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.master('local').appName('TestPerfmarketSilver').getOrCreate()
        cls.spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

    @patch('My_app.application.main_code.getResolvedOptions')
    @patch('My_app.application.main_code.csvReader', side_effect=local_csvReader)
    @patch('My_app.application.main_code.dataframeWriter', side_effect=local_dataframeWriter)
    def test_main(self, mock_csvreader, mock_datawriter, mocked_get_resolved_options: Mock):
        """Test the main function with local CSV and Parquet output."""
        import My_app.application.main_code as main_code

        import My_app.settings.config as stg
        import tests.integration_tests.settings.config as stg_new

        stg.data_schema = stg_new.data_schema_test

        expected_results = {'chemins': {'nbRows': 8}}

        # Mock the resolved options
        mocked_get_resolved_options.return_value = {
            'JOB_NAME': 'test_job',
            'S3_BRONZE_BUCKET_NAME': 'test_bronze',
            'S3_PRE_SILVER_BUCKET_NAME': 'test_pre_silver',
            'S3_BRONZE_PATH': './tests/integration_tests/input_mock',
            'S3_PRE_SILVER_PATH': './tests/integration_tests/output_mock',
            'S3_DATA_QUALITY_LOGS_BUCKET_NAME': 'test_dq',
        }

        main_code.main([])

        for table in stg.data_schema.keys():
            # Verify that the output Parquet file is created
            output_path = f'./tests/integration_tests/output_mock/{table}/'
            self.assertTrue(os.path.exists(output_path))

            # Read the written Parquet file and check the data
            written_df = self.spark.read.parquet(output_path)
            self.assertEqual(written_df.count(), expected_results[table]['nbRows'])  # Check row count
            self.assertTrue(
                set(
                    [column_data['bronze_name'] for column_data in stg.data_schema[table]['columns'].to_dict().values()]
                )
                == set(written_df.columns)
            )
            # Clean up
            os.system(f'rm -rf ./tests/integration_tests/output_mock/{table}/')

问题:

运行测试类正在返回:

======================================================================
ERROR: test_main (tests.integration_tests.application.test_main_code.IntegrationTest)
Test the main function with local CSV and Parquet output.
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1248, in _dot_lookup
    return getattr(thing, comp)
AttributeError: module 'My_app.application' has no attribute 'main_code'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1376, in patched
    with self.decoration_helper(patched,
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1358, in decoration_helper
    arg = exit_stack.enter_context(patching)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 492, in enter_context
    result = _cm_type.__enter__(cm)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1431, in __enter__
    self.target = self.getter()
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1618, in <lambda>
    getter = lambda: _importer(target)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1261, in _importer
    thing = _dot_lookup(thing, comp, import_path)
  File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1250, in _dot_lookup
    __import__(import_path)
  File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/application/main_code.py", line 10, in <module>
    from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
  File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/infrastructure/data_quality.py", line 4, in <module>
    from awsgluedq.transforms import EvaluateDataQuality
ModuleNotFoundError: No module named 'awsgluedq'

----------------------------------------------------------------------
Ran 1 test in 2.114s

FAILED (errors=1)
  • 我的测试课程结构合理吗?我正在导入

    main_code
    ,对吧? 我不这么认为,因为:
    AttributeError: module 'My_app.application' has no attribute 'main_code'

  • 如何集成模拟技术来用另一个代码替换

    awsgluedq
    模块?

python unit-testing mocking
1个回答
0
投票

首先,在代码中添加注释。这是非常难以阅读和理解的。

其次,对于“awsgluedq”包,您需要在第 1 行导入“runpy”,主要用于 python。

第三,未定义“main_code”,因为它不适用于该包。相反,您需要使用这些方括号之间的表达式; [name=='main']。 注意:对于 name 您需要在早期行导入“configparsner”。

如果您需要访问主 .py 文件。对于 python 包,您需要使用此表达式“main.py 文件”。

其中部分内容是由其他一些提供协助的用户提供的。如果这是不正确的,我会请教真正的专业人士。

© www.soinside.com 2019 - 2024. All rights reserved.