diff --git a/dags/daily_check_notices_availability_in_cellar.py b/dags/daily_check_notices_availability_in_cellar.py index 03a4c6eae..a430cd885 100644 --- a/dags/daily_check_notices_availability_in_cellar.py +++ b/dags/daily_check_notices_availability_in_cellar.py @@ -7,11 +7,14 @@ from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \ validate_notices_availability_in_cellar -DAG_NAME = "daily_check_notices_availability_in_cellar" +DAG_ID = "daily_check_notices_availability_in_cellar" +DAG_NAME = "Daily check notices availability in Cellar" @dag(default_args=DEFAULT_DAG_ARGUMENTS, catchup=False, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, schedule_interval="0 0 * * *", tags=['daily', 'validation']) def daily_check_notices_availability_in_cellar(): diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index ff10297de..176741aff 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -9,11 +9,13 @@ from ted_sws.data_manager.services.create_notice_collection_materialised_view import \ create_notice_collection_materialised_view, create_notice_kpi_collection -DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update" +DAG_ID = "daily_materialized_views_update" +DAG_NAME = "Materialized views update" @dag(default_args=DEFAULT_DAG_ARGUMENTS, - dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, catchup=False, timetable=CronTriggerTimetable( cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE, diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 85295d8bf..2a8857456 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -17,7 +17,8 @@ EventMessageProcessType from ted_sws.event_manager.services.log import log_error -FETCHER_DAG_NAME = "fetch_notices_by_date" +DAG_ID = "fetch_notices_by_date" +DAG_NAME = "Fetch notices by date" BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -29,7 +30,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, - dag_id=FETCHER_DAG_NAME, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, catchup=False, timetable=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, @@ -58,7 +60,7 @@ def fetch_notices_by_date(): @event_log(TechnicalEventMessage( message="fetch_notice_from_ted", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def fetch_by_date_notice_from_ted(): diff --git a/dags/fetch_notices_by_date_range.py b/dags/fetch_notices_by_date_range.py index c73eb8489..f6ad041e6 100644 --- a/dags/fetch_notices_by_date_range.py +++ b/dags/fetch_notices_by_date_range.py @@ -10,12 +10,13 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \ - FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME + DAG_ID as FETCH_NOTICES_BY_DATE_DAG_NAME from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "fetch_notices_by_date_range" +DAG_ID = "fetch_notices_by_date_range" +DAG_NAME = "Fetch notices by date range" START_DATE_KEY = "start_date" END_DATE_KEY = "end_date" @@ -34,7 +35,8 @@ def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> li until=datetime.strptime(end_date, '%Y-%m-%d'))] -@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'], +@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, dag_id=DAG_ID, dag_display_name=DAG_NAME, + tags=['master'], params={ START_DATE_KEY: Param( default=f"{date.today()}", @@ -66,7 +68,7 @@ def fetch_notices_by_date_range(): @event_log(TechnicalEventMessage( message="trigger_fetch_notices_workers_for_date_range", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def trigger_notice_by_date_for_each_date_in_range(): diff --git a/dags/fetch_notices_by_query.py b/dags/fetch_notices_by_query.py index d5f796994..4c747d7a8 100644 --- a/dags/fetch_notices_by_query.py +++ b/dags/fetch_notices_by_query.py @@ -11,7 +11,8 @@ from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType -DAG_NAME = "fetch_notices_by_query" +DAG_ID = "fetch_notices_by_query" +DAG_NAME = "Fetch notices by query" BATCH_SIZE = 2000 QUERY_DAG_KEY = "query" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -23,6 +24,8 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, tags=['fetch'], params={ QUERY_DAG_KEY: Param( @@ -46,7 +49,7 @@ def fetch_notices_by_query(): @event_log(TechnicalEventMessage( message="fetch_by_query_notice_from_ted", metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME + process_type=EventMessageProcessType.DAG, process_name=DAG_ID )) ) def fetch_by_query_notice_from_ted(): diff --git a/dags/load_mapping_suite_in_database.py b/dags/load_mapping_suite_in_database.py index cd4efc005..3a3351999 100644 --- a/dags/load_mapping_suite_in_database.py +++ b/dags/load_mapping_suite_in_database.py @@ -27,10 +27,13 @@ FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite" TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline" CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data" - +DAG_ID = "load_mapping_suite_in_database" +DAG_NAME = "Load mapping suite" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, tags=['fetch', 'mapping-suite', 'github'], params={ GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param( diff --git a/dags/load_notices_in_fuseki.py b/dags/load_notices_in_fuseki.py index 0209df869..2e9184ed7 100644 --- a/dags/load_notices_in_fuseki.py +++ b/dags/load_notices_in_fuseki.py @@ -13,10 +13,13 @@ FUSEKI_DATASET_NAME_DAG_PARAM_KEY = "fuseki_dataset_name" NOTICE_STATUS_DAG_PARAM_KEY = "notice_status" DEFAULT_FUSEKI_DATASET_NAME = "mdr_dataset" - +DAG_ID = "load_notices_in_fuseki" +DAG_NAME = "Load notices in Fuseki" @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, tags=['load', 'notices', 'fuseki']) def load_notices_in_fuseki(): @task diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index ceb66d0e2..8d9022c14 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -24,7 +24,8 @@ SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation" SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package" SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish" -DAG_NAME = "notice_processing_pipeline" +DAG_ID = "notice_processing_pipeline" +DAG_NAME = "Notice processing pipeline" BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, @@ -46,6 +47,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I @dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, + dag_display_name=DAG_NAME, + dag_id=DAG_ID, max_active_runs=256, max_active_tasks=256, tags=['worker', 'pipeline']) diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py new file mode 100644 index 000000000..0c9d66c87 --- /dev/null +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -0,0 +1,56 @@ +from airflow.decorators import dag, task +from airflow.models import Param + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import push_dag_downstream, get_dag_param +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator +from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType + +DAG_ID = "reprocess_notices_by_id_from_backlog" +DAG_NAME = "Reprocess notices from backlog by ID" + +NOTICE_IDS_DAG_PARAM = "notice_ids" +TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" + +@dag( + default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, + schedule_interval=None, + tags=["selector", "re-transform"], + params={ + NOTICE_IDS_DAG_PARAM: Param( + type="array", + title="Notice IDs", + description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]. Every ID value should be entered on a newline" + ) + }, + description=DAG_NAME +) +def reprocess_notices_by_id_from_backlog(): + @task + @event_log(TechnicalEventMessage( + message="select_notices_for_reprocess_by_id", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, + process_name=DAG_ID + )) + ) + def select_notice_ids(): + notice_ids = get_dag_param(key=NOTICE_IDS_DAG_PARAM, raise_error=True) + + if not notice_ids: + raise Exception("No notice IDs provided.") + + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( + task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, + start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID + ) + + select_notice_ids() >> trigger_notice_process_workflow + +dag = reprocess_notices_by_id_from_backlog() \ No newline at end of file diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py new file mode 100644 index 000000000..c3cc8aabc --- /dev/null +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -0,0 +1,72 @@ +from airflow.decorators import dag, task +from airflow.models import Param + +from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import push_dag_downstream, get_dag_param +from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID +from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator +from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status +from ted_sws.core.model.notice import NoticeStatus +from ted_sws.event_manager.adapters.event_log_decorator import event_log +from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType + +DAG_ID = "reprocess_notices_from_backlog_by_status" +DAG_NAME = "Reprocess notices from backlog by status" + +TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" +START_DATE_DAG_PARAM = "start_date" +END_DATE_DAG_PARAM = "end_date" +NOTICE_STATUSES_DAG_PARAM = "notice_statuses" + + +@dag( + default_args=DEFAULT_DAG_ARGUMENTS, + dag_id=DAG_ID, + dag_display_name=DAG_NAME, + schedule_interval=None, + tags=['selector', 're-transform'], + params={ + + NOTICE_STATUSES_DAG_PARAM: Param( + default=[], + type="array", + title="Notice Statuses", + description="Required. Select one or more notice statuses to reprocess.", + examples=[status.name for status in NoticeStatus] + ), + START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"), + END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)") + } + ) +def reprocess_notices_from_backlog_by_status(): + @task + @event_log(TechnicalEventMessage( + message="select_notices_for_re_transform", + metadata=EventMessageMetadata( + process_type=EventMessageProcessType.DAG, + process_name=DAG_ID + )) + ) + def select_notices_for_re_transform(): + start_date = get_dag_param(key=START_DATE_DAG_PARAM, default_value="") + end_date = get_dag_param(key=END_DATE_DAG_PARAM,default_value="") + statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM) + + notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param] + + notice_ids = notice_ids_selector_by_status( + notice_statuses=notice_statuses, + start_date=start_date, + end_date=end_date + ) + + push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) + + trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( + task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, + start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID + ) + + select_notices_for_re_transform() >> trigger_notice_process_workflow + +dag = reprocess_notices_from_backlog_by_status() \ No newline at end of file diff --git a/dags/reprocess_published_in_cellar_notices.py b/dags/reprocess_published_in_cellar_notices.py deleted file mode 100644 index 37a7e59d8..000000000 --- a/dags/reprocess_published_in_cellar_notices.py +++ /dev/null @@ -1,52 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_published_in_cellar_notices" - -RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-transform-publicly-available']) -def reprocess_published_in_cellar_notices(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_transform", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_transform(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_transform() >> trigger_notice_process_workflow - - -dag = reprocess_published_in_cellar_notices() diff --git a/dags/reprocess_unnormalised_notices_from_backlog.py b/dags/reprocess_unnormalised_notices_from_backlog.py deleted file mode 100644 index 51005e1ee..000000000 --- a/dags/reprocess_unnormalised_notices_from_backlog.py +++ /dev/null @@ -1,44 +0,0 @@ -from airflow.decorators import dag, task -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.operators.DagBatchPipelineOperator import TriggerNoticeBatchPipelineOperator, NOTICE_IDS_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unnormalised_notices_from_backlog" - -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 'raw-notices']) -def reprocess_unnormalised_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_all_raw_notices", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_all_raw_notices(): - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=[NoticeStatus.RAW], start_date=start_date, - end_date=end_date) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID - ) - select_all_raw_notices() >> trigger_notice_process_workflow - - -dag = reprocess_unnormalised_notices_from_backlog() diff --git a/dags/reprocess_unpackaged_notices_from_backlog.py b/dags/reprocess_unpackaged_notices_from_backlog.py deleted file mode 100644 index 75d37f4c6..000000000 --- a/dags/reprocess_unpackaged_notices_from_backlog.py +++ /dev/null @@ -1,53 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unpackaged_notices_from_backlog" - -RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING, - NoticeStatus.ELIGIBLE_FOR_PACKAGING, - NoticeStatus.INELIGIBLE_FOR_PUBLISHING] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-package']) -def reprocess_unpackaged_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_package", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_package(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_PACKAGE_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_PACKAGE_PIPELINE_TASK_ID - ) - select_notices_for_re_package() >> trigger_notice_process_workflow - - -dag = reprocess_unpackaged_notices_from_backlog() diff --git a/dags/reprocess_unpublished_notices_from_backlog.py b/dags/reprocess_unpublished_notices_from_backlog.py deleted file mode 100644 index fa0001435..000000000 --- a/dags/reprocess_unpublished_notices_from_backlog.py +++ /dev/null @@ -1,54 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unpublished_notices_from_backlog" - -RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING, - NoticeStatus.PACKAGED, NoticeStatus.PUBLICLY_UNAVAILABLE - ] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-publish']) -def reprocess_unpublished_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_publish", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_publish(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_PUBLISH_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_PUBLISH_PIPELINE_TASK_ID - ) - select_notices_for_re_publish() >> trigger_notice_process_workflow - - -etl_dag = reprocess_unpublished_notices_from_backlog() diff --git a/dags/reprocess_untransformed_notices_from_backlog.py b/dags/reprocess_untransformed_notices_from_backlog.py deleted file mode 100644 index b93033e61..000000000 --- a/dags/reprocess_untransformed_notices_from_backlog.py +++ /dev/null @@ -1,55 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_untransformed_notices_from_backlog" - -RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION, - NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION, - NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED - ] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-transform']) -def reprocess_untransformed_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_transform", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_transform(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_transform() >> trigger_notice_process_workflow - - -dag = reprocess_untransformed_notices_from_backlog() diff --git a/dags/reprocess_unvalidated_notices_from_backlog.py b/dags/reprocess_unvalidated_notices_from_backlog.py deleted file mode 100644 index 9f53f3f58..000000000 --- a/dags/reprocess_unvalidated_notices_from_backlog.py +++ /dev/null @@ -1,52 +0,0 @@ -from airflow.decorators import dag, task - -from dags import DEFAULT_DAG_ARGUMENTS -from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID -from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \ - EXECUTE_ONLY_ONE_STEP_KEY -from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status -from ted_sws.core.model.notice import NoticeStatus -from ted_sws.event_manager.adapters.event_log_decorator import event_log -from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ - EventMessageProcessType - -DAG_NAME = "reprocess_unvalidated_notices_from_backlog" - -RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED] -TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow" -FORM_NUMBER_DAG_PARAM = "form_number" -START_DATE_DAG_PARAM = "start_date" -END_DATE_DAG_PARAM = "end_date" -XSD_VERSION_DAG_PARAM = "xsd_version" - - -@dag(default_args=DEFAULT_DAG_ARGUMENTS, - schedule_interval=None, - tags=['selector', 're-validate']) -def reprocess_unvalidated_notices_from_backlog(): - @task - @event_log(TechnicalEventMessage( - message="select_notices_for_re_validate", - metadata=EventMessageMetadata( - process_type=EventMessageProcessType.DAG, process_name=DAG_NAME - )) - ) - def select_notices_for_re_validate(): - form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM) - start_date = get_dag_param(key=START_DATE_DAG_PARAM) - end_date = get_dag_param(key=END_DATE_DAG_PARAM) - xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM) - notice_ids = notice_ids_selector_by_status(notice_statuses=RE_VALIDATE_TARGET_NOTICE_STATES, - form_number=form_number, start_date=start_date, - end_date=end_date, xsd_version=xsd_version) - push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids) - - trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator( - task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID, - start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID - ) - select_notices_for_re_validate() >> trigger_notice_process_workflow - - -dag = reprocess_unvalidated_notices_from_backlog() diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index bbe3825ac..85f8bae95 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -1,8 +1,8 @@ import pytest from airflow.timetables.trigger import CronTriggerTimetable -from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME -from dags.fetch_notices_by_date import FETCHER_DAG_NAME +from dags.daily_materialized_views_update import DAG_ID as DAILY_MATERIALISED_VIEWS_DAG_NAME +from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME # @pytest.fixture