|
| 1 | +from airflow.decorators import dag, task |
| 2 | +from airflow.operators.dummy import DummyOperator |
| 3 | +from airflow.operators.python import BranchPythonOperator |
| 4 | +from airflow.utils.trigger_rule import TriggerRule |
| 5 | +from dags import DEFAULT_DAG_ARGUMENTS |
| 6 | +from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream |
| 7 | +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator |
| 8 | +from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_query_pipeline |
| 9 | +from ted_sws.event_manager.adapters.event_log_decorator import event_log |
| 10 | +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ |
| 11 | + EventMessageProcessType |
| 12 | + |
| 13 | +DAG_NAME = "fetch_notices_by_query" |
| 14 | +BATCH_SIZE = 2000 |
| 15 | +QUERY_DAG_KEY = "query" |
| 16 | +TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" |
| 17 | +TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow" |
| 18 | +TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow" |
| 19 | +CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "check_if_trigger_complete_workflow" |
| 20 | +FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_query" |
| 21 | + |
| 22 | + |
| 23 | +@dag(default_args=DEFAULT_DAG_ARGUMENTS, |
| 24 | + catchup=False, |
| 25 | + tags=['fetch']) |
| 26 | +def fetch_notices_by_query(): |
| 27 | + @task |
| 28 | + @event_log(TechnicalEventMessage( |
| 29 | + message="fetch_by_query_notice_from_ted", |
| 30 | + metadata=EventMessageMetadata( |
| 31 | + process_type=EventMessageProcessType.DAG, process_name=DAG_NAME |
| 32 | + )) |
| 33 | + ) |
| 34 | + def fetch_by_query_notice_from_ted(): |
| 35 | + notice_ids = notice_fetcher_by_query_pipeline(query=get_dag_param(key=QUERY_DAG_KEY, raise_error=True)) |
| 36 | + if not notice_ids: |
| 37 | + raise Exception("No notices has been fetched!") |
| 38 | + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) |
| 39 | + |
| 40 | + trigger_complete_workflow = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_COMPLETE_WORKFLOW_TASK_ID, |
| 41 | + execute_only_one_step=False |
| 42 | + ) |
| 43 | + trigger_normalisation_workflow = TriggerNoticeBatchPipelineOperator( |
| 44 | + task_id=TRIGGER_PARTIAL_WORKFLOW_TASK_ID, |
| 45 | + batch_size=BATCH_SIZE, |
| 46 | + execute_only_one_step=True) |
| 47 | + |
| 48 | + def _branch_selector(): |
| 49 | + trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, |
| 50 | + default_value=True) |
| 51 | + push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY)) |
| 52 | + if trigger_complete_workflow: |
| 53 | + return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID] |
| 54 | + return [TRIGGER_PARTIAL_WORKFLOW_TASK_ID] |
| 55 | + |
| 56 | + branch_task = BranchPythonOperator( |
| 57 | + task_id=CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID, |
| 58 | + python_callable=_branch_selector, |
| 59 | + ) |
| 60 | + |
| 61 | + finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID, |
| 62 | + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) |
| 63 | + |
| 64 | + fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow, |
| 65 | + trigger_complete_workflow] >> finish_step |
| 66 | + |
| 67 | + |
| 68 | +dag = fetch_notices_by_query() |
0 commit comments