Skip to content

Commit a7c87d1

Browse files
integrate notice_pubisher in DAG
1 parent a49be43 commit a7c87d1

4 files changed

Lines changed: 37 additions & 20 deletions

File tree

dags/old_worker_single_notice_process_orchestrator.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
1+
from airflow.decorators import dag
2+
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
13
from airflow.utils.trigger_rule import TriggerRule
24
from pymongo import MongoClient
35

6+
from dags import DEFAULT_DAG_ARGUMENTS
47
from dags.dags_utils import pull_dag_upstream, push_dag_downstream
58
from ted_sws import config
69
from ted_sws.core.model.manifestation import METSManifestation
710
from ted_sws.core.model.notice import NoticeStatus
811
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
912
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
1013
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice_by_id
14+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
15+
from ted_sws.event_manager.adapters.event_logger import EventLogger
16+
from ted_sws.event_manager.model.event_message import NoticeEventMessage, EventMessageProcessType, EventMessageMetadata, \
17+
TechnicalEventMessage
18+
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
19+
handle_event_message_metadata_dag_context, get_task_id_from_dag_context
1120
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice_by_id
12-
13-
from airflow.decorators import dag
14-
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
15-
16-
from dags import DEFAULT_DAG_ARGUMENTS
1721
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker_by_id
1822
from ted_sws.notice_packager.services.notice_packager import create_notice_package
23+
from ted_sws.notice_publisher.services.notice_publisher import publish_notice_by_id
1924
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2025
from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id
2126
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_by_id_with_shacl_suite
2227
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_by_id_with_sparql_suite
23-
from ted_sws.event_manager.adapters.event_logger import EventLogger
24-
from ted_sws.event_manager.adapters.event_log_decorator import event_log
25-
from ted_sws.event_manager.model.event_message import NoticeEventMessage, EventMessageProcessType, EventMessageMetadata, \
26-
TechnicalEventMessage
27-
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
28-
handle_event_message_metadata_dag_context, get_task_id_from_dag_context
2928
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice_by_id
3029

3130
NOTICE_ID = "notice_id"
@@ -285,10 +284,18 @@ def _generate_mets_package(**context_args):
285284

286285
def _check_package_integrity_by_package_structure():
287286
notice_id = pull_dag_upstream(NOTICE_ID)
287+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
288+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
289+
notice = notice_repository.get(reference=notice_id)
290+
notice.update_status_to(NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
291+
notice_repository.update(notice=notice)
288292
push_dag_downstream(NOTICE_ID, notice_id)
289293

290294
def _publish_notice_in_cellar():
291295
notice_id = pull_dag_upstream(NOTICE_ID)
296+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
297+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
298+
publish_notice_by_id(notice_id=notice_id, notice_repository=notice_repository)
292299
push_dag_downstream(NOTICE_ID, notice_id)
293300

294301
def _check_notice_public_availability_in_cellar():

dags/worker_single_notice_process_orchestrator.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
2020
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
2121
from ted_sws.notice_packager.services.notice_packager import create_notice_package
22+
from ted_sws.notice_publisher.services.notice_publisher import publish_notice_by_id
2223
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2324
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
2425
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_with_shacl_suite
@@ -200,7 +201,6 @@ def _validate_transformed_rdf_manifestation(**context_args):
200201
validate_xpath_coverage_notice(notice=notice, mapping_suite=mapping_suite, mongodb_client=mongodb_client)
201202
push_dag_downstream(NOTICE_OBJECT, notice)
202203
context = get_current_context()
203-
204204
handle_event_message_metadata_dag_context(event_message, context)
205205
event_message.notice_id = notice_id
206206
event_message.domain_action = get_task_id_from_dag_context(context)
@@ -261,10 +261,18 @@ def _generate_mets_package(**context_args):
261261

262262
def _check_package_integrity_by_package_structure():
263263
notice_id = pull_dag_upstream(NOTICE_ID)
264+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
265+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
266+
notice = notice_repository.get(reference=notice_id)
267+
notice.update_status_to(NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
268+
notice_repository.update(notice=notice)
264269
push_dag_downstream(NOTICE_ID, notice_id)
265270

266271
def _publish_notice_in_cellar():
267272
notice_id = pull_dag_upstream(NOTICE_ID)
273+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
274+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
275+
publish_notice_by_id(notice_id=notice_id, notice_repository=notice_repository)
268276
push_dag_downstream(NOTICE_ID, notice_id)
269277

270278
def _check_notice_public_availability_in_cellar():

ted_sws/notice_publisher/services/notice_publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def publish_notice(notice: Notice, publisher: SFTPPublisherABC = SFTPPublisher()
3636

3737

3838
def publish_notice_by_id(notice_id: str, notice_repository: NoticeRepositoryABC,
39-
publisher: SFTPPublisherABC, remote_folder_path=config.SFTP_PATH) -> bool:
39+
publisher: SFTPPublisherABC = SFTPPublisher(), remote_folder_path=config.SFTP_PATH) -> bool:
4040
"""
4141
This function publishes the METS manifestation of a Notice, based on notice_id, in Cellar.
4242
"""

tests/e2e/dags/test_worker_dag_steps.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
GENERATE_METS_PACKAGE_TASK_ID, CHECK_PACKAGE_INTEGRITY_BY_PACKAGE_STRUCTURE_TASK_ID, \
99
PUBLISH_NOTICE_IN_CELLAR_TASK_ID, CHECK_NOTICE_PUBLIC_AVAILABILITY_IN_CELLAR_TASK_ID, \
1010
CHECK_NOTICE_STATE_BEFORE_TRANSFORM_TASK_ID, CHECK_NOTICE_STATE_BEFORE_GENERATE_METS_PACKAGE_TASK_ID, \
11-
INDEX_NOTICE_XML_CONTENT_TASK_ID
11+
INDEX_NOTICE_XML_CONTENT_TASK_ID, CHECK_NOTICE_STATE_BEFORE_PUBLISH_NOTICE_IN_CELLAR_TASK_ID
1212

1313
DAG_ID = "old_worker_single_notice_process_orchestrator"
1414

@@ -95,13 +95,15 @@ def test_worker_dag_steps(dag_bag, notice_repository):
9595

9696
assert notice.mets_manifestation is not None
9797

98+
execute_dag_step(dag, task_id=CHECK_PACKAGE_INTEGRITY_BY_PACKAGE_STRUCTURE_TASK_ID, dag_config=dag_config)
99+
check_notice_status(notice_repository=notice_repository, notice_id=notice_id,
100+
notice_status=NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
101+
102+
execute_dag_step(dag, task_id=CHECK_NOTICE_STATE_BEFORE_PUBLISH_NOTICE_IN_CELLAR_TASK_ID, dag_config=dag_config)
103+
execute_dag_step(dag, task_id=PUBLISH_NOTICE_IN_CELLAR_TASK_ID, dag_config=dag_config)
104+
check_notice_status(notice_repository=notice_repository, notice_id=notice_id, notice_status=NoticeStatus.PUBLISHED)
105+
98106
# TODO: add this steps when publish notice in cellar will work
99-
# execute_dag_step(dag, task_id=CHECK_PACKAGE_INTEGRITY_BY_PACKAGE_STRUCTURE_TASK_ID, xcom_push_data=XCOM_DEFAULT)
100-
# check_notice_status(notice_repository=notice_repository, notice_status=NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
101-
#
102-
# execute_dag_step(dag, task_id=PUBLISH_NOTICE_IN_CELLAR_TASK_ID, xcom_push_data=XCOM_DEFAULT)
103-
# check_notice_status(notice_repository=notice_repository, notice_status=NoticeStatus.PUBLISHED)
104-
#
105107
# execute_dag_step(dag, task_id=CHECK_NOTICE_PUBLIC_AVAILABILITY_IN_CELLAR_TASK_ID, xcom_push_data=XCOM_DEFAULT)
106108
# check_notice_status(notice_repository=notice_repository, notice_status=NoticeStatus.PUBLICLY_AVAILABLE)
107109

0 commit comments

Comments
 (0)