Skip to content

Commit 8c0d1e4

Browse files
Dragos0000duprijil
authored andcommitted
reprocess by id
1 parent 1db884f commit 8c0d1e4

2 files changed

Lines changed: 59 additions & 3 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Param
3+
4+
from dags import DEFAULT_DAG_ARGUMENTS
5+
from dags.dags_utils import push_dag_downstream, get_dag_param
6+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
7+
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
8+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
9+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
10+
11+
DAG_ID = "reprocess_notices_by_id_from_backlog"
12+
DAG_NAME = "Reprocess Notices From Backlog By ID"
13+
14+
NOTICE_IDS_DAG_PARAM = "notice_ids"
15+
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
16+
17+
@dag(
18+
default_args=DEFAULT_DAG_ARGUMENTS,
19+
dag_id=DAG_ID,
20+
dag_display_name=DAG_NAME,
21+
schedule_interval=None,
22+
tags=["selector", "re-transform"],
23+
params={
24+
NOTICE_IDS_DAG_PARAM: Param(
25+
type="array",
26+
title="Notice IDs",
27+
description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]"
28+
)
29+
},
30+
description=DAG_NAME
31+
)
32+
def reprocess_notices_by_id_from_backlog():
33+
@task
34+
@event_log(TechnicalEventMessage(
35+
message="select_notices_for_reprocess_by_id",
36+
metadata=EventMessageMetadata(
37+
process_type=EventMessageProcessType.DAG,
38+
process_name=DAG_ID
39+
))
40+
)
41+
def select_notice_ids():
42+
notice_ids = get_dag_param(key=NOTICE_IDS_DAG_PARAM, raise_error=True)
43+
44+
if not notice_ids:
45+
raise Exception("No notice IDs provided.")
46+
47+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
48+
49+
trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
50+
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
51+
start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
52+
)
53+
54+
select_notice_ids() >> trigger_notice_process_workflow
55+
56+
dag = reprocess_notices_by_id_from_backlog()

dags/reprocess_notices_from_backlog_by_status.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
NOTICE_STATUSES_DAG_PARAM: Param(
2929
type="array",
3030
title="Notice Statuses",
31-
description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]"
31+
description="Required. List of notice statuses to reprocess. Example: [\"NORMALISED_METADATA\", \"DISTILLED\"]. Every status value should be entered on a newline"
3232
),
33-
START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start date (YYYY-MM-DD)"),
34-
END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End date (YYYY-MM-DD)")
33+
START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"),
34+
END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)")
3535
}
3636
)
3737
def reprocess_notices_from_backlog_by_status():

0 commit comments

Comments
 (0)