我正在学习模拟并在个人项目中尝试它。
这是我的项目结构:
project/
│
├── src/
│ └── My_app/
│ ├── __init__.py
│ ├── application/
│ │ └── main_code.py
│ │ └── __init__.py
│ ├── infrastructure/
│ │ ├── __init__.py
│ │ ├── data_quality.py
│ │ └── s3_utils.py
│ └── settings/
│ ├── __init__.py
│ └── s3_utils.py
│
└── tests/
└── integration_tests/
└── application/
└── test_main_code.py
目标是为
main_code.py
编写集成测试
import My_app.settings.config as stg
from awsglue.utils import getResolvedOptions
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
from My_app.infrastructure.s3_utils import csvReader, dataframeWriter
from pyspark.sql import SparkSession
def main(argv: List[str]) -> None:
args = getResolvedOptions(
argv,
[
'JOB_NAME',
'S3_BRONZE_BUCKET_NAME',
'S3_PRE_SILVER_BUCKET_NAME',
'S3_BRONZE_PATH',
'S3_PRE_SILVER_PATH',
'S3_DATA_QUALITY_LOGS_BUCKET_NAME',
],
)
s3_bronze_bucket_name = args['S3_BRONZE_BUCKET_NAME']
s3_pre_silver_bucket_name = args['S3_PRE_SILVER_BUCKET_NAME']
s3_bronze_path = args['S3_BRONZE_PATH']
s3_pre_silver_path = args['S3_PRE_SILVER_PATH']
s3_data_quality_logs_bucket_name = args['S3_DATA_QUALITY_LOGS_BUCKET_NAME']
spark = SparkSession.builder.getOrCreate() # TODO replace this init with common method (waiting for S3 part)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
for table in list(stg.data_schema.keys()):
raw_data = stg.data_schema[table].columns.to_dict()
schema = generateSchema(raw_data)
df = csvReader(spark, s3_bronze_bucket_name, s3_bronze_path, table, schema, '\t')
(quality_df, table_ingestion_status) = evaluateDataQuality(spark, df, table)
dataframeWriter(
quality_df,
s3_data_quality_logs_bucket_name,
'data-quality/',
'logs',
'date',
'append',
)
if __name__ == '__main__':
main(sys.argv)
import My_app.settings.config as stg
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsgluedq.transforms import EvaluateDataQuality
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, IntegerType, StringType, FloatType, DateType
def typeOf(t: str) -> IntegerType | StringType | FloatType | DateType:
...
return StringType()
def generateSchema(columns_dict: dict) -> StructType:
...
return schema
def evaluateDataQuality(spark: SparkSession, df: DataFrame, table: str) -> (DataFrame, bool):
...
return (
EvaluateDataQuality.apply(...)
.toDF(),
True,
)
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
def csvReader(spark: SparkSession, bucket: str, path: str, table: str, schema: StructType, sep: str) -> DataFrame:
"""Reads a CSV file as a Dataframe from S3 using user parameters for format."""
return (
spark.read.format('csv')
.option('header', 'true')
.option('sep', sep)
.schema(schema)
.load(f's3a://{bucket}/{path}/{table}.csv')
)
def dataframeWriter(
df: DataFrame, bucket: str, path: str, table: str, partition_key: str, mode: str = 'overwrite'
) -> None:
"""Writes a dataframe in S3 in parquet format using user parameters to define path and partition key."""
df.write.partitionBy(partition_key).mode(mode).parquet(f's3a://{bucket}/{path}/{table}/')
为
main_code.py
编写集成测试,同时:
csvReader
函数并将其替换为 local_csvReader
。dataframeWriter
函数并将其替换为 local_dataframeWriter
。awsgluedq
的导入以避免在本地安装。"""Module that contains unit tests for My_app pre silver job."""
import os
from unittest import TestCase
from unittest.mock import patch, Mock
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
def local_csvReader(spark: SparkSession, bu: str, pa: str, table: str, schema: StructType, sep: str):
"""Mocked function that replaces real csvReader. this one reads from local rather than S3."""
return (
spark.read.format('csv')
.option('header', 'true')
.option('sep', ';')
.schema(schema)
.load(f'./tests/integration_tests/input_mock/{table}.csv')
)
def local_dataframeWriter(df, bu: str, pa: str, table: str, partition_key: str):
"""Mocked function that replaces real dataframeWriter. this one writes in local rather than S3."""
output_dir = f'./tests/integration_tests/output_mock/{table}/'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
df.write.partitionBy(partition_key).mode('overwrite').parquet(output_dir)
class IntegrationTest(TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.master('local').appName('TestPerfmarketSilver').getOrCreate()
cls.spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
@patch('My_app.application.main_code.getResolvedOptions')
@patch('My_app.application.main_code.csvReader', side_effect=local_csvReader)
@patch('My_app.application.main_code.dataframeWriter', side_effect=local_dataframeWriter)
def test_main(self, mock_csvreader, mock_datawriter, mocked_get_resolved_options: Mock):
"""Test the main function with local CSV and Parquet output."""
import My_app.application.main_code as main_code
import My_app.settings.config as stg
import tests.integration_tests.settings.config as stg_new
stg.data_schema = stg_new.data_schema_test
expected_results = {'chemins': {'nbRows': 8}}
# Mock the resolved options
mocked_get_resolved_options.return_value = {
'JOB_NAME': 'test_job',
'S3_BRONZE_BUCKET_NAME': 'test_bronze',
'S3_PRE_SILVER_BUCKET_NAME': 'test_pre_silver',
'S3_BRONZE_PATH': './tests/integration_tests/input_mock',
'S3_PRE_SILVER_PATH': './tests/integration_tests/output_mock',
'S3_DATA_QUALITY_LOGS_BUCKET_NAME': 'test_dq',
}
main_code.main([])
for table in stg.data_schema.keys():
# Verify that the output Parquet file is created
output_path = f'./tests/integration_tests/output_mock/{table}/'
self.assertTrue(os.path.exists(output_path))
# Read the written Parquet file and check the data
written_df = self.spark.read.parquet(output_path)
self.assertEqual(written_df.count(), expected_results[table]['nbRows']) # Check row count
self.assertTrue(
set(
[column_data['bronze_name'] for column_data in stg.data_schema[table]['columns'].to_dict().values()]
)
== set(written_df.columns)
)
# Clean up
os.system(f'rm -rf ./tests/integration_tests/output_mock/{table}/')
运行测试类正在返回:
======================================================================
ERROR: test_main (tests.integration_tests.application.test_main_code.IntegrationTest)
Test the main function with local CSV and Parquet output.
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1248, in _dot_lookup
return getattr(thing, comp)
AttributeError: module 'My_app.application' has no attribute 'main_code'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1376, in patched
with self.decoration_helper(patched,
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 135, in __enter__
return next(self.gen)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1358, in decoration_helper
arg = exit_stack.enter_context(patching)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/contextlib.py", line 492, in enter_context
result = _cm_type.__enter__(cm)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1431, in __enter__
self.target = self.getter()
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1618, in <lambda>
getter = lambda: _importer(target)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1261, in _importer
thing = _dot_lookup(thing, comp, import_path)
File "/Users/me/.asdf/installs/python/3.10.14/lib/python3.10/unittest/mock.py", line 1250, in _dot_lookup
__import__(import_path)
File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/application/main_code.py", line 10, in <module>
from My_app.infrastructure.data_quality import evaluateDataQuality, generateSchema
File "/Users/me/IdeaProjects/project_root/apps/project/src/My_app/infrastructure/data_quality.py", line 4, in <module>
from awsgluedq.transforms import EvaluateDataQuality
ModuleNotFoundError: No module named 'awsgluedq'
----------------------------------------------------------------------
Ran 1 test in 2.114s
FAILED (errors=1)
我的测试课程结构合理吗?我正在导入
main_code
,对吧?
我不这么认为,因为:AttributeError: module 'My_app.application' has no attribute 'main_code'
如何集成模拟技术来用另一个代码替换
awsgluedq
模块?
首先,在代码中添加注释。这是非常难以阅读和理解的。
其次,对于“awsgluedq”包,您需要在第 1 行导入“runpy”,主要用于 python。
第三,未定义“main_code”,因为它不适用于该包。相反,您需要使用这些方括号之间的表达式; [name=='main']。 注意:对于 name 您需要在早期行导入“configparsner”。
如果您需要访问主 .py 文件。对于 python 包,您需要使用此表达式“main.py 文件”。
其中部分内容是由其他一些提供协助的用户提供的。如果这是不正确的,我会请教真正的专业人士。