1+ import os
2+
13from airflow import DAG
24from airflow .models import DagBag , Variable
35from airflow .timetables .trigger import CronTriggerTimetable
46
5- from dags .fetch_notices_by_date import DAG_FETCH_DEFAULT_TIMETABLE , DAG_FETCH_DEFAULT_TIMEZONE , \
6- SCHEDULE_DAG_FETCH_VAR_NAME
7+ from dags .fetch_notices_by_date import DAG_FETCH_DEFAULT_TIMEZONE
8+ from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE
79
810
9- def test_schedule_dag_fetch_ver_name_is_correct (dag_fetch_schedule_variable_name ):
10- assert SCHEDULE_DAG_FETCH_VAR_NAME == dag_fetch_schedule_variable_name
11+ def test_fetcher_change_timetable_from_airflow_variable_after_reparse (dag_bag : DagBag ,
12+ dag_fetch_schedule_variable_name ,
13+ fetcher_dag_id : str ,
14+ example_dag_cron_table : CronTriggerTimetable ,
15+ airflow_timetable_import_error_name : str ):
16+ fetcher_dag : DAG = dag_bag .get_dag (dag_id = fetcher_dag_id )
17+ assert fetcher_dag is not None
18+ assert fetcher_dag .schedule_interval != example_dag_cron_table ._expression
1119
20+ Variable .set (key = dag_fetch_schedule_variable_name , value = example_dag_cron_table ._expression )
21+ dag_bag .collect_dags (only_if_updated = False )
1222
13- def test_fetcher_has_default_timetable_at_the_beginning (dag_bag : DagBag , fetcher_dag_id : str ):
1423 fetcher_dag : DAG = dag_bag .get_dag (dag_id = fetcher_dag_id )
15- default_dag_timetable = CronTriggerTimetable (cron = DAG_FETCH_DEFAULT_TIMETABLE ,
16- timezone = DAG_FETCH_DEFAULT_TIMEZONE )
17-
1824 assert fetcher_dag is not None
19- assert fetcher_dag .schedule_interval == default_dag_timetable ._expression
25+ assert fetcher_dag .schedule_interval == example_dag_cron_table ._expression
26+
27+ assert all (airflow_timetable_import_error_name not in error for error in dag_bag .import_errors .values ())
2028
2129
22- def test_fetcher_gets_correct_timetable_after_reparse (dag_bag : DagBag , fetcher_dag_id : str ,
23- example_dag_cron_table : CronTriggerTimetable ,
24- airflow_timetable_import_error_name : str ):
30+ def test_fetcher_change_timetable_from_env_variable_after_reparse (dag_bag : DagBag ,
31+ dag_fetch_schedule_variable_name : str ,
32+ fetcher_dag_id : str ,
33+ example_dag_cron_table : CronTriggerTimetable ,
34+ airflow_timetable_import_error_name : str ):
2535 fetcher_dag : DAG = dag_bag .get_dag (dag_id = fetcher_dag_id )
2636 assert fetcher_dag is not None
2737 assert fetcher_dag .schedule_interval != example_dag_cron_table ._expression
2838
29- Variable . set ( key = SCHEDULE_DAG_FETCH_VAR_NAME , value = example_dag_cron_table ._expression )
39+ os . environ [ dag_fetch_schedule_variable_name ] = example_dag_cron_table ._expression
3040 dag_bag .collect_dags (only_if_updated = False )
3141
3242 fetcher_dag : DAG = dag_bag .get_dag (dag_id = fetcher_dag_id )
@@ -36,7 +46,9 @@ def test_fetcher_gets_correct_timetable_after_reparse(dag_bag: DagBag, fetcher_d
3646 assert all (airflow_timetable_import_error_name not in error for error in dag_bag .import_errors .values ())
3747
3848
39- def test_fetcher_gets_incorrect_timetable_after_reparse (dag_bag : DagBag , fetcher_dag_id : str ,
49+ def test_fetcher_gets_incorrect_timetable_after_reparse (dag_bag : DagBag ,
50+ dag_fetch_schedule_variable_name : str ,
51+ fetcher_dag_id : str ,
4052 example_wrong_cron_table : str ,
4153 airflow_timetable_import_error_name : str ):
4254 fetcher_dag : DAG = dag_bag .get_dag (dag_id = fetcher_dag_id )
@@ -45,7 +57,7 @@ def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, fetcher
4557 timezone = DAG_FETCH_DEFAULT_TIMEZONE )
4658 assert fetcher_dag .schedule_interval == default_dag_timetable ._expression
4759
48- Variable .set (key = SCHEDULE_DAG_FETCH_VAR_NAME , value = example_wrong_cron_table )
60+ Variable .set (key = dag_fetch_schedule_variable_name , value = example_wrong_cron_table )
4961 dag_bag .collect_dags (only_if_updated = False )
5062
5163 assert any (airflow_timetable_import_error_name in error for error in dag_bag .import_errors .values ())
0 commit comments