Skip to content

Commit b3a060c

Browse files
Update worker_single_notice_process_orchestrator.py
1 parent c94b21e commit b3a060c

1 file changed

Lines changed: 58 additions & 81 deletions

File tree

dags/worker_single_notice_process_orchestrator.py

Lines changed: 58 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,43 @@
1+
from airflow.decorators import dag
2+
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
13
from airflow.utils.trigger_rule import TriggerRule
24
from pymongo import MongoClient
35

6+
from dags import DEFAULT_DAG_ARGUMENTS
47
from dags.dags_utils import pull_dag_upstream, push_dag_downstream
58
from ted_sws import config
69
from ted_sws.core.model.manifestation import METSManifestation
7-
from ted_sws.core.model.notice import NoticeStatus
10+
from ted_sws.core.model.notice import NoticeStatus, Notice
811
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
912
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
10-
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice_by_id
11-
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice_by_id
12-
13-
from airflow.decorators import dag
14-
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
15-
16-
from dags import DEFAULT_DAG_ARGUMENTS
17-
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker_by_id
18-
from ted_sws.notice_packager.services.notice_packager import create_notice_package
19-
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
20-
from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id
21-
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_by_id_with_shacl_suite
22-
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_by_id_with_sparql_suite
23-
from ted_sws.event_manager.adapters.event_logger import EventLogger
13+
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
2414
from ted_sws.event_manager.adapters.event_log_decorator import event_log
15+
from ted_sws.event_manager.adapters.event_logger import EventLogger
2516
from ted_sws.event_manager.model.event_message import NoticeEventMessage, EventMessageProcessType, EventMessageMetadata, \
2617
TechnicalEventMessage
2718
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
2819
handle_event_message_metadata_dag_context, get_task_id_from_dag_context
20+
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
21+
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
22+
from ted_sws.notice_packager.services.notice_packager import create_notice_package
23+
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
24+
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
25+
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_with_shacl_suite
26+
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_with_sparql_suite
2927

3028
NOTICE_ID = "notice_id"
3129
MAPPING_SUITE_ID = "mapping_suite_id"
32-
DAG_NAME = "old_worker_single_notice_process_orchestrator"
30+
NOTICE_OBJECT = "notice_object"
31+
MAPPING_SUITE_OBJECT = "mapping_suite_object"
32+
DAG_NAME = "worker_single_notice_process_orchestrator"
3333

3434

3535
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
3636
schedule_interval=None,
3737
max_active_runs=128,
3838
concurrency=128,
3939
tags=['worker', 'pipeline'])
40-
def old_worker_single_notice_process_orchestrator():
40+
def worker_single_notice_process_orchestrator():
4141
"""
4242
4343
:return:
@@ -57,8 +57,10 @@ def _index_notice_xml_content():
5757
"""
5858
notice_id = pull_dag_upstream(NOTICE_ID)
5959
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
60-
index_notice_by_id(notice_id=notice_id, mongodb_client=mongodb_client)
61-
push_dag_downstream(NOTICE_ID, notice_id)
60+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
61+
notice = notice_repository.get(reference=notice_id)
62+
notice = index_notice(notice=notice)
63+
push_dag_downstream(NOTICE_OBJECT, notice)
6264

6365
@event_log(is_loggable=False)
6466
def _normalise_notice_metadata(**context_args):
@@ -70,19 +72,16 @@ def _normalise_notice_metadata(**context_args):
7072
event_message: NoticeEventMessage = NoticeEventMessage()
7173
event_message.start_record()
7274

73-
notice_id = pull_dag_upstream(NOTICE_ID)
74-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
75-
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
76-
normalised_notice = normalise_notice_by_id(notice_id=notice_id, notice_repository=notice_repository)
77-
notice_repository.update(notice=normalised_notice)
78-
push_dag_downstream(NOTICE_ID, notice_id)
75+
notice: Notice = pull_dag_upstream(NOTICE_OBJECT)
76+
normalised_notice = normalise_notice(notice=notice)
77+
push_dag_downstream(NOTICE_OBJECT, normalised_notice)
7978

8079
context = get_current_context()
8180

8281
handle_event_message_metadata_dag_context(event_message, context)
83-
event_message.notice_id = notice_id
82+
event_message.notice_id = notice.ted_id
8483
event_message.domain_action = get_task_id_from_dag_context(context)
85-
event_message.message = f"Normalising notice({notice_id}) metadata"
84+
event_message.message = f"Normalising notice({notice.ted_id}) metadata"
8685
event_message.end_record()
8786
event_logger.info(event_message)
8887

@@ -92,20 +91,18 @@ def _check_eligibility_for_transformation(**context_args):
9291
event_message: NoticeEventMessage = NoticeEventMessage()
9392
event_message.start_record()
9493

95-
notice_id = pull_dag_upstream(NOTICE_ID)
94+
notice = pull_dag_upstream(NOTICE_OBJECT)
9695
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
9796
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
9897
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
99-
result = notice_eligibility_checker_by_id(notice_id=notice_id,
100-
notice_repository=notice_repository,
101-
mapping_suite_repository=mapping_suite_repository)
98+
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
99+
notice_repository.update(notice=notice)
102100
mapping_suite_id = None
101+
notice_id = notice.ted_id
103102
if result:
104103
notice_id, mapping_suite_id = result
105-
106104
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
107105
push_dag_downstream(NOTICE_ID, notice_id)
108-
109106
context = get_current_context()
110107

111108
handle_event_message_metadata_dag_context(event_message, context)
@@ -123,16 +120,12 @@ def _preprocess_xml_manifestation(**context_args):
123120
event_message: NoticeEventMessage = NoticeEventMessage()
124121
event_message.start_record()
125122

126-
notice_id = pull_dag_upstream(NOTICE_ID)
123+
notice = pull_dag_upstream(NOTICE_OBJECT)
127124
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
128-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
129-
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
130-
notice = notice_repository.get(reference=notice_id)
131125
notice.update_status_to(new_status=NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION)
132-
notice_repository.update(notice=notice)
133-
push_dag_downstream(NOTICE_ID, notice_id)
126+
push_dag_downstream(NOTICE_OBJECT, notice)
134127
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
135-
128+
notice_id = notice.ted_id
136129
context = get_current_context()
137130

138131
handle_event_message_metadata_dag_context(event_message, context)
@@ -148,19 +141,16 @@ def _transform_notice(**context_args):
148141
event_logger: EventLogger = get_logger_from_dag_context(context_args)
149142
event_message: NoticeEventMessage = NoticeEventMessage()
150143
event_message.start_record()
151-
152-
notice_id = pull_dag_upstream(NOTICE_ID)
144+
notice = pull_dag_upstream(NOTICE_OBJECT)
153145
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
154146
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
155-
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
156147
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
148+
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
157149
rml_mapper = RMLMapper(rml_mapper_path=config.RML_MAPPER_PATH)
158-
transform_notice_by_id(notice_id=notice_id, mapping_suite_id=mapping_suite_id,
159-
notice_repository=notice_repository, mapping_suite_repository=mapping_suite_repository,
160-
rml_mapper=rml_mapper
161-
)
162-
push_dag_downstream(NOTICE_ID, notice_id)
163-
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
150+
result_notice = transform_notice(notice=notice, mapping_suite=mapping_suite, rml_mapper=rml_mapper)
151+
push_dag_downstream(NOTICE_OBJECT, result_notice)
152+
push_dag_downstream(MAPPING_SUITE_OBJECT, mapping_suite)
153+
notice_id = result_notice.ted_id
164154

165155
context = get_current_context()
166156

@@ -178,23 +168,19 @@ def _resolve_entities_in_the_rdf_manifestation(**context_args):
178168
event_message: NoticeEventMessage = NoticeEventMessage()
179169
event_message.start_record()
180170

181-
notice_id = pull_dag_upstream(NOTICE_ID)
182-
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
183-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
184-
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
185-
notice = notice_repository.get(reference=notice_id)
171+
notice = pull_dag_upstream(NOTICE_OBJECT)
172+
mapping_suite = pull_dag_upstream(MAPPING_SUITE_OBJECT)
186173
notice.set_distilled_rdf_manifestation(distilled_rdf_manifestation=notice.rdf_manifestation.copy())
187-
notice_repository.update(notice=notice)
188-
push_dag_downstream(NOTICE_ID, notice_id)
189-
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
190-
174+
push_dag_downstream(NOTICE_OBJECT, notice)
175+
push_dag_downstream(MAPPING_SUITE_OBJECT, mapping_suite)
176+
notice_id = notice.ted_id
191177
context = get_current_context()
192178

193179
handle_event_message_metadata_dag_context(event_message, context)
194180
event_message.notice_id = notice_id
195181
event_message.domain_action = get_task_id_from_dag_context(context)
196182
event_message.message = f"Resolving notice({notice_id}) entities in the RDF manifestation"
197-
event_message.metadata.process_context["mapping_suite_id"] = mapping_suite_id
183+
event_message.metadata.process_context["mapping_suite_id"] = mapping_suite.identifier
198184
event_message.end_record()
199185
event_logger.info(event_message)
200186

@@ -204,28 +190,19 @@ def _validate_transformed_rdf_manifestation(**context_args):
204190
event_message: NoticeEventMessage = NoticeEventMessage()
205191
event_message.start_record()
206192

207-
notice_id = pull_dag_upstream(NOTICE_ID)
208-
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
209-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
210-
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
211-
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
212-
validate_notice_by_id_with_sparql_suite(notice_id=notice_id, mapping_suite_identifier=mapping_suite_id,
213-
notice_repository=notice_repository,
214-
mapping_suite_repository=mapping_suite_repository)
215-
validate_notice_by_id_with_shacl_suite(notice_id=notice_id, mapping_suite_identifier=mapping_suite_id,
216-
notice_repository=notice_repository,
217-
mapping_suite_repository=mapping_suite_repository
218-
)
219-
push_dag_downstream(NOTICE_ID, notice_id)
220-
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
221-
193+
notice = pull_dag_upstream(NOTICE_OBJECT)
194+
mapping_suite = pull_dag_upstream(MAPPING_SUITE_OBJECT)
195+
validate_notice_with_sparql_suite(notice=notice, mapping_suite_package=mapping_suite)
196+
validate_notice_with_shacl_suite(notice=notice, mapping_suite_package=mapping_suite)
197+
notice_id = notice.ted_id
198+
push_dag_downstream(NOTICE_OBJECT, notice)
222199
context = get_current_context()
223200

224201
handle_event_message_metadata_dag_context(event_message, context)
225202
event_message.notice_id = notice_id
226203
event_message.domain_action = get_task_id_from_dag_context(context)
227204
event_message.message = f"Validating notice({notice_id}) transformed RDF manifestation"
228-
event_message.metadata.process_context["mapping_suite_id"] = mapping_suite_id
205+
event_message.metadata.process_context["mapping_suite_id"] = mapping_suite.identifier
229206
event_message.end_record()
230207
event_logger.info(event_message)
231208

@@ -235,12 +212,12 @@ def _check_eligibility_for_packing_by_validation_report(**context_args):
235212
event_message: NoticeEventMessage = NoticeEventMessage()
236213
event_message.start_record()
237214

238-
notice_id = pull_dag_upstream(NOTICE_ID)
215+
notice = pull_dag_upstream(NOTICE_OBJECT)
239216
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
240217
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
241-
notice = notice_repository.get(reference=notice_id)
242218
notice.set_is_eligible_for_packaging(eligibility=True)
243219
notice_repository.update(notice=notice)
220+
notice_id = notice.ted_id
244221
push_dag_downstream(NOTICE_ID, notice_id)
245222

246223
context = get_current_context()
@@ -302,7 +279,7 @@ def _check_notice_state_before_transform():
302279
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
303280
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
304281
notice = notice_repository.get(reference=notice_id)
305-
push_dag_downstream(NOTICE_ID, notice_id)
282+
push_dag_downstream(NOTICE_OBJECT, notice)
306283
if notice.status == NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION:
307284
mapping_suite_id = pull_dag_upstream(MAPPING_SUITE_ID)
308285
push_dag_downstream(MAPPING_SUITE_ID, mapping_suite_id)
@@ -360,7 +337,7 @@ def _check_notice_state_before_notice_successfully_processed():
360337
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
361338
)
362339

363-
transform_notice = PythonOperator(
340+
notice_transform = PythonOperator(
364341
task_id="transform_notice",
365342
python_callable=_transform_notice,
366343
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
@@ -442,7 +419,7 @@ def _check_notice_state_before_notice_successfully_processed():
442419

443420
index_notice_xml_content >> normalise_notice_metadata >> check_eligibility_for_transformation >> check_notice_state_before_transform >> [
444421
preprocess_xml_manifestation, fail_on_state]
445-
preprocess_xml_manifestation >> transform_notice >> resolve_entities_in_the_rdf_manifestation >> validate_transformed_rdf_manifestation >> check_eligibility_for_packing_by_validation_report
422+
preprocess_xml_manifestation >> notice_transform >> resolve_entities_in_the_rdf_manifestation >> validate_transformed_rdf_manifestation >> check_eligibility_for_packing_by_validation_report
446423
check_eligibility_for_packing_by_validation_report >> check_notice_state_before_generate_mets_package >> [
447424
generate_mets_package, fail_on_state]
448425
generate_mets_package >> check_package_integrity_by_package_structure >> check_notice_state_before_publish_notice_in_cellar >> [
@@ -473,4 +450,4 @@ def _get_task_run():
473450
check_notice_state_before_generate_mets_package, check_notice_state_before_publish_notice_in_cellar]
474451

475452

476-
dag = old_worker_single_notice_process_orchestrator()
453+
dag = worker_single_notice_process_orchestrator()

0 commit comments

Comments
 (0)