공기 흐름에서 동적 워크플로우를 만드는 올바른 방법
문제
작업 A를 완료할 때까지 작업 B.*의 수를 알 수 없도록 작업 흐름에서 워크플로우를 생성할 수 있는 방법이 있습니까?하위 대시를 살펴보았지만 Dag 생성 시 결정해야 하는 정적 작업 집합에서만 작동할 수 있는 것 같습니다.
다그 트리거가 작동합니까?그리고 만약 그렇다면 예를 들어 주시겠습니까?
과제 A가 완료될 때까지 과제 C를 계산하는 데 필요한 과제 B의 수를 알 수 없는 문제가 있습니다.각 작업 B.*은 계산에 몇 시간이 소요되며 결합할 수 없습니다.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
아이디어 #1
차단하는 외부 작업 센서를 만들어야 하고 모든 작업 B*를 완료하는 데 2-24시간이 소요되기 때문에 이 솔루션이 마음에 들지 않습니다.그래서 저는 이것이 실행 가능한 해결책이라고 생각하지 않습니다.확실히 더 쉬운 방법이 있습니까?아니면 공기 흐름은 이것을 위해 설계되지 않았나요?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
편집 1:
현재까지 이 질문은 여전히 좋은 답을 가지고 있지 않습니다.해결책을 찾고 있는 몇몇 사람들로부터 연락을 받았습니다.
하위 태그 없이 유사한 요청을 수행한 방법은 다음과 같습니다.
먼저 원하는 값을 반환하는 메소드를 만듭니다.
def values_function():
return values
작업을 동적으로 생성하는 다음 생성 방법:
def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)
그런 다음 이를 결합합니다.
push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)
complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)
for i in values_function():
push_func >> group(i) >> complete
예, 이것이 가능합니다. 저는 이것을 보여주는 DAG 예제를 만들었습니다.
import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
main_dag_id = 'DynamicWorkflow2'
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)
def start(*args, **kwargs):
value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)
dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()
my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))
def bridge1(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))
def bridge2(*args, **kwargs):
# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))
variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))
def end(*args, **kwargs):
logging.info("Ending")
def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')
starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])
# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id='bridge1',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])
DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id='firstGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['firstGroup', index])
starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)
# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id='bridge2',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])
DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id='secondGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['secondGroup', index])
bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)
ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])
DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
for index in range(int(DynamicWorkflow_Group3)):
# You can make this logic anything you'd like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id='thirdGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['thirdGroup', index])
else:
dynamicTask = BashOperator(
task_id='thirdGroup_' + str(index),
bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
dag=dag)
bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)
# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
DAG를 실행하기 전에 다음 세 가지 기류 변수를 생성합니다.
airflow variables --set DynamicWorkflow_Group1 1
airflow variables --set DynamicWorkflow_Group2 0
airflow variables --set DynamicWorkflow_Group3 0
DAG는 여기서 시작됩니다.
실행된 후 이 작업으로
이 DAG에 대한 자세한 내용은 공기 흐름에 대한 동적 워크플로우 생성에 대한 기사에서 확인할 수 있습니다.
v2.3 이상에만 해당:
이 기능은 공기 흐름 버전 2.3 이상에 대해서만 동적 작업 매핑을 사용하여 달성됩니다.
자세한 설명서 및 예제는 다음과 같습니다.
- 공식 동적 작업 매핑 문서
- 천문학자의 자습서
- Airflow v2.4에는 추가적인 개선 사항이 추가되었습니다.
예:
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(list(arg))
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
예 2:
from airflow import XComArg
task = MyOperator(task_id="source")
downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))
그래프 보기 및 트리 보기도 업데이트됩니다.
관련 문제:
이전 작업의 결과를 기반으로 워크플로우를 생성하는 방법을 알아냈습니다.
기본적으로 다음과 같은 두 개의 하위 대시가 있습니다.
- Xcom은 먼저 실행되는 하위 태그에서 목록(또는 나중에 동적 워크플로우를 생성하기 위해 필요한 모든 항목)을 푸시합니다(test1.py 참조).
def return_list()) - 기본 Dag 개체를 매개 변수로 두 번째 하위 Dag에 전달
- 이제 주 Dag 개체가 있으면 이 개체를 사용하여 해당 작업 인스턴스 목록을 가져올 수 있습니다.인스턴스 에서 해당태스인크목다사록현음용을하실재여다있중수니습필태할터를 중인 수 .
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), 더할 수 있습니다.), 를 참조하십시오. - 인스턴스를 를 첫 중 수 .
dag_id='%s.%s' % (parent_dag_name, 'test1') - 목록/값을 사용하여 작업을 동적으로 생성합니다.
이제 로컬 공기 흐름 설치에서 테스트를 해 보았는데 잘 작동합니다.동시에 두 개 이상의 Dag 인스턴스가 실행되는 경우 xcom 풀 파트에 문제가 있는지는 모르겠지만, 원하는 xcom 값을 고유하게 식별하기 위해 고유한 키 또는 유사한 키를 사용할 수 있습니다.아마도 현재 메인 다그의 특정 작업을 100% 확실히 얻기 위해 3. 단계를 최적화할 수 있지만, 제가 사용하기에 충분히 좋은 성능을 발휘하기 위해서는 xcom_pull을 사용하기 위해 하나의 task_instance 개체만 있으면 된다고 생각합니다.
또한 저는 실수로 잘못된 값을 얻지 않도록 모든 실행 전에 첫 번째 서브다그에 대한 xcoms를 청소합니다.
제가 설명을 잘 못해서 다음 코드가 모든 것을 명확하게 해주길 바랍니다.
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
및 기본 워크플로우:
test.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2
제가 생각하기에 당신이 찾고 있는 것은 DAG를 동적으로 만드는 것입니다. 저는 이 블로그를 검색한 후 며칠 전에 이런 상황을 접했습니다.
동적 작업 생성
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
DAG 워크플로 설정
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# Use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
코드를 조합한 후 우리 DAG의 모습입니다.
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id=task_id,
provide_context=True,
# Eval is used since the callableFunction var is of type string
# while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable=eval(callableFunction),
op_kwargs=args,
xcom_push=True,
dag=dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
# Extract table names and fields to be processed
tables = configFile['tables']
# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})
# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})
# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end
큰 도움이 되었습니다. 다른 사람에게도 도움이 될 것입니다.
OA: "A 태스크 완료 전까지 태스크 B.*의 수를 알 수 없도록 워크플로우를 생성할 수 있는 방법이 있습니까?"
간단한 대답은 아니오입니다.DAG를 실행하기 전에 공기 흐름이 DAG 흐름을 만듭니다.
그것은 우리가 간단한 결론에 도달했다는 것을 말해줍니다. 즉, 우리는 그런 필요가 없다는 것입니다.일부 작업을 병렬화하려면 처리할 항목 수가 아니라 사용 가능한 리소스를 평가해야 합니다.
우리는 이렇게 했습니다. 우리는 일정한 수의 작업(예: 10개)을 동적으로 생성하여 작업을 분할합니다.예를 들어 100개의 파일을 처리해야 하는 경우 각 작업에서 10개의 파일을 처리합니다.코드는 오늘 늦게 올리겠습니다.
갱신하다
여기 코드가 있습니다, 늦어서 죄송합니다.
from datetime import datetime, timedelta
import airflow
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
'email': ['myemail@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
dag = airflow.DAG(
'parallel_tasks_v1',
schedule_interval="@daily",
catchup=False,
default_args=args)
# You can read this from variables
parallel_tasks_total_number = 10
start_task = DummyOperator(
task_id='start_task',
dag=dag
)
# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id='parallel_task_' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)
end = DummyOperator(
task_id='end',
dag=dag)
for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end
코드 설명:
여기에는 단일 시작 작업과 단일 종료 작업(둘 다 더미)이 있습니다.
그런 다음 for 루프를 사용한 시작 작업에서 동일한 파이썬 호출 가능한 10개의 작업을 만듭니다.작업은 create_dynamic_task 함수에 생성됩니다.
각 파이썬 호출 가능에 병렬 작업의 총 수와 현재 작업 인덱스를 인수로 전달합니다.
자세히 설명할 항목이 1000개 있다고 가정합니다. 첫 번째 작업은 10개의 청크 중 첫 번째 청크를 자세히 설명해야 한다는 입력을 받습니다.그것은 1000개의 항목을 10개의 덩어리로 나누고 첫 번째 항목을 정교하게 만들 것입니다.
훌륭한 대답
너무 많아요?어쨌든.
다른 많은 답들은 약간 사각형-페그-둥근 구멍입니다.복잡한 새 연산자를 추가하거나, 내장된 변수를 남용하거나, 질문에 답하지 못하는 경우.웹 UI를 통해 볼 때 동작을 숨기거나, 손상되기 쉽거나, 많은 사용자 지정 코드가 필요하기 때문에 특별히 마음에 들지 않았습니다.
이 솔루션은 내장된 기능을 사용하며, 새로운 연산자와 제한된 추가 코드가 필요하지 않으며, DAG는 속임수 없이 UI를 통해 볼 수 있으며, 공기 흐름 모범 사례를 따릅니다(동일성 참조).
이 문제에 대한 해결책은 상당히 복잡해서 여러 부분으로 나누었습니다.다음은 다음과 같습니다.
- 동적 태스크 수를 안전하게 트리거하는 방법
- 이러한 모든 작업이 완료될 때까지 기다린 후 최종 작업을 호출하는 방법
- 이를 태스크 파이프라인에 통합하는 방법
- 한계(완벽한 것은 없음)
작업이 동적인 수의 다른 작업을 트리거할 수 있습니까?
네, 약간.새로운 연산자를 작성할 필요 없이 DAG가 내장된 연산자만 사용하여 동적인 수의 다른 DAG를 트리거하도록 할 수 있습니다.그런 다음 동적인 수의 다른 DAG에 의존하도록 DAG를 확장할 수 있습니다(작업이 완료되기를 기다리는 중 참조).이는 flinz의 솔루션과 비슷하지만 훨씬 더 강력하고 사용자 지정 코드가 적습니다.
이 작업은 다른 트리거DagRunOperator 2개를 선택적으로 트리거하는 BranchPythonOperator를 사용하여 수행됩니다.이 중 하나는 현재 DAG를 재귀적으로 호출하고, 다른 하나는 외부 DAG, 즉 대상 함수를 호출합니다.
recursive_dag.py 맨 위에 dag를 트리거하는 데 사용할 수 있는 구성 예제가 나와 있습니다.
print_conf.py(트리거할 DAG 예제)
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
recursive_dag.py(마법이 발생하는 곳)
"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'
def choose_branch(task_instance, dag_run):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
# dump to string because value is stringified into
# template string, is then parsed.
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
return next_task_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
return add_braces(pull)
with DAG(
dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id),
conf=make_templated_pull(next_task_conf)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=dag_id,
conf=make_templated_pull(repeat_task_conf)
)
branch >> [trigger_next, trigger_repeat]
이 솔루션은 매우 제한적인 사용자 지정 코드를 사용하는 장점이 있습니다.flinz의 솔루션은 부분적으로 실패하여 일부 스케줄링된 작업과 그렇지 않은 작업이 발생할 수 있습니다.그런 다음 다시 시도할 때 DAGS가 두 번 실행되도록 예약되거나 첫 번째 DAG에서 실패하여 실패한 작업이 부분적으로 완료될 수 있습니다.이 접근 방식은 트리거에 실패한 DAG를 알려주고 트리거에 실패한 DAG만 다시 시도합니다.따라서 이 접근 방식은 동일하지만 다른 접근 방식은 그렇지 않습니다.
DAG는 다른 DAG의 동적 수에 의존할 수 있습니까?
네, 하지만...작업이 병렬로 실행되지 않는 경우 이 작업을 쉽게 수행할 수 있습니다.병렬로 실행하는 것은 더 복잡합니다.
는 순적으실위중해변요같다사다습니음과경항은을 사용하는 것입니다.wait_for_completion=Truetrigger_next하여 " 이전에작업을 한 다음 "trigger_next" "xcom" xcom 연산자를 합니다.
setup_xcom >> trigger_next >> branch >> trigger_repeat
이 있는 로실템사을용여는하러외작센병부업있다유를 사용하는 Task 를 유사하게 할 수 .external_dag_id값과 트리거된 다그 실행과 관련된 타임스탬프가 표시됩니다.트리거된 Dag 타임스탬프를 가져오려면 트리거된 Dag의 타임스탬프를 사용하여 Dag를 트리거할 수 있습니다.그런 다음 이러한 센서가 하나씩 생성된 모든 DAG가 완료될 때까지 기다린 다음 최종 DAG를 트리거합니다.아래 코드는, 이번에는 인쇄 출력 DAG에 임의 절전을 추가하여 대기 대그가 실제로 약간의 대기를 수행하도록 했습니다.
참고: recurse_wait_dag.py는 이제 두 개의 단검을 정의합니다. 이 모든 것이 작동하려면 둘 다 활성화되어야 합니다.
dag를 트리거하는 데 사용할 수 있는 구성 예제는 recurse_wait_dag.py의 맨 위에 나와 있습니다.
print_conf.py(임의 절전 모드를 추가하도록 수정됨)
"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""
from datetime import timedelta
from time import sleep
from random import randint
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_output(dag_run):
sleep_time = randint(15,30)
print(f'sleeping for time: {sleep_time}')
sleep(sleep_time)
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)
recurse_wait_magic.py(더 많은 마법이 발생하는 곳)
"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
"final_task": "print_output",
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone
from common import make_templated_pull
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
def to_conf(id):
return f'{id}_conf'
def to_execution_date(id):
return f'{id}_execution_date'
def to_ts(id):
return f'{id}_ts'
recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)
wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)
def choose_branch(task_instance, dag_run, ts):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
# can't have multiple dag runs of same DAG with same timestamp
assert next_task != recurse_dag_id
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]
task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
task_instance.xcom_push(key=next_task_execution_date, value=ts)
if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs,
'triggered_tasks': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
end_conf = json.dumps({
'tasks_to_wait': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=end_task_conf, value=end_conf)
return [next_task_id, end_task_id]
def choose_wait_target(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
next_task, next_ts = tasks_to_wait[0]
later_tasks = tasks_to_wait[1:]
task_instance.xcom_push(key=next_wait_id, value=next_task)
task_instance.xcom_push(key=next_wait_ts, value=next_ts)
if later_tasks:
repeat_conf = json.dumps({
'tasks_to_wait': later_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
def execution_date_fn(_, task_instance):
date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
return timezone.parse(date_str)
def choose_wait_branch(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
if len(tasks_to_wait) == 1:
return end_task_id
return repeat_task_id
with DAG(
recurse_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as recursive_dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)
trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id, branch_id),
execution_date=make_templated_pull(next_task_execution_date, branch_id),
conf=make_templated_pull(next_task_conf, branch_id)
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(repeat_task_conf, branch_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(end_task_conf, branch_id)
)
branch >> [trigger_next, trigger_repeat, trigger_end]
with DAG(
wait_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as wait_dag:
py_operator = PythonOperator(
task_id=choose_wait_id,
python_callable=choose_wait_target
)
sensor = ExternalTaskSensor(
task_id='do_wait',
external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
execution_date_fn=execution_date_fn
)
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_wait_branch
)
trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(repeat_task_conf, choose_wait_id)
)
trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
)
py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]
코드와 통합
잘됐네요, 하지만 실제로 이걸 사용하고 싶으실 겁니다.그래서, 당신은 무엇을 해야 합니까?이 질문에는 다음을 수행하려는 예가 포함됩니다.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
질문 목표(아래 구현 예)를 달성하려면 태스크 A, B 및 C를 각각의 DAG로 분리해야 합니다.그런 다음 DAGA에서 위의 DAG 'recurse_then_wait'을 트리거하는 새 연산자를 끝에 추가합니다.각 BDAG에 필요한 구성과 BDAG ID(다른 Dag, Go 너트를 사용하도록 쉽게 변경할 수 있음)가 포함된 이 Daga 구성에 전달합니다.그런 다음 마지막에 실행할 최종 DAG인 DAG C의 이름을 포함합니다.이 구성은 다음과 같아야 합니다.
{
"final_task": "C_DAG",
"task_list": ["B_DAG","B_DAG"],
"conf_list": [
{
"b_number": 1,
"more_stuff": "goes_here"
},
{
"b_number": 2,
"foo": "bar"
}
]
}
구현 시에는 다음과 같이 보여야 합니다.
trigger_transce.py
from datetime import timedelta
import json
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from recurse_wait_dag import recurse_dag_id
def add_braces(in_string):
return '{{' + in_string + '}}'
def make_templated_pull(key, task_id):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
return add_braces(pull)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'
def setup_trigger_conf(task_instance):
trigger_conf = {
'final_task': 'print_output',
'task_list': ['print_output','print_output'],
'conf_list': [
{
'output': 'Hello'
},
{
'output': 'world!'
}
]
}
print('Triggering the following tasks')
for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
print(f' task: {task} with config {json.dumps(conf)}')
print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')
task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))
with DAG(
'trigger_recurse_example',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
py_operator = PythonOperator(
task_id=setup_trigger_conf_id,
python_callable=setup_trigger_conf
)
trigger_operator = TriggerDagRunOperator(
task_id='trigger_call_and_wait',
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
)
py_operator >> trigger_operator
이 모든 것은 DAG가 다른 DAG를 트리거하는 위치를 보여주는 수직 및 수평 라인과 함께 아래와 같이 표시됩니다.
A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C
한계
작업은 더 이상 단일 그래프에 표시되지 않습니다.이것이 아마도 이 접근법의 가장 큰 문제일 것입니다.연결된 모든 DAG에 태그를 추가하면 DAG를 함께 볼 수 있습니다.그러나 DAGB의 여러 병렬 실행을 DAGA의 실행과 연관시키는 것은 번거롭습니다.그러나 DAG 실행 하나에 입력 conf가 표시되므로 각 DAG 실행은 DAGA에 의존하지 않고 입력 구성에만 의존합니다.따라서 이 관계는 적어도 부분적으로 무시될 수 있습니다.
작업은 더 이상 xcom을 사용하여 통신할 수 없습니다.B 태스크는 DAG 구성을 통해 A 태스크로부터 입력을 받을 수 있지만 C 태스크는 B 태스크로부터 출력을 받을 수 없습니다.모든 B 작업의 결과는 알려진 위치에 저장된 다음 C 작업에 의해 읽혀져야 합니다.
'recurse_and_wait'에 대한 구성 인수는 task_list와 conf_list를 결합하도록 개선될 수 있지만, 이는 언급된 대로 문제를 해결합니다.
최종 DAG에 대한 구성이 없습니다.그것은 해결하기에 사소한 것일 것입니다.
작업 그래프는 실행 시 생성되지 않습니다.대신 그래프는 Dags 폴더에서 공기 흐름에 의해 선택될 때 작성됩니다.따라서 작업이 실행될 때마다 다른 그래프를 사용할 수는 없습니다.로드 시 조회를 기반으로 그래프를 작성하도록 작업을 구성할 수 있습니다.이 그래프는 이후 모든 실행에 대해 동일하게 유지되므로 그다지 유용하지 않을 수 있습니다.
분기 연산자를 사용하여 쿼리 결과를 기반으로 모든 실행에 대해 서로 다른 작업을 실행하는 그래프를 설계할 수 있습니다.
작업 집합을 미리 구성한 다음 쿼리 결과를 가져와 작업 전체에 배포했습니다.쿼리가 많은 결과를 반환하는 경우 스케줄러에 많은 동시 작업을 플러딩하지 않기 때문에 이 방법이 더 나을 수 있습니다.더욱 안전하게 사용하기 위해 풀을 사용하여 예기치 않게 큰 쿼리로 인해 동시성이 제어되지 않도록 했습니다.
"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask
########################################################################
default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': datetime(2019, 7, 2, 19, 50, 00),
'email': ['rotten@stackoverflow'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}
dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)
totalBuckets = 5
get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
and
o.is_test = false
and
o.is_processed = false
"""
###########################################################################################################
# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f'order_processing_task_{bucket_number}',
python_callable=runOrderProcessing,
pool='order_processing_pool',
op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
provide_context=True,
dag=dag
)
# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)
if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)
# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')
# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f'order_processing_task_{task_number}'] = []
# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):
resultCounter += 1
bucket = (resultCounter % totalBuckets)
tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})
# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f'Task {task} has {len(tasks[task])} orders.')
context['ti'].xcom_push(key=task, value=tasks[task])
else:
# if we didn't have enough tasks for every bucket
# don't bother running that task - remove it from the list
logging.info(f"Task {task} doesn't have any orders.")
del(tasks[task])
return list(tasks.keys())
###################################################################################################
# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id='clean_xcoms',
mysql_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id='get_orders',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
get_orders_task.set_upstream(clean_xcoms)
# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)
###################################################################################################
https://github.com/mastak/airflow_multi_dagrun, 에서 TriggerDagRuns와 유사하게 여러 개의 DagRuns를 트리거하여 DagRuns의 간단한 대기열을 사용하는 더 좋은 솔루션을 찾은 것 같습니다.최신 흐름으로 작동하기 위해 몇 가지 세부 사항을 패치해야 했지만 대부분의 크레딧은 https://github.com/mastak, 에 전달됩니다.
이 솔루션은 여러 DagRun을 트리거하는 사용자 지정 연산자를 사용합니다.
from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
CREATED_DAGRUN_KEY = 'created_dagrun_key'
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None,
*args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
context.update(self.op_kwargs)
session = settings.Session()
created_dr_ids = []
for dro in self.python_callable(*self.op_args, **context):
if not dro:
break
if not isinstance(dro, DagRunOrder):
dro = DagRunOrder(payload=dro)
now = timezone.utcnow()
if dro.run_id is None:
dro.run_id = 'trig__' + now.isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
execution_date=now,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True,
)
created_dr_ids.append(dr.id)
self.log.info("Created DagRun %s, %s", dr, now)
if created_dr_ids:
session.commit()
context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
else:
self.log.info("No DagRun created")
session.close()
그런 다음 PythonOperator의 호출 가능 함수에서 다음과 같은 여러 개의 다그룬을 제출할 수 있습니다.
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago
def generate_dag_run(**kwargs):
for i in range(10):
order = DagRunOrder(payload={'my_variable': i})
yield order
args = {
'start_date': days_ago(1),
'owner': 'airflow',
}
dag = DAG(
dag_id='simple_trigger',
max_active_runs=1,
schedule_interval='@hourly',
default_args=args,
)
gen_target_dag_run = TriggerMultiDagRunOperator(
task_id='gen_target_dag_run',
dag=dag,
trigger_dag_id='common_target',
python_callable=generate_dag_run
)
https://github.com/flinz/airflow_multi_dagrun 에서 코드로 포크를 만들었습니다.
무엇이 문제인지 이해하지 못합니까?
여기 표준 예가 있습니다.이제 기능이 작동하는 경우 하위 닥을 교체합니다.for i in range(5):와 함께for i in range(random.randint(0, 10)):그러면 모든 것이 작동할 것입니다.이제 연산자 '시작'이 데이터를 파일에 넣고 랜덤 값 대신 함수가 이 데이터를 읽는다고 상상해 보십시오.그러면 연산자 '시작'이 작업 수에 영향을 미칩니다.
하위 태그를 입력할 때 작업 수가 현재 파일/데이터베이스/XCom에서 마지막으로 읽은 것과 같기 때문에 문제는 UI에서만 표시됩니다.한 번에 여러 번의 단검 실행을 자동으로 제한합니다.
패러다임 전환
여기 있는 모든 답변을 바탕으로 볼 때, 동적 "작업 목록" 생성 코드를 초기 작업으로 생각하는 것이 아니라 DAG 이전 정의 계산으로 생각하는 것이 가장 좋은 방법인 것 같습니다.
물론 이는 OP가 설명한 것처럼 각 DAG가 처음에 한 번만 실행되는 단일 초기 계산이 있다고 가정합니다.공기 흐름이 구축되지 않은 것처럼 보이는 패턴인 DAG를 일부 중간 작업에서 다시 정의해야 하는 경우 이 접근 방식은 작동하지 않습니다.그러나 컨트롤러/대상 DAG를 연결하는 것이 좋습니다(아래 참조).
코드 샘플:
from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago
DEFAULT_ARGS = {"owner": "airflow"}
def get_list_of_things(connection_id):
list_all_the_things_sql = """
SELECT * FROM things
"""
pg_hook = PostgresHook(postgres_conn_id=connection_id)
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(list_all_the_things_sql) # NOTE: this will execute to build the DAG, so if you grock the code, expect the DAG not to load, unless you have a valid postgres DB with a table named "things" and with things in it.
res = cursor.fetchall()
return res
@dag(default_args=DEFAULT_ARGS, schedule_interval="@once", start_date=days_ago(2), dag_id='test_joey_dag')
def dynamicly_generated_dag():
connection_id = "ProdDB"
@task
def do_a_thing(row):
print(row)
return row
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
data_list = get_list_of_things(connection_id)
for row in data_list:
start >> do_a_thing(row) >> end
dag = dynamicly_generated_dag()
에 약에만.get_list_of_things()계산이 길기 때문에 사전 계산하여 컨트롤러/타겟 패턴을 사용하여 이 DAG를 외부에서 트리거하는 것이 좋습니다.
trigger_target_target_target_target
상황에 따라 비동기 배치 작업자 스타일로 구현할 수 있습니다."동적 작업"은 수행할 작업 항목 목록으로 처리되고 작업자 노드가 픽업할 외부 메시지 브로커 대기열에 게시된 비동기 메시지로 분할될 수 있습니다.
하나의 작업은 "작업"을 동적으로 생성하고 모든 항목(정확히 몇 개, 심지어 어떤 항목이 있는지도 사전에 알 수 없음)을 주제/큐에 게시합니다.
작업자는 대기열에서 "작업 태스크"를 소비합니다.외부 공기 흐름 기술을 사용하여 직접 구현하거나 공기 흐름 센서 작업(별도의 DAG)으로 구현할 수 있습니다.작업 처리가 완료되면 공기량 센서가 트리거되고 실행 흐름이 계속됩니다.
개별 작업 항목의 흐름을 복원하려면 EIP 클레임 확인 패턴을 사용하는 것이 좋습니다.
5년 전에 질문이 있었지만, 오늘날 많은 작업과 관련이 있습니다.저도 같은 요구 사항이 있었지만 이전 작업의 출력을 기반으로 동적 작업을 생성할 수 있는 적절한 방법을 찾을 수 없었습니다. 제 경우 요구 사항은 다음과 같습니다.
- 이전 작업을 기준으로 동적 작업 수 생성
- 이러한 작업은 공기 흐름 서버에서 실행되는 PythonOperator를 사용하지 않고 데이터 프로시저 연산자를 사용하여 원격 GCP 데이터 프로시저 서버에서 실행해야 했습니다.
제가 여기서 간단히 취한 접근법을 적었습니다.같은 요구사항이 있으면 읽어보시고 다른 방법도 있으면 알려주세요.도움이 되길 바랍니다.
Postgres 태스크를 동적으로 생성하기 위해 다음과 같은 작업을 수행할 수 있습니다.
for filename in some_files:
try:
f = open(f'{filename}')
query = f.read()
dynamic_task = PostgresOperator(
task_id=f"run_{filename}",
postgres_conn_id="some_connection",
sql=query
)
task_start >> dynamic_task >> task_end
finally:
f.close()
v2.3 버전의 경우 다음과 같은 순차적 접근 방식을 수행할 수 있습니다.
python_operator를 사용합니다.그 함수 내에서, 당신은 다음을 사용하여 다른 연산자를 호출할 수 있습니다.execute명확하게 문서화되지 않은 명령입니다.
주의:
- 이것은 순차적으로 실행됩니다(OP가 정확히 무엇을 찾고 있는지는 아니지만 많은 경우 여전히 유용합니다).
- 그래프/트리 보기의 UI는 루프에서 처리된 모든 항목을 표시하지 않습니다. 루프에 충분한 로그가 있으면 때때로 큰 문제가 되지 않습니다.
아래 예제에서는 상위/하위 단그 시나리오에 트리거 다그 연산자를 사용하지만 다른 연산자를 사용할 수 있습니다.
def dynamic_executions(
df_data = pd.read_sql('select * from mytable', my_engine)
for row in df_data.itertuples():
trigger = TriggerDagRunOperator(
task_id="test1",
trigger_dag_id="my_child_dag",
conf={"message": row.column1},
execution_date=exec_date
)
-- key piece here, call execute
trigger.execute(kwcontext)
dynamic_executions= PythonOperator(
task_id='dynamic_executions',
python_callable=dynamic_executions,
)
다음은 Ariflow 1.x에 적용할 수 있는 다른 접근 방식으로, 주로 @oleg-yamin에서 영감을 받았으며 동일한 솔루션에 대한 @Ena의 의견과 관련이 있습니다.
태스크 B의 총 개수를 제한할 수 있으므로 BranchPythonOperator를 사용하여 태스크 B의 가변 개수를 트리거할 수 있습니다.이 방법은 변수 개수의 값을 dag 매개 변수로 전달하는 경우 유용할 수 있습니다.
코드 측면에서는 다음과 같습니다.
MAX_EXECS_TO_START = 5
def generate_variable_number_of_values(variable_number_of_values):
# you can do some magic with input parameters here and return the result
variable_number_of_values = [v.strip() for v in variable_number_of_values]
if len(variable_number_of_values) > MAX_EXECS_TO_START:
raise Exception(f"Too many executions requested! The maximum is {MAX_EXECS_TO_START}")
def test_value(*args, **kwargs):
ti: TaskInstance = kwargs["ti"]
# get all values from xcom
all_values_info = ti.xcom_pull(task_ids="generate_variable_number_of_values")
# check current counter
consecutive_run_count = args[0]
try:
current_value = all_values_info[consecutive_run_count]
return f"trigger_{consecutive_run_count}th_exec_operator"
except IndexError as ex:
return "final_operator"
그리고 다그 자체:
with DAG(
dag_id="sample_dag_id",
) as dag:
# use a variable number of values passed as dag param
variable_number_of_values = dag_param("values_to_use")
# define the operators
start_op = DummyOperator(task_id="starting_operator")
finish_op = DummyOperator(task_id="final_operator")
variable_num_of_values_op = PythonOperator(
task_id="generate_variable_number_of_values",
python_callable=generate_variable_number_of_values,
op_args=[variable_number_of_values],
do_xcom_push=True,
dag=dag,
)
def create_branch_op(run_counter):
return BranchPythonOperator(
task_id=f'branching_{run_counter}',
python_callable=test_value,
provide_context=True,
op_args=[run_counter],
do_xcom_push=True,
dag=dag,
)
def trigger_exec(run_counter):
# you can use a different operator and provide context and do x_com_push
# similar to the test_value method
return DummyOperator(task_id=f"trigger_{run_counter}th_exec_operator")
# Chaining
start_op >> variable_num_of_values_op
for run_counter in range(MAX_SESSIONS_TO_START):
branching_op = create_branch_op(run_counter)
trigger_exec_op = trigger_exec(run_counter)
variable_num_of_values_op >> branching_op
branching_op >> trigger_exec_op >> finish_op
branching_op >> finish_op
저는 이 질문과 매우 유사한 중간 게시물을 발견했습니다.하지만 오타가 많아서 구현을 해봐도 작동하지 않습니다.
위 내용에 대한 저의 답변은 다음과 같습니다.
동적으로 태스크를 생성하는 경우 업스트림 태스크에 의해 생성되지 않았거나 해당 태스크와 독립적으로 정의될 수 있는 작업을 반복하여 생성해야 합니다.이전에 많은 다른 사람들이 지적했던 것처럼 실행 날짜 또는 기타 공기 흐름 변수를 템플릿 외부(예: 작업)로 전달할 수 없다는 것을 알게 되었습니다.이 게시물도 참조하십시오.
언급URL : https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow
'programing' 카테고리의 다른 글
| web.config에서 하나의 항목을 암호화하는 방법 (0) | 2023.08.01 |
|---|---|
| Animate scrollFirefox에서 상단이 작동하지 않음 (0) | 2023.08.01 |
| 다중 글꼴 가중치, 하나의 @ 글꼴-면 쿼리 (0) | 2023.08.01 |
| mysql 치명적 오류: 버퍼 풀에 메모리를 할당할 수 없습니다. (0) | 2023.08.01 |
| NSData 또는 UIImage에서 이미지 유형 찾기 (0) | 2023.08.01 |




