Skip to content

Commit a102ac1

Browse files
committed
WIP reprocess dags
1 parent f012140 commit a102ac1

1 file changed

Lines changed: 66 additions & 0 deletions

File tree

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Param
3+
4+
from dags import DEFAULT_DAG_ARGUMENTS
5+
from dags.dags_utils import push_dag_downstream, get_dag_param
6+
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
7+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
8+
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
9+
from ted_sws.core.model.notice import NoticeStatus
10+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
11+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
12+
13+
DAG_ID = "reprocess_notices_from_backlog_by_status"
14+
DAG_NAME = "Reprocess Notices From Backlog By Status"
15+
16+
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
17+
START_DATE_DAG_PARAM = "start_date"
18+
END_DATE_DAG_PARAM = "end_date"
19+
NOTICE_STATUSES_DAG_PARAM = "notice_statuses"
20+
21+
@dag(
22+
default_args=DEFAULT_DAG_ARGUMENTS,
23+
dag_id=DAG_ID,
24+
schedule_interval=None,
25+
tags=['selector', 're-transform'],
26+
params={
27+
START_DATE_DAG_PARAM: Param(default="", type="string", description="Start date (YYYY-MM-DD)"),
28+
END_DATE_DAG_PARAM: Param(default="", type="string", description="End date (YYYY-MM-DD)"),
29+
NOTICE_STATUSES_DAG_PARAM: Param(
30+
type="array",
31+
title="Notice Statuses",
32+
description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]"
33+
)}
34+
)
35+
def reprocess_notices_from_backlog_by_status():
36+
@task
37+
@event_log(TechnicalEventMessage(
38+
message="select_notices_for_re_transform",
39+
metadata=EventMessageMetadata(
40+
process_type=EventMessageProcessType.DAG,
41+
process_name=DAG_ID
42+
))
43+
)
44+
def select_notices_for_re_transform():
45+
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
46+
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
47+
statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM)
48+
49+
notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param]
50+
51+
notice_ids = notice_ids_selector_by_status(
52+
notice_statuses=notice_statuses,
53+
start_date=start_date,
54+
end_date=end_date
55+
)
56+
57+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
58+
59+
trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
60+
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
61+
start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
62+
)
63+
64+
select_notices_for_re_transform() >> trigger_notice_process_workflow
65+
66+
dag = reprocess_notices_from_backlog_by_status()

0 commit comments

Comments
 (0)