From 2d42541e3629fa6046a3e8d1654173152b25938c Mon Sep 17 00:00:00 2001 From: duprijil Date: Mon, 26 May 2025 09:17:04 +0300 Subject: [PATCH 01/18] feat!: Change logic of failing tasks for notice_processing_pipeline DAG --- dags/__init__.py | 2 ++ dags/fetch_notices_by_date.py | 3 +-- dags/load_mapping_suite_in_database.py | 5 ++-- dags/notice_processing_pipeline.py | 26 +++++++++---------- dags/operators/DagBatchPipelineOperator.py | 25 +++++++++++------- .../notice_batch_processor_pipelines.py | 8 ++++-- 6 files changed, 40 insertions(+), 29 deletions(-) diff --git a/dags/__init__.py b/dags/__init__.py index ddc8f130b..741259b78 100644 --- a/dags/__init__.py +++ b/dags/__init__.py @@ -16,6 +16,8 @@ "execution_timeout": timedelta(days=10), } +BATCH_SIZE = 2000 + NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline" NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline" NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline" diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 5bea63499..63e6c4a5d 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -7,7 +7,7 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.trigger_rule import TriggerRule -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline @@ -18,7 +18,6 @@ from ted_sws.event_manager.services.log import log_error FETCHER_DAG_NAME = "fetch_notices_by_date" -BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow" diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index cd4efc005..3f9c35eef 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -5,7 +5,7 @@ from airflow.utils.trigger_rule import TriggerRule from pymongo import MongoClient -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE from dags.dags_utils import push_dag_downstream, pull_dag_upstream, get_dag_param from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from ted_sws import config @@ -111,7 +111,8 @@ def _branch_selector(): finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID) + trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID, + batch_size=BATCH_SIZE) fetch_mapping_suite_package_from_github_into_mongodb() >> branch_task trigger_document_proc_pipeline >> finish_step branch_task >> [trigger_document_proc_pipeline, finish_step] diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index 02507acfd..08268e4e9 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -1,6 +1,7 @@ from typing import List from airflow.decorators import dag +from airflow.exceptions import AirflowSkipException from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule @@ -61,37 +62,36 @@ def _selector_branch_before_publish(): return branch_selector(NOTICE_PUBLISH_PIPELINE_TASK_ID) def _stop_processing(): - notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) - if not notice_ids: - raise Exception(f"No notice has been processed!") + pass start_processing = BranchPythonOperator( task_id=BRANCH_SELECTOR_TASK_ID, python_callable=_start_processing, + trigger_rule=TriggerRule.ALWAYS ) selector_branch_before_transformation = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, python_callable=_selector_branch_before_transformation, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_validation = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, python_callable=_selector_branch_before_validation, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_package = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, python_callable=_selector_branch_before_package, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_publish = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, python_callable=_selector_branch_before_publish, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) stop_processing = PythonOperator( @@ -103,27 +103,27 @@ def _stop_processing(): notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline, task_id=NOTICE_NORMALISATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_transformation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_transformation_pipeline, task_id=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_distillation_step = NoticeBatchPipelineOperator(batch_pipeline_callable=notices_batch_distillation_pipeline, task_id=NOTICE_DISTILLATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS + trigger_rule=TriggerRule.ALL_SUCCESS ) notice_validation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_validation_pipeline, task_id=NOTICE_VALIDATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_package_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_package_pipeline, task_id=NOTICE_PACKAGE_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_publish_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_publish_pipeline, task_id=NOTICE_PUBLISH_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) start_processing >> [notice_normalisation_step, selector_branch_before_transformation, selector_branch_before_validation, diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index 510f5d3b6..217b1e9dd 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -1,14 +1,16 @@ from typing import Any, Protocol, List from uuid import uuid4 + +from airflow.exceptions import AirflowSkipException from airflow.models import BaseOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from pymongo import MongoClient from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \ smart_xcom_push -from ted_sws.core.service.batch_processing import chunks -from dags.pipelines.pipeline_protocols import NoticePipelineCallable +from dags.pipelines.pipeline_protocols import NoticePipelineCallable, NoticePipelineOutput from ted_sws import config +from ted_sws.core.service.batch_processing import chunks from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage from ted_sws.event_manager.services.log import log_notice_error @@ -26,7 +28,7 @@ class BatchPipelineCallable(Protocol): - def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[str]: + def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[NoticePipelineOutput]: """ :param notice_ids: :param mongodb_client: @@ -57,11 +59,14 @@ def execute(self, context: Any): """ logger = get_logger() notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) - if not notice_ids: + if notice_ids is None: raise Exception(f"XCOM key [{NOTICE_IDS_KEY}] is not present in context!") + if len(notice_ids) == 0: + smart_xcom_push(key=NOTICE_IDS_KEY, value=[]) + raise AirflowSkipException("No notices to process!") mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) notice_repository = NoticeRepository(mongodb_client=mongodb_client) - processed_notice_ids = [] + processed_notices_pipeline_output: List[NoticePipelineOutput] = [] pipeline_name = DEFAULT_PIPELINE_NAME_FOR_LOGS if self.notice_pipeline_callable: pipeline_name = self.notice_pipeline_callable.__name__ @@ -76,7 +81,7 @@ def execute(self, context: Any): handle_event_message_metadata_dag_context(batch_event_message, context) batch_event_message.start_record() if self.batch_pipeline_callable is not None: - processed_notice_ids.extend( + processed_notices_pipeline_output.extend( self.batch_pipeline_callable(notice_ids=notice_ids, mongodb_client=mongodb_client)) elif self.notice_pipeline_callable is not None: for notice_id in notice_ids: @@ -89,7 +94,7 @@ def execute(self, context: Any): if result_notice_pipeline.store_result: notice_repository.update(notice=result_notice_pipeline.notice) if result_notice_pipeline.processed: - processed_notice_ids.append(notice_id) + processed_notices_pipeline_output.append(result_notice_pipeline) notice_event.end_record() if notice.normalised_metadata: notice_event.notice_form_number = notice.normalised_metadata.form_number @@ -102,12 +107,12 @@ def execute(self, context: Any): notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None, notice_status=notice.status if notice else None, notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None) + raise e batch_event_message.end_record() logger.info(event_message=batch_event_message) - if not processed_notice_ids: - raise Exception(f"No notice has been processed!") - smart_xcom_push(key=NOTICE_IDS_KEY, value=processed_notice_ids) + smart_xcom_push(key=NOTICE_IDS_KEY, value=[notice_pipeline_output.notice.ted_id for notice_pipeline_output in + processed_notices_pipeline_output]) class TriggerNoticeBatchPipelineOperator(BaseOperator): diff --git a/dags/pipelines/notice_batch_processor_pipelines.py b/dags/pipelines/notice_batch_processor_pipelines.py index e4c5f165b..12605f05f 100644 --- a/dags/pipelines/notice_batch_processor_pipelines.py +++ b/dags/pipelines/notice_batch_processor_pipelines.py @@ -1,13 +1,17 @@ from typing import List + from pymongo import MongoClient +from dags.pipelines.pipeline_protocols import NoticePipelineOutput from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities CET_URIS = ["http://www.w3.org/ns/org#Organization"] PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure" -def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]: +def notices_batch_distillation_pipeline(notice_ids: List[str], + mongodb_client: MongoClient + ) -> List[NoticePipelineOutput]: """ :param notice_ids: @@ -29,4 +33,4 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client) for notice in notices: notice_repository.update(notice=notice) - return notice_ids + return [NoticePipelineOutput(notice=notice) for notice in notices] From 74268e20206b8165538e9cea0812317c482ded33 Mon Sep 17 00:00:00 2001 From: duprijil Date: Mon, 26 May 2025 09:49:55 +0300 Subject: [PATCH 02/18] fix: Fix test after changing distillation pipeline return type --- tests/e2e/dags/pipelines/test_notice_processor_pipelines.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py b/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py index d6d8c196c..15f293e51 100644 --- a/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py +++ b/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py @@ -33,7 +33,7 @@ def test_notice_processor_pipelines(fake_mongodb_client): notice_repository.update(notice=notice) result_list = notices_batch_distillation_pipeline(notice_ids=[notice_id], mongodb_client=fake_mongodb_client) assert len(result_list) == 1 - notice_id = result_list[0] + notice_id = result_list[0].notice.ted_id notice = notice_repository.get(reference=notice_id) pipelines = [notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline] notice_states = [NoticeStatus.DISTILLED, NoticeStatus.VALIDATED, NoticeStatus.PACKAGED, NoticeStatus.PUBLISHED] From f576e8d7a308a722141c171c19273ad6e06a98be Mon Sep 17 00:00:00 2001 From: duprijil Date: Tue, 27 May 2025 23:06:26 +0300 Subject: [PATCH 03/18] fix: undo copy libs to opt --- infra/airflow/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/airflow/Dockerfile b/infra/airflow/Dockerfile index 138abb798..0d80955ee 100644 --- a/infra/airflow/Dockerfile +++ b/infra/airflow/Dockerfile @@ -16,7 +16,7 @@ RUN apt-get update && apt-get install -y \ # back to normal user USER airflow -COPY libraries /opt/airflow +COPY libraries /home/airflow # requirements.txt shall be made availble from the **ted-sws** GitHub repository COPY requirements.txt /opt/airflow From 9d85f1cf7a97ada1d0fec446e701d9d046977201 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Thu, 22 May 2025 12:01:27 +0100 Subject: [PATCH 04/18] WIP reprocess dags --- ...eprocess_notices_from_backlog_by_status.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 dags/reprocess_notices_from_backlog_by_status.py diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py new file mode 100644 index 000000000..c15d397e8 --- /dev/null +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -0,0 +1,66 @@ +from airflow.decorators import dag, task +from airflow.models import Param + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import push_dag_downstream, get_dag_param +from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator +from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from ted_sws.core.model.notice import NoticeStatus +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType + +DAG_ID = "reprocess_notices_from_backlog_by_status" +DAG_NAME = "Reprocess Notices From Backlog By Status" + +TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" +START_DATE_DAG_PARAM = "start_date" +END_DATE_DAG_PARAM = "end_date" +NOTICE_STATUSES_DAG_PARAM = "notice_statuses" + +@dag( + default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAG_ID, + schedule_interval=None, + tags=['selector', 're-transform'], + params={ + START_DATE_DAG_PARAM: Param(default="", type="string", description="Start date (YYYY-MM-DD)"), + END_DATE_DAG_PARAM: Param(default="", type="string", description="End date (YYYY-MM-DD)"), + NOTICE_STATUSES_DAG_PARAM: Param( + type="array", + title="Notice Statuses", + description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]" + )} + ) +def reprocess_notices_from_backlog_by_status(): + @task + @event_log(TechnicalEventMessage( + message="select_notices_for_re_transform", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, + process_name=DAG_ID + )) + ) + def select_notices_for_re_transform(): + start_date = get_dag_param(key=START_DATE_DAG_PARAM) + end_date = get_dag_param(key=END_DATE_DAG_PARAM) + statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM) + + notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param] + + notice_ids = notice_ids_selector_by_status( + notice_statuses=notice_statuses, + start_date=start_date, + end_date=end_date + ) + + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( + task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, + start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID + ) + + select_notices_for_re_transform() >> trigger_notice_process_workflow + +dag = reprocess_notices_from_backlog_by_status() \ No newline at end of file From 37da1fb2bcbd4033625780bad3b89746a935a9d8 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Thu, 22 May 2025 13:37:53 +0100 Subject: [PATCH 05/18] WIP reprocess dags --- dags/reprocess_notices_from_backlog_by_status.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index c15d397e8..a40c8cec5 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -24,13 +24,15 @@ schedule_interval=None, tags=['selector', 're-transform'], params={ - START_DATE_DAG_PARAM: Param(default="", type="string", description="Start date (YYYY-MM-DD)"), - END_DATE_DAG_PARAM: Param(default="", type="string", description="End date (YYYY-MM-DD)"), + NOTICE_STATUSES_DAG_PARAM: Param( type="array", title="Notice Statuses", description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]" - )} + ), + START_DATE_DAG_PARAM: Param(default="", type="string", description="Start date (YYYY-MM-DD)"), + END_DATE_DAG_PARAM: Param(default="", type="string", description="End date (YYYY-MM-DD)") + } ) def reprocess_notices_from_backlog_by_status(): @task @@ -42,8 +44,8 @@ def reprocess_notices_from_backlog_by_status(): )) ) def select_notices_for_re_transform(): - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) + start_date = get_dag_param(key=START_DATE_DAG_PARAM, default_value="") + end_date = get_dag_param(key=END_DATE_DAG_PARAM,default_value="") statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM) notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param] From 1db884f329c794d2edd53306e2dabb5beb2a0ec1 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Thu, 22 May 2025 13:46:34 +0100 Subject: [PATCH 06/18] WIP reprocess dags --- dags/reprocess_notices_from_backlog_by_status.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index a40c8cec5..69242abd4 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -30,8 +30,8 @@ title="Notice Statuses", description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]" ), - START_DATE_DAG_PARAM: Param(default="", type="string", description="Start date (YYYY-MM-DD)"), - END_DATE_DAG_PARAM: Param(default="", type="string", description="End date (YYYY-MM-DD)") + START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start date (YYYY-MM-DD)"), + END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End date (YYYY-MM-DD)") } ) def reprocess_notices_from_backlog_by_status(): From 8c0d1e419e7a61939032d0748907f8b9740c2c6b Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Thu, 22 May 2025 15:17:59 +0100 Subject: [PATCH 07/18] reprocess by id --- dags/reprocess_notices_from_backlog_by_id.py | 56 +++++++++++++++++++ ...eprocess_notices_from_backlog_by_status.py | 6 +- 2 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 dags/reprocess_notices_from_backlog_by_id.py diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py new file mode 100644 index 000000000..f4f6b4f70 --- /dev/null +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -0,0 +1,56 @@ +from airflow.decorators import dag, task +from airflow.models import Param + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import push_dag_downstream, get_dag_param +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator +from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType + +DAG_ID = "reprocess_notices_by_id_from_backlog" +DAG_NAME = "Reprocess Notices From Backlog By ID" + +NOTICE_IDS_DAG_PARAM = "notice_ids" +TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" + +@dag( + default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, + schedule_interval=None, + tags=["selector", "re-transform"], + params={ + NOTICE_IDS_DAG_PARAM: Param( + type="array", + title="Notice IDs", + description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]" + ) + }, + description=DAG_NAME +) +def reprocess_notices_by_id_from_backlog(): + @task + @event_log(TechnicalEventMessage( + message="select_notices_for_reprocess_by_id", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, + process_name=DAG_ID + )) + ) + def select_notice_ids(): + notice_ids = get_dag_param(key=NOTICE_IDS_DAG_PARAM, raise_error=True) + + if not notice_ids: + raise Exception("No notice IDs provided.") + + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( + task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, + start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID + ) + + select_notice_ids() >> trigger_notice_process_workflow + +dag = reprocess_notices_by_id_from_backlog() \ No newline at end of file diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index 69242abd4..5ef796323 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -28,10 +28,10 @@ NOTICE_STATUSES_DAG_PARAM: Param( type="array", title="Notice Statuses", - description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]" + description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]. Every status value should be entered on a newline" ), - START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start date (YYYY-MM-DD)"), - END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End date (YYYY-MM-DD)") + START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"), + END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)") } ) def reprocess_notices_from_backlog_by_status(): From cf65e6c2e1d61efe6cd23d58f0680772722c9b32 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Fri, 23 May 2025 12:51:01 +0100 Subject: [PATCH 08/18] reprocess dags and label change for dags --- ...ly_check_notices_availability_in_cellar.py | 5 +- dags/daily_materialized_views_update.py | 2 + dags/fetch_notices_by_date.py | 9 ++-- dags/fetch_notices_by_date_range.py | 10 ++-- dags/fetch_notices_by_query.py | 7 ++- dags/load_mapping_suite_in_database.py | 5 +- dags/load_notices_in_fuseki.py | 5 +- dags/notice_processing_pipeline.py | 3 ++ dags/reprocess_notices_from_backlog_by_id.py | 4 +- ...eprocess_notices_from_backlog_by_status.py | 3 +- dags/reprocess_published_in_cellar_notices.py | 50 ----------------- ...ocess_unnormalised_notices_from_backlog.py | 44 --------------- ...process_unpackaged_notices_from_backlog.py | 52 ------------------ ...rocess_unpublished_notices_from_backlog.py | 53 ------------------ ...cess_untransformed_notices_from_backlog.py | 54 ------------------- ...rocess_unvalidated_notices_from_backlog.py | 50 ----------------- tests/unit/dags/conftest.py | 4 +- 17 files changed, 40 insertions(+), 320 deletions(-) delete mode 100644 dags/reprocess_published_in_cellar_notices.py delete mode 100644 dags/reprocess_unnormalised_notices_from_backlog.py delete mode 100644 dags/reprocess_unpackaged_notices_from_backlog.py delete mode 100644 dags/reprocess_unpublished_notices_from_backlog.py delete mode 100644 dags/reprocess_untransformed_notices_from_backlog.py delete mode 100644 dags/reprocess_unvalidated_notices_from_backlog.py diff --git a/dags/daily_check_notices_availability_in_cellar.py b/dags/daily_check_notices_availability_in_cellar.py index 03a4c6eae..a430cd885 100644 --- a/dags/daily_check_notices_availability_in_cellar.py +++ b/dags/daily_check_notices_availability_in_cellar.py @@ -7,11 +7,14 @@ from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \ validate_notices_availability_in_cellar -DAG_NAME = "daily_check_notices_availability_in_cellar" +DAG_ID = "daily_check_notices_availability_in_cellar" +DAG_NAME = "Daily check notices availability in Cellar" @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, schedule_interval="0 0 * * *", tags=['daily', 'validation']) def daily_check_notices_availability_in_cellar(): diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 2b0c56522..35e07ab96 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -10,10 +10,12 @@ DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update" DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1 +DAG_NAME = "Materialized views update" @dag(default_args=DEFAULT_DAG_ARGUMENTS, dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME, + dag_display_name=DAG_NAME, catchup=False, schedule=NOTICES_COLLECTION_DATASET, max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS, diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 63e6c4a5d..92b02ed4d 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -17,7 +17,9 @@ EventMessageProcessType from ted_sws.event_manager.services.log import log_error -FETCHER_DAG_NAME = "fetch_notices_by_date" +DAG_ID = "fetch_notices_by_date" +DAG_NAME = "Fetch notices by date" +BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow" @@ -28,7 +30,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, - dag_id=FETCHER_DAG_NAME, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, catchup=False, schedule=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, @@ -57,7 +60,7 @@ def fetch_notices_by_date(): @event_log(TechnicalEventMessage( message="fetch_notice_from_ted", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def fetch_by_date_notice_from_ted(): diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index c73eb8489..f6ad041e6 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -10,12 +10,13 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \ - FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME + DAG_ID as FETCH_NOTICES_BY_DATE_DAG_NAME from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "fetch_notices_by_date_range" +DAG_ID = "fetch_notices_by_date_range" +DAG_NAME = "Fetch notices by date range" START_DATE_KEY = "start_date" END_DATE_KEY = "end_date" @@ -34,7 +35,8 @@ def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> li until=datetime.strptime(end_date, '%Y-%m-%d'))] -@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'], +@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, dag_id=DAG_ID, dag_display_name=DAG_NAME, + tags=['master'], params={ START_DATE_KEY: Param( default=f"{date.today()}", @@ -66,7 +68,7 @@ def fetch_notices_by_date_range(): @event_log(TechnicalEventMessage( message="trigger_fetch_notices_workers_for_date_range", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def trigger_notice_by_date_for_each_date_in_range(): diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py index d5f796994..4c747d7a8 100644 --- a/dags/fetch_notices_by_query.py +++ b/dags/fetch_notices_by_query.py @@ -11,7 +11,8 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "fetch_notices_by_query" +DAG_ID = "fetch_notices_by_query" +DAG_NAME = "Fetch notices by query" BATCH_SIZE = 2000 QUERY_DAG_KEY = "query" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -23,6 +24,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, tags=['fetch'], params={ QUERY_DAG_KEY: Param( @@ -46,7 +49,7 @@ def fetch_notices_by_query(): @event_log(TechnicalEventMessage( message="fetch_by_query_notice_from_ted", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def fetch_by_query_notice_from_ted(): diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index 3f9c35eef..b68937458 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -27,10 +27,13 @@ FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite" TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline" CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data" - +DAG_ID = "load_mapping_suite_in_database" +DAG_NAME = "Load mapping suite" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, tags=['fetch', 'mapping-suite', 'github'], params={ GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param( diff --git a/dags/load_notices_in_fuseki.py b/dags/load_notices_in_fuseki.py index 0209df869..2e9184ed7 100644 --- a/dags/load_notices_in_fuseki.py +++ b/dags/load_notices_in_fuseki.py @@ -13,10 +13,13 @@ FUSEKI_DATASET_NAME_DAG_PARAM_KEY = "fuseki_dataset_name" NOTICE_STATUS_DAG_PARAM_KEY = "notice_status" DEFAULT_FUSEKI_DATASET_NAME = "mdr_dataset" - +DAG_ID = "load_notices_in_fuseki" +DAG_NAME = "Load notices in Fuseki" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, tags=['load', 'notices', 'fuseki']) def load_notices_in_fuseki(): @task diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index 08268e4e9..841641d5e 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -19,6 +19,7 @@ notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline DAG_NAME = "notice_processing_pipeline" +DAG_ID = "notice_processing_pipeline" def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str: @@ -33,6 +34,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, max_active_runs=256, max_active_tasks=256, tags=['worker', 'pipeline']) diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py index f4f6b4f70..1804dfe7d 100644 --- a/dags/reprocess_notices_from_backlog_by_id.py +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -9,7 +9,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType DAG_ID = "reprocess_notices_by_id_from_backlog" -DAG_NAME = "Reprocess Notices From Backlog By ID" +DAG_NAME = "Reprocess notices from backlog by ID" NOTICE_IDS_DAG_PARAM = "notice_ids" TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" @@ -24,7 +24,7 @@ NOTICE_IDS_DAG_PARAM: Param( type="array", title="Notice IDs", - description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]" + description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]. Every ID value should be entered on a newline" ) }, description=DAG_NAME diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index 5ef796323..d102601f7 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -11,7 +11,7 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType DAG_ID = "reprocess_notices_from_backlog_by_status" -DAG_NAME = "Reprocess Notices From Backlog By Status" +DAG_NAME = "Reprocess notices from backlog by status" TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" START_DATE_DAG_PARAM = "start_date" @@ -21,6 +21,7 @@ @dag( default_args=DEFAULT_DAG_ARGUMENTS, dag_id=DAG_ID, + dag_display_name=DAG_NAME, schedule_interval=None, tags=['selector', 're-transform'], params={ diff --git a/dags/reprocess_published_in_cellar_notices.py b/dags/reprocess_published_in_cellar_notices.py deleted file mode 100644 index 687625d03..000000000 --- a/dags/reprocess_published_in_cellar_notices.py +++ /dev/null @@ -1,50 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_published_in_cellar_notices" - -RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-transform-publicly-available']) -def reprocess_published_in_cellar_notices(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_transform", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_transform(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_transform() >> trigger_notice_process_workflow - - -dag = reprocess_published_in_cellar_notices() diff --git a/dags/reprocess_unnormalised_notices_from_backlog.py b/dags/reprocess_unnormalised_notices_from_backlog.py deleted file mode 100644 index 51005e1ee..000000000 --- a/dags/reprocess_unnormalised_notices_from_backlog.py +++ /dev/null @@ -1,44 +0,0 @@ -from airflow.decorators import dag, task -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import TriggerNoticeBatchPipelineOperator, NOTICE_IDS_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unnormalised_notices_from_backlog" - -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 'raw-notices']) -def reprocess_unnormalised_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_all_raw_notices", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_all_raw_notices(): - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=[NoticeStatus.RAW], start_date=start_date, - end_date=end_date) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID - ) - select_all_raw_notices() >> trigger_notice_process_workflow - - -dag = reprocess_unnormalised_notices_from_backlog() diff --git a/dags/reprocess_unpackaged_notices_from_backlog.py b/dags/reprocess_unpackaged_notices_from_backlog.py deleted file mode 100644 index c3db996ad..000000000 --- a/dags/reprocess_unpackaged_notices_from_backlog.py +++ /dev/null @@ -1,52 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PACKAGE_PIPELINE_TASK_ID -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unpackaged_notices_from_backlog" - -RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING, - NoticeStatus.ELIGIBLE_FOR_PACKAGING, - NoticeStatus.INELIGIBLE_FOR_PUBLISHING] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-package']) -def reprocess_unpackaged_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_package", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_package(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_PACKAGE_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_PACKAGE_PIPELINE_TASK_ID - ) - select_notices_for_re_package() >> trigger_notice_process_workflow - - -dag = reprocess_unpackaged_notices_from_backlog() diff --git a/dags/reprocess_unpublished_notices_from_backlog.py b/dags/reprocess_unpublished_notices_from_backlog.py deleted file mode 100644 index f6c06697f..000000000 --- a/dags/reprocess_unpublished_notices_from_backlog.py +++ /dev/null @@ -1,53 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PUBLISH_PIPELINE_TASK_ID -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unpublished_notices_from_backlog" - -RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING, - NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE - ] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-publish']) -def reprocess_unpublished_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_publish", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_publish(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_PUBLISH_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_PUBLISH_PIPELINE_TASK_ID - ) - select_notices_for_re_publish() >> trigger_notice_process_workflow - - -etl_dag = reprocess_unpublished_notices_from_backlog() diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py deleted file mode 100644 index 80e8e74ef..000000000 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ /dev/null @@ -1,54 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_untransformed_notices_from_backlog" - -RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, - NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, - NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, - NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED - ] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-transform']) -def reprocess_untransformed_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_transform", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_transform(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_transform() >> trigger_notice_process_workflow - - -dag = reprocess_untransformed_notices_from_backlog() diff --git a/dags/reprocess_unvalidated_notices_from_backlog.py b/dags/reprocess_unvalidated_notices_from_backlog.py deleted file mode 100644 index 16e654727..000000000 --- a/dags/reprocess_unvalidated_notices_from_backlog.py +++ /dev/null @@ -1,50 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unvalidated_notices_from_backlog" - -RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-validate']) -def reprocess_unvalidated_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_validate", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_validate(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_VALIDATE_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_validate() >> trigger_notice_process_workflow - - -dag = reprocess_unvalidated_notices_from_backlog() diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 5d5857768..33034db0b 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -7,8 +7,8 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.db import resetdb, initdb -from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME -from dags.fetch_notices_by_date import FETCHER_DAG_NAME +from dags.daily_materialized_views_update import DAG_ID as DAILY_MATERIALISED_VIEWS_DAG_NAME +from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME from tests import AIRFLOW_DAG_FOLDER From c54c423b1f2c8453d030317c0478e98ea27f2634 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Mon, 26 May 2025 15:36:45 +0100 Subject: [PATCH 09/18] adding examples to reprocess by status --- dags/reprocess_notices_from_backlog_by_status.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index d102601f7..75195befd 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -18,6 +18,7 @@ END_DATE_DAG_PARAM = "end_date" NOTICE_STATUSES_DAG_PARAM = "notice_statuses" + @dag( default_args=DEFAULT_DAG_ARGUMENTS, dag_id=DAG_ID, @@ -29,7 +30,8 @@ NOTICE_STATUSES_DAG_PARAM: Param( type="array", title="Notice Statuses", - description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]. Every status value should be entered on a newline" + description="Required. Select one or more notice statuses to reprocess.", + examples=[status.name for status in NoticeStatus] ), START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"), END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)") From b8a187e3dc87220b9f7935eb76b4723e644916bd Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Mon, 26 May 2025 16:00:39 +0100 Subject: [PATCH 10/18] adding examples to reprocess by status --- dags/reprocess_notices_from_backlog_by_status.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index 75195befd..d519fc3ce 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -28,6 +28,7 @@ params={ NOTICE_STATUSES_DAG_PARAM: Param( + default=[], type="array", title="Notice Statuses", description="Required. Select one or more notice statuses to reprocess.", From e543554859760c519846aab3e627a21b7e731953 Mon Sep 17 00:00:00 2001 From: Dragos0000 Date: Tue, 27 May 2025 13:08:19 +0100 Subject: [PATCH 11/18] change step for reprocessing --- dags/reprocess_notices_from_backlog_by_id.py | 4 ++-- dags/reprocess_notices_from_backlog_by_status.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py index 1804dfe7d..0c9d66c87 100644 --- a/dags/reprocess_notices_from_backlog_by_id.py +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -4,7 +4,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType @@ -48,7 +48,7 @@ def select_notice_ids(): trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID + start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID ) select_notice_ids() >> trigger_notice_process_workflow diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index d519fc3ce..c3cc8aabc 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -3,7 +3,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID +from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus @@ -64,7 +64,7 @@ def select_notices_for_re_transform(): trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID + start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID ) select_notices_for_re_transform() >> trigger_notice_process_workflow From b8692f306bac67fe9d9486bdfb574b01d3c036b9 Mon Sep 17 00:00:00 2001 From: duprijil Date: Tue, 27 May 2025 23:19:00 +0300 Subject: [PATCH 12/18] fix: Fix tests after merge --- dags/fetch_notices_by_date.py | 4 ++-- dags/reprocess_notices_from_backlog_by_id.py | 3 +-- dags/reprocess_notices_from_backlog_by_status.py | 3 +-- tests/unit/event_manager/services/test_logger_from_context.py | 4 ++-- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 92b02ed4d..8acb4d64f 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -18,7 +18,7 @@ from ted_sws.event_manager.services.log import log_error DAG_ID = "fetch_notices_by_date" -DAG_NAME = "Fetch notices by date" +FETCHER_DAG_NAME = "Fetch notices by date" BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -31,7 +31,7 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, dag_id=DAG_ID, - dag_display_name=DAG_NAME, + dag_display_name=FETCHER_DAG_NAME, catchup=False, schedule=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py index 0c9d66c87..4b05077e3 100644 --- a/dags/reprocess_notices_from_backlog_by_id.py +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -1,10 +1,9 @@ from airflow.decorators import dag, task from airflow.models import Param -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.dags_utils import push_dag_downstream, get_dag_param from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index c3cc8aabc..8554303db 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -1,9 +1,8 @@ from airflow.decorators import dag, task from airflow.models import Param -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus diff --git a/tests/unit/event_manager/services/test_logger_from_context.py b/tests/unit/event_manager/services/test_logger_from_context.py index 17d2fbbfc..96785a52a 100644 --- a/tests/unit/event_manager/services/test_logger_from_context.py +++ b/tests/unit/event_manager/services/test_logger_from_context.py @@ -56,7 +56,7 @@ def test_get_dag_run_id_from_dag_context(fake_dag_context): def test_handle_event_message_metadata_dag_context(fake_dag_context): - process_name = "DAG_NAME" + process_name = "FETCHER_DAG_NAME" process_id = "DAG_RUN_ID" metadata = handle_event_message_metadata_dag_context(ps_name=process_name, ps_id=process_id, ps_context={}) @@ -70,7 +70,7 @@ def test_handle_event_message_metadata_dag_context(fake_dag_context): def test_handle_event_message_metadata_context(): - process_name = "DAG_NAME" + process_name = "FETCHER_DAG_NAME" process_id = "DAG_RUN_ID" process_type = EventMessageProcessType.CLI From 90cb7c2f97cc26c45635e55dc199b83eaf849b9a Mon Sep 17 00:00:00 2001 From: duprijil Date: Tue, 27 May 2025 23:34:46 +0300 Subject: [PATCH 13/18] fix: Fix problem after merger --- tests/unit/dags/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 33034db0b..ed838696e 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -7,7 +7,7 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.db import resetdb, initdb -from dags.daily_materialized_views_update import DAG_ID as DAILY_MATERIALISED_VIEWS_DAG_NAME +from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME from tests import AIRFLOW_DAG_FOLDER From fa46d7edccf4c9fae6ab00c90a6bfc4e3b259b06 Mon Sep 17 00:00:00 2001 From: duprijil Date: Wed, 28 May 2025 01:11:11 +0300 Subject: [PATCH 14/18] fix: Fix problem with running unnecessary scheduled dag run on DAG unpause --- dags/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/__init__.py b/dags/__init__.py index 741259b78..b4d12f334 100644 --- a/dags/__init__.py +++ b/dags/__init__.py @@ -5,7 +5,7 @@ DEFAULT_DAG_ARGUMENTS = { "owner": "airflow", "depends_on_past": False, - "start_date": datetime(2022, 1, 1), + "start_date": datetime.today(), "email": ["info@meaningfy.ws"], "email_on_failure": False, "email_on_retry": False, From 0cf76de64b108569890912a82f060ce0c9132961 Mon Sep 17 00:00:00 2001 From: duprijil Date: Wed, 28 May 2025 01:17:14 +0300 Subject: [PATCH 15/18] fix: Make notice daily fetcher being hard failing --- dags/pipelines/notice_fetcher_pipelines.py | 4 ++-- ted_sws/notice_fetcher/adapters/ted_api.py | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dags/pipelines/notice_fetcher_pipelines.py b/dags/pipelines/notice_fetcher_pipelines.py index 880b35d8b..a0a1ff5d3 100644 --- a/dags/pipelines/notice_fetcher_pipelines.py +++ b/dags/pipelines/notice_fetcher_pipelines.py @@ -11,7 +11,6 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \ create_and_store_in_mongo_db_daily_supra_notice from ted_sws.event_manager.services.log import log_error - notice_ids = None try: date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*") notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date() @@ -24,6 +23,7 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: ted_publication_date=notice_publication_date) except Exception as error: log_error(message=str(error)) + raise error return notice_ids @@ -35,7 +35,6 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]: from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher from ted_sws.event_manager.services.log import log_error - notice_ids = None try: ted_api_query = {"query": query} mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) @@ -44,4 +43,5 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]: request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query) except Exception as error: log_error(message=str(error)) + raise error return notice_ids diff --git a/ted_sws/notice_fetcher/adapters/ted_api.py b/ted_sws/notice_fetcher/adapters/ted_api.py index 002b3057a..9161b2ffc 100644 --- a/ted_sws/notice_fetcher/adapters/ted_api.py +++ b/ted_sws/notice_fetcher/adapters/ted_api.py @@ -72,11 +72,10 @@ def __call__(self, api_url: str, api_query: dict) -> dict: headers = get_configured_custom_headers(CUSTOM_HEADER) response = execute_request_with_retries( request_lambda=lambda: requests.post(api_url, json=api_query, headers=headers)) - if response.ok: - response_content = json.loads(response.text) - return response_content - else: - raise Exception(f"The TED-API call failed with: {response}") + response.raise_for_status() + response_content = json.loads(response.text) + return response_content + class TedAPIAdapter(TedAPIAdapterABC): From dec7e7dd09ccaedb13e003076c2b6dbb72336ee8 Mon Sep 17 00:00:00 2001 From: duprijil Date: Wed, 28 May 2025 01:27:46 +0300 Subject: [PATCH 16/18] fix: Fix problem with running unnecessary scheduled dag run --- dags/__init__.py | 2 +- dags/fetch_notices_by_date.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/__init__.py b/dags/__init__.py index b4d12f334..741259b78 100644 --- a/dags/__init__.py +++ b/dags/__init__.py @@ -5,7 +5,7 @@ DEFAULT_DAG_ARGUMENTS = { "owner": "airflow", "depends_on_past": False, - "start_date": datetime.today(), + "start_date": datetime(2022, 1, 1), "email": ["info@meaningfy.ws"], "email_on_failure": False, "email_on_retry": False, diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 8acb4d64f..741a3deb0 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -36,6 +36,7 @@ schedule=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, timezone=DAG_DEFAULT_TIMEZONE), + start_date=datetime.today(), tags=['selector', 'daily-fetch'], params={ WILD_CARD_DAG_KEY: Param( From 50c5e49c02ee765dcbc68bdbb0b45b43d9483d6a Mon Sep 17 00:00:00 2001 From: duprijil Date: Wed, 28 May 2025 09:44:36 +0300 Subject: [PATCH 17/18] feat!: Make MP processor be more hard fail --- ted_sws/mapping_suite_processor/services/__init__.py | 2 ++ .../services/conceptual_mapping_processor.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/ted_sws/mapping_suite_processor/services/__init__.py b/ted_sws/mapping_suite_processor/services/__init__.py index e69de29bb..86664e409 100644 --- a/ted_sws/mapping_suite_processor/services/__init__.py +++ b/ted_sws/mapping_suite_processor/services/__init__.py @@ -0,0 +1,2 @@ +class MappingSuiteProcessorServiceError(Exception): + pass diff --git a/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py b/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py index 7b17c3a75..0bd09c147 100644 --- a/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py +++ b/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py @@ -12,6 +12,7 @@ from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.event_manager.services.log import log_mapping_suite_info, log_mapping_suite_error from ted_sws.mapping_suite_processor.adapters.github_package_downloader import GitHubMappingSuitePackageDownloader +from ted_sws.mapping_suite_processor.services import MappingSuiteProcessorServiceError from ted_sws.mapping_suite_processor.services.mapping_suite_digest_service import \ update_digest_api_address_for_mapping_suite from ted_sws.mapping_suite_processor.services.mapping_suite_validation_service import validate_mapping_suite, \ @@ -98,9 +99,11 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong validation_result = validate_mapping_suite(mapping_suite_path=mapping_suite_package_path) mapping_suite_id = get_mapping_suite_id_from_file_system(mapping_suite_path=mapping_suite_package_path) if mapping_suite_id is None: + error_msg = "Invalid mapping suite metadata, can't read mapping suite identifier!" log_mapping_suite_error( - message="Invalid mapping suite metadata, can't read mapping suite identifier!", + message=error_msg, mapping_suite_id=MAPPING_SUITE_UNKNOWN_ID) + raise MappingSuiteProcessorServiceError(error_msg) elif validation_result: log_mapping_suite_info( message=f"Mapping suite with id={mapping_suite_id} is valid for loading in MongoDB!", @@ -115,8 +118,10 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong message=f"Mapping suite with id={mapping_suite_id} loaded with success in MongoDB!", mapping_suite_id=mapping_suite_id) else: + error_msg = f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!" log_mapping_suite_error( - message=f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!", + message=error_msg, mapping_suite_id=mapping_suite_id) + raise MappingSuiteProcessorServiceError(error_msg) return result_notice_ids From dee722de138b870d855d4f77f96023b56e757d16 Mon Sep 17 00:00:00 2001 From: duprijil Date: Wed, 28 May 2025 10:18:09 +0300 Subject: [PATCH 18/18] fix: Solve failing ted api tests with response status --- tests/e2e/notice_fetcher/test_ted_api.py | 3 ++- tests/e2e/notice_fetcher/test_ted_request_api.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/e2e/notice_fetcher/test_ted_api.py b/tests/e2e/notice_fetcher/test_ted_api.py index e962286c7..49f4bcb92 100644 --- a/tests/e2e/notice_fetcher/test_ted_api.py +++ b/tests/e2e/notice_fetcher/test_ted_api.py @@ -27,6 +27,7 @@ def test_ted_api(): def test_ted_api_error(): ted = TedAPIAdapter(request_api=TedRequestAPI()) + response_status = "400" with pytest.raises(Exception) as e: ted.get_by_query(query={"query": "NDE=67623-2022"}) - assert str(e.value) == "The TED-API call failed with: " + assert response_status in str(e.value) diff --git a/tests/e2e/notice_fetcher/test_ted_request_api.py b/tests/e2e/notice_fetcher/test_ted_request_api.py index 6bc29e3f8..62ffab512 100644 --- a/tests/e2e/notice_fetcher/test_ted_request_api.py +++ b/tests/e2e/notice_fetcher/test_ted_request_api.py @@ -15,6 +15,7 @@ def test_ted_request_api(): notice_by_query = ted_api_request(api_url=config.TED_API_URL, api_query=api_query) assert notice_by_query assert isinstance(notice_by_query, dict) + response_code = "400" with pytest.raises(Exception) as e: ted_api_request(api_url=config.TED_API_URL, api_query={"query": "INCORRECT PARAMS"}) - assert str(e.value) == "The TED-API call failed with: " + assert response_code in str(e.value)