Skip to content

Commit 092a72b

Browse files
committed
Merge branch 'develop' into feature/SWS2-18
2 parents ffb8e5c + 366b5d4 commit 092a72b

16 files changed

Lines changed: 161 additions & 316 deletions

dags/daily_check_notices_availability_in_cellar.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
88
validate_notices_availability_in_cellar
99

10-
DAG_NAME = "daily_check_notices_availability_in_cellar"
10+
DAG_ID = "daily_check_notices_availability_in_cellar"
11+
DAG_NAME = "Daily check notices availability in Cellar"
1112

1213

1314
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1415
catchup=False,
16+
dag_display_name=DAG_NAME,
17+
dag_id=DAG_ID,
1518
schedule_interval="0 0 * * *",
1619
tags=['daily', 'validation'])
1720
def daily_check_notices_availability_in_cellar():

dags/daily_materialized_views_update.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010

1111
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
1212
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1
13+
DAG_NAME = "Materialized views update"
1314

1415

1516
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1617
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
18+
dag_display_name=DAG_NAME,
1719
catchup=False,
1820
schedule=NOTICES_COLLECTION_DATASET,
1921
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,

dags/fetch_notices_by_date.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
EventMessageProcessType
1818
from ted_sws.event_manager.services.log import log_error
1919

20-
FETCHER_DAG_NAME = "fetch_notices_by_date"
20+
DAG_ID = "fetch_notices_by_date"
21+
DAG_NAME = "Fetch notices by date"
2122
BATCH_SIZE = 2000
2223
WILD_CARD_DAG_KEY = "wild_card"
2324
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -29,7 +30,8 @@
2930

3031

3132
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
32-
dag_id=FETCHER_DAG_NAME,
33+
dag_id=DAG_ID,
34+
dag_display_name=DAG_NAME,
3335
catchup=False,
3436
schedule=CronTriggerTimetable(
3537
cron=config.SCHEDULE_DAG_FETCH,
@@ -58,7 +60,7 @@ def fetch_notices_by_date():
5860
@event_log(TechnicalEventMessage(
5961
message="fetch_notice_from_ted",
6062
metadata=EventMessageMetadata(
61-
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
63+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
6264
))
6365
)
6466
def fetch_by_date_notice_from_ted():

dags/fetch_notices_by_date_range.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
from dags import DEFAULT_DAG_ARGUMENTS
1111
from dags.dags_utils import get_dag_param
1212
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
13-
FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
13+
DAG_ID as FETCH_NOTICES_BY_DATE_DAG_NAME
1414
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1515
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1616
EventMessageProcessType
1717

18-
DAG_NAME = "fetch_notices_by_date_range"
18+
DAG_ID = "fetch_notices_by_date_range"
19+
DAG_NAME = "Fetch notices by date range"
1920

2021
START_DATE_KEY = "start_date"
2122
END_DATE_KEY = "end_date"
@@ -34,7 +35,8 @@ def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> li
3435
until=datetime.strptime(end_date, '%Y-%m-%d'))]
3536

3637

37-
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
38+
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, dag_id=DAG_ID, dag_display_name=DAG_NAME,
39+
tags=['master'],
3840
params={
3941
START_DATE_KEY: Param(
4042
default=f"{date.today()}",
@@ -66,7 +68,7 @@ def fetch_notices_by_date_range():
6668
@event_log(TechnicalEventMessage(
6769
message="trigger_fetch_notices_workers_for_date_range",
6870
metadata=EventMessageMetadata(
69-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
71+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
7072
))
7173
)
7274
def trigger_notice_by_date_for_each_date_in_range():

dags/fetch_notices_by_query.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1212
EventMessageProcessType
1313

14-
DAG_NAME = "fetch_notices_by_query"
14+
DAG_ID = "fetch_notices_by_query"
15+
DAG_NAME = "Fetch notices by query"
1516
BATCH_SIZE = 2000
1617
QUERY_DAG_KEY = "query"
1718
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -23,6 +24,8 @@
2324

2425
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2526
schedule_interval=None,
27+
dag_display_name=DAG_NAME,
28+
dag_id=DAG_ID,
2629
tags=['fetch'],
2730
params={
2831
QUERY_DAG_KEY: Param(
@@ -46,7 +49,7 @@ def fetch_notices_by_query():
4649
@event_log(TechnicalEventMessage(
4750
message="fetch_by_query_notice_from_ted",
4851
metadata=EventMessageMetadata(
49-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
52+
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
5053
))
5154
)
5255
def fetch_by_query_notice_from_ted():

dags/load_mapping_suite_in_database.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite"
2828
TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline"
2929
CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data"
30-
30+
DAG_ID = "load_mapping_suite_in_database"
31+
DAG_NAME = "Load mapping suite"
3132

3233
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3334
schedule_interval=None,
35+
dag_id=DAG_ID,
36+
dag_display_name=DAG_NAME,
3437
tags=['fetch', 'mapping-suite', 'github'],
3538
params={
3639
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(

dags/load_notices_in_fuseki.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@
1313
FUSEKI_DATASET_NAME_DAG_PARAM_KEY = "fuseki_dataset_name"
1414
NOTICE_STATUS_DAG_PARAM_KEY = "notice_status"
1515
DEFAULT_FUSEKI_DATASET_NAME = "mdr_dataset"
16-
16+
DAG_ID = "load_notices_in_fuseki"
17+
DAG_NAME = "Load notices in Fuseki"
1718

1819
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1920
schedule_interval=None,
21+
dag_display_name=DAG_NAME,
22+
dag_id=DAG_ID,
2023
tags=['load', 'notices', 'fuseki'])
2124
def load_notices_in_fuseki():
2225
@task

dags/notice_processing_pipeline.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline
1919

2020
DAG_NAME = "notice_processing_pipeline"
21-
21+
DAG_ID = "notice_processing_pipeline"
2222

2323
def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
2424
start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY,
@@ -32,6 +32,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I
3232

3333
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3434
schedule_interval=None,
35+
dag_display_name=DAG_NAME,
36+
dag_id=DAG_ID,
3537
max_active_runs=256,
3638
max_active_tasks=256,
3739
tags=['worker', 'pipeline'])
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Param
3+
4+
from dags import DEFAULT_DAG_ARGUMENTS
5+
from dags.dags_utils import push_dag_downstream, get_dag_param
6+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
7+
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
8+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
9+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
10+
11+
DAG_ID = "reprocess_notices_by_id_from_backlog"
12+
DAG_NAME = "Reprocess notices from backlog by ID"
13+
14+
NOTICE_IDS_DAG_PARAM = "notice_ids"
15+
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
16+
17+
@dag(
18+
default_args=DEFAULT_DAG_ARGUMENTS,
19+
dag_id=DAG_ID,
20+
dag_display_name=DAG_NAME,
21+
schedule_interval=None,
22+
tags=["selector", "re-transform"],
23+
params={
24+
NOTICE_IDS_DAG_PARAM: Param(
25+
type="array",
26+
title="Notice IDs",
27+
description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]. Every ID value should be entered on a newline"
28+
)
29+
},
30+
description=DAG_NAME
31+
)
32+
def reprocess_notices_by_id_from_backlog():
33+
@task
34+
@event_log(TechnicalEventMessage(
35+
message="select_notices_for_reprocess_by_id",
36+
metadata=EventMessageMetadata(
37+
process_type=EventMessageProcessType.DAG,
38+
process_name=DAG_ID
39+
))
40+
)
41+
def select_notice_ids():
42+
notice_ids = get_dag_param(key=NOTICE_IDS_DAG_PARAM, raise_error=True)
43+
44+
if not notice_ids:
45+
raise Exception("No notice IDs provided.")
46+
47+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
48+
49+
trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
50+
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
51+
start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID
52+
)
53+
54+
select_notice_ids() >> trigger_notice_process_workflow
55+
56+
dag = reprocess_notices_by_id_from_backlog()
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Param
3+
4+
from dags import DEFAULT_DAG_ARGUMENTS
5+
from dags.dags_utils import push_dag_downstream, get_dag_param
6+
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
7+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
8+
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
9+
from ted_sws.core.model.notice import NoticeStatus
10+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
11+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
12+
13+
DAG_ID = "reprocess_notices_from_backlog_by_status"
14+
DAG_NAME = "Reprocess notices from backlog by status"
15+
16+
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
17+
START_DATE_DAG_PARAM = "start_date"
18+
END_DATE_DAG_PARAM = "end_date"
19+
NOTICE_STATUSES_DAG_PARAM = "notice_statuses"
20+
21+
22+
@dag(
23+
default_args=DEFAULT_DAG_ARGUMENTS,
24+
dag_id=DAG_ID,
25+
dag_display_name=DAG_NAME,
26+
schedule_interval=None,
27+
tags=['selector', 're-transform'],
28+
params={
29+
30+
NOTICE_STATUSES_DAG_PARAM: Param(
31+
default=[],
32+
type="array",
33+
title="Notice Statuses",
34+
description="Required. Select one or more notice statuses to reprocess.",
35+
examples=[status.name for status in NoticeStatus]
36+
),
37+
START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"),
38+
END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)")
39+
}
40+
)
41+
def reprocess_notices_from_backlog_by_status():
42+
@task
43+
@event_log(TechnicalEventMessage(
44+
message="select_notices_for_re_transform",
45+
metadata=EventMessageMetadata(
46+
process_type=EventMessageProcessType.DAG,
47+
process_name=DAG_ID
48+
))
49+
)
50+
def select_notices_for_re_transform():
51+
start_date = get_dag_param(key=START_DATE_DAG_PARAM, default_value="")
52+
end_date = get_dag_param(key=END_DATE_DAG_PARAM,default_value="")
53+
statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM)
54+
55+
notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param]
56+
57+
notice_ids = notice_ids_selector_by_status(
58+
notice_statuses=notice_statuses,
59+
start_date=start_date,
60+
end_date=end_date
61+
)
62+
63+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
64+
65+
trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
66+
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
67+
start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID
68+
)
69+
70+
select_notices_for_re_transform() >> trigger_notice_process_workflow
71+
72+
dag = reprocess_notices_from_backlog_by_status()

0 commit comments

Comments
 (0)