1- from datetime import datetime
1+ from datetime import datetime , date
22from typing import Any
33from dateutil import rrule
44
55from airflow .decorators import dag , task
66from airflow .operators .python import get_current_context
77from airflow .operators .trigger_dagrun import TriggerDagRunOperator
8+ from airflow .models import Param
89
910from dags import DEFAULT_DAG_ARGUMENTS
1011from dags .dags_utils import get_dag_param
2021END_DATE_KEY = "end_date"
2122
2223
23- def generate_wildcards_foreach_day_in_range (start_date : str , end_date : str ) -> list :
24+ def generate_list_of_dates_from_date_range (start_date : str , end_date : str ) -> list :
2425 """
2526 Given a date range returns all daily dates in that range
2627 :param start_date:
2728 :param end_date:
2829 :return:
2930 """
30- return [dt . strftime ( '%Y%m%d*' )
31+ return [dt
3132 for dt in rrule .rrule (rrule .DAILY ,
32- dtstart = datetime .strptime (start_date , '%Y%m %d' ),
33- until = datetime .strptime (end_date , '%Y%m %d' ))]
33+ dtstart = datetime .strptime (start_date , '%Y-%m- %d' ),
34+ until = datetime .strptime (end_date , '%Y-%m- %d' ))]
3435
3536
36- @dag (default_args = DEFAULT_DAG_ARGUMENTS , schedule_interval = None , tags = ['master' ])
37+ @dag (default_args = DEFAULT_DAG_ARGUMENTS , schedule_interval = None , tags = ['master' ],
38+ params = {
39+ START_DATE_KEY : Param (
40+ default = f"{ date .today ()} " ,
41+ type = "string" ,
42+ format = "date" ,
43+ title = "Start Date" ,
44+ description = """This field is required.
45+ Start date of the date range to fetch notices from TED."""
46+ ),
47+ END_DATE_KEY : Param (
48+ default = f"{ date .today ()} " ,
49+ type = "string" ,
50+ format = "date" ,
51+ title = "End Date" ,
52+ description = """This field is required.
53+ End date of the date range to fetch notices from TED."""
54+ ),
55+ TRIGGER_COMPLETE_WORKFLOW_DAG_KEY : Param (
56+ default = False ,
57+ type = "boolean" ,
58+ title = "Trigger Complete Workflow" ,
59+ description = """This field is required.
60+ If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
61+ )
62+ }
63+ )
3764def fetch_notices_by_date_range ():
3865 @task
3966 @event_log (TechnicalEventMessage (
@@ -47,12 +74,12 @@ def trigger_notice_by_date_for_each_date_in_range():
4774 start_date = get_dag_param (key = START_DATE_KEY , raise_error = True )
4875 end_date = get_dag_param (key = END_DATE_KEY , raise_error = True )
4976 trigger_complete_workflow = get_dag_param (key = TRIGGER_COMPLETE_WORKFLOW_DAG_KEY , default_value = False )
50- date_wildcards = generate_wildcards_foreach_day_in_range (start_date , end_date )
51- for date_wildcard in date_wildcards :
77+ fetch_dates = generate_list_of_dates_from_date_range (start_date , end_date )
78+ for fetch_date in fetch_dates :
5279 TriggerDagRunOperator (
53- task_id = f'trigger_notice_fetch_by_date_workflow_dag_{ date_wildcard [: - 1 ] } ' ,
80+ task_id = f'trigger_notice_fetch_by_date_workflow_dag_{ fetch_date . strftime ( "%Y_%m_%d" ) } ' ,
5481 trigger_dag_id = FETCH_NOTICES_BY_DATE_DAG_NAME ,
55- conf = {WILD_CARD_DAG_KEY : date_wildcard ,
82+ conf = {WILD_CARD_DAG_KEY : fetch_date . strftime ( '%Y-%m-%d' ) ,
5683 TRIGGER_COMPLETE_WORKFLOW_DAG_KEY : trigger_complete_workflow ,
5784 }
5885 ).execute (context = context )
0 commit comments