diff --git a/dags/__init__.py b/dags/__init__.py index ddc8f130b..741259b78 100644 --- a/dags/__init__.py +++ b/dags/__init__.py @@ -16,6 +16,8 @@ "execution_timeout": timedelta(days=10), } +BATCH_SIZE = 2000 + NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline" NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline" NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline" diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 162c6936d..741a3deb0 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -7,7 +7,7 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils.trigger_rule import TriggerRule -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline @@ -36,6 +36,7 @@ schedule=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, timezone=DAG_DEFAULT_TIMEZONE), + start_date=datetime.today(), tags=['selector', 'daily-fetch'], params={ WILD_CARD_DAG_KEY: Param( diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index 3a3351999..b68937458 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -5,7 +5,7 @@ from airflow.utils.trigger_rule import TriggerRule from pymongo import MongoClient -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE from dags.dags_utils import push_dag_downstream, pull_dag_upstream, get_dag_param from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from ted_sws import config @@ -114,7 +114,8 @@ def _branch_selector(): finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID) + trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID, + batch_size=BATCH_SIZE) fetch_mapping_suite_package_from_github_into_mongodb() >> branch_task trigger_document_proc_pipeline >> finish_step branch_task >> [trigger_document_proc_pipeline, finish_step] diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index 114845202..e5edaa845 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -1,6 +1,7 @@ from typing import List from airflow.decorators import dag +from airflow.exceptions import AirflowSkipException from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule @@ -63,37 +64,36 @@ def _selector_branch_before_publish(): return branch_selector(NOTICE_PUBLISH_PIPELINE_TASK_ID) def _stop_processing(): - notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) - if not notice_ids: - raise Exception(f"No notice has been processed!") + pass start_processing = BranchPythonOperator( task_id=BRANCH_SELECTOR_TASK_ID, python_callable=_start_processing, + trigger_rule=TriggerRule.ALWAYS ) selector_branch_before_transformation = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, python_callable=_selector_branch_before_transformation, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_validation = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, python_callable=_selector_branch_before_validation, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_package = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, python_callable=_selector_branch_before_package, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) selector_branch_before_publish = BranchPythonOperator( task_id=SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, python_callable=_selector_branch_before_publish, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + trigger_rule=TriggerRule.ALL_SUCCESS, ) stop_processing = PythonOperator( @@ -105,27 +105,27 @@ def _stop_processing(): notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline, task_id=NOTICE_NORMALISATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_transformation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_transformation_pipeline, task_id=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_distillation_step = NoticeBatchPipelineOperator(batch_pipeline_callable=notices_batch_distillation_pipeline, task_id=NOTICE_DISTILLATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS + trigger_rule=TriggerRule.ALL_SUCCESS ) notice_validation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_validation_pipeline, task_id=NOTICE_VALIDATION_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_package_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_package_pipeline, task_id=NOTICE_PACKAGE_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) notice_publish_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_publish_pipeline, task_id=NOTICE_PUBLISH_PIPELINE_TASK_ID, - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + trigger_rule=TriggerRule.ALL_SUCCESS) start_processing >> [notice_normalisation_step, selector_branch_before_transformation, selector_branch_before_validation, diff --git a/dags/operators/DagBatchPipelineOperator.py b/dags/operators/DagBatchPipelineOperator.py index 510f5d3b6..217b1e9dd 100644 --- a/dags/operators/DagBatchPipelineOperator.py +++ b/dags/operators/DagBatchPipelineOperator.py @@ -1,14 +1,16 @@ from typing import Any, Protocol, List from uuid import uuid4 + +from airflow.exceptions import AirflowSkipException from airflow.models import BaseOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from pymongo import MongoClient from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \ smart_xcom_push -from ted_sws.core.service.batch_processing import chunks -from dags.pipelines.pipeline_protocols import NoticePipelineCallable +from dags.pipelines.pipeline_protocols import NoticePipelineCallable, NoticePipelineOutput from ted_sws import config +from ted_sws.core.service.batch_processing import chunks from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage from ted_sws.event_manager.services.log import log_notice_error @@ -26,7 +28,7 @@ class BatchPipelineCallable(Protocol): - def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[str]: + def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[NoticePipelineOutput]: """ :param notice_ids: :param mongodb_client: @@ -57,11 +59,14 @@ def execute(self, context: Any): """ logger = get_logger() notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY) - if not notice_ids: + if notice_ids is None: raise Exception(f"XCOM key [{NOTICE_IDS_KEY}] is not present in context!") + if len(notice_ids) == 0: + smart_xcom_push(key=NOTICE_IDS_KEY, value=[]) + raise AirflowSkipException("No notices to process!") mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) notice_repository = NoticeRepository(mongodb_client=mongodb_client) - processed_notice_ids = [] + processed_notices_pipeline_output: List[NoticePipelineOutput] = [] pipeline_name = DEFAULT_PIPELINE_NAME_FOR_LOGS if self.notice_pipeline_callable: pipeline_name = self.notice_pipeline_callable.__name__ @@ -76,7 +81,7 @@ def execute(self, context: Any): handle_event_message_metadata_dag_context(batch_event_message, context) batch_event_message.start_record() if self.batch_pipeline_callable is not None: - processed_notice_ids.extend( + processed_notices_pipeline_output.extend( self.batch_pipeline_callable(notice_ids=notice_ids, mongodb_client=mongodb_client)) elif self.notice_pipeline_callable is not None: for notice_id in notice_ids: @@ -89,7 +94,7 @@ def execute(self, context: Any): if result_notice_pipeline.store_result: notice_repository.update(notice=result_notice_pipeline.notice) if result_notice_pipeline.processed: - processed_notice_ids.append(notice_id) + processed_notices_pipeline_output.append(result_notice_pipeline) notice_event.end_record() if notice.normalised_metadata: notice_event.notice_form_number = notice.normalised_metadata.form_number @@ -102,12 +107,12 @@ def execute(self, context: Any): notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None, notice_status=notice.status if notice else None, notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None) + raise e batch_event_message.end_record() logger.info(event_message=batch_event_message) - if not processed_notice_ids: - raise Exception(f"No notice has been processed!") - smart_xcom_push(key=NOTICE_IDS_KEY, value=processed_notice_ids) + smart_xcom_push(key=NOTICE_IDS_KEY, value=[notice_pipeline_output.notice.ted_id for notice_pipeline_output in + processed_notices_pipeline_output]) class TriggerNoticeBatchPipelineOperator(BaseOperator): diff --git a/dags/pipelines/notice_batch_processor_pipelines.py b/dags/pipelines/notice_batch_processor_pipelines.py index e4c5f165b..12605f05f 100644 --- a/dags/pipelines/notice_batch_processor_pipelines.py +++ b/dags/pipelines/notice_batch_processor_pipelines.py @@ -1,13 +1,17 @@ from typing import List + from pymongo import MongoClient +from dags.pipelines.pipeline_protocols import NoticePipelineOutput from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities CET_URIS = ["http://www.w3.org/ns/org#Organization"] PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure" -def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]: +def notices_batch_distillation_pipeline(notice_ids: List[str], + mongodb_client: MongoClient + ) -> List[NoticePipelineOutput]: """ :param notice_ids: @@ -29,4 +33,4 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client) for notice in notices: notice_repository.update(notice=notice) - return notice_ids + return [NoticePipelineOutput(notice=notice) for notice in notices] diff --git a/dags/pipelines/notice_fetcher_pipelines.py b/dags/pipelines/notice_fetcher_pipelines.py index 880b35d8b..a0a1ff5d3 100644 --- a/dags/pipelines/notice_fetcher_pipelines.py +++ b/dags/pipelines/notice_fetcher_pipelines.py @@ -11,7 +11,6 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \ create_and_store_in_mongo_db_daily_supra_notice from ted_sws.event_manager.services.log import log_error - notice_ids = None try: date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*") notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date() @@ -24,6 +23,7 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]: ted_publication_date=notice_publication_date) except Exception as error: log_error(message=str(error)) + raise error return notice_ids @@ -35,7 +35,6 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]: from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher from ted_sws.event_manager.services.log import log_error - notice_ids = None try: ted_api_query = {"query": query} mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) @@ -44,4 +43,5 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]: request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query) except Exception as error: log_error(message=str(error)) + raise error return notice_ids diff --git a/ted_sws/mapping_suite_processor/services/__init__.py b/ted_sws/mapping_suite_processor/services/__init__.py index e69de29bb..86664e409 100644 --- a/ted_sws/mapping_suite_processor/services/__init__.py +++ b/ted_sws/mapping_suite_processor/services/__init__.py @@ -0,0 +1,2 @@ +class MappingSuiteProcessorServiceError(Exception): + pass diff --git a/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py b/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py index 7b17c3a75..0bd09c147 100644 --- a/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py +++ b/ted_sws/mapping_suite_processor/services/conceptual_mapping_processor.py @@ -12,6 +12,7 @@ from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.event_manager.services.log import log_mapping_suite_info, log_mapping_suite_error from ted_sws.mapping_suite_processor.adapters.github_package_downloader import GitHubMappingSuitePackageDownloader +from ted_sws.mapping_suite_processor.services import MappingSuiteProcessorServiceError from ted_sws.mapping_suite_processor.services.mapping_suite_digest_service import \ update_digest_api_address_for_mapping_suite from ted_sws.mapping_suite_processor.services.mapping_suite_validation_service import validate_mapping_suite, \ @@ -98,9 +99,11 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong validation_result = validate_mapping_suite(mapping_suite_path=mapping_suite_package_path) mapping_suite_id = get_mapping_suite_id_from_file_system(mapping_suite_path=mapping_suite_package_path) if mapping_suite_id is None: + error_msg = "Invalid mapping suite metadata, can't read mapping suite identifier!" log_mapping_suite_error( - message="Invalid mapping suite metadata, can't read mapping suite identifier!", + message=error_msg, mapping_suite_id=MAPPING_SUITE_UNKNOWN_ID) + raise MappingSuiteProcessorServiceError(error_msg) elif validation_result: log_mapping_suite_info( message=f"Mapping suite with id={mapping_suite_id} is valid for loading in MongoDB!", @@ -115,8 +118,10 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong message=f"Mapping suite with id={mapping_suite_id} loaded with success in MongoDB!", mapping_suite_id=mapping_suite_id) else: + error_msg = f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!" log_mapping_suite_error( - message=f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!", + message=error_msg, mapping_suite_id=mapping_suite_id) + raise MappingSuiteProcessorServiceError(error_msg) return result_notice_ids diff --git a/ted_sws/notice_fetcher/adapters/ted_api.py b/ted_sws/notice_fetcher/adapters/ted_api.py index 002b3057a..9161b2ffc 100644 --- a/ted_sws/notice_fetcher/adapters/ted_api.py +++ b/ted_sws/notice_fetcher/adapters/ted_api.py @@ -72,11 +72,10 @@ def __call__(self, api_url: str, api_query: dict) -> dict: headers = get_configured_custom_headers(CUSTOM_HEADER) response = execute_request_with_retries( request_lambda=lambda: requests.post(api_url, json=api_query, headers=headers)) - if response.ok: - response_content = json.loads(response.text) - return response_content - else: - raise Exception(f"The TED-API call failed with: {response}") + response.raise_for_status() + response_content = json.loads(response.text) + return response_content + class TedAPIAdapter(TedAPIAdapterABC): diff --git a/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py b/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py index d6d8c196c..15f293e51 100644 --- a/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py +++ b/tests/e2e/dags/pipelines/test_notice_processor_pipelines.py @@ -33,7 +33,7 @@ def test_notice_processor_pipelines(fake_mongodb_client): notice_repository.update(notice=notice) result_list = notices_batch_distillation_pipeline(notice_ids=[notice_id], mongodb_client=fake_mongodb_client) assert len(result_list) == 1 - notice_id = result_list[0] + notice_id = result_list[0].notice.ted_id notice = notice_repository.get(reference=notice_id) pipelines = [notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline] notice_states = [NoticeStatus.DISTILLED, NoticeStatus.VALIDATED, NoticeStatus.PACKAGED, NoticeStatus.PUBLISHED] diff --git a/tests/e2e/notice_fetcher/test_ted_api.py b/tests/e2e/notice_fetcher/test_ted_api.py index e962286c7..49f4bcb92 100644 --- a/tests/e2e/notice_fetcher/test_ted_api.py +++ b/tests/e2e/notice_fetcher/test_ted_api.py @@ -27,6 +27,7 @@ def test_ted_api(): def test_ted_api_error(): ted = TedAPIAdapter(request_api=TedRequestAPI()) + response_status = "400" with pytest.raises(Exception) as e: ted.get_by_query(query={"query": "NDE=67623-2022"}) - assert str(e.value) == "The TED-API call failed with: " + assert response_status in str(e.value) diff --git a/tests/e2e/notice_fetcher/test_ted_request_api.py b/tests/e2e/notice_fetcher/test_ted_request_api.py index 6bc29e3f8..62ffab512 100644 --- a/tests/e2e/notice_fetcher/test_ted_request_api.py +++ b/tests/e2e/notice_fetcher/test_ted_request_api.py @@ -15,6 +15,7 @@ def test_ted_request_api(): notice_by_query = ted_api_request(api_url=config.TED_API_URL, api_query=api_query) assert notice_by_query assert isinstance(notice_by_query, dict) + response_code = "400" with pytest.raises(Exception) as e: ted_api_request(api_url=config.TED_API_URL, api_query={"query": "INCORRECT PARAMS"}) - assert str(e.value) == "The TED-API call failed with: " + assert response_code in str(e.value) diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 5d5857768..ed838696e 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -8,7 +8,7 @@ from airflow.utils.db import resetdb, initdb from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME -from dags.fetch_notices_by_date import FETCHER_DAG_NAME +from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME from tests import AIRFLOW_DAG_FOLDER