使用 reset_job 方法时“请求正文中给出的 JSON 无效 - 需要地图”

问题描述 投票:0回答:1

我正在尝试使用 cli 更改现有作业设置,但是当我调用

reset_job
方法时,出现此错误:

Traceback (most recent call last):
  File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 78, in <module>
    dr.experiment(host,token)
  File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 58, in experiment
    jobs.reset_job(job_json)
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/jobs/api.py", line 49, in reset_job
    return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers,
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/sdk/api_client.py", line 174, in perform_query
    raise requests.exceptions.HTTPError(message, response=e.response)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://spg-sustainable1-qa.cloud.databricks.com/api/2.0/jobs/reset
 Response from server: 
 { 'error_code': 'MALFORMED_REQUEST',
  'message': 'Invalid JSON given in the body of the request - expected a map'}

这是我正在使用的示例 python 代码:

...
api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)

job_list = jobs.list_jobs()["jobs"]

job_name = "DP DataSync Job"
result_list = list(
    filter(
    lambda job: job['settings']['name'] == job_name, job_list)
    )

job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"

print(f"Resetting job with id: {job_id}")

job_json = json.dumps(job)

jobs.reset_job(job_json)

这是传递给

reset_job
的 json:

{
    "job_id": 217841321277199,
    "creator_user_name": "...",
    "settings": {
        "name": "DP DataSync Job",
        "new_cluster": {
            "cluster_name": "",
            "spark_version": "10.4.x-scala2.12",
            "aws_attributes": {
                "first_on_demand": 1,
                "availability": "SPOT_WITH_FALLBACK",
                "zone_id": "us-east-1a",
                "spot_bid_price_percent": 100,
                "ebs_volume_count": 0
            },
            "node_type_id": "d3.4xlarge",
            "custom_tags": {
                "Owner": "[email protected]",
                "AppID": "appidhere",
                "Environment": ""
            },
            "spark_env_vars": {
                "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
            },
            "enable_elastic_disk": false,
            "runtime_engine": "STANDARD",
            "autoscale": {
                "min_workers": 2,
                "max_workers": 16
            }
        },
        "libraries": [
            {
                "jar": "DataSync-1.0-all.jar"
            }
        ],
        "email_notifications": {
            "on_start": [
                "[email protected]"
            ],
            "on_success": [
                "[email protected]"
            ],
            "on_failure": [
                "[email protected]"
            ],
            "no_alert_for_skipped_runs": false
        },
        "timeout_seconds": 0,
        "schedule": {
            "quartz_cron_expression": "35 0 21 * * ?",
            "timezone_id": "America/New_York",
            "pause_status": "UNPAUSED"
        },
        "spark_jar_task": {
            "jar_uri": "",
            "main_class_name": "com.company.s.dp.datasync",
            "parameters": [
                "Config.json"
            ],
            "run_as_repl": true
        },
        "max_concurrent_runs": 1,
        "format": "SINGLE_TASK"
    },
    "created_time": 1678272261985
}

Databricks CLI 版本:17.4

python json databricks databricks-cli databricks-rest-api
1个回答
0
投票

您使用的负载仅用于 Job Get 响应 - 您不能按原样使用它来重置作业。如果您查看 Job Reset API,您会看到有效负载仅包含两个字段:

  • job_id
    - 要重置的作业的 ID
  • new_settings
    - 为作业设置的设置,同时使用
    settings
    .
{
    "job_id": 11223344,
    "new_settings": {
    "name": "A multitask job",
...
    }
}

你也不需要自己做

json.dumps
——它会由API客户端完成(见源代码)。

所以你的代码应该修改成如下:

orig_job = result_list[0]
job_id = job["job_id"]
job = {"job_id": job_id, "new_settings": job["settings"]}
job["new_settings"]["schedule"]["pause_status"] = "UNPAUSED"

jobs.reset_job(job)
© www.soinside.com 2019 - 2024. All rights reserved.