Skip to content

Commit 5dca2bc

Browse files
committed
wip trigger forms for dags
1 parent 2fdccf9 commit 5dca2bc

4 files changed

Lines changed: 106 additions & 12 deletions

File tree

dags/fetch_notices_by_date.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from datetime import timedelta
1+
from datetime import timedelta, date
22

33
from airflow.decorators import dag, task
4-
from airflow.operators.dummy import DummyOperator
4+
from airflow.models import Param
5+
from airflow.operators.empty import EmptyOperator
56
from airflow.operators.python import BranchPythonOperator, PythonOperator
67
from airflow.timetables.trigger import CronTriggerTimetable
78
from airflow.utils.trigger_rule import TriggerRule
@@ -33,7 +34,24 @@
3334
timetable=CronTriggerTimetable(
3435
cron=config.SCHEDULE_DAG_FETCH,
3536
timezone=DAG_DEFAULT_TIMEZONE),
36-
tags=['selector', 'daily-fetch'])
37+
tags=['selector', 'daily-fetch'],
38+
params={
39+
WILD_CARD_DAG_KEY: Param(
40+
type="string",
41+
format="date",
42+
title="Date",
43+
description="""This field is required.
44+
Date to fetch notices from TED. Correct format is YYYYMMDD*"""
45+
),
46+
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
47+
default=True,
48+
type="boolean",
49+
title="Trigger Complete Workflow",
50+
description="""This field is required.
51+
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
52+
)
53+
}
54+
)
3755
def fetch_notices_by_date():
3856
@task
3957
@event_log(TechnicalEventMessage(
@@ -95,7 +113,7 @@ def _branch_selector():
95113
python_callable=validate_fetched_notices
96114
)
97115

98-
finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
116+
finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
99117
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
100118

101119
fetch_by_date_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,

dags/fetch_notices_by_date_range.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
from datetime import datetime
1+
from datetime import datetime, date
22
from typing import Any
33
from dateutil import rrule
44

55
from airflow.decorators import dag, task
66
from airflow.operators.python import get_current_context
77
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
8+
from airflow.models import Param
89

910
from dags import DEFAULT_DAG_ARGUMENTS
1011
from dags.dags_utils import get_dag_param
@@ -33,7 +34,33 @@ def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> l
3334
until=datetime.strptime(end_date, '%Y%m%d'))]
3435

3536

36-
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'])
37+
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['fetch'],
38+
params={
39+
START_DATE_KEY: Param(
40+
default=f"{date.today()}",
41+
type="string",
42+
format="date",
43+
title="Start Date",
44+
description="""This field is required.
45+
Start date of the date range to fetch notices from TED."""
46+
),
47+
END_DATE_KEY: Param(
48+
default=f"{date.today()}",
49+
type="string",
50+
format="date",
51+
title="End Date",
52+
description="""This field is required.
53+
End date of the date range to fetch notices from TED."""
54+
),
55+
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
56+
default=False,
57+
type="boolean",
58+
title="Trigger Complete Workflow",
59+
description="""This field is required.
60+
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
61+
)
62+
}
63+
)
3764
def fetch_notices_by_date_range():
3865
@task
3966
@event_log(TechnicalEventMessage(

dags/fetch_notices_by_query.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from airflow.decorators import dag, task
2-
from airflow.operators.dummy import DummyOperator
2+
from airflow.operators.empty import EmptyOperator
3+
from airflow.models import Param
34
from airflow.operators.python import BranchPythonOperator
45
from airflow.utils.trigger_rule import TriggerRule
56
from dags import DEFAULT_DAG_ARGUMENTS
@@ -22,7 +23,24 @@
2223

2324
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
2425
schedule_interval=None,
25-
tags=['fetch'])
26+
tags=['fetch'],
27+
params={
28+
QUERY_DAG_KEY: Param(
29+
default=None,
30+
type="string",
31+
title="Query",
32+
description="""This field is required.
33+
Query to fetch notices from TED."""
34+
),
35+
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
36+
default=True,
37+
type="boolean",
38+
title="Trigger Complete Workflow",
39+
description="""This field is required.
40+
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
41+
)
42+
}
43+
)
2644
def fetch_notices_by_query():
2745
@task
2846
@event_log(TechnicalEventMessage(
@@ -58,7 +76,7 @@ def _branch_selector():
5876
python_callable=_branch_selector,
5977
)
6078

61-
finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
79+
finish_step = EmptyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
6280
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
6381

6482
fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,

dags/load_mapping_suite_in_database.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from airflow.decorators import dag, task
2-
from airflow.operators.dummy import DummyOperator
2+
from airflow.models import Param
3+
from airflow.operators.empty import EmptyOperator
34
from airflow.operators.python import get_current_context, BranchPythonOperator
45
from airflow.utils.trigger_rule import TriggerRule
56
from pymongo import MongoClient
@@ -30,7 +31,37 @@
3031

3132
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3233
schedule_interval=None,
33-
tags=['fetch', 'mapping-suite', 'github'])
34+
tags=['fetch', 'mapping-suite', 'github'],
35+
params={
36+
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(
37+
default=None,
38+
type=["null", "string"],
39+
title="Github repository url",
40+
description="""This is optional field.
41+
Github repository url to fetch mapping suite package from."""
42+
),
43+
BRANCH_OR_TAG_NAME_DAG_PARAM_KEY: Param(
44+
default=None,
45+
type=["null", "string"],
46+
title="Branch or tag name",
47+
description="""This is optional field.
48+
Branch or tag name to fetch mapping suite package from."""
49+
),
50+
MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY: Param(
51+
default=None,
52+
type=["null", "string"],
53+
title="Mapping suite package name",
54+
description="""This is optional field.
55+
Mapping suite package name to fetch from github repository."""
56+
),
57+
LOAD_TEST_DATA_DAG_PARAM_KEY: Param(
58+
default=False,
59+
type="boolean",
60+
title="Load test data",
61+
description="""This field is used to load test data."""
62+
)
63+
}
64+
)
3465
def load_mapping_suite_in_database():
3566
@task
3667
@event_log(is_loggable=False)
@@ -77,7 +108,7 @@ def _branch_selector():
77108
task_id=CHECK_IF_LOAD_TEST_DATA_TASK_ID,
78109
python_callable=_branch_selector,
79110
)
80-
finish_step = DummyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
111+
finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
81112
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
82113

83114
trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)

0 commit comments

Comments
 (0)