Skip to content

Commit ca80647

Browse files
committed
prepare the tests for fetcher scheduling
1 parent 3b5d081 commit ca80647

3 files changed

Lines changed: 123 additions & 20 deletions

File tree

dags/fetch_notices_by_date.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
EventMessageProcessType
1616
from ted_sws.event_manager.services.log import log_error
1717

18-
DAG_NAME = "fetch_notices_by_date"
18+
FETCHER_DAG_NAME = "fetch_notices_by_date"
1919
BATCH_SIZE = 2000
2020
WILD_CARD_DAG_KEY = "wild_card"
2121
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -25,6 +25,9 @@
2525
FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_date"
2626
VALIDATE_FETCHED_NOTICES_TASK_ID = "validate_fetched_notices"
2727

28+
DAG_FETCH_DEFAULT_TIMEZONE = "UTC"
29+
DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *"
30+
SCHEDULE_DAG_FETCH_VAR_NAME = "SCHEDULE_DAG_FETCH"
2831

2932
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3033
catchup=False,
@@ -35,7 +38,7 @@ def fetch_notices_by_date():
3538
@event_log(TechnicalEventMessage(
3639
message="fetch_notice_from_ted",
3740
metadata=EventMessageMetadata(
38-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
41+
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
3942
))
4043
)
4144
def fetch_by_date_notice_from_ted():

tests/unit/dags/conftest.py

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,74 @@
1-
# import os
2-
#
3-
# import pytest
4-
#
5-
# from airflow.models import DagBag
6-
# from airflow.utils import db
7-
# import logging
1+
import logging
2+
import os
83

4+
from airflow.exceptions import AirflowTimetableInvalid
5+
from airflow.models import DagBag
6+
from airflow.timetables.trigger import CronTriggerTimetable
7+
from airflow.utils import db
8+
from psutil.tests import pytest
9+
10+
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
911
from tests import TESTS_PATH
1012

1113
AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags"
1214

1315

14-
# @pytest.fixture(scope="session")
16+
@pytest.fixture()
17+
def dag_bag():
18+
os.environ["AIRFLOW_HOME"] = str(AIRFLOW_DAG_FOLDER)
19+
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
20+
# Initialising the Airflow DB so that it works properly with the new AIRFLOW_HOME
21+
logging.disable(logging.CRITICAL)
22+
db.resetdb()
23+
db.initdb()
24+
logging.disable(logging.NOTSET)
25+
dag_bag = DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False,
26+
read_dags_from_db=False)
27+
return dag_bag
28+
29+
30+
# @pytest.fixture
1531
# def dag_bag():
16-
# os.environ["AIRFLOW_HOME"] = str(AIRFLOW_DAG_FOLDER)
17-
# os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
18-
# # Initialising the Airflow DB so that it works properly with the new AIRFLOW_HOME
19-
# logging.disable(logging.CRITICAL)
20-
# db.resetdb()
21-
# db.initdb()
22-
# logging.disable(logging.NOTSET)
23-
# dag_bag = DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False,
24-
# read_dags_from_db=False)
25-
# return dag_bag
32+
# return DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False, read_dags_from_db=False)
33+
34+
35+
@pytest.fixture
36+
def fetcher_dag_id():
37+
return FETCHER_DAG_NAME
38+
39+
40+
@pytest.fixture
41+
def example_cron_table() -> str:
42+
return "15 14 1 * *"
43+
44+
45+
@pytest.fixture
46+
def example_wrong_cron_table() -> str:
47+
return "1234"
48+
49+
50+
@pytest.fixture
51+
def example_dag_cron_table(example_cron_table) -> CronTriggerTimetable:
52+
return CronTriggerTimetable(cron=example_cron_table, timezone="UTC")
53+
54+
55+
@pytest.fixture
56+
def airflow_timetable_import_error_name() -> str:
57+
return AirflowTimetableInvalid.__name__
58+
59+
60+
61+
@pytest.fixture
62+
def dag_fetch_schedule_variable_name() -> str:
63+
"""
64+
According to MM of meeting with OP from 2024.12.28
65+
"""
66+
return "SCHEDULE_DAG_FETCH"
67+
68+
69+
@pytest.fixture
70+
def dag_materialised_view_update_schedule_variable_name() -> str:
71+
"""
72+
According to MM of meeting with OP from 2024.12.28
73+
"""
74+
return "SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE"
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from airflow import DAG
2+
from airflow.models import DagBag, Variable
3+
from airflow.timetables.trigger import CronTriggerTimetable
4+
5+
from dags.fetch_notices_by_date import DAG_FETCH_DEFAULT_TIMETABLE, DAG_FETCH_DEFAULT_TIMEZONE, \
6+
SCHEDULE_DAG_FETCH_VAR_NAME
7+
8+
9+
def test_schedule_dag_fetch_ver_name_is_correct(dag_fetch_schedule_variable_name):
10+
assert SCHEDULE_DAG_FETCH_VAR_NAME == dag_fetch_schedule_variable_name
11+
12+
13+
def test_fetcher_has_default_timetable_at_the_beginning(dag_bag: DagBag, fetcher_dag_id: str):
14+
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
15+
default_dag_timetable = CronTriggerTimetable(cron=DAG_FETCH_DEFAULT_TIMETABLE,
16+
timezone=DAG_FETCH_DEFAULT_TIMEZONE)
17+
18+
assert fetcher_dag is not None
19+
assert fetcher_dag.schedule_interval == default_dag_timetable._expression
20+
21+
22+
def test_fetcher_gets_correct_timetable_after_reparse(dag_bag: DagBag, fetcher_dag_id: str,
23+
example_dag_cron_table: CronTriggerTimetable,
24+
airflow_timetable_import_error_name: str):
25+
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
26+
assert fetcher_dag is not None
27+
assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
28+
29+
Variable.set(key=SCHEDULE_DAG_FETCH_VAR_NAME, value=example_dag_cron_table._expression)
30+
dag_bag.collect_dags(only_if_updated=False)
31+
32+
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
33+
assert fetcher_dag is not None
34+
assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
35+
36+
assert all(airflow_timetable_import_error_name not in error for error in dag_bag.import_errors.values())
37+
38+
39+
def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag, fetcher_dag_id: str,
40+
example_wrong_cron_table: str,
41+
airflow_timetable_import_error_name: str):
42+
fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_id)
43+
assert fetcher_dag is not None
44+
default_dag_timetable = CronTriggerTimetable(cron=DAG_FETCH_DEFAULT_TIMETABLE,
45+
timezone=DAG_FETCH_DEFAULT_TIMEZONE)
46+
assert fetcher_dag.schedule_interval == default_dag_timetable._expression
47+
48+
Variable.set(key=SCHEDULE_DAG_FETCH_VAR_NAME, value=example_wrong_cron_table)
49+
dag_bag.collect_dags(only_if_updated=False)
50+
51+
assert any(airflow_timetable_import_error_name in error for error in dag_bag.import_errors.values())

0 commit comments

Comments
 (0)