Skip to content

Commit 6c03617

Browse files
authored
Merge pull request #562 from OP-TED/feature/SWS1-14
Fix issue #557
2 parents 17d8e24 + 065e3b9 commit 6c03617

10 files changed

Lines changed: 298 additions & 35 deletions

dags/daily_materialized_views_update.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
from airflow.decorators import dag, task
2+
from airflow.timetables.trigger import CronTriggerTimetable
23
from pymongo import MongoClient
34

45
from dags import DEFAULT_DAG_ARGUMENTS
5-
from ted_sws import config
6+
from ted_sws import config, DAG_DEFAULT_TIMEZONE
67
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
78
create_batch_collection_materialised_view
89
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
910
create_notice_collection_materialised_view, create_notice_kpi_collection
1011

11-
DAG_NAME = "daily_materialized_views_update"
12+
DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
1213

1314

1415
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
16+
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
1517
catchup=False,
16-
schedule_interval="0 6 * * *",
18+
timetable=CronTriggerTimetable(
19+
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
20+
timezone=DAG_DEFAULT_TIMEZONE),
1721
tags=['mongodb', 'daily-views-update'])
1822
def daily_materialized_views_update():
1923
@task

dags/fetch_notices_by_date.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@
33
from airflow.decorators import dag, task
44
from airflow.operators.dummy import DummyOperator
55
from airflow.operators.python import BranchPythonOperator, PythonOperator
6-
from airflow.utils.trigger_rule import TriggerRule
76
from airflow.timetables.trigger import CronTriggerTimetable
7+
from airflow.utils.trigger_rule import TriggerRule
88

99
from dags import DEFAULT_DAG_ARGUMENTS
1010
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
1111
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
1212
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline
13+
from ted_sws import config, DAG_DEFAULT_TIMEZONE
1314
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1415
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1516
EventMessageProcessType
1617
from ted_sws.event_manager.services.log import log_error
1718

18-
DAG_NAME = "fetch_notices_by_date"
19+
FETCHER_DAG_NAME = "fetch_notices_by_date"
1920
BATCH_SIZE = 2000
2021
WILD_CARD_DAG_KEY = "wild_card"
2122
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
@@ -27,15 +28,18 @@
2728

2829

2930
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
31+
dag_id=FETCHER_DAG_NAME,
3032
catchup=False,
31-
timetable=CronTriggerTimetable('0 1 * * *', timezone='UTC'),
33+
timetable=CronTriggerTimetable(
34+
cron=config.SCHEDULE_DAG_FETCH,
35+
timezone=DAG_DEFAULT_TIMEZONE),
3236
tags=['selector', 'daily-fetch'])
3337
def fetch_notices_by_date():
3438
@task
3539
@event_log(TechnicalEventMessage(
3640
message="fetch_notice_from_ted",
3741
metadata=EventMessageMetadata(
38-
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
42+
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
3943
))
4044
)
4145
def fetch_by_date_notice_from_ted():

dags/fetch_notices_by_date_range.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from dags import DEFAULT_DAG_ARGUMENTS
1010
from dags.dags_utils import get_dag_param
1111
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
12-
DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
12+
FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
1313
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1414
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1515
EventMessageProcessType

sonar-project.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ sonar.projectVersion=0.1.0
66

77
# Comma-separated paths to directories with sources (required)
88
# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
9-
sonar.sources=ted_sws, dags, notebooks, infra
9+
sonar.sources=ted_sws, notebooks, infra
1010

1111
# Language
1212
sonar.language=py

ted_sws/__init__.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
PROJECT_PATH = pathlib.Path(__file__).parent.resolve()
3636
SPARQL_PREFIXES_PATH = PROJECT_PATH / "resources" / "prefixes" / "prefixes.json"
3737

38+
DAG_FETCH_DEFAULT_TIMETABLE = "0 1 * * *"
39+
DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE = "0 6 * * *"
40+
DAG_DEFAULT_TIMEZONE = "UTC"
3841

3942
class MongoDBConfig:
4043

@@ -263,9 +266,19 @@ def S3_PUBLISH_ENABLED(self, config_value: str) -> bool:
263266
return config_value.lower() in ["1", "true"]
264267

265268

269+
class DagSchedulingConfig:
270+
271+
@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_FETCH_DEFAULT_TIMETABLE)
272+
def SCHEDULE_DAG_FETCH(self, config_value: str) -> str:
273+
return config_value
274+
275+
@env_property(config_resolver_class=AirflowAndEnvConfigResolver, default_value=DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE)
276+
def SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE(self, config_value: str) -> str:
277+
return config_value
278+
266279
class TedConfigResolver(MongoDBConfig, RMLMapperConfig, XMLProcessorConfig, ELKConfig, LoggingConfig,
267280
GitHubArtefacts, API, AllegroConfig, TedAPIConfig, SFTPConfig, FusekiConfig,
268-
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig):
281+
SPARQLConfig, LimesAlignmentConfig, S3PublishConfig, DagSchedulingConfig):
269282
"""
270283
This class resolve the secrets of the ted-sws project.
271284
"""

tests/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
TEST_DATA_PATH = TESTS_PATH / 'test_data'
2121

22+
AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags"
23+
2224

2325
class temporary_copy(object):
2426
"""
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# import os
2+
#
3+
# from airflow import DAG
4+
# from airflow.models import DagBag, Variable
5+
# from airflow.timetables.trigger import CronTriggerTimetable
6+
#
7+
# from ted_sws import DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE
8+
#
9+
#
10+
# def test_daily_materialised_view_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag,
11+
# dag_materialised_view_update_schedule_variable_name: str,
12+
# daily_materialised_views_dag_id: str,
13+
# example_dag_cron_table: CronTriggerTimetable,
14+
# airflow_timetable_import_error_message: str):
15+
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
16+
#
17+
# assert daily_materialised_view_dag is not None
18+
# assert daily_materialised_view_dag.schedule_interval != example_dag_cron_table._expression
19+
#
20+
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_dag_cron_table._expression)
21+
# dag_bag.collect_dags(only_if_updated=False)
22+
# daily_materialised_view_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
23+
#
24+
# assert daily_materialised_view_dag is not None
25+
# assert daily_materialised_view_dag.schedule_interval == example_dag_cron_table._expression
26+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
27+
#
28+
#
29+
# def test_daily_materialised_view_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag,
30+
# dag_materialised_view_update_schedule_variable_name: str,
31+
# daily_materialised_views_dag_id: str,
32+
# example_dag_cron_table: CronTriggerTimetable,
33+
# airflow_timetable_import_error_message: str):
34+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
35+
#
36+
# assert fetcher_dag is not None
37+
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
38+
#
39+
# os.environ[dag_materialised_view_update_schedule_variable_name] = example_dag_cron_table._expression
40+
# dag_bag.collect_dags(only_if_updated=False)
41+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
42+
#
43+
# assert fetcher_dag is not None
44+
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
45+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
46+
#
47+
#
48+
# def test_daily_materialised_view_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
49+
# dag_materialised_view_update_schedule_variable_name: str,
50+
# daily_materialised_views_dag_id: str,
51+
# airflow_timetable_import_error_message: str):
52+
# env_var_value = os.getenv(dag_materialised_view_update_schedule_variable_name)
53+
# is_env_var_set: bool = True if env_var_value is not None else False
54+
# if is_env_var_set:
55+
# del os.environ[dag_materialised_view_update_schedule_variable_name]
56+
# airflow_var_value = Variable.get(key=dag_materialised_view_update_schedule_variable_name, default_var=None)
57+
# is_airflow_var_set: bool = True if airflow_var_value is not None else False
58+
# if is_airflow_var_set:
59+
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
60+
#
61+
# dag_bag.collect_dags(only_if_updated=False)
62+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
63+
#
64+
# assert fetcher_dag is not None
65+
# assert fetcher_dag.schedule_interval == DAG_MATERIALIZED_VIEW_UPDATE_DEFAULT_TIMETABLE
66+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
67+
#
68+
# if is_env_var_set:
69+
# os.environ[dag_materialised_view_update_schedule_variable_name] = env_var_value
70+
# if is_airflow_var_set:
71+
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=airflow_var_value)
72+
#
73+
#
74+
# def test_daily_materialised_view_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
75+
# dag_materialised_view_update_schedule_variable_name: str,
76+
# daily_materialised_views_dag_id: str,
77+
# example_wrong_cron_table: str,
78+
# airflow_timetable_import_error_message: str):
79+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=daily_materialised_views_dag_id)
80+
#
81+
# assert fetcher_dag is not None
82+
#
83+
# Variable.set(key=dag_materialised_view_update_schedule_variable_name, value=example_wrong_cron_table)
84+
#
85+
# dag_bag.collect_dags(only_if_updated=False)
86+
#
87+
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values())
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# import os
2+
#
3+
# from airflow import DAG
4+
# from airflow.models import DagBag, Variable
5+
# from airflow.timetables.trigger import CronTriggerTimetable
6+
#
7+
# from ted_sws import DAG_FETCH_DEFAULT_TIMETABLE
8+
#
9+
#
10+
# def test_fetcher_change_timetable_from_airflow_variable_after_reparse(dag_bag: DagBag,
11+
# dag_fetch_schedule_variable_name: str,
12+
# fetcher_dag_name: str,
13+
# example_dag_cron_table: CronTriggerTimetable,
14+
# airflow_timetable_import_error_message: str):
15+
# dag_bag.collect_dags(only_if_updated=False)
16+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
17+
#
18+
# assert fetcher_dag is not None
19+
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
20+
#
21+
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_dag_cron_table._expression)
22+
# dag_bag.collect_dags(only_if_updated=False)
23+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
24+
#
25+
# assert fetcher_dag is not None
26+
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
27+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
28+
#
29+
#
30+
# def test_fetcher_change_timetable_from_env_variable_after_reparse(dag_bag: DagBag,
31+
# dag_fetch_schedule_variable_name: str,
32+
# fetcher_dag_name: str,
33+
# example_dag_cron_table: CronTriggerTimetable,
34+
# airflow_timetable_import_error_message: str):
35+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
36+
#
37+
# assert fetcher_dag is not None
38+
# assert fetcher_dag.schedule_interval != example_dag_cron_table._expression
39+
#
40+
# os.environ[dag_fetch_schedule_variable_name] = example_dag_cron_table._expression
41+
# dag_bag.collect_dags(only_if_updated=False)
42+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
43+
#
44+
# assert fetcher_dag is not None
45+
# assert fetcher_dag.schedule_interval == example_dag_cron_table._expression
46+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
47+
#
48+
# del os.environ[dag_fetch_schedule_variable_name]
49+
#
50+
#
51+
# def test_fetcher_has_default_timetable_if_no_variable_is_set_after_reparse(dag_bag: DagBag,
52+
# dag_fetch_schedule_variable_name: str,
53+
# fetcher_dag_name: str,
54+
# airflow_timetable_import_error_message: str):
55+
# env_var_value = os.getenv(dag_fetch_schedule_variable_name)
56+
# is_env_var_set: bool = True if env_var_value is not None else False
57+
# if is_env_var_set:
58+
# del os.environ[dag_fetch_schedule_variable_name]
59+
# airflow_var_value = Variable.get(key=dag_fetch_schedule_variable_name, default_var=None)
60+
# is_airflow_var_set: bool = True if airflow_var_value is not None else False
61+
# if is_airflow_var_set:
62+
# Variable.delete(key=dag_fetch_schedule_variable_name)
63+
#
64+
# dag_bag.collect_dags(only_if_updated=False)
65+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
66+
#
67+
# assert fetcher_dag is not None
68+
# assert fetcher_dag.schedule_interval == DAG_FETCH_DEFAULT_TIMETABLE
69+
# assert all(airflow_timetable_import_error_message not in error for error in dag_bag.import_errors.values())
70+
#
71+
# if is_env_var_set:
72+
# os.environ[dag_fetch_schedule_variable_name] = env_var_value
73+
# if is_airflow_var_set:
74+
# Variable.set(key=dag_fetch_schedule_variable_name, value=airflow_var_value)
75+
#
76+
#
77+
# def test_fetcher_gets_incorrect_timetable_after_reparse(dag_bag: DagBag,
78+
# dag_fetch_schedule_variable_name: str,
79+
# fetcher_dag_name: str,
80+
# example_wrong_cron_table: str,
81+
# airflow_timetable_import_error_message: str):
82+
# fetcher_dag: DAG = dag_bag.get_dag(dag_id=fetcher_dag_name)
83+
#
84+
# assert fetcher_dag is not None
85+
#
86+
# Variable.set(key=dag_fetch_schedule_variable_name, value=example_wrong_cron_table)
87+
# dag_bag.collect_dags(only_if_updated=False)
88+
#
89+
# assert any(airflow_timetable_import_error_message in error for error in dag_bag.import_errors.values())

tests/unit/dags/conftest.py

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,62 @@
1-
# import os
2-
#
3-
# import pytest
4-
#
5-
# from airflow.models import DagBag
6-
# from airflow.utils import db
7-
# import logging
8-
9-
from tests import TESTS_PATH
10-
11-
AIRFLOW_DAG_FOLDER = TESTS_PATH.parent.resolve() / "dags"
12-
13-
14-
# @pytest.fixture(scope="session")
15-
# def dag_bag():
16-
# os.environ["AIRFLOW_HOME"] = str(AIRFLOW_DAG_FOLDER)
17-
# os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
18-
# # Initialising the Airflow DB so that it works properly with the new AIRFLOW_HOME
19-
# logging.disable(logging.CRITICAL)
20-
# db.resetdb()
21-
# db.initdb()
22-
# logging.disable(logging.NOTSET)
23-
# dag_bag = DagBag(dag_folder=AIRFLOW_DAG_FOLDER, include_examples=False,
24-
# read_dags_from_db=False)
25-
# return dag_bag
1+
import pytest
2+
from airflow.timetables.trigger import CronTriggerTimetable
3+
4+
from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
5+
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
6+
7+
8+
# @pytest.fixture
9+
# def dag_bag(dag_materialised_view_update_schedule_variable_name, dag_fetch_schedule_variable_name) -> DagBag:
10+
# Variable.delete(key=dag_materialised_view_update_schedule_variable_name)
11+
# Variable.delete(key=dag_fetch_schedule_variable_name)
12+
# return DagBag(
13+
# dag_folder=AIRFLOW_DAG_FOLDER,
14+
# include_examples=False,
15+
# read_dags_from_db=False,
16+
# collect_dags=True)
17+
18+
19+
@pytest.fixture
20+
def fetcher_dag_name() -> str:
21+
return FETCHER_DAG_NAME
22+
23+
24+
@pytest.fixture
25+
def daily_materialised_views_dag_id() -> str:
26+
return DAILY_MATERIALISED_VIEWS_DAG_NAME
27+
28+
29+
@pytest.fixture
30+
def example_cron_table() -> str:
31+
return "15 14 1 * *"
32+
33+
34+
@pytest.fixture
35+
def example_wrong_cron_table() -> str:
36+
return "wrong_cron"
37+
38+
39+
@pytest.fixture
40+
def example_dag_cron_table(example_cron_table) -> CronTriggerTimetable:
41+
return CronTriggerTimetable(cron=example_cron_table, timezone="UTC")
42+
43+
44+
@pytest.fixture
45+
def airflow_timetable_import_error_message() -> str:
46+
return "FormatException"
47+
48+
49+
@pytest.fixture
50+
def dag_fetch_schedule_variable_name() -> str:
51+
"""
52+
According to MM of meeting with OP from 2024.12.28
53+
"""
54+
return "SCHEDULE_DAG_FETCH"
55+
56+
57+
@pytest.fixture
58+
def dag_materialised_view_update_schedule_variable_name() -> str:
59+
"""
60+
According to MM of meeting with OP from 2024.12.28
61+
"""
62+
return "SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE"

0 commit comments

Comments
 (0)