Skip to content

Commit cf65e6c

Browse files
Dragos0000duprijil
authored andcommitted
reprocess dags and label change for dags
1 parent 8c0d1e4 commit cf65e6c

17 files changed

Lines changed: 40 additions & 320 deletions

dags/daily_check_notices_availability_in_cellar.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
88
validate_notices_availability_in_cellar
99

10-
DAG_NAME = "daily_check_notices_availability_in_cellar"
10+
DAG_ID = "daily_check_notices_availability_in_cellar"
11+
DAG_NAME = "Daily check notices availability in Cellar"
1112

1213

1314
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1415
catchup=False,
16+
dag_display_name=DAG_NAME,
17+
dag_id=DAG_ID,
1518
schedule_interval="0 0 * * *",
1619
tags=['daily', 'validation'])
1720
def daily_check_notices_availability_in_cellar():

dags/daily_materialized_views_update.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010

1111
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
1212
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1
13+
DAG_NAME = "Materialized views update"
1314

1415

1516
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1617
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
18+
dag_display_name=DAG_NAME,
1719
catchup=False,
1820
schedule=NOTICES_COLLECTION_DATASET,
1921
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,

dags/fetch_notices_by_date.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
EventMessageProcessType
1818
from ted_sws.event_manager.services.log import log_error
1919

20-
FETCHER_DAG_NAME = "fetch_notices_by_date"
20+
DAG_ID = "fetch_notices_by_date"
21+
DAG_NAME = "Fetch notices by date"
22+
BATCH_SIZE = 2000
2123
WILD_CARD_DAG_KEY = "wild_card"
2224
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
2325
TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow"
@@ -28,7 +30,8 @@
2830

2931

3032
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
31-
dag_id=FETCHER_DAG_NAME,
33+
dag_id=DAG_ID,
34+
dag_display_name=DAG_NAME,
3235
catchup=False,
3336
schedule=CronTriggerTimetable(
3437
cron=config.SCHEDULE_DAG_FETCH,
@@ -57,7 +60,7 @@ def fetch_notices_by_date():
5760
@event_log(TechnicalEventMessage(
5861
message="fetch_notice_from_ted",
5962
metadata=EventMessageMetadata(
60-
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
63+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
6164
))
6265
)
6366
def fetch_by_date_notice_from_ted():

dags/fetch_notices_by_date_range.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
from dags import DEFAULT_DAG_ARGUMENTS
1111
from dags.dags_utils import get_dag_param
1212
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
13-
FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
13+
DAG_ID as FETCH_NOTICES_BY_DATE_DAG_NAME
1414
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1515
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1616
EventMessageProcessType
1717

18-
DAG_NAME = "fetch_notices_by_date_range"
18+
DAG_ID = "fetch_notices_by_date_range"
19+
DAG_NAME = "Fetch notices by date range"
1920

2021
START_DATE_KEY = "start_date"
2122
END_DATE_KEY = "end_date"
@@ -34,7 +35,8 @@ def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> li
3435
until=datetime.strptime(end_date, '%Y-%m-%d'))]
3536

3637

37-
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
38+
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, dag_id=DAG_ID, dag_display_name=DAG_NAME,
39+
tags=['master'],
3840
params={
3941
START_DATE_KEY: Param(
4042
default=f"{date.today()}",
@@ -66,7 +68,7 @@ def fetch_notices_by_date_range():
6668
@event_log(TechnicalEventMessage(
6769
message="trigger_fetch_notices_workers_for_date_range",
6870
metadata=EventMessageMetadata(
69-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
71+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
7072
))
7173
)
7274
def trigger_notice_by_date_for_each_date_in_range():

dags/fetch_notices_by_query.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1212
EventMessageProcessType
1313

14-
DAG_NAME = "fetch_notices_by_query"
14+
DAG_ID = "fetch_notices_by_query"
15+
DAG_NAME = "Fetch notices by query"
1516
BATCH_SIZE = 2000
1617
QUERY_DAG_KEY = "query"
1718
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -23,6 +24,8 @@
2324

2425
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2526
schedule_interval=None,
27+
dag_display_name=DAG_NAME,
28+
dag_id=DAG_ID,
2629
tags=['fetch'],
2730
params={
2831
QUERY_DAG_KEY: Param(
@@ -46,7 +49,7 @@ def fetch_notices_by_query():
4649
@event_log(TechnicalEventMessage(
4750
message="fetch_by_query_notice_from_ted",
4851
metadata=EventMessageMetadata(
49-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
52+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
5053
))
5154
)
5255
def fetch_by_query_notice_from_ted():

dags/load_mapping_suite_in_database.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite"
2828
TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline"
2929
CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data"
30-
30+
DAG_ID = "load_mapping_suite_in_database"
31+
DAG_NAME = "Load mapping suite"
3132

3233
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3334
schedule_interval=None,
35+
dag_id=DAG_ID,
36+
dag_display_name=DAG_NAME,
3437
tags=['fetch', 'mapping-suite', 'github'],
3538
params={
3639
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(

dags/load_notices_in_fuseki.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
FUSEKI_DATASET_NAME_DAG_PARAM_KEY = "fuseki_dataset_name"
1414
NOTICE_STATUS_DAG_PARAM_KEY = "notice_status"
1515
DEFAULT_FUSEKI_DATASET_NAME = "mdr_dataset"
16-
16+
DAG_ID = "load_notices_in_fuseki"
17+
DAG_NAME = "Load notices in Fuseki"
1718

1819
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1920
schedule_interval=None,
21+
dag_display_name=DAG_NAME,
22+
dag_id=DAG_ID,
2023
tags=['load', 'notices', 'fuseki'])
2124
def load_notices_in_fuseki():
2225
@task

dags/notice_processing_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline
2020

2121
DAG_NAME = "notice_processing_pipeline"
22+
DAG_ID = "notice_processing_pipeline"
2223

2324

2425
def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
@@ -33,6 +34,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I
3334

3435
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3536
schedule_interval=None,
37+
dag_display_name=DAG_NAME,
38+
dag_id=DAG_ID,
3639
max_active_runs=256,
3740
max_active_tasks=256,
3841
tags=['worker', 'pipeline'])

dags/reprocess_notices_from_backlog_by_id.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
1010

1111
DAG_ID = "reprocess_notices_by_id_from_backlog"
12-
DAG_NAME = "Reprocess Notices From Backlog By ID"
12+
DAG_NAME = "Reprocess notices from backlog by ID"
1313

1414
NOTICE_IDS_DAG_PARAM = "notice_ids"
1515
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
@@ -24,7 +24,7 @@
2424
NOTICE_IDS_DAG_PARAM: Param(
2525
type="array",
2626
title="Notice IDs",
27-
description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]"
27+
description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]. Every ID value should be entered on a newline"
2828
)
2929
},
3030
description=DAG_NAME

dags/reprocess_notices_from_backlog_by_status.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
1212

1313
DAG_ID = "reprocess_notices_from_backlog_by_status"
14-
DAG_NAME = "Reprocess Notices From Backlog By Status"
14+
DAG_NAME = "Reprocess notices from backlog by status"
1515

1616
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1717
START_DATE_DAG_PARAM = "start_date"
@@ -21,6 +21,7 @@
2121
@dag(
2222
default_args=DEFAULT_DAG_ARGUMENTS,
2323
dag_id=DAG_ID,
24+
dag_display_name=DAG_NAME,
2425
schedule_interval=None,
2526
tags=['selector', 're-transform'],
2627
params={

0 commit comments

Comments
 (0)