Azure ML SDKv2:计划的管道作业未正确执行主脚本

问题描述 投票:0回答:1

Azure ML SDKv2:计划的管道作业未正确执行主脚本

问题:

我正在致力于将 Azure ML 项目从 SDKv1 迁移到 SDKv2。虽然我的计划管道作业成功运行,但主脚本 (

main.py
) 的输出与初始作业提交保持相同,表明脚本没有被重新执行。

代码:

  • main.py
    每次执行时生成随机字符串并将其与当前时间戳一起打印。
  • Publish.py
    :
    • 初始化 MLClient。
    • 使用 Dockerfile 和 conda.yml 设置环境。
    • 创建并注册一个执行
      main.py
      的组件。
    • 使用已注册的组件定义一个简单的管道。
    • 将管道作为作业提交。
    • 安排管道每 5 分钟运行一次。

预期行为:

每个计划的作业执行应生成一个新的随机字符串并将其与当前时间戳一起打印。

实际行为:

所有计划的作业执行都会生成与初始作业提交相同的随机字符串和时间戳。

代码片段:

  • main.py
    :
import string
import random
import datetime

# using random.choices() generating random strings
res = ''.join(random.choices(string.ascii_letters, k=10)) # initializing size of string

message_to_main = f"Hello world from scheduler... {str(res)}"

print(message_to_main)

print(f"Script execution time: {datetime.datetime.utcnow()}")
  • Publish.py
    :
# ... (relevant code for environment setup, component registration, pipeline definition, job submission, and scheduling)


# Import required libraries
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azure.ai.ml import command
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.constants import TimeZone
from azure.ai.ml.entities import (
    Environment, BuildContext,
    JobSchedule,
    RecurrenceTrigger,
    RecurrencePattern,

)
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.ml.exceptions import ValidationException


def main():
    """
    main function performs following actions:
    - Initialize MLClient handle
    - Creates or update environment for the main component which executes the logic from
        Dockerfile and conda.yml
    - Creates or update component to execute the main.py script (script is simple and does not 
        have any input and outputs)
    - Setup pipeline using main component.
    - Submit pipeline for execution as job
    - Schedule pipeline as recurring job every 5 minutes
    
    """
    
    # Set variables
    COMPUTE = "sdkv2-test-cluster"
    ENV_PATH = "."
    ENV_NAME = "helloapp_sdkv2_env_02"
    MAIN_COMPONENT_NAME = "helloapp_sdkv2_main_02"
    COMPONENT_CODE_PATH = "src"
    APP_PIPELINE_NAME = "helloapp_sdkv2_scheduler_pipeline_02"
    PIPELINE_JOB_EXPERIMENT = "helloapp_sdkv2_scheduler_experiment_02"
    JOB_SCHEDULE_NAME = "helloapp_sdkv2_scheduler_experiment_02"
    
    # get a handle to the workspace
    subscription_id="<SUBSCRIPTION ID>"
    resource_group="<RESOURCE GROUP NAME>"
    workspace="<AML WORKSPACE NAME>"
    
    ml_client = MLClient(
        DefaultAzureCredential(), subscription_id, resource_group, workspace
    )
    
    
    try:
        print(f'Starting to create environment: {ENV_NAME}')
        env_docker_context = Environment(
            build=BuildContext(path=ENV_PATH),
            name=ENV_NAME,
            description=f"Environment for {MAIN_COMPONENT_NAME}"
        )
        job_run_env =  ml_client.environments.create_or_update(env_docker_context)
        print(f'Environment creation job started for: {ENV_NAME}')
    except Exception as e:
        print(e)
        raise e
        
    try:
        component = command(
            name=MAIN_COMPONENT_NAME,
            compute=COMPUTE,
            # The source folder of the component
            code=COMPONENT_CODE_PATH,
            command="python main.py",
            environment=job_run_env,
        )
        # Register component for reusability
        registered_component = ml_client.create_or_update(component.component)
        print(
            f"Component {registered_component.name} with Version {registered_component.version} is registered"
        )
    except Exception as e:
        print(e)
        raise e
        
    # Create simple pipeline
    @pipeline(name=APP_PIPELINE_NAME, compute=COMPUTE)
    def hello_sdkv2_scheduler_pipeline():
        _ = registered_component()

    app_pipeline = hello_sdkv2_scheduler_pipeline()
    
    # submit pipeline as job for execution
    submitted_job = ml_client.jobs.create_or_update(
        app_pipeline, experiment_name=PIPELINE_JOB_EXPERIMENT
    )
    print(submitted_job.id)
    
    # Schedule pipeline job for recurring execution every 5 mins
    recurrence_trigger = RecurrenceTrigger(
        frequency="minute",
        interval=5,
        time_zone=TimeZone.CENTRAL_AMERICA_STANDARD_TIME,
    )

    job_schedule = JobSchedule(
        name=JOB_SCHEDULE_NAME, 
        trigger=recurrence_trigger, 
        create_job=app_pipeline
    )

    job_schedule = ml_client.schedules.begin_create_or_update(
        schedule=job_schedule
    ).result()
    print(job_schedule)
    
if __name__ == "__main__":
    main()
  • Dockerfile
    :
FROM mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu22.04

ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/helloappsdkv2

COPY conda.yml .

# Create the conda environment from the YAML file
RUN conda env create -f conda.yml -p $AZUREML_CONDA_ENVIRONMENT_PATH

# Prepend path to AzureML conda environment
ENV PATH $AZUREML_CONDA_ENVIRONMENT_PATH/bin:$PATH

RUN rm conda.yml

# This is needed for mpi to locate libpython
ENV LD_LIBRARY_PATH $AZUREML_CONDA_ENVIRONMENT_PATH/lib:$LD_LIBRARY_PATH

  • conda.yml
    :
channels:
  - defaults
dependencies:
  - python=3.12.7
  - pip
  - pip:
    - azure-ai-ml>=1.15.0

问题:

为什么计划的管道作业没有重新执行

main.py
并产生与初始作业提交相同的输出?我可以采取哪些步骤来确保每个计划作业执行独立运行
main.py
并生成新的随机字符串?

附加信息:

  • 我提供了初始作业提交和后续计划执行的屏幕截图,以及每次执行的
    main.py
    的输出。
  • 我正在使用 Azure ML SDKv2。

在此输入图片描述

初始作业和计划执行的输出

  • 管道提交作业... helloapp_sdkv2_scheduler_pipeline_02 执行时间:2024年10月22日上午11:18

主要脚本输出: 来自调度程序的你好世界...VUQcztoumM 脚本执行时间:2024-10-22 14:25:55.560369

  • 调度程序第一次运行 helloapp_sdkv2_scheduler_experiment_02-20241022T141842Z 执行时间:2024年10月22日上午11:18

主要脚本输出: 来自调度程序的你好世界...VUQcztoumM 脚本执行时间:2024-10-22 14:25:55.560369

  • 调度程序第二次运行 helloapp_sdkv2_scheduler_experiment_02-20241022T142342Z 执行时间:2024年10月22日 11:23 am

主要脚本输出: 来自调度程序的你好世界...VUQcztoumM 脚本执行时间:2024-10-22 14:25:55.560369

  • 调度程序第三次运行 helloapp_sdkv2_scheduler_experiment_02-20241022T142842Z 执行时间:2024年10月22日上午11:28

主要脚本输出: 来自调度程序的你好世界...VUQcztoumM 脚本执行时间:2024-10-22 14:25:55.560369

请帮助我理解为什么调度程序没有正确执行

main.py
以及如何修复它。

如果它正在缓存结果,对于没有任何输入和输出的管道应该采取什么解决方法。

azure-machine-learning-service azureml-python-sdk azure-ml-pipelines azure-ml-component
1个回答
0
投票

已解决:管道定义*force_rerun中有设置,默认为False。将该设置更改为 True 解决了重新运行的问题。

app_pipeline.settings.force_rerun = True

force_rerun (boolean): 是否强制重新运行整个管道。默认值为 False。这意味着默认情况下,管道会尝试重用前一个作业的输出(如果它满足重用条件)。如果设置为 True,管道中的所有步骤都将重新运行。

https://learn.microsoft.com/en-us/azure/machine-learning/reference-yaml-job-pipeline?view=azureml-api-2

© www.soinside.com 2019 - 2024. All rights reserved.