File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 99 "email_on_retry" : False ,
1010 "retries" : 0 ,
1111 "retry_delay" : timedelta (minutes = 3600 ),
12- "max_active_runs" : 128 ,
13- "concurrency" : 128 ,
12+ "max_active_runs" : 15 ,
13+ "concurrency" : 15 ,
1414 "execution_timeout" : timedelta (days = 10 ),
1515}
Original file line number Diff line number Diff line change 11import datetime
2+ import time
3+ from random import randint
24
35from airflow .operators .trigger_dagrun import TriggerDagRunOperator
46
@@ -38,16 +40,18 @@ def fetch_notices_and_trigger_index_and_normalise_notice_worker():
3840 while restart_dag_operator :
3941 restart_dag_operator = False
4042 try :
43+ time .sleep (randint (10 , 500 ) / 1000 )
4144 TriggerDagRunOperator (
4245 task_id = f'trigger_index_and_normalise_notice_worker_dag_{ notice_id } ' ,
4346 trigger_dag_id = "index_and_normalise_notice_worker" ,
4447 trigger_run_id = notice_id ,
45- execution_date = datetime .datetime .now ().replace (tzinfo = datetime .timezone .utc ),
4648 conf = {NOTICE_ID : notice_id }
4749 ).execute (context = context )
48- except :
50+ except Exception as e :
51+
4952 restart_dag_operator = True
5053 print ("trigger dag operator restarted !!!" )
54+ print ("EXCEPTION message: " , e )
5155
5256 fetch_notices_and_trigger_index_and_normalise_notice_worker ()
5357
You can’t perform that action at this time.
0 commit comments