我有以下 DAG,它应该生成变量中指定的尽可能多的任务,但映射的任务选项卡显示“未找到数据”:
from datetime import datetime
from time import sleep
from airflow.decorators import dag, task
from airflow.models import Variable
@dag(schedule=None, start_date=datetime(2024, 11, 15), catchup=False)
def test_autoscaling():
@task()
def consume_mem(mem_to_consume: int, index: int):
print(f'consuming {mem_to_consume} bytes in task {index}')
with open('/dev/urandom', 'rb') as f:
rand_data = f.read(mem_to_consume)
sleep(600)
num_tasks = int(Variable.get('num_tasks'))
mem_to_consume = int(Variable.get('mem_to_consume'))
consume_mem.partial(mem_to_consume=mem_to_consume).expand(index=list(range(num_tasks)))
test_autoscaling()
为什么任务不动态创建?
天哪......我可怕的新手错误。 DAG 没有任何问题。 start_date 是未来的日期...