Skip to content

Commit 41f02c1

Browse files
committed
feat+tests: implement scheduled materialisation dag run based on datasets update + test materisation dag
1 parent 23d46b0 commit 41f02c1

5 files changed

Lines changed: 64 additions & 14 deletions

File tree

dags/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import datetime, timedelta
22

3+
from airflow import Dataset
4+
35
DEFAULT_DAG_ARGUMENTS = {
46
"owner": "airflow",
57
"depends_on_past": False,
@@ -32,4 +34,8 @@
3234
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
3335
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
3436
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
35-
}
37+
}
38+
39+
# This is a formal name, not an actual connection string.
40+
NOTICES_COLLECTION_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection")
41+
MATERIALISED_VIEW_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection_materialised_view")

dags/daily_materialized_views_update.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,35 @@
11
from airflow.decorators import dag, task
2-
from airflow.timetables.trigger import CronTriggerTimetable
32
from pymongo import MongoClient
43

5-
from dags import DEFAULT_DAG_ARGUMENTS
6-
from ted_sws import config, DAG_DEFAULT_TIMEZONE
4+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICES_COLLECTION_DATASET, MATERIALISED_VIEW_DATASET
5+
from ted_sws import config
76
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
87
create_batch_collection_materialised_view
98
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
109
create_notice_collection_materialised_view, create_notice_kpi_collection
1110

1211
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
12+
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1
1313

1414

1515
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
1616
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
1717
catchup=False,
18-
schedule=CronTriggerTimetable(
19-
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
20-
timezone=DAG_DEFAULT_TIMEZONE),
18+
schedule=NOTICES_COLLECTION_DATASET,
19+
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,
2120
tags=['mongodb', 'daily-views-update'])
2221
def daily_materialized_views_update():
23-
@task
22+
@task(inlets=[NOTICES_COLLECTION_DATASET])
2423
def create_materialised_view():
2524
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
2625
create_notice_collection_materialised_view(mongo_client=mongo_client)
2726

28-
@task
27+
@task(inlets=[NOTICES_COLLECTION_DATASET])
2928
def create_kpi_collection_for_notices():
3029
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
3130
create_notice_kpi_collection(mongo_client=mongo_client)
3231

33-
@task
32+
@task(inlets=[NOTICES_COLLECTION_DATASET], outlets=[MATERIALISED_VIEW_DATASET])
3433
def aggregate_batch_logs():
3534
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
3635
create_batch_collection_materialised_view(mongo_client=mongo_client)

dags/notice_processing_pipeline.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from typing import List
22

3-
from airflow.operators.python import BranchPythonOperator, PythonOperator
43
from airflow.decorators import dag
4+
from airflow.operators.python import BranchPythonOperator, PythonOperator
55
from airflow.utils.trigger_rule import TriggerRule
66

77
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID, STOP_PROCESSING_TASK_ID, \
88
BRANCH_SELECTOR_MAP, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, NOTICE_VALIDATION_PIPELINE_TASK_ID, \
99
NOTICE_PACKAGE_PIPELINE_TASK_ID, NOTICE_PUBLISH_PIPELINE_TASK_ID, BRANCH_SELECTOR_TASK_ID, \
1010
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, \
11-
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, NOTICE_DISTILLATION_PIPELINE_TASK_ID
11+
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, \
12+
NOTICE_DISTILLATION_PIPELINE_TASK_ID, NOTICES_COLLECTION_DATASET
1213
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
1314
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
1415
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY
@@ -18,6 +19,7 @@
1819

1920
DAG_NAME = "notice_processing_pipeline"
2021

22+
2123
def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
2224
start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY,
2325
default_value=NOTICE_NORMALISATION_PIPELINE_TASK_ID)
@@ -95,7 +97,8 @@ def _stop_processing():
9597
stop_processing = PythonOperator(
9698
task_id=STOP_PROCESSING_TASK_ID,
9799
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
98-
python_callable=_stop_processing
100+
python_callable=_stop_processing,
101+
outlets=NOTICES_COLLECTION_DATASET
99102
)
100103

101104
notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline,

tests/unit/dags/test_dags.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from airflow.models import DagBag
22

33

4-
def test_dag_loaded(dag_bag: DagBag):
4+
def test_dags_are_loaded_successfully(dag_bag: DagBag):
55
assert dag_bag.import_errors == {}
66
for dag in dag_bag.dags.values():
77
assert dag is not None
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from airflow.models import DagBag
2+
3+
4+
def test_daily_materialized_views_update_dag_loaded(dag_bag: DagBag, daily_materialised_views_dag_id: str):
5+
assert daily_materialised_views_dag_id in dag_bag.dags
6+
dag = dag_bag.dags[daily_materialised_views_dag_id]
7+
assert dag is not None
8+
9+
10+
def test_daily_materialized_views_update_dag_structure(dag_bag: DagBag, daily_materialised_views_dag_id: str):
11+
dag = dag_bag.dags[daily_materialised_views_dag_id]
12+
13+
task_ids = [task.task_id for task in dag.tasks]
14+
expected_tasks = [
15+
"create_materialised_view",
16+
"create_kpi_collection_for_notices",
17+
"aggregate_batch_logs"
18+
]
19+
for task_id in expected_tasks:
20+
assert task_id in task_ids
21+
22+
assert len(dag.tasks) == 3
23+
24+
create_view_task = dag.get_task("create_materialised_view")
25+
kpi_collection_task = dag.get_task("create_kpi_collection_for_notices")
26+
aggregate_logs_task = dag.get_task("aggregate_batch_logs")
27+
28+
assert kpi_collection_task.task_id in [task.task_id for task in create_view_task.downstream_list]
29+
assert aggregate_logs_task.task_id in [task.task_id for task in kpi_collection_task.downstream_list]
30+
31+
assert create_view_task.task_id in [task.task_id for task in kpi_collection_task.upstream_list]
32+
assert kpi_collection_task.task_id in [task.task_id for task in aggregate_logs_task.upstream_list]
33+
34+
35+
def test_daily_materialized_views_update_dag_default_args(dag_bag: DagBag, daily_materialised_views_dag_id: str):
36+
assert daily_materialised_views_dag_id in dag_bag.dags
37+
dag = dag_bag.dags[daily_materialised_views_dag_id]
38+
39+
assert dag.max_active_runs == 1
40+
assert not dag.catchup
41+
assert "mongodb" in dag.tags
42+
assert "daily-views-update" in dag.tags

0 commit comments

Comments
 (0)