55from airflow .models import DagBag , Variable
66from airflow .timetables .trigger import CronTriggerTimetable
77from airflow .utils .db import resetdb , initdb
8+
89from dags .daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
910from dags .fetch_notices_by_date import FETCHER_DAG_NAME
1011from tests import AIRFLOW_DAG_FOLDER
1112
12- @pytest .fixture (scope = "function" )
13- def setup_airflow ():
13+
14+ @pytest .fixture (scope = "session" , autouse = True )
15+ def setup_airflow_for_all_tests ():
16+ """Setup Airflow DB once at the beginning of the test session"""
1417 temp_db_file = tempfile .NamedTemporaryFile (mode = "w+" , suffix = ".db" )
1518 os .environ ["AIRFLOW__CORE__SQL_ALCHEMY_CONN" ] = f"sqlite:///{ temp_db_file .name } "
1619 os .environ ["AIRFLOW__CORE__LOAD_EXAMPLES" ] = "False"
1720 os .environ ["AIRFLOW__CORE__UNIT_TEST_MODE" ] = "True"
21+ resetdb () # Reset the database first to ensure clean state
22+ initdb ()
23+ yield
24+ # Cleanup after all tests complete
25+ if os .path .exists (temp_db_file .name ):
26+ temp_db_file .close ()
27+
28+
29+ @pytest .fixture (scope = "function" )
30+ def setup_airflow ():
31+ """This fixture now just makes sure the DB is clean for each test,
32+ but doesn't reinitialize the connection"""
33+ resetdb ()
1834 initdb ()
1935
2036
@@ -26,7 +42,7 @@ def dag_bag(dag_materialised_view_update_schedule_variable_name,
2642 Variable .delete (key = dag_fetch_schedule_variable_name )
2743 return DagBag (
2844 dag_folder = AIRFLOW_DAG_FOLDER ,
29- include_examples = False , # Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
45+ include_examples = False , # Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
3046 read_dags_from_db = False ,
3147 collect_dags = True )
3248
0 commit comments