Skip to content

Commit 03cb423

Browse files
committed
test!: Init tests for DAGs; make DagBag import without importing errors
1 parent 2d1a86e commit 03cb423

11 files changed

Lines changed: 74 additions & 50 deletions

dags/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,23 @@
1313
"concurrency": 15,
1414
"execution_timeout": timedelta(days=10),
1515
}
16+
17+
NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
18+
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
19+
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
20+
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
21+
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
22+
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
23+
STOP_PROCESSING_TASK_ID = "stop_processing"
24+
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
25+
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
26+
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
27+
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
28+
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
29+
30+
BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
31+
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
32+
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
33+
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
34+
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
35+
}

dags/daily_materialized_views_update.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1616
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
1717
catchup=False,
18-
timetable=CronTriggerTimetable(
18+
schedule=CronTriggerTimetable(
1919
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
2020
timezone=DAG_DEFAULT_TIMEZONE),
2121
tags=['mongodb', 'daily-views-update'])

dags/fetch_notices_by_date.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3232
dag_id=FETCHER_DAG_NAME,
3333
catchup=False,
34-
timetable=CronTriggerTimetable(
34+
schedule=CronTriggerTimetable(
3535
cron=config.SCHEDULE_DAG_FETCH,
3636
timezone=DAG_DEFAULT_TIMEZONE),
3737
tags=['selector', 'daily-fetch'],

dags/notice_processing_pipeline.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,20 @@
44
from airflow.decorators import dag
55
from airflow.utils.trigger_rule import TriggerRule
66

7-
from dags import DEFAULT_DAG_ARGUMENTS
7+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID, STOP_PROCESSING_TASK_ID, \
8+
BRANCH_SELECTOR_MAP, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, NOTICE_VALIDATION_PIPELINE_TASK_ID, \
9+
NOTICE_PACKAGE_PIPELINE_TASK_ID, NOTICE_PUBLISH_PIPELINE_TASK_ID, BRANCH_SELECTOR_TASK_ID, \
10+
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, \
11+
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, NOTICE_DISTILLATION_PIPELINE_TASK_ID
812
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
913
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
1014
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY
1115
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline
1216
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_transformation_pipeline, \
1317
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline
1418

15-
NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
16-
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
17-
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
18-
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
19-
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
20-
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
21-
STOP_PROCESSING_TASK_ID = "stop_processing"
22-
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
23-
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
24-
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
25-
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
26-
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
2719
DAG_NAME = "notice_processing_pipeline"
2820

29-
BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
30-
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
31-
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
32-
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
33-
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
34-
}
35-
36-
3721
def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
3822
start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY,
3923
default_value=NOTICE_NORMALISATION_PIPELINE_TASK_ID)

dags/reprocess_published_in_cellar_notices.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from airflow.decorators import dag, task
22

3-
from dags import DEFAULT_DAG_ARGUMENTS
3+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
6-
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
7-
EXECUTE_ONLY_ONE_STEP_KEY
5+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
86
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
97
from ted_sws.core.model.notice import NoticeStatus
108
from ted_sws.event_manager.adapters.event_log_decorator import event_log

dags/reprocess_unpackaged_notices_from_backlog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from airflow.decorators import dag, task
22

3-
from dags import DEFAULT_DAG_ARGUMENTS
3+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PACKAGE_PIPELINE_TASK_ID
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID
65
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
76
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
87
from ted_sws.core.model.notice import NoticeStatus

dags/reprocess_unpublished_notices_from_backlog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from airflow.decorators import dag, task
22

3-
from dags import DEFAULT_DAG_ARGUMENTS
3+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PUBLISH_PIPELINE_TASK_ID
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
65
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
76
EXECUTE_ONLY_ONE_STEP_KEY
87
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status

dags/reprocess_untransformed_notices_from_backlog.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from airflow.decorators import dag, task
22

3-
from dags import DEFAULT_DAG_ARGUMENTS
3+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
6-
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
7-
EXECUTE_ONLY_ONE_STEP_KEY
5+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
86
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
97
from ted_sws.core.model.notice import NoticeStatus
108
from ted_sws.event_manager.adapters.event_log_decorator import event_log
@@ -14,7 +12,8 @@
1412
DAG_NAME = "reprocess_untransformed_notices_from_backlog"
1513

1614
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
17-
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
15+
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
16+
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
1817
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
1918
]
2019
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

dags/reprocess_unvalidated_notices_from_backlog.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from airflow.decorators import dag, task
22

3-
from dags import DEFAULT_DAG_ARGUMENTS
3+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
44
from dags.dags_utils import push_dag_downstream, get_dag_param
5-
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
6-
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
7-
EXECUTE_ONLY_ONE_STEP_KEY
5+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
86
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
97
from ted_sws.core.model.notice import NoticeStatus
108
from ted_sws.event_manager.adapters.event_log_decorator import event_log

tests/unit/dags/conftest.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,38 @@
1+
import os
2+
import tempfile
3+
14
import pytest
5+
from airflow.models import DagBag, Variable
26
from airflow.timetables.trigger import CronTriggerTimetable
3-
7+
from airflow.utils.db import initdb
48
from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
59
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
10+
from tests import AIRFLOW_DAG_FOLDER
11+
12+
@pytest.fixture(scope="session", autouse=True)
13+
def setup_airflow():
14+
# Setup
15+
temp_db_file = tempfile.NamedTemporaryFile(mode="w+", suffix=".db")
16+
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = f"sqlite:///{temp_db_file.name}"
17+
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
18+
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
19+
initdb()
620

21+
# Run tests
22+
yield
723

8-
# @pytest.fixture
9-
# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
10-
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
11-
# Variable.delete(key=dag_fetch_schedule_variable_name)
12-
# return DagBag(
13-
# dag_folder=AIRFLOW_DAG_FOLDER,
14-
# include_examples=False,
15-
# read_dags_from_db=False,
16-
# collect_dags=True)
24+
# Cleanup
25+
temp_db_file.close()
26+
27+
@pytest.fixture
28+
def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
29+
Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
30+
Variable.delete(key=dag_fetch_schedule_variable_name)
31+
return DagBag(
32+
dag_folder=AIRFLOW_DAG_FOLDER,
33+
include_examples=False, #Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
34+
read_dags_from_db=False,
35+
collect_dags=True)
1736

1837

1938
@pytest.fixture

0 commit comments

Comments
 (0)