Skip to content

Commit 62f5d0a

Browse files
Merge pull request #406 from OP-TED/feature/TED-1049
Feature/ted 1049
2 parents b07be08 + 597b787 commit 62f5d0a

8 files changed

Lines changed: 77 additions & 27 deletions

File tree

dags/pipelines/notice_fetcher_pipelines.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,20 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
1010
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
1111
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
1212
create_and_store_in_mongo_db_daily_supra_notice
13-
14-
date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*")
15-
notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date()
16-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
17-
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
18-
ted_api_adapter=TedAPIAdapter(
19-
request_api=TedRequestAPI())).fetch_notices_by_date_wild_card(
20-
wildcard_date=date_wild_card)
21-
create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=mongodb_client,
22-
notice_fetched_date=notice_publication_date)
13+
from ted_sws.event_manager.services.log import log_error
14+
notice_ids = None
15+
try:
16+
date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*")
17+
notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date()
18+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
19+
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
20+
ted_api_adapter=TedAPIAdapter(
21+
request_api=TedRequestAPI())).fetch_notices_by_date_wild_card(
22+
wildcard_date=date_wild_card)
23+
create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=mongodb_client,
24+
notice_fetched_date=notice_publication_date)
25+
except Exception as error:
26+
log_error(message=str(error))
2327

2428
return notice_ids
2529

@@ -30,10 +34,14 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]:
3034
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
3135
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
3236
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
33-
34-
ted_api_query = {"q": query}
35-
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
36-
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
37-
ted_api_adapter=TedAPIAdapter(
38-
request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query)
37+
from ted_sws.event_manager.services.log import log_error
38+
notice_ids = None
39+
try:
40+
ted_api_query = {"q": query}
41+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
42+
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
43+
ted_api_adapter=TedAPIAdapter(
44+
request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query)
45+
except Exception as error:
46+
log_error(message=str(error))
3947
return notice_ids

ted_sws/master_data_registry/resources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
MASTER_DATA_REGISTRY_RESOURCES_PATH = pathlib.Path(__file__).parent.resolve()
44

55
TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_by_cet_uri.rq"
6+
PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_procedure_uris.rq"
67
RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_2_dependency_levels_for_a_uri_as_root.rq"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
prefix epo:<http://data.europa.eu/a4g/ontology#>
2+
3+
SELECT DISTINCT ?s
4+
WHERE {
5+
?s a epo:Procedure.
6+
?o ?po ?s.
7+
filter not exists {?o epo:refersToPreviousProcedure ?s}
8+
}

ted_sws/master_data_registry/services/entity_deduplication.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from ted_sws.event_manager.services.log import log_error, log_notice_error
1818
from ted_sws.master_data_registry.services.rdf_fragment_processor import get_rdf_fragments_by_cet_uri_from_notices, \
1919
merge_rdf_fragments_into_graph, write_rdf_fragments_in_triple_store, RDF_FRAGMENT_FROM_NOTICE_PROPERTY, \
20-
get_subjects_by_cet_uri, get_rdf_fragment_by_cet_uri_from_notice
20+
get_procedure_subjects, get_rdf_fragment_by_root_uri_from_notice
2121

2222
MDR_TEMPORARY_FUSEKI_DATASET_NAME = "tmp_mdr_dataset"
2323
MDR_FUSEKI_DATASET_NAME = "mdr_dataset"
@@ -258,7 +258,7 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str
258258
if parent_notice and parent_notice.rdf_manifestation and parent_notice.rdf_manifestation.object_data:
259259
rdf_content = parent_notice.rdf_manifestation.object_data
260260
sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content)
261-
result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri)
261+
result_uris = get_procedure_subjects(sparql_endpoint=sparql_endpoint)
262262
result_uris_len = len(result_uris)
263263
if result_uris_len != 1:
264264
notice_normalised_metadata = parent_notice.normalised_metadata
@@ -269,11 +269,12 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str
269269
notice_status=parent_notice.status,
270270
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)
271271
else:
272+
result_uri = result_uris[0]
272273
parent_procedure_uri = rdflib.URIRef(result_uris[0])
273274
parent_uries[parent_notice_id] = parent_procedure_uri
274-
parent_procedure_rdf_fragments = get_rdf_fragment_by_cet_uri_from_notice(notice=parent_notice,
275-
cet_uri=procedure_cet_uri)
276-
parent_new_cet = {parent_procedure_uri: parent_procedure_rdf_fragments[0]}
275+
parent_procedure_rdf_fragment = get_rdf_fragment_by_root_uri_from_notice(notice=parent_notice,
276+
root_uri=result_uri)
277+
parent_new_cet = {parent_procedure_uri: parent_procedure_rdf_fragment}
277278
register_new_cets_in_mdr(new_canonical_entities=parent_new_cet, triple_store=triple_store,
278279
mdr_dataset_name=mdr_dataset_name)
279280

@@ -282,7 +283,7 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str
282283
for child_notice in notice_families[parent_uri_key]:
283284
rdf_content = child_notice.rdf_manifestation.object_data
284285
sparql_endpoint = SPARQLStringEndpoint(rdf_content=rdf_content)
285-
result_uris = get_subjects_by_cet_uri(sparql_endpoint=sparql_endpoint, cet_uri=procedure_cet_uri)
286+
result_uris = get_procedure_subjects(sparql_endpoint=sparql_endpoint)
286287
result_uris_len = len(result_uris)
287288
if result_uris_len != 1:
288289
notice_normalised_metadata = child_notice.normalised_metadata

ted_sws/master_data_registry/services/rdf_fragment_processor.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
"""
1111
import pathlib
1212
from string import Template
13-
from typing import List, Tuple
13+
from typing import List, Tuple, Optional
1414

1515
import rdflib
1616

1717
from ted_sws.core.model.notice import Notice
1818
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLStringEndpoint
1919
from ted_sws.data_manager.adapters.triple_store import TripleStoreABC
2020
from ted_sws.master_data_registry.resources import RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH, \
21-
TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH
21+
TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH, PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH
2222

2323
RDFTriple = Tuple[rdflib.term.Node, rdflib.term.Node, rdflib.term.Node]
2424

@@ -39,6 +39,17 @@ def get_subjects_by_cet_uri(sparql_endpoint: SPARQLStringEndpoint, cet_uri: str)
3939
return query_table_result["s"].to_list()
4040

4141

42+
def get_procedure_subjects(sparql_endpoint: SPARQLStringEndpoint) -> List[str]:
43+
"""
44+
This function return a list of procedure subjects.
45+
:param sparql_endpoint:
46+
:return:
47+
"""
48+
sparql_query = PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH.read_text(encoding="utf-8")
49+
query_table_result = sparql_endpoint.with_query(sparql_query=sparql_query).fetch_tabular()
50+
return query_table_result["s"].to_list()
51+
52+
4253
def get_rdf_fragment_by_root_uri(sparql_endpoint: SPARQLStringEndpoint, root_uri: str,
4354
inject_triples: List[RDFTriple] = None) -> rdflib.Graph:
4455
"""
@@ -92,6 +103,25 @@ def get_rdf_fragments_by_cet_uri_from_file(rdf_file_path: pathlib.Path, cet_uri:
92103
rdf_content_format=rdf_file_content_format)
93104

94105

106+
def get_rdf_fragment_by_root_uri_from_notice(notice: Notice, root_uri: str) -> Optional[rdflib.Graph]:
107+
"""
108+
This function extracts from a Notice RDF content a RDFFragment dependent on a root URI.
109+
:param notice:
110+
:param root_uri:
111+
:return:
112+
"""
113+
sparql_endpoint = SPARQLStringEndpoint(rdf_content=notice.rdf_manifestation.object_data,
114+
rdf_content_format=DEFAULT_RDF_FILE_FORMAT)
115+
rdf_fragment = get_rdf_fragment_by_root_uri(sparql_endpoint=sparql_endpoint, root_uri=root_uri,
116+
inject_triples=[(rdflib.URIRef(root_uri),
117+
RDF_FRAGMENT_FROM_NOTICE_PROPERTY,
118+
rdflib.Literal(notice.ted_id))
119+
]
120+
)
121+
return rdf_fragment
122+
123+
124+
95125
def get_rdf_fragment_by_cet_uri_from_notice(notice: Notice, cet_uri: str) -> List[rdflib.Graph]:
96126
"""
97127
This function extracts from a Notice RDF content a list of RDFFragments dependent on a CET URI.

ted_sws/notice_fetcher/adapters/ted_api.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import requests
77

88
from ted_sws import config
9+
from ted_sws.event_manager.services.log import log_error
910
from ted_sws.notice_fetcher.adapters.ted_api_abc import TedAPIAdapterABC, RequestAPI
1011

1112
DEFAULT_TED_API_QUERY_RESULT_SIZE = {"pageSize": 100,
@@ -45,7 +46,8 @@ def __call__(self, api_url: str, api_query: dict) -> dict:
4546
response_content = json.loads(response.text)
4647
return response_content
4748
else:
48-
raise Exception(f"The API call failed with: {response}")
49+
raise Exception(f"The TED-API call failed with: {response}")
50+
4951

5052

5153
class TedAPIAdapter(TedAPIAdapterABC):

tests/e2e/notice_fetcher/test_ted_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ def test_ted_api_error():
2929
ted = TedAPIAdapter(request_api=TedRequestAPI())
3030
with pytest.raises(Exception) as e:
3131
ted.get_by_query(query={"q": "NDE=67623-2022"})
32-
assert str(e.value) == "The API call failed with: <Response [500]>"
32+
assert str(e.value) == "The TED-API call failed with: <Response [500]>"

tests/e2e/notice_fetcher/test_ted_request_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ def test_ted_request_api():
1111
assert isinstance(notice_by_query, dict)
1212
with pytest.raises(Exception) as e:
1313
ted_api_request(api_url=config.TED_API_URL, api_query={"q": "INCORRECT PARAMS"})
14-
assert str(e.value) == "The API call failed with: <Response [500]>"
14+
assert str(e.value) == "The TED-API call failed with: <Response [500]>"
1515

0 commit comments

Comments
 (0)