Skip to content

Commit e01b55c

Browse files
rename DAGs
1 parent e52c3d6 commit e01b55c

11 files changed

Lines changed: 35 additions & 35 deletions
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
99
create_notice_collection_materialised_view, create_notice_kpi_collection
1010

11-
DAG_NAME = "daily_materialized_view_update"
11+
DAG_NAME = "daily_materialized_views_update"
1212

1313

1414
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1515
catchup=False,
1616
schedule_interval="0 6 * * *",
1717
tags=['mongodb', 'daily-views-update'])
18-
def daily_materialized_view_update():
18+
def daily_materialized_views_update():
1919
@task
2020
def create_materialised_view():
2121
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
@@ -34,4 +34,4 @@ def aggregate_batch_logs():
3434
create_materialised_view() >> create_kpi_collection_for_notices() >> aggregate_batch_logs()
3535

3636

37-
dag = daily_materialized_view_update()
37+
dag = daily_materialized_views_update()
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1515
EventMessageProcessType
1616

17-
DAG_NAME = "notice_fetch_by_date_workflow"
17+
DAG_NAME = "fetch_notices_by_date"
1818
BATCH_SIZE = 2000
1919
WILD_CARD_DAG_KEY = "wild_card"
2020
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -29,7 +29,7 @@
2929
catchup=False,
3030
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
3131
tags=['selector', 'daily-fetch'])
32-
def notice_fetch_by_date_workflow():
32+
def fetch_notices_by_date():
3333
@task
3434
@event_log(TechnicalEventMessage(
3535
message="fetch_notice_from_ted",
@@ -93,4 +93,4 @@ def _branch_selector():
9393
trigger_complete_workflow] >> validate_fetched_notices_step >> finish_step
9494

9595

96-
dag = notice_fetch_by_date_workflow()
96+
dag = fetch_notices_by_date()

dags/notice_fetch_for_date_range_orchestrator.py renamed to dags/fetch_notices_by_date_range.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88

99
from dags import DEFAULT_DAG_ARGUMENTS
1010
from dags.dags_utils import get_dag_param
11-
from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY
11+
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY
1212
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1313
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1414
EventMessageProcessType
1515

16-
DAG_NAME = "notice_fetch_for_date_range_orchestrator"
16+
DAG_NAME = "fetch_notices_by_date_range"
1717

1818
START_DATE_KEY = "start_date"
1919
END_DATE_KEY = "end_date"
@@ -33,7 +33,7 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l
3333

3434

3535
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'])
36-
def notice_fetch_for_date_range_orchestrator():
36+
def fetch_notices_by_date_range():
3737
@task
3838
@event_log(TechnicalEventMessage(
3939
message="trigger_fetch_notices_workers_for_date_range",
@@ -59,4 +59,4 @@ def trigger_notice_by_date_for_each_date_in_range():
5959
trigger_notice_by_date_for_each_date_in_range()
6060

6161

62-
dag = notice_fetch_for_date_range_orchestrator()
62+
dag = fetch_notices_by_date_range()
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3232
schedule_interval=None,
3333
tags=['fetch', 'mapping-suite', 'github'])
34-
def load_mapping_suite_in_mongodb():
34+
def load_mapping_suite_in_database():
3535
@task
3636
@event_log(is_loggable=False)
3737
def fetch_mapping_suite_package_from_github_into_mongodb(**context_args):
@@ -86,4 +86,4 @@ def _branch_selector():
8686
branch_task >> [trigger_document_proc_pipeline, finish_step]
8787

8888

89-
dag = load_mapping_suite_in_mongodb()
89+
dag = load_mapping_suite_in_database()
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
2525
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
2626
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
27-
DAG_NAME = "notice_process_workflow"
27+
DAG_NAME = "notice_processing_pipeline"
2828

2929
BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
3030
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
@@ -49,7 +49,7 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I
4949
max_active_runs=256,
5050
max_active_tasks=256,
5151
tags=['worker', 'pipeline'])
52-
def notice_process_workflow():
52+
def notice_processing_pipeline():
5353
"""
5454
5555
"""
@@ -149,4 +149,4 @@ def _stop_processing():
149149
notice_package_step >> selector_branch_before_publish >> notice_publish_step
150150

151151

152-
dag = notice_process_workflow()
152+
dag = notice_processing_pipeline()

dags/selector_raw_notices_process_orchestrator.py renamed to dags/reprocess_unnormalised_notices_from_backlog.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
99
EventMessageProcessType
1010

11-
DAG_NAME = "selector_raw_notices_process_orchestrator"
11+
DAG_NAME = "reprocess_unnormalised_notices_from_backlog"
1212

1313
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1414
FORM_NUMBER_DAG_PARAM = "form_number"
@@ -20,7 +20,7 @@
2020
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2121
schedule_interval=None,
2222
tags=['selector', 'raw-notices'])
23-
def selector_raw_notices_process_orchestrator():
23+
def reprocess_unnormalised_notices_from_backlog():
2424
@task
2525
@event_log(TechnicalEventMessage(
2626
message="select_all_raw_notices",
@@ -41,4 +41,4 @@ def select_all_raw_notices():
4141
select_all_raw_notices() >> trigger_notice_process_workflow
4242

4343

44-
dag = selector_raw_notices_process_orchestrator()
44+
dag = reprocess_unnormalised_notices_from_backlog()

dags/selector_repackage_process_orchestrator.py renamed to dags/reprocess_unpackaged_notices_from_backlog.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
from dags import DEFAULT_DAG_ARGUMENTS
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_process_workflow import NOTICE_PACKAGE_PIPELINE_TASK_ID
5+
from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID
66
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
77
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
88
from ted_sws.core.model.notice import NoticeStatus
99
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1010
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1111
EventMessageProcessType
1212

13-
DAG_NAME = "selector_re_package_process_orchestrator"
13+
DAG_NAME = "reprocess_unpackaged_notices_from_backlog"
1414

1515
RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
1616
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
@@ -25,7 +25,7 @@
2525
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2626
schedule_interval=None,
2727
tags=['selector', 're-package'])
28-
def selector_re_package_process_orchestrator():
28+
def reprocess_unpackaged_notices_from_backlog():
2929
@task
3030
@event_log(TechnicalEventMessage(
3131
message="select_notices_for_re_package",
@@ -50,4 +50,4 @@ def select_notices_for_re_package():
5050
select_notices_for_re_package() >> trigger_notice_process_workflow
5151

5252

53-
dag = selector_re_package_process_orchestrator()
53+
dag = reprocess_unpackaged_notices_from_backlog()

dags/selector_republish_process_orchestrator.py renamed to dags/reprocess_unpublished_notices_from_backlog.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dags import DEFAULT_DAG_ARGUMENTS
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_process_workflow import NOTICE_PUBLISH_PIPELINE_TASK_ID
5+
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
66
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
77
EXECUTE_ONLY_ONE_STEP_KEY
88
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
@@ -11,7 +11,7 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1212
EventMessageProcessType
1313

14-
DAG_NAME = "selector_re_publish_process_orchestrator"
14+
DAG_NAME = "reprocess_unpublished_notices_from_backlog"
1515

1616
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
1717
NoticeStatus.PACKAGED
@@ -26,7 +26,7 @@
2626
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2727
schedule_interval=None,
2828
tags=['selector', 're-publish'])
29-
def selector_re_publish_process_orchestrator():
29+
def reprocess_unpublished_notices_from_backlog():
3030
@task
3131
@event_log(TechnicalEventMessage(
3232
message="select_notices_for_re_publish",
@@ -51,4 +51,4 @@ def select_notices_for_re_publish():
5151
select_notices_for_re_publish() >> trigger_notice_process_workflow
5252

5353

54-
etl_dag = selector_re_publish_process_orchestrator()
54+
etl_dag = reprocess_unpublished_notices_from_backlog()

dags/selector_retransform_process_orchestrator.py renamed to dags/reprocess_untransformed_notices_from_backlog.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dags import DEFAULT_DAG_ARGUMENTS
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
5+
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
66
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
77
EXECUTE_ONLY_ONE_STEP_KEY
88
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
@@ -11,7 +11,7 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1212
EventMessageProcessType
1313

14-
DAG_NAME = "selector_re_transform_process_orchestrator"
14+
DAG_NAME = "reprocess_untransformed_notices_from_backlog"
1515

1616
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
1717
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
@@ -27,7 +27,7 @@
2727
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2828
schedule_interval=None,
2929
tags=['selector', 're-transform'])
30-
def selector_re_transform_process_orchestrator():
30+
def reprocess_untransformed_notices_from_backlog():
3131
@task
3232
@event_log(TechnicalEventMessage(
3333
message="select_notices_for_re_transform",
@@ -52,4 +52,4 @@ def select_notices_for_re_transform():
5252
select_notices_for_re_transform() >> trigger_notice_process_workflow
5353

5454

55-
dag = selector_re_transform_process_orchestrator()
55+
dag = reprocess_untransformed_notices_from_backlog()

dags/selector_revalidate_process_orchestrator.py renamed to dags/reprocess_unvalidated_notices_from_backlog.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dags import DEFAULT_DAG_ARGUMENTS
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_process_workflow import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
5+
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
66
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
77
EXECUTE_ONLY_ONE_STEP_KEY
88
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
@@ -11,7 +11,7 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1212
EventMessageProcessType
1313

14-
DAG_NAME = "selector_re_validate_process_orchestrator"
14+
DAG_NAME = "reprocess_unvalidated_notices_from_backlog"
1515

1616
RE_VALIDATE_TARGET_NOTICE_STATES = [NoticeStatus.DISTILLED]
1717
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
@@ -24,7 +24,7 @@
2424
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2525
schedule_interval=None,
2626
tags=['selector', 're-validate'])
27-
def selector_re_validate_process_orchestrator():
27+
def reprocess_unvalidated_notices_from_backlog():
2828
@task
2929
@event_log(TechnicalEventMessage(
3030
message="select_notices_for_re_validate",
@@ -49,4 +49,4 @@ def select_notices_for_re_validate():
4949
select_notices_for_re_validate() >> trigger_notice_process_workflow
5050

5151

52-
dag = selector_re_validate_process_orchestrator()
52+
dag = reprocess_unvalidated_notices_from_backlog()

0 commit comments

Comments
 (0)