Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,5 @@ package.json
.DS_Store
.scannerwork/*
/infra/alpine/libraries/
/infra/airflow
/libraries
20 changes: 11 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ ENV_FILE := .env

PROJECT_PATH = $(shell pwd)
AIRFLOW_INFRA_FOLDER ?= ${PROJECT_PATH}/.airflow
RML_MAPPER_PATH = ${PROJECT_PATH}/.rmlmapper/rmlmapper.jar
XML_PROCESSOR_PATH = ${PROJECT_PATH}/.saxon/saxon-he-10.9.jar
LIMES_ALIGNMENT_PATH = $(PROJECT_PATH)/.limes/limes.jar
LIBRARIES_PATH = ${PROJECT_PATH}/libraries
RML_MAPPER_PATH = ${LIBRARIES_PATH}/.rmlmapper/rmlmapper.jar
XML_PROCESSOR_PATH = ${LIBRARIES_PATH}/.saxon/saxon-he-10.9.jar
LIMES_ALIGNMENT_PATH = $(LIBRARIES_PATH)/.limes/limes.jar
HOSTNAME = $(shell hostname)
CAROOT = $(shell pwd)/infra/traefik/certs

Expand Down Expand Up @@ -215,18 +216,19 @@ stop-metabase:

init-rml-mapper:
@ echo -e "RMLMapper folder initialisation!"
@ mkdir -p ./.rmlmapper
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./.rmlmapper/rmlmapper.jar
@ mkdir -p ./libraries/.rmlmapper
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./libraries/.rmlmapper/rmlmapper.jar

init-limes:
@ echo -e "Limes folder initialisation!"
@ mkdir -p ./.limes
@ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./.limes
@ mkdir -p ./libraries/.limes
@ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./libraries/.limes/

init-saxon:
@ echo -e "$(BUILD_PRINT)Saxon folder initialization $(END_BUILD_PRINT)"
@ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P .saxon/
@ cd .saxon && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip
@ mkdir -p ./libraries/.saxon
@ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P ./libraries/.saxon/
@ cd ./libraries/.saxon/ && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip

start-project-services: | start-airflow start-mongo init-rml-mapper init-limes start-allegro-graph start-metabase
stop-project-services: | stop-airflow stop-mongo stop-allegro-graph stop-metabase
Expand Down
26 changes: 26 additions & 0 deletions dags/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from datetime import datetime, timedelta

from airflow import Dataset

DEFAULT_DAG_ARGUMENTS = {
"owner": "airflow",
"depends_on_past": False,
Expand All @@ -13,3 +15,27 @@
"concurrency": 15,
"execution_timeout": timedelta(days=10),
}

NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
STOP_PROCESSING_TASK_ID = "stop_processing"
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"

BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
}

# This is a formal name, not an actual connection string.
NOTICES_COLLECTION_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection")
MATERIALISED_VIEW_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection_materialised_view")
17 changes: 8 additions & 9 deletions dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@
from airflow.decorators import dag, task
from airflow.timetables.trigger import CronTriggerTimetable
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from ted_sws import config, DAG_DEFAULT_TIMEZONE
from dags import DEFAULT_DAG_ARGUMENTS, NOTICES_COLLECTION_DATASET, MATERIALISED_VIEW_DATASET
from ted_sws import config
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection

DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
timezone=DAG_DEFAULT_TIMEZONE),
schedule=NOTICES_COLLECTION_DATASET,
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,
tags=['mongodb', 'daily-views-update'])
def daily_materialized_views_update():
@task
@task(inlets=[NOTICES_COLLECTION_DATASET])
def create_materialised_view():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_notice_collection_materialised_view(mongo_client=mongo_client)

@task
@task(inlets=[NOTICES_COLLECTION_DATASET])
def create_kpi_collection_for_notices():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_notice_kpi_collection(mongo_client=mongo_client)

@task
@task(inlets=[NOTICES_COLLECTION_DATASET], outlets=[MATERIALISED_VIEW_DATASET])
def aggregate_batch_logs():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
create_batch_collection_materialised_view(mongo_client=mongo_client)
Expand Down
2 changes: 1 addition & 1 deletion dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=FETCHER_DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable(
schedule=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_FETCH,
timezone=DAG_DEFAULT_TIMEZONE),
tags=['selector', 'daily-fetch'],
Expand Down
31 changes: 9 additions & 22 deletions dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
from typing import List

from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.decorators import dag
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID, STOP_PROCESSING_TASK_ID, \
BRANCH_SELECTOR_MAP, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, NOTICE_VALIDATION_PIPELINE_TASK_ID, \
NOTICE_PACKAGE_PIPELINE_TASK_ID, NOTICE_PUBLISH_PIPELINE_TASK_ID, BRANCH_SELECTOR_TASK_ID, \
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, \
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, \
NOTICE_DISTILLATION_PIPELINE_TASK_ID, NOTICES_COLLECTION_DATASET
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_transformation_pipeline, \
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline

NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
STOP_PROCESSING_TASK_ID = "stop_processing"
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
DAG_NAME = "notice_processing_pipeline"

BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
}


def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY,
Expand Down Expand Up @@ -111,7 +97,8 @@ def _stop_processing():
stop_processing = PythonOperator(
task_id=STOP_PROCESSING_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
python_callable=_stop_processing
python_callable=_stop_processing,
outlets=NOTICES_COLLECTION_DATASET
)

notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline,
Expand Down
6 changes: 2 additions & 4 deletions dags/reprocess_published_in_cellar_notices.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
Expand Down
3 changes: 1 addition & 2 deletions dags/reprocess_unpackaged_notices_from_backlog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_PACKAGE_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
Expand Down
3 changes: 1 addition & 2 deletions dags/reprocess_unpublished_notices_from_backlog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_PUBLISH_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
Expand Down
9 changes: 4 additions & 5 deletions dags/reprocess_untransformed_notices_from_backlog.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
Expand All @@ -14,7 +12,8 @@
DAG_NAME = "reprocess_untransformed_notices_from_backlog"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
Expand Down
6 changes: 2 additions & 4 deletions dags/reprocess_unvalidated_notices_from_backlog.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from airflow.decorators import dag, task

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
EXECUTE_ONLY_ONE_STEP_KEY
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
Expand Down
2 changes: 1 addition & 1 deletion infra/airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN apt-get update && apt-get install -y \
# back to normal user
USER airflow

COPY libraries /home/airflow
COPY libraries /opt/airflow
# requirements.txt shall be made availble from the **ted-sws** GitHub repository
COPY requirements.txt /opt/airflow

Expand Down
49 changes: 40 additions & 9 deletions tests/unit/dags/conftest.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,50 @@
import os
import tempfile

import pytest
from airflow.models import DagBag, Variable
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils.db import resetdb, initdb

from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
from tests import AIRFLOW_DAG_FOLDER


@pytest.fixture(scope="session", autouse=True)
def setup_airflow_for_all_tests():
"""Setup Airflow DB once at the beginning of the test session"""
temp_db_file = tempfile.NamedTemporaryFile(mode="w+", suffix=".db")
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = f"sqlite:///{temp_db_file.name}"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
resetdb() # Reset the database first to ensure clean state
initdb()
yield
# Cleanup after all tests complete
if os.path.exists(temp_db_file.name):
temp_db_file.close()


# @pytest.fixture
# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
# Variable.delete(key=dag_fetch_schedule_variable_name)
# return DagBag(
# dag_folder=AIRFLOW_DAG_FOLDER,
# include_examples=False,
# read_dags_from_db=False,
# collect_dags=True)
@pytest.fixture(scope="function")
def setup_airflow():
"""This fixture now just makes sure the DB is clean for each test,
but doesn't reinitialize the connection"""
resetdb()
initdb()


@pytest.fixture
def dag_bag(dag_materialised_view_update_schedule_variable_name,
dag_fetch_schedule_variable_name,
setup_airflow) -> DagBag:
Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
Variable.delete(key=dag_fetch_schedule_variable_name)
return DagBag(
dag_folder=AIRFLOW_DAG_FOLDER,
include_examples=False, # Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
read_dags_from_db=False,
collect_dags=True)


@pytest.fixture
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/dags/test_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from airflow.models import DagBag


def test_dags_are_loaded_successfully(dag_bag: DagBag):
assert dag_bag.import_errors == {}
for dag in dag_bag.dags.values():
assert dag is not None
assert len(dag.tasks) > 0
42 changes: 42 additions & 0 deletions tests/unit/dags/test_daily_materialized_views_update_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from airflow.models import DagBag


def test_daily_materialized_views_update_dag_loaded(dag_bag: DagBag, daily_materialised_views_dag_id: str):
assert daily_materialised_views_dag_id in dag_bag.dags
dag = dag_bag.dags[daily_materialised_views_dag_id]
assert dag is not None


def test_daily_materialized_views_update_dag_structure(dag_bag: DagBag, daily_materialised_views_dag_id: str):
dag = dag_bag.dags[daily_materialised_views_dag_id]

task_ids = [task.task_id for task in dag.tasks]
expected_tasks = [
"create_materialised_view",
"create_kpi_collection_for_notices",
"aggregate_batch_logs"
]
for task_id in expected_tasks:
assert task_id in task_ids

assert len(dag.tasks) == 3

create_view_task = dag.get_task("create_materialised_view")
kpi_collection_task = dag.get_task("create_kpi_collection_for_notices")
aggregate_logs_task = dag.get_task("aggregate_batch_logs")

assert kpi_collection_task.task_id in [task.task_id for task in create_view_task.downstream_list]
assert aggregate_logs_task.task_id in [task.task_id for task in kpi_collection_task.downstream_list]

assert create_view_task.task_id in [task.task_id for task in kpi_collection_task.upstream_list]
assert kpi_collection_task.task_id in [task.task_id for task in aggregate_logs_task.upstream_list]


def test_daily_materialized_views_update_dag_default_args(dag_bag: DagBag, daily_materialised_views_dag_id: str):
assert daily_materialised_views_dag_id in dag_bag.dags
dag = dag_bag.dags[daily_materialised_views_dag_id]

assert dag.max_active_runs == 1
assert not dag.catchup
assert "mongodb" in dag.tags
assert "daily-views-update" in dag.tags
Loading
Loading