Skip to content

Commit 2d42541

Browse files
committed
feat!: Change logic of failing tasks for notice_processing_pipeline DAG
1 parent dc388f7 commit 2d42541

6 files changed

Lines changed: 40 additions & 29 deletions

dags/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
"execution_timeout": timedelta(days=10),
1717
}
1818

19+
BATCH_SIZE = 2000
20+
1921
NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
2022
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
2123
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"

dags/fetch_notices_by_date.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airflow.timetables.trigger import CronTriggerTimetable
88
from airflow.utils.trigger_rule import TriggerRule
99

10-
from dags import DEFAULT_DAG_ARGUMENTS
10+
from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE
1111
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
1212
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
1313
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline
@@ -18,7 +18,6 @@
1818
from ted_sws.event_manager.services.log import log_error
1919

2020
FETCHER_DAG_NAME = "fetch_notices_by_date"
21-
BATCH_SIZE = 2000
2221
WILD_CARD_DAG_KEY = "wild_card"
2322
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
2423
TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow"

dags/load_mapping_suite_in_database.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from airflow.utils.trigger_rule import TriggerRule
66
from pymongo import MongoClient
77

8-
from dags import DEFAULT_DAG_ARGUMENTS
8+
from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE
99
from dags.dags_utils import push_dag_downstream, pull_dag_upstream, get_dag_param
1010
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
1111
from ted_sws import config
@@ -111,7 +111,8 @@ def _branch_selector():
111111
finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
112112
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
113113

114-
trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)
114+
trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID,
115+
batch_size=BATCH_SIZE)
115116
fetch_mapping_suite_package_from_github_into_mongodb() >> branch_task
116117
trigger_document_proc_pipeline >> finish_step
117118
branch_task >> [trigger_document_proc_pipeline, finish_step]

dags/notice_processing_pipeline.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List
22

33
from airflow.decorators import dag
4+
from airflow.exceptions import AirflowSkipException
45
from airflow.operators.python import BranchPythonOperator, PythonOperator
56
from airflow.utils.trigger_rule import TriggerRule
67

@@ -61,37 +62,36 @@ def _selector_branch_before_publish():
6162
return branch_selector(NOTICE_PUBLISH_PIPELINE_TASK_ID)
6263

6364
def _stop_processing():
64-
notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY)
65-
if not notice_ids:
66-
raise Exception(f"No notice has been processed!")
65+
pass
6766

6867
start_processing = BranchPythonOperator(
6968
task_id=BRANCH_SELECTOR_TASK_ID,
7069
python_callable=_start_processing,
70+
trigger_rule=TriggerRule.ALWAYS
7171
)
7272

7373
selector_branch_before_transformation = BranchPythonOperator(
7474
task_id=SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
7575
python_callable=_selector_branch_before_transformation,
76-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
76+
trigger_rule=TriggerRule.ALL_SUCCESS,
7777
)
7878

7979
selector_branch_before_validation = BranchPythonOperator(
8080
task_id=SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
8181
python_callable=_selector_branch_before_validation,
82-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
82+
trigger_rule=TriggerRule.ALL_SUCCESS,
8383
)
8484

8585
selector_branch_before_package = BranchPythonOperator(
8686
task_id=SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
8787
python_callable=_selector_branch_before_package,
88-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
88+
trigger_rule=TriggerRule.ALL_SUCCESS,
8989
)
9090

9191
selector_branch_before_publish = BranchPythonOperator(
9292
task_id=SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID,
9393
python_callable=_selector_branch_before_publish,
94-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
94+
trigger_rule=TriggerRule.ALL_SUCCESS,
9595
)
9696

9797
stop_processing = PythonOperator(
@@ -103,27 +103,27 @@ def _stop_processing():
103103

104104
notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline,
105105
task_id=NOTICE_NORMALISATION_PIPELINE_TASK_ID,
106-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
106+
trigger_rule=TriggerRule.ALL_SUCCESS)
107107

108108
notice_transformation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_transformation_pipeline,
109109
task_id=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID,
110-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
110+
trigger_rule=TriggerRule.ALL_SUCCESS)
111111

112112
notice_distillation_step = NoticeBatchPipelineOperator(batch_pipeline_callable=notices_batch_distillation_pipeline,
113113
task_id=NOTICE_DISTILLATION_PIPELINE_TASK_ID,
114-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
114+
trigger_rule=TriggerRule.ALL_SUCCESS
115115
)
116116

117117
notice_validation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_validation_pipeline,
118118
task_id=NOTICE_VALIDATION_PIPELINE_TASK_ID,
119-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
119+
trigger_rule=TriggerRule.ALL_SUCCESS)
120120
notice_package_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_package_pipeline,
121121
task_id=NOTICE_PACKAGE_PIPELINE_TASK_ID,
122-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
122+
trigger_rule=TriggerRule.ALL_SUCCESS)
123123

124124
notice_publish_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_publish_pipeline,
125125
task_id=NOTICE_PUBLISH_PIPELINE_TASK_ID,
126-
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
126+
trigger_rule=TriggerRule.ALL_SUCCESS)
127127

128128
start_processing >> [notice_normalisation_step, selector_branch_before_transformation,
129129
selector_branch_before_validation,

dags/operators/DagBatchPipelineOperator.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
from typing import Any, Protocol, List
22
from uuid import uuid4
3+
4+
from airflow.exceptions import AirflowSkipException
35
from airflow.models import BaseOperator
46
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
57
from pymongo import MongoClient
68

79
from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \
810
smart_xcom_push
9-
from ted_sws.core.service.batch_processing import chunks
10-
from dags.pipelines.pipeline_protocols import NoticePipelineCallable
11+
from dags.pipelines.pipeline_protocols import NoticePipelineCallable, NoticePipelineOutput
1112
from ted_sws import config
13+
from ted_sws.core.service.batch_processing import chunks
1214
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
1315
from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage
1416
from ted_sws.event_manager.services.log import log_notice_error
@@ -26,7 +28,7 @@
2628

2729
class BatchPipelineCallable(Protocol):
2830

29-
def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
31+
def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[NoticePipelineOutput]:
3032
"""
3133
:param notice_ids:
3234
:param mongodb_client:
@@ -57,11 +59,14 @@ def execute(self, context: Any):
5759
"""
5860
logger = get_logger()
5961
notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY)
60-
if not notice_ids:
62+
if notice_ids is None:
6163
raise Exception(f"XCOM key [{NOTICE_IDS_KEY}] is not present in context!")
64+
if len(notice_ids) == 0:
65+
smart_xcom_push(key=NOTICE_IDS_KEY, value=[])
66+
raise AirflowSkipException("No notices to process!")
6267
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
6368
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
64-
processed_notice_ids = []
69+
processed_notices_pipeline_output: List[NoticePipelineOutput] = []
6570
pipeline_name = DEFAULT_PIPELINE_NAME_FOR_LOGS
6671
if self.notice_pipeline_callable:
6772
pipeline_name = self.notice_pipeline_callable.__name__
@@ -76,7 +81,7 @@ def execute(self, context: Any):
7681
handle_event_message_metadata_dag_context(batch_event_message, context)
7782
batch_event_message.start_record()
7883
if self.batch_pipeline_callable is not None:
79-
processed_notice_ids.extend(
84+
processed_notices_pipeline_output.extend(
8085
self.batch_pipeline_callable(notice_ids=notice_ids, mongodb_client=mongodb_client))
8186
elif self.notice_pipeline_callable is not None:
8287
for notice_id in notice_ids:
@@ -89,7 +94,7 @@ def execute(self, context: Any):
8994
if result_notice_pipeline.store_result:
9095
notice_repository.update(notice=result_notice_pipeline.notice)
9196
if result_notice_pipeline.processed:
92-
processed_notice_ids.append(notice_id)
97+
processed_notices_pipeline_output.append(result_notice_pipeline)
9398
notice_event.end_record()
9499
if notice.normalised_metadata:
95100
notice_event.notice_form_number = notice.normalised_metadata.form_number
@@ -102,12 +107,12 @@ def execute(self, context: Any):
102107
notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,
103108
notice_status=notice.status if notice else None,
104109
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)
110+
raise e
105111

106112
batch_event_message.end_record()
107113
logger.info(event_message=batch_event_message)
108-
if not processed_notice_ids:
109-
raise Exception(f"No notice has been processed!")
110-
smart_xcom_push(key=NOTICE_IDS_KEY, value=processed_notice_ids)
114+
smart_xcom_push(key=NOTICE_IDS_KEY, value=[notice_pipeline_output.notice.ted_id for notice_pipeline_output in
115+
processed_notices_pipeline_output])
111116

112117

113118
class TriggerNoticeBatchPipelineOperator(BaseOperator):

dags/pipelines/notice_batch_processor_pipelines.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
from typing import List
2+
23
from pymongo import MongoClient
34

5+
from dags.pipelines.pipeline_protocols import NoticePipelineOutput
46
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities
57

68
CET_URIS = ["http://www.w3.org/ns/org#Organization"]
79
PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure"
810

911

10-
def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
12+
def notices_batch_distillation_pipeline(notice_ids: List[str],
13+
mongodb_client: MongoClient
14+
) -> List[NoticePipelineOutput]:
1115
"""
1216
1317
:param notice_ids:
@@ -29,4 +33,4 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
2933
deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client)
3034
for notice in notices:
3135
notice_repository.update(notice=notice)
32-
return notice_ids
36+
return [NoticePipelineOutput(notice=notice) for notice in notices]

0 commit comments

Comments
 (0)