Skip to content

Commit 1b10821

Browse files
author
Kolea Plesco
committed
Merge remote-tracking branch 'origin/main' into feature/TED-677
2 parents 11df641 + 2ac149e commit 1b10821

61 files changed

Lines changed: 1343 additions & 433 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ stop-metabase:
213213
init-rml-mapper:
214214
@ echo -e "RMLMapper folder initialisation!"
215215
@ mkdir -p ./.rmlmapper
216-
@ wget -c https://api.bitbucket.org/2.0/repositories/Dragos0000/rml-mapper/src/master/rmlmapper.jar -P ./.rmlmapper
216+
@ wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.0.0/rmlmapper-6.0.0-r363-all.jar -O ./.rmlmapper/rmlmapper.jar
217217

218218
init-limes:
219219
@ echo -e "Limes folder initialisation!"

dags/daily_materialized_view_update.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
77
create_batch_collection_materialised_view
88
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
9-
create_notice_collection_materialised_view, update_notice_collection_materialised_view
9+
create_notice_collection_materialised_view, create_notice_kpi_collection
1010

1111
DAG_NAME = "daily_materialized_view_update"
1212

@@ -22,12 +22,16 @@ def create_materialised_view():
2222
create_notice_collection_materialised_view(mongo_client=mongo_client)
2323

2424
@task
25-
def update_materialised_view_with_logs():
25+
def create_kpi_collection_for_notices():
26+
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
27+
create_notice_kpi_collection(mongo_client=mongo_client)
28+
29+
@task
30+
def aggregate_batch_logs():
2631
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
27-
update_notice_collection_materialised_view(mongo_client=mongo_client)
2832
create_batch_collection_materialised_view(mongo_client=mongo_client)
2933

30-
create_materialised_view() >> update_materialised_view_with_logs()
34+
create_materialised_view() >> create_kpi_collection_for_notices() >> aggregate_batch_logs()
3135

3236

3337
dag = daily_materialized_view_update()

dags/operators/DagBatchPipelineOperator.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88
smart_xcom_push
99
from dags.pipelines.pipeline_protocols import NoticePipelineCallable
1010
from ted_sws import config
11+
from ted_sws.core.model.notice import NoticeStatus
1112
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
1213
from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage
13-
from ted_sws.event_manager.services.log import log_error
14+
from ted_sws.event_manager.services.log import log_notice_error
1415
from ted_sws.event_manager.services.logger_from_context import get_logger, handle_event_message_metadata_dag_context
1516

1617
NOTICE_IDS_KEY = "notice_ids"
1718
START_WITH_STEP_NAME_KEY = "start_with_step_name"
1819
EXECUTE_ONLY_ONE_STEP_KEY = "execute_only_one_step"
19-
DEFAULT_NUBER_OF_CELERY_WORKERS = 144
20+
DEFAULT_NUBER_OF_CELERY_WORKERS = 144 #TODO: revise this config
2021
NOTICE_PROCESS_WORKFLOW_DAG_NAME = "notice_process_workflow"
2122
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
2223
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
@@ -88,9 +89,14 @@ def execute(self, context: Any):
8889
if result_notice_pipeline.processed:
8990
processed_notice_ids.append(notice_id)
9091
notice_event.end_record()
92+
if notice.normalised_metadata:
93+
notice_event.notice_form_number = notice.normalised_metadata.form_number
94+
notice_event.notice_eforms_subtype = notice.normalised_metadata.eforms_subtype
95+
notice_event.notice_status = str(notice.status)
9196
logger.info(event_message=notice_event)
9297
except Exception as e:
93-
log_error(message=str(e))
98+
log_notice_error(message=str(e), notice_id=notice_id, domain_action=pipeline_name)
99+
94100
batch_event_message.end_record()
95101
logger.info(event_message=batch_event_message)
96102
if not processed_notice_ids:

dags/pipelines/notice_batch_processor_pipelines.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from typing import List
22
from pymongo import MongoClient
33

4+
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities
5+
46
CET_URIS = ["http://www.w3.org/ns/org#Organization"]
7+
PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure"
58

69

710
def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
@@ -23,6 +26,7 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
2326
notices.append(notice)
2427
for cet_uri in CET_URIS:
2528
deduplicate_entities_by_cet_uri(notices=notices, cet_uri=cet_uri)
29+
deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client)
2630
for notice in notices:
2731
notice_repository.update(notice=notice)
2832
return notice_ids

dags/pipelines/notice_processor_pipelines.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pymongo import MongoClient
22
from dags.pipelines.pipeline_protocols import NoticePipelineOutput
33
from ted_sws.core.model.notice import Notice, NoticeStatus
4+
from ted_sws.event_manager.services.log import log_notice_error
45

56

67
def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient) -> NoticePipelineOutput:
@@ -9,7 +10,7 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient) -
910
"""
1011
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
1112
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
12-
13+
notice.update_status_to(new_status=NoticeStatus.RAW)
1314
indexed_notice = index_notice(notice=notice)
1415
normalised_notice = normalise_notice(notice=indexed_notice)
1516

@@ -26,10 +27,16 @@ def notice_transformation_pipeline(notice: Notice, mongodb_client: MongoClient)
2627
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
2728
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
2829
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
29-
30+
notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA)
3031
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
3132
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
3233
if not result:
34+
log_notice_error(
35+
message=f"This notice {notice.ted_id} is not eligible for transformation. Notice info: "
36+
f"form_number=[{notice.normalised_metadata.form_number}],"
37+
f" eform_subtype=[{notice.normalised_metadata.eforms_subtype}], "
38+
f"xsd_version=[{notice.normalised_metadata.xsd_version}]. Check mapping suites!",
39+
notice_id=notice.ted_id, domain_action=notice_transformation_pipeline.__name__)
3340
return NoticePipelineOutput(notice=notice, processed=False)
3441
notice_id, mapping_suite_id = result
3542
# TODO: Implement XML preprocessing
@@ -50,18 +57,18 @@ def notice_validation_pipeline(notice: Notice, mongodb_client: MongoClient) -> N
5057
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice
5158
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
5259
from ted_sws.event_manager.services.log import log_notice_info
53-
60+
notice.update_status_to(new_status=NoticeStatus.DISTILLED)
5461
mapping_suite_id = notice.distilled_rdf_manifestation.mapping_suite_id
5562
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
5663
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
5764
log_notice_info(message="Validation :: XPATH coverage :: START", notice_id=notice.ted_id)
5865
validate_xpath_coverage_notice(notice=notice, mapping_suite=mapping_suite, mongodb_client=mongodb_client)
5966
log_notice_info(message="Validation :: XPATH coverage :: END", notice_id=notice.ted_id)
6067
log_notice_info(message="Validation :: SPARQL :: START", notice_id=notice.ted_id)
61-
validate_notice_with_sparql_suite(notice=notice, mapping_suite_package=mapping_suite)
68+
validate_notice_with_sparql_suite(notice=notice, mapping_suite_package=mapping_suite, execute_full_validation=False)
6269
log_notice_info(message="Validation :: SPARQL :: END", notice_id=notice.ted_id)
6370
log_notice_info(message="Validation :: SHACL :: START", notice_id=notice.ted_id)
64-
validate_notice_with_shacl_suite(notice=notice, mapping_suite_package=mapping_suite)
71+
validate_notice_with_shacl_suite(notice=notice, mapping_suite_package=mapping_suite, execute_full_validation=False)
6572
log_notice_info(message="Validation :: SHACL :: END", notice_id=notice.ted_id)
6673
log_notice_info(message="Validation :: Summary :: START", notice_id=notice.ted_id)
6774
validation_summary_report_notice(notice=notice)
@@ -75,6 +82,7 @@ def notice_package_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
7582
"""
7683
from ted_sws.notice_packager.services.notice_packager import package_notice
7784

85+
notice.update_status_to(new_status=NoticeStatus.VALIDATED)
7886
# TODO: Implement notice package eligiblity
7987
notice.set_is_eligible_for_packaging(eligibility=True)
8088
packaged_notice = package_notice(notice=notice)
@@ -88,7 +96,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
8896
from ted_sws.notice_publisher.services.notice_publisher import publish_notice, publish_notice_rdf_into_s3
8997
from ted_sws.event_manager.services.log import log_notice_error
9098
from ted_sws import config
91-
99+
notice.update_status_to(new_status=NoticeStatus.PACKAGED)
92100
if config.S3_PUBLISH_ENABLED:
93101
published_into_s3 = publish_notice_rdf_into_s3(notice=notice)
94102
if not published_into_s3:
@@ -99,4 +107,5 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
99107
if result:
100108
return NoticePipelineOutput(notice=notice)
101109
else:
110+
notice.set_is_eligible_for_publishing(eligibility=False)
102111
return NoticePipelineOutput(notice=notice, processed=False)

dags/pipelines/notice_selectors_pipelines.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,20 @@
77
PUBLICATION_DATE = "normalised_metadata.publication_date"
88

99

10-
def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
10+
def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None,
1111
start_date: str = None, end_date: str = None,
1212
xsd_version: str = None) -> dict:
13+
"""
14+
15+
:param notice_statuses:
16+
:param form_number:
17+
:param start_date:
18+
:param end_date:
19+
:param xsd_version:
20+
:return:
21+
"""
1322
from datetime import datetime
14-
mongodb_filter = {NOTICE_STATUS: notice_status}
23+
mongodb_filter = {NOTICE_STATUS: {"$in": notice_statuses}}
1524
if form_number:
1625
mongodb_filter[FORM_NUMBER] = form_number
1726
if start_date and end_date:
@@ -26,21 +35,26 @@ def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
2635
def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_number: str = None,
2736
start_date: str = None, end_date: str = None,
2837
xsd_version: str = None) -> List[str]:
38+
"""
39+
40+
:param notice_statuses:
41+
:param form_number:
42+
:param start_date:
43+
:param end_date:
44+
:param xsd_version:
45+
:return:
46+
"""
2947
from pymongo import MongoClient
3048
from ted_sws import config
3149
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_TED_ID
3250

3351
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
3452
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
35-
notice_ids = []
36-
for notice_status in notice_statuses:
37-
mongodb_filter = build_selector_mongodb_filter(notice_status=str(notice_status),
38-
form_number=form_number,
39-
start_date=start_date,
40-
end_date=end_date,
41-
xsd_version=xsd_version
42-
)
43-
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
44-
notice_ids.extend([result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator])
45-
46-
return notice_ids
53+
notice_statuses = [str(notice_status) for notice_status in notice_statuses]
54+
mongodb_filter = build_selector_mongodb_filter(notice_statuses=notice_statuses,
55+
form_number=form_number,
56+
start_date=start_date,
57+
end_date=end_date,
58+
xsd_version=xsd_version)
59+
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
60+
return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator]

dags/selector_repackage_process_orchestrator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
DAG_NAME = "selector_re_package_process_orchestrator"
1414

1515
RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
16+
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
1617
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]
1718
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1819
FORM_NUMBER_DAG_PARAM = "form_number"

dags/selector_republish_process_orchestrator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
DAG_NAME = "selector_re_publish_process_orchestrator"
1515

16-
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_UNAVAILABLE, NoticeStatus.ELIGIBLE_FOR_PUBLISHING]
16+
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
17+
NoticeStatus.PACKAGED
18+
]
1719
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1820
FORM_NUMBER_DAG_PARAM = "form_number"
1921
START_DATE_DAG_PARAM = "start_date"

dags/selector_retransform_process_orchestrator.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313

1414
DAG_NAME = "selector_re_transform_process_orchestrator"
1515

16-
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION]
16+
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
17+
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
18+
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
19+
]
1720
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
1821
FORM_NUMBER_DAG_PARAM = "form_number"
1922
START_DATE_DAG_PARAM = "start_date"

infra/airflow-cluster/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ COPY requirements.txt /opt/airflow
1414
# working in the /opt/airflow
1515
WORKDIR /opt/airflow
1616
RUN mkdir -p ./.rmlmapper
17-
RUN wget -c https://api.bitbucket.org/2.0/repositories/Dragos0000/rml-mapper/src/master/rmlmapper.jar -P ./.rmlmapper
17+
RUN wget -c https://github.com/RMLio/rmlmapper-java/releases/download/v6.0.0/rmlmapper-6.0.0-r363-all.jar -O ./.rmlmapper/rmlmapper.jar
1818

1919

2020
RUN wget -c https://kumisystems.dl.sourceforge.net/project/saxon/Saxon-HE/10/Java/SaxonHE10-6J.zip -P .saxon/

0 commit comments

Comments
 (0)