diff --git a/.gitignore b/.gitignore index 6ad409a82..df90c8b43 100644 --- a/.gitignore +++ b/.gitignore @@ -120,3 +120,5 @@ package.json .DS_Store .scannerwork/* /infra/alpine/libraries/ +/infra/airflow +/libraries \ No newline at end of file diff --git a/Makefile b/Makefile index 54a07383c..229d6936a 100644 --- a/Makefile +++ b/Makefile @@ -12,9 +12,10 @@ ENV_FILE := .env PROJECT_PATH = $(shell pwd) AIRFLOW_INFRA_FOLDER ?= ${PROJECT_PATH}/.airflow -RML_MAPPER_PATH = ${PROJECT_PATH}/.rmlmapper/rmlmapper.jar -XML_PROCESSOR_PATH = ${PROJECT_PATH}/.saxon/saxon-he-10.9.jar -LIMES_ALIGNMENT_PATH = $(PROJECT_PATH)/.limes/limes.jar +LIBRARIES_PATH = ${PROJECT_PATH}/libraries +RML_MAPPER_PATH = ${LIBRARIES_PATH}/.rmlmapper/rmlmapper.jar +XML_PROCESSOR_PATH = ${LIBRARIES_PATH}/.saxon/saxon-he-10.9.jar +LIMES_ALIGNMENT_PATH = $(LIBRARIES_PATH)/.limes/limes.jar HOSTNAME = $(shell hostname) CAROOT = $(shell pwd)/infra/traefik/certs @@ -215,18 +216,19 @@ stop-metabase: init-rml-mapper: @ echo -e "RMLMapper folder initialisation!" - @ mkdir -p ./.rmlmapper - @ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./.rmlmapper/rmlmapper.jar + @ mkdir -p ./libraries/.rmlmapper + @ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./libraries/.rmlmapper/rmlmapper.jar init-limes: @ echo -e "Limes folder initialisation!" - @ mkdir -p ./.limes - @ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./.limes + @ mkdir -p ./libraries/.limes + @ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./libraries/.limes/ init-saxon: @ echo -e "$(BUILD_PRINT)Saxon folder initialization $(END_BUILD_PRINT)" - @ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P .saxon/ - @ cd .saxon && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip + @ mkdir -p ./libraries/.saxon + @ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P ./libraries/.saxon/ + @ cd ./libraries/.saxon/ && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip start-project-services: | start-airflow start-mongo init-rml-mapper init-limes start-allegro-graph start-metabase stop-project-services: | stop-airflow stop-mongo stop-allegro-graph stop-metabase diff --git a/dags/__init__.py b/dags/__init__.py index e384e43ab..ddc8f130b 100644 --- a/dags/__init__.py +++ b/dags/__init__.py @@ -1,5 +1,7 @@ from datetime import datetime, timedelta +from airflow import Dataset + DEFAULT_DAG_ARGUMENTS = { "owner": "airflow", "depends_on_past": False, @@ -13,3 +15,27 @@ "concurrency": 15, "execution_timeout": timedelta(days=10), } + +NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline" +NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline" +NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline" +NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline" +NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline" +NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline" +STOP_PROCESSING_TASK_ID = "stop_processing" +BRANCH_SELECTOR_TASK_ID = 'branch_selector' +SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation" +SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation" +SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package" +SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish" + +BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID, + NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, + NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, + NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, + NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID + } + +# This is a formal name, not an actual connection string. +NOTICES_COLLECTION_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection") +MATERIALISED_VIEW_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection_materialised_view") diff --git a/dags/daily_materialized_views_update.py b/dags/daily_materialized_views_update.py index 176741aff..35e07ab96 100644 --- a/dags/daily_materialized_views_update.py +++ b/dags/daily_materialized_views_update.py @@ -1,38 +1,37 @@ from airflow.decorators import dag, task -from airflow.timetables.trigger import CronTriggerTimetable from pymongo import MongoClient -from dags import DEFAULT_DAG_ARGUMENTS -from ted_sws import config, DAG_DEFAULT_TIMEZONE +from dags import DEFAULT_DAG_ARGUMENTS, NOTICES_COLLECTION_DATASET, MATERIALISED_VIEW_DATASET +from ted_sws import config from ted_sws.data_manager.services.create_batch_collection_materialised_view import \ create_batch_collection_materialised_view from ted_sws.data_manager.services.create_notice_collection_materialised_view import \ create_notice_collection_materialised_view, create_notice_kpi_collection -DAG_ID = "daily_materialized_views_update" +DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update" +DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1 DAG_NAME = "Materialized views update" @dag(default_args=DEFAULT_DAG_ARGUMENTS, - dag_id=DAG_ID, + dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME, dag_display_name=DAG_NAME, catchup=False, - timetable=CronTriggerTimetable( - cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE, - timezone=DAG_DEFAULT_TIMEZONE), + schedule=NOTICES_COLLECTION_DATASET, + max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS, tags=['mongodb', 'daily-views-update']) def daily_materialized_views_update(): - @task + @task(inlets=[NOTICES_COLLECTION_DATASET]) def create_materialised_view(): mongo_client = MongoClient(config.MONGO_DB_AUTH_URL) create_notice_collection_materialised_view(mongo_client=mongo_client) - @task + @task(inlets=[NOTICES_COLLECTION_DATASET]) def create_kpi_collection_for_notices(): mongo_client = MongoClient(config.MONGO_DB_AUTH_URL) create_notice_kpi_collection(mongo_client=mongo_client) - @task + @task(inlets=[NOTICES_COLLECTION_DATASET], outlets=[MATERIALISED_VIEW_DATASET]) def aggregate_batch_logs(): mongo_client = MongoClient(config.MONGO_DB_AUTH_URL) create_batch_collection_materialised_view(mongo_client=mongo_client) diff --git a/dags/fetch_notices_by_date.py b/dags/fetch_notices_by_date.py index 2a8857456..162c6936d 100644 --- a/dags/fetch_notices_by_date.py +++ b/dags/fetch_notices_by_date.py @@ -18,7 +18,7 @@ from ted_sws.event_manager.services.log import log_error DAG_ID = "fetch_notices_by_date" -DAG_NAME = "Fetch notices by date" +FETCHER_DAG_NAME = "Fetch notices by date" BATCH_SIZE = 2000 WILD_CARD_DAG_KEY = "wild_card" TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow" @@ -31,9 +31,9 @@ @dag(default_args=DEFAULT_DAG_ARGUMENTS, dag_id=DAG_ID, - dag_display_name=DAG_NAME, + dag_display_name=FETCHER_DAG_NAME, catchup=False, - timetable=CronTriggerTimetable( + schedule=CronTriggerTimetable( cron=config.SCHEDULE_DAG_FETCH, timezone=DAG_DEFAULT_TIMEZONE), tags=['selector', 'daily-fetch'], diff --git a/dags/notice_processing_pipeline.py b/dags/notice_processing_pipeline.py index 8d9022c14..114845202 100644 --- a/dags/notice_processing_pipeline.py +++ b/dags/notice_processing_pipeline.py @@ -1,10 +1,15 @@ from typing import List -from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.decorators import dag +from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.utils.trigger_rule import TriggerRule -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID, STOP_PROCESSING_TASK_ID, \ + BRANCH_SELECTOR_MAP, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, NOTICE_VALIDATION_PIPELINE_TASK_ID, \ + NOTICE_PACKAGE_PIPELINE_TASK_ID, NOTICE_PUBLISH_PIPELINE_TASK_ID, BRANCH_SELECTOR_TASK_ID, \ + SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, \ + SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, \ + NOTICE_DISTILLATION_PIPELINE_TASK_ID, NOTICES_COLLECTION_DATASET from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \ EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY @@ -12,28 +17,8 @@ from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_transformation_pipeline, \ notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline -NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline" -NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline" -NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline" -NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline" -NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline" -NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline" -STOP_PROCESSING_TASK_ID = "stop_processing" -BRANCH_SELECTOR_TASK_ID = 'branch_selector' -SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation" -SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation" -SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package" -SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish" +DAG_NAME = "notice_processing_pipeline" DAG_ID = "notice_processing_pipeline" -DAG_NAME = "Notice processing pipeline" - -BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID, - NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, - NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, - NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, - NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID - } - def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str: start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY, @@ -114,7 +99,8 @@ def _stop_processing(): stop_processing = PythonOperator( task_id=STOP_PROCESSING_TASK_ID, trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - python_callable=_stop_processing + python_callable=_stop_processing, + outlets=NOTICES_COLLECTION_DATASET ) notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline, diff --git a/dags/reprocess_notices_from_backlog_by_id.py b/dags/reprocess_notices_from_backlog_by_id.py index 0c9d66c87..4b05077e3 100644 --- a/dags/reprocess_notices_from_backlog_by_id.py +++ b/dags/reprocess_notices_from_backlog_by_id.py @@ -1,10 +1,9 @@ from airflow.decorators import dag, task from airflow.models import Param -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.dags_utils import push_dag_downstream, get_dag_param from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator -from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType diff --git a/dags/reprocess_notices_from_backlog_by_status.py b/dags/reprocess_notices_from_backlog_by_status.py index c3cc8aabc..8554303db 100644 --- a/dags/reprocess_notices_from_backlog_by_status.py +++ b/dags/reprocess_notices_from_backlog_by_status.py @@ -1,9 +1,8 @@ from airflow.decorators import dag, task from airflow.models import Param -from dags import DEFAULT_DAG_ARGUMENTS +from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.dags_utils import push_dag_downstream, get_dag_param -from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status from ted_sws.core.model.notice import NoticeStatus diff --git a/tests/unit/dags/conftest.py b/tests/unit/dags/conftest.py index 85f8bae95..5d5857768 100644 --- a/tests/unit/dags/conftest.py +++ b/tests/unit/dags/conftest.py @@ -1,19 +1,40 @@ +import os +import tempfile +from typing import Generator + import pytest +from airflow.models import DagBag, Variable from airflow.timetables.trigger import CronTriggerTimetable +from airflow.utils.db import resetdb, initdb + +from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME +from dags.fetch_notices_by_date import FETCHER_DAG_NAME +from tests import AIRFLOW_DAG_FOLDER -from dags.daily_materialized_views_update import DAG_ID as DAILY_MATERIALISED_VIEWS_DAG_NAME -from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME +@pytest.fixture(scope="session", autouse=True) +def setup_airflow_for_all_tests() -> Generator[None, None, None]: + temp_db_file = tempfile.NamedTemporaryFile(mode="w+", suffix=".db") + os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = f"sqlite:///{temp_db_file.name}" + os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False" + os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True" + resetdb() + initdb() + yield + if os.path.exists(temp_db_file.name): + temp_db_file.close() -# @pytest.fixture -# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag: -# Variable.delete(key=dag_materialised_view_update_schedule_variable_name) -# Variable.delete(key=dag_fetch_schedule_variable_name) -# return DagBag( -# dag_folder=AIRFLOW_DAG_FOLDER, -# include_examples=False, -# read_dags_from_db=False, -# collect_dags=True) + +@pytest.fixture +def dag_bag(dag_materialised_view_update_schedule_variable_name: str, + dag_fetch_schedule_variable_name: str) -> DagBag: + Variable.delete(key=dag_materialised_view_update_schedule_variable_name) + Variable.delete(key=dag_fetch_schedule_variable_name) + return DagBag( + dag_folder=AIRFLOW_DAG_FOLDER, + include_examples=False, # Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False" + read_dags_from_db=False, + collect_dags=True) @pytest.fixture diff --git a/tests/unit/dags/test_dags.py b/tests/unit/dags/test_dags.py new file mode 100644 index 000000000..9ab2e142e --- /dev/null +++ b/tests/unit/dags/test_dags.py @@ -0,0 +1,8 @@ +from airflow.models import DagBag + + +def test_dags_are_loaded_successfully(dag_bag: DagBag): + assert dag_bag.import_errors == {} + for dag in dag_bag.dags.values(): + assert dag is not None + assert len(dag.tasks) > 0 \ No newline at end of file diff --git a/tests/unit/dags/test_daily_materialized_views_update_dag.py b/tests/unit/dags/test_daily_materialized_views_update_dag.py new file mode 100644 index 000000000..67ca0bac5 --- /dev/null +++ b/tests/unit/dags/test_daily_materialized_views_update_dag.py @@ -0,0 +1,42 @@ +from airflow.models import DagBag + + +def test_daily_materialized_views_update_dag_loaded(dag_bag: DagBag, daily_materialised_views_dag_id: str): + assert daily_materialised_views_dag_id in dag_bag.dags + dag = dag_bag.dags[daily_materialised_views_dag_id] + assert dag is not None + + +def test_daily_materialized_views_update_dag_structure(dag_bag: DagBag, daily_materialised_views_dag_id: str): + dag = dag_bag.dags[daily_materialised_views_dag_id] + + task_ids = [task.task_id for task in dag.tasks] + expected_tasks = [ + "create_materialised_view", + "create_kpi_collection_for_notices", + "aggregate_batch_logs" + ] + for task_id in expected_tasks: + assert task_id in task_ids + + assert len(dag.tasks) == 3 + + create_view_task = dag.get_task("create_materialised_view") + kpi_collection_task = dag.get_task("create_kpi_collection_for_notices") + aggregate_logs_task = dag.get_task("aggregate_batch_logs") + + assert kpi_collection_task.task_id in [task.task_id for task in create_view_task.downstream_list] + assert aggregate_logs_task.task_id in [task.task_id for task in kpi_collection_task.downstream_list] + + assert create_view_task.task_id in [task.task_id for task in kpi_collection_task.upstream_list] + assert kpi_collection_task.task_id in [task.task_id for task in aggregate_logs_task.upstream_list] + + +def test_daily_materialized_views_update_dag_default_args(dag_bag: DagBag, daily_materialised_views_dag_id: str): + assert daily_materialised_views_dag_id in dag_bag.dags + dag = dag_bag.dags[daily_materialised_views_dag_id] + + assert dag.max_active_runs == 1 + assert not dag.catchup + assert "mongodb" in dag.tags + assert "daily-views-update" in dag.tags diff --git a/tests/unit/event_manager/services/test_logger_from_context.py b/tests/unit/event_manager/services/test_logger_from_context.py index 17d2fbbfc..96785a52a 100644 --- a/tests/unit/event_manager/services/test_logger_from_context.py +++ b/tests/unit/event_manager/services/test_logger_from_context.py @@ -56,7 +56,7 @@ def test_get_dag_run_id_from_dag_context(fake_dag_context): def test_handle_event_message_metadata_dag_context(fake_dag_context): - process_name = "DAG_NAME" + process_name = "FETCHER_DAG_NAME" process_id = "DAG_RUN_ID" metadata = handle_event_message_metadata_dag_context(ps_name=process_name, ps_id=process_id, ps_context={}) @@ -70,7 +70,7 @@ def test_handle_event_message_metadata_dag_context(fake_dag_context): def test_handle_event_message_metadata_context(): - process_name = "DAG_NAME" + process_name = "FETCHER_DAG_NAME" process_id = "DAG_RUN_ID" process_type = EventMessageProcessType.CLI diff --git a/tox.ini b/tox.ini index 68ab7ec11..9f07119a9 100644 --- a/tox.ini +++ b/tox.ini @@ -45,7 +45,9 @@ commands = [coverage:run] relative_files = True -source = ted_sws/ +source = + ted_sws/ + dags/ branch = False @@ -57,6 +59,7 @@ log_cli_date_format = %Y-%m-%d %H:%M:%S addopts = --cov=ted_sws + --cov=dags --cov-report=html --cov-report=term --cov-report=xml