-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathload_mapping_suite_in_database.py
More file actions
124 lines (111 loc) · 5.6 KB
/
load_mapping_suite_in_database.py
File metadata and controls
124 lines (111 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from airflow.decorators import dag, task
from airflow.models import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from pymongo import MongoClient
from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE
from dags.dags_utils import push_dag_downstream, pull_dag_upstream, get_dag_param
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from ted_sws import config
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.adapters.event_logger import EventLogger
from ted_sws.event_manager.model.event_message import MappingSuiteEventMessage
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
handle_event_message_metadata_dag_context
from ted_sws.mapping_suite_processor.services.conceptual_mapping_processor import \
mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db
FETCH_MAPPING_SUITE_PACKAGE_FROM_GITHUB_INTO_MONGODB = "fetch_mapping_suite_package_from_github_into_mongodb"
MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY = 'mapping_suite_package_name'
LOAD_TEST_DATA_DAG_PARAM_KEY = 'load_test_data'
BRANCH_OR_TAG_NAME_DAG_PARAM_KEY = "branch_or_tag_name"
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY = "github_repository_url"
FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite"
TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline"
CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data"
DAG_ID = "load_mapping_suite_in_database"
DAG_NAME = "Load mapping suite"
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
tags=['fetch', 'mapping-suite', 'github'],
params={
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Github repository url",
description="""This is optional field.
Github repository url to fetch mapping suite package from."""
),
BRANCH_OR_TAG_NAME_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Branch or tag name",
description="""This is optional field.
Branch or tag name to fetch mapping suite package from."""
),
MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY: Param(
default=None,
type=["null", "string"],
title="Mapping suite package name",
description="""This is optional field.
Mapping suite package name to fetch from github repository."""
),
LOAD_TEST_DATA_DAG_PARAM_KEY: Param(
default=False,
type="boolean",
title="Load test data",
description="""This field is used to load test data."""
)
}
)
def load_mapping_suite_in_database():
@task
@event_log(is_loggable=False)
def fetch_mapping_suite_package_from_github_into_mongodb(**context_args):
"""
:return:
"""
event_logger: EventLogger = get_logger_from_dag_context(context_args)
event_message = MappingSuiteEventMessage()
event_message.start_record()
context = get_current_context()
load_test_data = get_dag_param(key=LOAD_TEST_DATA_DAG_PARAM_KEY, default_value=False)
mapping_suite_package_name = get_dag_param(key=MAPPING_SUITE_PACKAGE_NAME_DAG_PARAM_KEY)
branch_or_tag_name = get_dag_param(key=BRANCH_OR_TAG_NAME_DAG_PARAM_KEY)
github_repository_url = get_dag_param(key=GITHUB_REPOSITORY_URL_DAG_PARAM_KEY)
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_ids = mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(
mongodb_client=mongodb_client,
mapping_suite_package_name=mapping_suite_package_name,
load_test_data=load_test_data,
branch_or_tag_name=branch_or_tag_name,
github_repository_url=github_repository_url
)
notice_ids = list(set(notice_ids))
if load_test_data:
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
handle_event_message_metadata_dag_context(event_message, context)
if mapping_suite_package_name:
event_message.mapping_suite_id = mapping_suite_package_name
event_message.end_record()
event_logger.info(event_message)
def _branch_selector():
load_test_data = get_dag_param(key=LOAD_TEST_DATA_DAG_PARAM_KEY, default_value=False)
if load_test_data:
push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY))
return [TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID]
return [FINISH_LOADING_MAPPING_SUITE_TASK_ID]
branch_task = BranchPythonOperator(
task_id=CHECK_IF_LOAD_TEST_DATA_TASK_ID,
python_callable=_branch_selector,
)
finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID,
batch_size=BATCH_SIZE)
fetch_mapping_suite_package_from_github_into_mongodb() >> branch_task
trigger_document_proc_pipeline >> finish_step
branch_task >> [trigger_document_proc_pipeline, finish_step]
dag = load_mapping_suite_in_database()