我正在尝试使用 cli 更改现有作业设置,但是当我调用
reset_job
方法时,出现此错误:
Traceback (most recent call last):
File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 78, in <module>
dr.experiment(host,token)
File "/home/vsts/work/1/s/S1.DataPlatform.DR/main.py", line 58, in experiment
jobs.reset_job(job_json)
File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/jobs/api.py", line 49, in reset_job
return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers,
File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/sdk/api_client.py", line 174, in perform_query
raise requests.exceptions.HTTPError(message, response=e.response)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://spg-sustainable1-qa.cloud.databricks.com/api/2.0/jobs/reset
Response from server:
{ 'error_code': 'MALFORMED_REQUEST',
'message': 'Invalid JSON given in the body of the request - expected a map'}
这是我正在使用的示例 python 代码:
...
api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)
job_list = jobs.list_jobs()["jobs"]
job_name = "DP DataSync Job"
result_list = list(
filter(
lambda job: job['settings']['name'] == job_name, job_list)
)
job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"
print(f"Resetting job with id: {job_id}")
job_json = json.dumps(job)
jobs.reset_job(job_json)
这是传递给
reset_job
的 json:
{
"job_id": 217841321277199,
"creator_user_name": "...",
"settings": {
"name": "DP DataSync Job",
"new_cluster": {
"cluster_name": "",
"spark_version": "10.4.x-scala2.12",
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-east-1a",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "d3.4xlarge",
"custom_tags": {
"Owner": "[email protected]",
"AppID": "appidhere",
"Environment": ""
},
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"enable_elastic_disk": false,
"runtime_engine": "STANDARD",
"autoscale": {
"min_workers": 2,
"max_workers": 16
}
},
"libraries": [
{
"jar": "DataSync-1.0-all.jar"
}
],
"email_notifications": {
"on_start": [
"[email protected]"
],
"on_success": [
"[email protected]"
],
"on_failure": [
"[email protected]"
],
"no_alert_for_skipped_runs": false
},
"timeout_seconds": 0,
"schedule": {
"quartz_cron_expression": "35 0 21 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"spark_jar_task": {
"jar_uri": "",
"main_class_name": "com.company.s.dp.datasync",
"parameters": [
"Config.json"
],
"run_as_repl": true
},
"max_concurrent_runs": 1,
"format": "SINGLE_TASK"
},
"created_time": 1678272261985
}
Databricks CLI 版本:17.4
您使用的负载仅用于 Job Get 响应 - 您不能按原样使用它来重置作业。如果您查看 Job Reset API,您会看到有效负载仅包含两个字段:
job_id
- 要重置的作业的 IDnew_settings
- 为作业设置的设置,同时使用 settings
.{
"job_id": 11223344,
"new_settings": {
"name": "A multitask job",
...
}
}
你也不需要自己做
json.dumps
——它会由API客户端完成(见源代码)。
所以你的代码应该修改成如下:
orig_job = result_list[0]
job_id = job["job_id"]
job = {"job_id": job_id, "new_settings": job["settings"]}
job["new_settings"]["schedule"]["pause_status"] = "UNPAUSED"
jobs.reset_job(job)