Skip to content

Commit a6c0bce

Browse files
authored
Merge pull request #213 from OP-TED/feature/TED-563
Feature/ted 563
2 parents da2ebe4 + a7c87d1 commit a6c0bce

11 files changed

Lines changed: 140 additions & 205 deletions

dags/old_worker_single_notice_process_orchestrator.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,30 @@
1+
from airflow.decorators import dag
2+
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
13
from airflow.utils.trigger_rule import TriggerRule
24
from pymongo import MongoClient
35

6+
from dags import DEFAULT_DAG_ARGUMENTS
47
from dags.dags_utils import pull_dag_upstream, push_dag_downstream
58
from ted_sws import config
69
from ted_sws.core.model.manifestation import METSManifestation
710
from ted_sws.core.model.notice import NoticeStatus
811
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
912
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
1013
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice_by_id
14+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
15+
from ted_sws.event_manager.adapters.event_logger import EventLogger
16+
from ted_sws.event_manager.model.event_message import NoticeEventMessage, EventMessageProcessType, EventMessageMetadata, \
17+
TechnicalEventMessage
18+
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
19+
handle_event_message_metadata_dag_context, get_task_id_from_dag_context
1120
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice_by_id
12-
13-
from airflow.decorators import dag
14-
from airflow.operators.python import get_current_context, BranchPythonOperator, PythonOperator
15-
16-
from dags import DEFAULT_DAG_ARGUMENTS
1721
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker_by_id
1822
from ted_sws.notice_packager.services.notice_packager import create_notice_package
23+
from ted_sws.notice_publisher.services.notice_publisher import publish_notice_by_id
1924
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2025
from ted_sws.notice_transformer.services.notice_transformer import transform_notice_by_id
2126
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_by_id_with_shacl_suite
2227
from ted_sws.notice_validator.services.sparql_test_suite_runner import validate_notice_by_id_with_sparql_suite
23-
from ted_sws.event_manager.adapters.event_logger import EventLogger
24-
from ted_sws.event_manager.adapters.event_log_decorator import event_log
25-
from ted_sws.event_manager.model.event_message import NoticeEventMessage, EventMessageProcessType, EventMessageMetadata, \
26-
TechnicalEventMessage
27-
from ted_sws.event_manager.services.logger_from_context import get_logger_from_dag_context, \
28-
handle_event_message_metadata_dag_context, get_task_id_from_dag_context
2928
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice_by_id
3029

3130
NOTICE_ID = "notice_id"
@@ -285,10 +284,18 @@ def _generate_mets_package(**context_args):
285284

286285
def _check_package_integrity_by_package_structure():
287286
notice_id = pull_dag_upstream(NOTICE_ID)
287+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
288+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
289+
notice = notice_repository.get(reference=notice_id)
290+
notice.update_status_to(NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
291+
notice_repository.update(notice=notice)
288292
push_dag_downstream(NOTICE_ID, notice_id)
289293

290294
def _publish_notice_in_cellar():
291295
notice_id = pull_dag_upstream(NOTICE_ID)
296+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
297+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
298+
publish_notice_by_id(notice_id=notice_id, notice_repository=notice_repository)
292299
push_dag_downstream(NOTICE_ID, notice_id)
293300

294301
def _check_notice_public_availability_in_cellar():

dags/worker_single_notice_process_orchestrator.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
2020
from ted_sws.notice_metadata_processor.services.notice_eligibility import notice_eligibility_checker
2121
from ted_sws.notice_packager.services.notice_packager import create_notice_package
22+
from ted_sws.notice_publisher.services.notice_publisher import publish_notice_by_id
2223
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2324
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
2425
from ted_sws.notice_validator.services.shacl_test_suite_runner import validate_notice_with_shacl_suite
@@ -200,7 +201,6 @@ def _validate_transformed_rdf_manifestation(**context_args):
200201
validate_xpath_coverage_notice(notice=notice, mapping_suite=mapping_suite, mongodb_client=mongodb_client)
201202
push_dag_downstream(NOTICE_OBJECT, notice)
202203
context = get_current_context()
203-
204204
handle_event_message_metadata_dag_context(event_message, context)
205205
event_message.notice_id = notice_id
206206
event_message.domain_action = get_task_id_from_dag_context(context)
@@ -261,10 +261,18 @@ def _generate_mets_package(**context_args):
261261

262262
def _check_package_integrity_by_package_structure():
263263
notice_id = pull_dag_upstream(NOTICE_ID)
264+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
265+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
266+
notice = notice_repository.get(reference=notice_id)
267+
notice.update_status_to(NoticeStatus.ELIGIBLE_FOR_PUBLISHING)
268+
notice_repository.update(notice=notice)
264269
push_dag_downstream(NOTICE_ID, notice_id)
265270

266271
def _publish_notice_in_cellar():
267272
notice_id = pull_dag_upstream(NOTICE_ID)
273+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
274+
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
275+
publish_notice_by_id(notice_id=notice_id, notice_repository=notice_repository)
268276
push_dag_downstream(NOTICE_ID, notice_id)
269277

270278
def _check_notice_public_availability_in_cellar():

infra/sftp/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ services:
1111
- SFTP_PORT=${SFTP_PORT}
1212
command: ${SFTP_USER}:${SFTP_PASSWORD}:::upload
1313
ports:
14-
- ${SFTP_PORT:-2235}:${SFTP_PORT}
14+
- ${SFTP_PORT:-2235}:22
1515
networks:
1616
- sftp-net
1717
- proxy-net

ted_sws/notice_publisher/adapters/notice_publisher.py

Lines changed: 0 additions & 29 deletions
This file was deleted.

ted_sws/notice_publisher/adapters/notice_publisher_abc.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

ted_sws/notice_publisher/adapters/sftp_notice_publisher.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
11
import pysftp
22

3-
from ted_sws.notice_publisher.adapters.notice_publisher_abc import NoticePublisherABC
43
from ted_sws import config
4+
from ted_sws.notice_publisher.adapters.sftp_publisher_abc import SFTPPublisherABC
55

66

7-
class SFTPNoticePublisher(NoticePublisherABC):
7+
class SFTPPublisher(SFTPPublisherABC):
8+
"""
9+
10+
"""
11+
812
connection: pysftp.Connection = None
913
default_host = config.SFTP_HOST
1014
default_user = config.SFTP_USER
1115
default_pass = config.SFTP_PASSWORD
1216
default_port = config.SFTP_PORT
1317

14-
def __init__(self, hostname=default_host, username=default_user, password=default_pass, port=default_port,
15-
remote_path=None):
18+
def __init__(self, hostname=default_host, username=default_user, password=default_pass, port=default_port):
1619
"""Constructor Method"""
1720
# Set connection object to None (initial value)
1821
self.hostname = hostname
1922
self.username = username
2023
self.password = password
21-
self.port = port or self.default_port
22-
self.remote_path = remote_path
24+
self.port = port
25+
self.is_connected = False
2326

2427
def connect(self):
2528
"""Connects to the sftp server and returns the sftp connection object"""
@@ -39,30 +42,31 @@ def connect(self):
3942
except Exception as err:
4043
raise Exception(err)
4144

45+
self.is_connected = True
46+
4247
def disconnect(self):
4348
"""Closes the sftp connection"""
44-
self.connection.close()
49+
if self.is_connected:
50+
self.connection.close()
51+
self.is_connected = False
4552

46-
def publish(self, source_path, remote_path=None) -> bool:
47-
return self.upload(source_path, remote_path)
53+
def __del__(self):
54+
self.disconnect()
4855

49-
def upload(self, source_path, remote_path=None) -> bool:
56+
def publish(self, source_path, remote_path) -> bool:
5057
"""
51-
Uploads the notice's METS manifestation to the sftp server remote path.
58+
Publish file_content to the sftp server remote path.
5259
"""
53-
if remote_path is None:
54-
remote_path = self.remote_path
55-
56-
if remote_path is None:
57-
raise ValueError("No remote path specified.")
58-
5960
try:
6061
self.connection.put(source_path, remote_path)
6162
return True
6263
except Exception as err:
6364
raise Exception(err)
6465

6566
def remove(self, remote_path) -> bool:
67+
"""
68+
69+
"""
6670
try:
6771
self.connection.unlink(remote_path)
6872
return True
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import abc
2+
3+
4+
class SFTPPublisherABC(abc.ABC):
5+
6+
@abc.abstractmethod
7+
def connect(self):
8+
"""
9+
10+
"""
11+
12+
@abc.abstractmethod
13+
def publish(self, source_path, remote_path):
14+
"""
15+
16+
"""
17+
18+
def disconnect(self):
19+
"""
20+
21+
"""
Lines changed: 42 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,47 @@
11
import base64
2+
import pathlib
23
import tempfile
3-
from pathlib import Path
4-
from typing import Union
54

65
from ted_sws import config
7-
from ted_sws.core.model.notice import Notice, NoticeStatus, UnsupportedStatusTransition
6+
from ted_sws.core.model.notice import Notice, NoticeStatus
87
from ted_sws.data_manager.adapters.notice_repository import NoticeRepositoryABC
9-
from ted_sws.notice_packager.adapters.archiver import ARCHIVE_DEFAULT_FORMAT
10-
from ted_sws.notice_publisher.adapters.notice_publisher import NoticePublisherFactory
11-
from ted_sws.notice_publisher.adapters.notice_publisher_abc import NoticePublisherABC
12-
13-
14-
class NoticePublishBuilder:
15-
def __init__(self, notice: Union[Notice, str], notice_repository: NoticeRepositoryABC,
16-
notice_publisher: NoticePublisherABC, remote_path: str = None):
17-
self.notice_repository = notice_repository
18-
if isinstance(notice, str):
19-
notice_id = notice
20-
notice: Notice = self.notice_repository.get(reference=notice_id)
21-
if notice is None:
22-
raise Exception(f"Notice {notice_id} could not be found.")
23-
24-
self.notice = notice
25-
self.check_publish_eligibility()
26-
27-
self.notice_publisher = notice_publisher
28-
self.remote_path = remote_path
29-
30-
def check_publish_eligibility(self):
31-
if self.notice.status != NoticeStatus.ELIGIBLE_FOR_PUBLISHING:
32-
raise UnsupportedStatusTransition(
33-
f"Notice {self.notice.ted_id} is not Eligible for Publishing. Status: {self.notice.status}")
34-
35-
def package_content(self) -> bytes:
36-
mets_manifestation = self.notice.mets_manifestation
37-
if not mets_manifestation or not mets_manifestation.object_data:
38-
raise ValueError("Notice does not have a METS manifestation to be published.")
39-
40-
package_content = base64.b64decode(bytes(mets_manifestation.object_data, encoding='utf-8'), validate=True)
41-
return package_content
42-
43-
def build_remote_path(self) -> str:
44-
remote_path = self.remote_path
45-
if remote_path is None:
46-
remote_path = f"{config.SFTP_PATH}/{self.notice.ted_id}"
47-
if ARCHIVE_DEFAULT_FORMAT == "zip":
48-
remote_path += '.zip'
49-
return remote_path
50-
51-
def publish(self):
52-
source_file = tempfile.NamedTemporaryFile()
53-
source_file.write(self.package_content())
54-
55-
try:
56-
if self.notice_publisher.publish(source_path=Path(source_file.name), remote_path=self.build_remote_path()):
57-
self.notice.update_status_to(NoticeStatus.PUBLISHED)
58-
self.notice_repository.update(self.notice)
59-
except Exception as e:
60-
raise Exception(f"Notice {self.notice.ted_id} could not be published: " + str(e))
61-
62-
return self.notice.status == NoticeStatus.PUBLISHED
63-
64-
65-
def publish_notice(notice: Union[Notice, str], notice_publisher: NoticePublisherABC,
66-
notice_repository: NoticeRepositoryABC, remote_path=None) -> bool:
67-
publish_builder = NoticePublishBuilder(notice, notice_repository, notice_publisher, remote_path)
68-
return publish_builder.publish()
69-
70-
71-
def publish_single_notice(notice: Union[Notice, str], notice_repository: NoticeRepositoryABC, hostname, username,
72-
password, port=None, remote_path=None) -> bool:
73-
notice_publisher: NoticePublisherABC = NoticePublisherFactory.get_publisher(hostname=hostname, username=username,
74-
password=password, port=port,
75-
remote_path=remote_path)
76-
notice_publisher.connect()
77-
notice_published: bool = publish_notice(notice, notice_publisher, notice_repository, remote_path)
78-
notice_publisher.disconnect()
79-
80-
return notice_published
8+
from ted_sws.notice_publisher.adapters.sftp_notice_publisher import SFTPPublisher
9+
from ted_sws.notice_publisher.adapters.sftp_publisher_abc import SFTPPublisherABC
10+
11+
12+
def publish_notice(notice: Notice, publisher: SFTPPublisherABC = SFTPPublisher(),
13+
remote_folder_path=config.SFTP_PATH) -> bool:
14+
"""
15+
This function publishes the METS manifestation for a Notice in Cellar.
16+
"""
17+
18+
mets_manifestation = notice.mets_manifestation
19+
if not mets_manifestation or not mets_manifestation.object_data:
20+
raise ValueError("Notice does not have a METS manifestation to be published.")
21+
22+
package_content = base64.b64decode(bytes(mets_manifestation.object_data, encoding='utf-8'), validate=True)
23+
remote_notice_path = f"{remote_folder_path}/{notice.ted_id}.zip"
24+
source_file = tempfile.NamedTemporaryFile()
25+
source_file.write(package_content)
26+
try:
27+
publisher.connect()
28+
if publisher.publish(source_path=pathlib.Path(source_file.name),
29+
remote_path=remote_notice_path):
30+
notice.update_status_to(NoticeStatus.PUBLISHED)
31+
publisher.disconnect()
32+
except Exception as e:
33+
raise Exception(f"Notice {notice.ted_id} could not be published: " + str(e))
34+
35+
return notice.status == NoticeStatus.PUBLISHED
36+
37+
38+
def publish_notice_by_id(notice_id: str, notice_repository: NoticeRepositoryABC,
39+
publisher: SFTPPublisherABC = SFTPPublisher(), remote_folder_path=config.SFTP_PATH) -> bool:
40+
"""
41+
This function publishes the METS manifestation of a Notice, based on notice_id, in Cellar.
42+
"""
43+
notice = notice_repository.get(reference=notice_id)
44+
result = publish_notice(notice=notice, publisher=publisher, remote_folder_path=remote_folder_path)
45+
if result:
46+
notice_repository.update(notice=notice)
47+
return result

0 commit comments

Comments
 (0)