-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathdaily_materialized_views_update.py
More file actions
40 lines (31 loc) · 1.7 KB
/
daily_materialized_views_update.py
File metadata and controls
40 lines (31 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from airflow.decorators import dag, task
from pymongo import MongoClient
from dags import DEFAULT_DAG_ARGUMENTS, NOTICES_COLLECTION_DATASET, MATERIALISED_VIEW_DATASET
from ted_sws import config
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
catchup=False,
schedule=NOTICES_COLLECTION_DATASET,
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,
tags=['mongodb', 'daily-views-update'])
def daily_materialized_views_update():
@task(inlets=[NOTICES_COLLECTION_DATASET])
def create_materialised_view():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_notice_collection_materialised_view(mongo_client=mongo_client)
@task(inlets=[NOTICES_COLLECTION_DATASET])
def create_kpi_collection_for_notices():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_notice_kpi_collection(mongo_client=mongo_client)
@task(inlets=[NOTICES_COLLECTION_DATASET], outlets=[MATERIALISED_VIEW_DATASET])
def aggregate_batch_logs():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_batch_collection_materialised_view(mongo_client=mongo_client)
create_materialised_view() >> create_kpi_collection_for_notices() >> aggregate_batch_logs()
dag = daily_materialized_views_update()