Skip to content

Commit defed69

Browse files
Merge pull request #361 from OP-TED/feature/TED-981
clean code and add improvments for backlog DAGs
2 parents 3741e44 + 45b3abb commit defed69

22 files changed

Lines changed: 103 additions & 80 deletions

dags/operators/DagBatchPipelineOperator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
NOTICE_IDS_KEY = "notice_ids"
1818
START_WITH_STEP_NAME_KEY = "start_with_step_name"
1919
EXECUTE_ONLY_ONE_STEP_KEY = "execute_only_one_step"
20-
DEFAULT_NUBER_OF_CELERY_WORKERS = 144
20+
DEFAULT_NUBER_OF_CELERY_WORKERS = 144 #TODO: revise this config
2121
NOTICE_PROCESS_WORKFLOW_DAG_NAME = "notice_process_workflow"
2222
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
2323
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"

dags/pipelines/notice_processor_pipelines.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient) -
1010
"""
1111
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
1212
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
13-
13+
notice.update_status_to(new_status=NoticeStatus.RAW)
1414
indexed_notice = index_notice(notice=notice)
1515
normalised_notice = normalise_notice(notice=indexed_notice)
1616

@@ -27,7 +27,7 @@ def notice_transformation_pipeline(notice: Notice, mongodb_client: MongoClient)
2727
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
2828
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2929
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
30-
30+
notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA)
3131
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
3232
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
3333
if not result:
@@ -57,7 +57,7 @@ def notice_validation_pipeline(notice: Notice, mongodb_client: MongoClient) -> N
5757
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice
5858
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
5959
from ted_sws.event_manager.services.log import log_notice_info
60-
60+
notice.update_status_to(new_status=NoticeStatus.DISTILLED)
6161
mapping_suite_id = notice.distilled_rdf_manifestation.mapping_suite_id
6262
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
6363
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
@@ -82,6 +82,7 @@ def notice_package_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
8282
"""
8383
from ted_sws.notice_packager.services.notice_packager import package_notice
8484

85+
notice.update_status_to(new_status=NoticeStatus.VALIDATED)
8586
# TODO: Implement notice package eligiblity
8687
notice.set_is_eligible_for_packaging(eligibility=True)
8788
packaged_notice = package_notice(notice=notice)
@@ -95,7 +96,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
9596
from ted_sws.notice_publisher.services.notice_publisher import publish_notice, publish_notice_rdf_into_s3
9697
from ted_sws.event_manager.services.log import log_notice_error
9798
from ted_sws import config
98-
99+
notice.update_status_to(new_status=NoticeStatus.PACKAGED)
99100
if config.S3_PUBLISH_ENABLED:
100101
published_into_s3 = publish_notice_rdf_into_s3(notice=notice)
101102
if not published_into_s3:
@@ -106,4 +107,5 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
106107
if result:
107108
return NoticePipelineOutput(notice=notice)
108109
else:
110+
notice.set_is_eligible_for_publishing(eligibility=False)
109111
return NoticePipelineOutput(notice=notice, processed=False)

dags/pipelines/notice_selectors_pipelines.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,20 @@
77
PUBLICATION_DATE = "normalised_metadata.publication_date"
88

99

10-
def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
10+
def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None,
1111
start_date: str = None, end_date: str = None,
1212
xsd_version: str = None) -> dict:
13+
"""
14+
15+
:param notice_statuses:
16+
:param form_number:
17+
:param start_date:
18+
:param end_date:
19+
:param xsd_version:
20+
:return:
21+
"""
1322
from datetime import datetime
14-
mongodb_filter = {NOTICE_STATUS: notice_status}
23+
mongodb_filter = {NOTICE_STATUS: {"$in": notice_statuses}}
1524
if form_number:
1625
mongodb_filter[FORM_NUMBER] = form_number
1726
if start_date and end_date:
@@ -26,21 +35,26 @@ def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
2635
def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_number: str = None,
2736
start_date: str = None, end_date: str = None,
2837
xsd_version: str = None) -> List[str]:
38+
"""
39+
40+
:param notice_statuses:
41+
:param form_number:
42+
:param start_date:
43+
:param end_date:
44+
:param xsd_version:
45+
:return:
46+
"""
2947
from pymongo import MongoClient
3048
from ted_sws import config
3149
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_TED_ID
3250

3351
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
3452
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
35-
notice_ids = []
36-
for notice_status in notice_statuses:
37-
mongodb_filter = build_selector_mongodb_filter(notice_status=str(notice_status),
38-
form_number=form_number,
39-
start_date=start_date,
40-
end_date=end_date,
41-
xsd_version=xsd_version
42-
)
43-
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
44-
notice_ids.extend([result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator])
45-
46-
return notice_ids
53+
notice_statuses = [str(notice_status) for notice_status in notice_statuses]
54+
mongodb_filter = build_selector_mongodb_filter(notice_statuses=notice_statuses,
55+
form_number=form_number,
56+
start_date=start_date,
57+
end_date=end_date,
58+
xsd_version=xsd_version)
59+
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
60+
return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator]

dags/selector_repackage_process_orchestrator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
DAG_NAME = "selector_re_package_process_orchestrator"
1414

1515
RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
16+
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
1617
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]
1718
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1819
FORM_NUMBER_DAG_PARAM = "form_number"

dags/selector_republish_process_orchestrator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
DAG_NAME = "selector_re_publish_process_orchestrator"
1515

16-
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_UNAVAILABLE, NoticeStatus.ELIGIBLE_FOR_PUBLISHING]
16+
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
17+
NoticeStatus.PACKAGED
18+
]
1719
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1820
FORM_NUMBER_DAG_PARAM = "form_number"
1921
START_DATE_DAG_PARAM = "start_date"

dags/selector_retransform_process_orchestrator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
DAG_NAME = "selector_re_transform_process_orchestrator"
1515

16-
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION]
16+
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
17+
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
18+
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
19+
]
1720
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1821
FORM_NUMBER_DAG_PARAM = "form_number"
1922
START_DATE_DAG_PARAM = "start_date"

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ def open_local(paths, mode="r", encoding="utf8"):
8989
"metadata_generator = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_metadata_generator:main",
9090
"conceptual_mapping_differ = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_conceptual_mapping_differ:main",
9191
"rdf_differ = ted_sws.rdf_differ.entrypoints.cli.cmd_rdf_differ:main",
92-
9392
"mapping_suite_processor = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_mapping_suite_processor:main",
9493
"yarrrml2rml_converter = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_yarrrml2rml_converter:main",
9594
"normalisation_resource_generator = ted_sws.data_manager.entrypoints.cli.cmd_generate_mapping_resources:main",

ted_sws/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def AIRFLOW__CORE__EXECUTOR(self, config_value: str) -> str:
6464
def MONGO_DB_PORT(self, config_value: str) -> int:
6565
return int(config_value)
6666

67-
@env_property()
67+
@env_property(default_value="aggregates_db")
6868
def MONGO_DB_AGGREGATES_DATABASE_NAME(self, config_value: str) -> str:
6969
return config_value
7070

@@ -115,7 +115,7 @@ def ELK_VERSION(self, config_value: str) -> int:
115115

116116

117117
class LoggingConfig:
118-
@env_property()
118+
@env_property(default_value="aggregates_db")
119119
def MONGO_DB_LOGS_DATABASE_NAME(self, config_value: str) -> str:
120120
return config_value
121121

@@ -151,7 +151,7 @@ class API:
151151
def ID_MANAGER_PROD_API_HOST(self, config_value: str) -> str:
152152
return config_value
153153

154-
@env_property(default_value="local_host")
154+
@env_property(default_value="localhost")
155155
def ID_MANAGER_DEV_API_HOST(self, config_value: str) -> str:
156156
return config_value
157157

ted_sws/core/adapters/config_resolver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ def env_property(config_resolver_class: Type[ConfigResolverABC] = EnvConfigResol
126126
"""
127127
def wrap(func):
128128
@property
129-
def wrapped_f(self, *args, **kwargs):
129+
def wrapped_function(self, *args, **kwargs):
130130
config_value = config_resolver_class().concrete_config_resolve(config_name=func.__name__,
131131
default_value=default_value)
132132
return func(self, config_value, *args, **kwargs)
133133

134-
return wrapped_f
134+
return wrapped_function
135135

136136
return wrap

ted_sws/core/model/notice.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,28 @@ class NoticeStatus(IntEnum):
3838
"""
3939
RAW = 10
4040
INDEXED = 15
41+
# STATES FOR RE-TRANSFORM ---BEGIN---
4142
NORMALISED_METADATA = 20
4243
INELIGIBLE_FOR_TRANSFORMATION = 23 # backlog status
4344
ELIGIBLE_FOR_TRANSFORMATION = 27 # forward status
4445
PREPROCESSED_FOR_TRANSFORMATION = 29
4546
TRANSFORMED = 30
47+
# STATES FOR RE-VALIDATE---BEGIN---
4648
DISTILLED = 35
49+
# STATES FOR RE-TRANSFORM ---END---
50+
# STATES FOR RE-VALIDATE---END---
51+
# STATES FOR RE-PACKAGE ---BEGIN---
4752
VALIDATED = 40
4853
INELIGIBLE_FOR_PACKAGING = 43 # backlog status
4954
ELIGIBLE_FOR_PACKAGING = 47 # forward status
55+
# STATES FOR RE-PACKAGE ---END---
56+
# STATES FOR RE-PUBLISH ---BEGIN---
5057
PACKAGED = 50
5158
INELIGIBLE_FOR_PUBLISHING = 53 # backlog status
5259
ELIGIBLE_FOR_PUBLISHING = 57 # forward status
60+
# STATES FOR RE-PUBLISH ---END---
5361
PUBLISHED = 60
54-
PUBLICLY_UNAVAILABLE = 63 # to be investigated if more fine-grained checks can be adopted
62+
PUBLICLY_UNAVAILABLE = 63 # to be investigated if more fine-grained checks can be adopted #TODO: Revalidate for public availability.
5563
PUBLICLY_AVAILABLE = 67 # forward status
5664

5765
def __lt__(self, other):

0 commit comments

Comments
 (0)