Skip to content

Commit 726ac53

Browse files
authored
Merge pull request #583 from OP-TED/feature/SWS2-18
Feature/sws2 18
2 parents 366b5d4 + efeb8aa commit 726ac53

13 files changed

Lines changed: 152 additions & 65 deletions

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,5 @@ package.json
120120
.DS_Store
121121
.scannerwork/*
122122
/infra/alpine/libraries/
123+
/infra/airflow
124+
/libraries

Makefile

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ ENV_FILE := .env
1212

1313
PROJECT_PATH = $(shell pwd)
1414
AIRFLOW_INFRA_FOLDER ?= ${PROJECT_PATH}/.airflow
15-
RML_MAPPER_PATH = ${PROJECT_PATH}/.rmlmapper/rmlmapper.jar
16-
XML_PROCESSOR_PATH = ${PROJECT_PATH}/.saxon/saxon-he-10.9.jar
17-
LIMES_ALIGNMENT_PATH = $(PROJECT_PATH)/.limes/limes.jar
15+
LIBRARIES_PATH = ${PROJECT_PATH}/libraries
16+
RML_MAPPER_PATH = ${LIBRARIES_PATH}/.rmlmapper/rmlmapper.jar
17+
XML_PROCESSOR_PATH = ${LIBRARIES_PATH}/.saxon/saxon-he-10.9.jar
18+
LIMES_ALIGNMENT_PATH = $(LIBRARIES_PATH)/.limes/limes.jar
1819
HOSTNAME = $(shell hostname)
1920
CAROOT = $(shell pwd)/infra/traefik/certs
2021

@@ -215,18 +216,19 @@ stop-metabase:
215216

216217
init-rml-mapper:
217218
@ echo -e "RMLMapper folder initialisation!"
218-
@ mkdir -p ./.rmlmapper
219-
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./.rmlmapper/rmlmapper.jar
219+
@ mkdir -p ./libraries/.rmlmapper
220+
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.2.2/rmlmapper-6.2.2-r371-all.jar -O ./libraries/.rmlmapper/rmlmapper.jar
220221

221222
init-limes:
222223
@ echo -e "Limes folder initialisation!"
223-
@ mkdir -p ./.limes
224-
@ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./.limes
224+
@ mkdir -p ./libraries/.limes
225+
@ wget -c https://github.com/dice-group/LIMES/releases/download/1.7.9/limes.jar -P ./libraries/.limes/
225226

226227
init-saxon:
227228
@ echo -e "$(BUILD_PRINT)Saxon folder initialization $(END_BUILD_PRINT)"
228-
@ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P .saxon/
229-
@ cd .saxon && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip
229+
@ mkdir -p ./libraries/.saxon
230+
@ wget -c https://github.com/Saxonica/Saxon-HE/releases/download/SaxonHE10-9/SaxonHE10-9J.zip -P ./libraries/.saxon/
231+
@ cd ./libraries/.saxon/ && unzip SaxonHE10-9J.zip && rm -rf SaxonHE10-9J.zip
230232

231233
start-project-services: | start-airflow start-mongo init-rml-mapper init-limes start-allegro-graph start-metabase
232234
stop-project-services: | stop-airflow stop-mongo stop-allegro-graph stop-metabase

dags/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import datetime, timedelta
22

3+
from airflow import Dataset
4+
35
DEFAULT_DAG_ARGUMENTS = {
46
"owner": "airflow",
57
"depends_on_past": False,
@@ -13,3 +15,27 @@
1315
"concurrency": 15,
1416
"execution_timeout": timedelta(days=10),
1517
}
18+
19+
NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
20+
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
21+
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
22+
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
23+
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
24+
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
25+
STOP_PROCESSING_TASK_ID = "stop_processing"
26+
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
27+
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
28+
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
29+
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
30+
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
31+
32+
BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
33+
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
34+
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
35+
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
36+
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
37+
}
38+
39+
# This is a formal name, not an actual connection string.
40+
NOTICES_COLLECTION_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection")
41+
MATERIALISED_VIEW_DATASET: Dataset = Dataset("db://aggregates_db/notices_collection_materialised_view")

dags/daily_materialized_views_update.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,37 @@
11
from airflow.decorators import dag, task
2-
from airflow.timetables.trigger import CronTriggerTimetable
32
from pymongo import MongoClient
43

5-
from dags import DEFAULT_DAG_ARGUMENTS
6-
from ted_sws import config, DAG_DEFAULT_TIMEZONE
4+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICES_COLLECTION_DATASET, MATERIALISED_VIEW_DATASET
5+
from ted_sws import config
76
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
87
create_batch_collection_materialised_view
98
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
109
create_notice_collection_materialised_view, create_notice_kpi_collection
1110

12-
DAG_ID = "daily_materialized_views_update"
11+
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
12+
DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS: int = 1
1313
DAG_NAME = "Materialized views update"
1414

1515

1616
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
17-
dag_id=DAG_ID,
17+
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
1818
dag_display_name=DAG_NAME,
1919
catchup=False,
20-
timetable=CronTriggerTimetable(
21-
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
22-
timezone=DAG_DEFAULT_TIMEZONE),
20+
schedule=NOTICES_COLLECTION_DATASET,
21+
max_active_runs=DAILY_MATERIALISED_VIEWS_MAX_ACTIVE_RUNS,
2322
tags=['mongodb', 'daily-views-update'])
2423
def daily_materialized_views_update():
25-
@task
24+
@task(inlets=[NOTICES_COLLECTION_DATASET])
2625
def create_materialised_view():
2726
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
2827
create_notice_collection_materialised_view(mongo_client=mongo_client)
2928

30-
@task
29+
@task(inlets=[NOTICES_COLLECTION_DATASET])
3130
def create_kpi_collection_for_notices():
3231
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
3332
create_notice_kpi_collection(mongo_client=mongo_client)
3433

35-
@task
34+
@task(inlets=[NOTICES_COLLECTION_DATASET], outlets=[MATERIALISED_VIEW_DATASET])
3635
def aggregate_batch_logs():
3736
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
3837
create_batch_collection_materialised_view(mongo_client=mongo_client)

dags/fetch_notices_by_date.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ted_sws.event_manager.services.log import log_error
1919

2020
DAG_ID = "fetch_notices_by_date"
21-
DAG_NAME = "Fetch notices by date"
21+
FETCHER_DAG_NAME = "Fetch notices by date"
2222
BATCH_SIZE = 2000
2323
WILD_CARD_DAG_KEY = "wild_card"
2424
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -31,9 +31,9 @@
3131

3232
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3333
dag_id=DAG_ID,
34-
dag_display_name=DAG_NAME,
34+
dag_display_name=FETCHER_DAG_NAME,
3535
catchup=False,
36-
timetable=CronTriggerTimetable(
36+
schedule=CronTriggerTimetable(
3737
cron=config.SCHEDULE_DAG_FETCH,
3838
timezone=DAG_DEFAULT_TIMEZONE),
3939
tags=['selector', 'daily-fetch'],

dags/notice_processing_pipeline.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,24 @@
11
from typing import List
22

3-
from airflow.operators.python import BranchPythonOperator, PythonOperator
43
from airflow.decorators import dag
4+
from airflow.operators.python import BranchPythonOperator, PythonOperator
55
from airflow.utils.trigger_rule import TriggerRule
66

7-
from dags import DEFAULT_DAG_ARGUMENTS
7+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID, STOP_PROCESSING_TASK_ID, \
8+
BRANCH_SELECTOR_MAP, NOTICE_TRANSFORMATION_PIPELINE_TASK_ID, NOTICE_VALIDATION_PIPELINE_TASK_ID, \
9+
NOTICE_PACKAGE_PIPELINE_TASK_ID, NOTICE_PUBLISH_PIPELINE_TASK_ID, BRANCH_SELECTOR_TASK_ID, \
10+
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID, SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID, \
11+
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID, SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID, \
12+
NOTICE_DISTILLATION_PIPELINE_TASK_ID, NOTICES_COLLECTION_DATASET
813
from dags.dags_utils import get_dag_param, smart_xcom_push, smart_xcom_forward, smart_xcom_pull
914
from dags.operators.DagBatchPipelineOperator import NoticeBatchPipelineOperator, NOTICE_IDS_KEY, \
1015
EXECUTE_ONLY_ONE_STEP_KEY, START_WITH_STEP_NAME_KEY
1116
from dags.pipelines.notice_batch_processor_pipelines import notices_batch_distillation_pipeline
1217
from dags.pipelines.notice_processor_pipelines import notice_normalisation_pipeline, notice_transformation_pipeline, \
1318
notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline
1419

15-
NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
16-
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
17-
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
18-
NOTICE_VALIDATION_PIPELINE_TASK_ID = "notice_validation_pipeline"
19-
NOTICE_PACKAGE_PIPELINE_TASK_ID = "notice_package_pipeline"
20-
NOTICE_PUBLISH_PIPELINE_TASK_ID = "notice_publish_pipeline"
21-
STOP_PROCESSING_TASK_ID = "stop_processing"
22-
BRANCH_SELECTOR_TASK_ID = 'branch_selector'
23-
SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID = "switch_to_transformation"
24-
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
25-
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
26-
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
20+
DAG_NAME = "notice_processing_pipeline"
2721
DAG_ID = "notice_processing_pipeline"
28-
DAG_NAME = "Notice processing pipeline"
29-
30-
BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
31-
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
32-
NOTICE_VALIDATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
33-
NOTICE_PACKAGE_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
34-
NOTICE_PUBLISH_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID
35-
}
36-
3722

3823
def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_IDS_KEY]) -> str:
3924
start_with_step_name = get_dag_param(key=START_WITH_STEP_NAME_KEY,
@@ -114,7 +99,8 @@ def _stop_processing():
11499
stop_processing = PythonOperator(
115100
task_id=STOP_PROCESSING_TASK_ID,
116101
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
117-
python_callable=_stop_processing
102+
python_callable=_stop_processing,
103+
outlets=NOTICES_COLLECTION_DATASET
118104
)
119105

120106
notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline,

dags/reprocess_notices_from_backlog_by_id.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
from airflow.decorators import dag, task
22
from airflow.models import Param
33

4-
from dags import DEFAULT_DAG_ARGUMENTS
4+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID
55
from dags.dags_utils import push_dag_downstream, get_dag_param
66
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
7-
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
87
from ted_sws.event_manager.adapters.event_log_decorator import event_log
98
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType
109

dags/reprocess_notices_from_backlog_by_status.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from airflow.decorators import dag, task
22
from airflow.models import Param
33

4-
from dags import DEFAULT_DAG_ARGUMENTS
4+
from dags import DEFAULT_DAG_ARGUMENTS, NOTICE_NORMALISATION_PIPELINE_TASK_ID
55
from dags.dags_utils import push_dag_downstream, get_dag_param
6-
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
76
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
87
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
98
from ted_sws.core.model.notice import NoticeStatus

tests/unit/dags/conftest.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,40 @@
1+
import os
2+
import tempfile
3+
from typing import Generator
4+
15
import pytest
6+
from airflow.models import DagBag, Variable
27
from airflow.timetables.trigger import CronTriggerTimetable
8+
from airflow.utils.db import resetdb, initdb
9+
10+
from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
11+
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
12+
from tests import AIRFLOW_DAG_FOLDER
313

4-
from dags.daily_materialized_views_update import DAG_ID as DAILY_MATERIALISED_VIEWS_DAG_NAME
5-
from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME
614

15+
@pytest.fixture(scope="session", autouse=True)
16+
def setup_airflow_for_all_tests() -> Generator[None, None, None]:
17+
temp_db_file = tempfile.NamedTemporaryFile(mode="w+", suffix=".db")
18+
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = f"sqlite:///{temp_db_file.name}"
19+
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
20+
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
21+
resetdb()
22+
initdb()
23+
yield
24+
if os.path.exists(temp_db_file.name):
25+
temp_db_file.close()
726

8-
# @pytest.fixture
9-
# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
10-
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
11-
# Variable.delete(key=dag_fetch_schedule_variable_name)
12-
# return DagBag(
13-
# dag_folder=AIRFLOW_DAG_FOLDER,
14-
# include_examples=False,
15-
# read_dags_from_db=False,
16-
# collect_dags=True)
27+
28+
@pytest.fixture
29+
def dag_bag(dag_materialised_view_update_schedule_variable_name: str,
30+
dag_fetch_schedule_variable_name: str) -> DagBag:
31+
Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
32+
Variable.delete(key=dag_fetch_schedule_variable_name)
33+
return DagBag(
34+
dag_folder=AIRFLOW_DAG_FOLDER,
35+
include_examples=False, # Same as: os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
36+
read_dags_from_db=False,
37+
collect_dags=True)
1738

1839

1940
@pytest.fixture

tests/unit/dags/test_dags.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from airflow.models import DagBag
2+
3+
4+
def test_dags_are_loaded_successfully(dag_bag: DagBag):
5+
assert dag_bag.import_errors == {}
6+
for dag in dag_bag.dags.values():
7+
assert dag is not None
8+
assert len(dag.tasks) > 0

0 commit comments

Comments
 (0)