11import pathlib
22import tempfile
33from io import StringIO
4- from typing import List , Set , Tuple , Dict
4+ from typing import List , Tuple , Dict
55import rdflib
66from pymongo import MongoClient
77from rdflib import RDF , URIRef , OWL
1212from ted_sws .core .model .notice import Notice
1313from ted_sws .data_manager .adapters .notice_repository import NoticeRepository
1414from ted_sws .data_manager .adapters .sparql_endpoint import SPARQLStringEndpoint
15- from ted_sws .data_manager .adapters .triple_store import FusekiAdapter , TripleStoreABC , FusekiException , \
15+ from ted_sws .data_manager .adapters .triple_store import FusekiAdapter , TripleStoreABC , \
1616 FUSEKI_REPOSITORY_ALREADY_EXIST_ERROR_MSG
17- from ted_sws .event_manager .services .log import log_error
17+ from ted_sws .event_manager .services .log import log_error , log_notice_error
1818from ted_sws .master_data_registry .services .rdf_fragment_processor import get_rdf_fragments_by_cet_uri_from_notices , \
1919 merge_rdf_fragments_into_graph , write_rdf_fragments_in_triple_store , RDF_FRAGMENT_FROM_NOTICE_PROPERTY , \
20- get_subjects_by_cet_uri
20+ get_subjects_by_cet_uri , get_rdf_fragment_by_cet_uri_from_notice
2121
2222MDR_TEMPORARY_FUSEKI_DATASET_NAME = "tmp_mdr_dataset"
2323MDR_FUSEKI_DATASET_NAME = "mdr_dataset"
2424MDR_CANONICAL_CET_PROPERTY = rdflib .term .URIRef ("http://www.meaningfy.ws/mdr#isCanonicalEntity" )
25+ DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION = "deduplicate_procedure_entities"
2526
2627
2728def generate_mdr_alignment_links (merged_rdf_fragments : rdflib .Graph , cet_uri : str ,
@@ -225,12 +226,14 @@ def deduplicate_entities_by_cet_uri(notices: List[Notice], cet_uri: str,
225226 alignment_graph = cet_alignment_links , inject_reflexive_links = True )
226227
227228
228- def deduplicate_procedure_entities (notices : List [Notice ], procedure_cet_uri : str , mongodb_client : MongoClient ):
229+ def deduplicate_procedure_entities (notices : List [Notice ], procedure_cet_uri : str , mongodb_client : MongoClient ,
230+ mdr_dataset_name : str = MDR_FUSEKI_DATASET_NAME ):
229231 """
230232 This function deduplicate procedure entities for each notice from batch of notices.
231233 :param notices:
232234 :param procedure_cet_uri:
233235 :param mongodb_client:
236+ :param mdr_dataset_name:
234237 :return:
235238 """
236239 notice_families = defaultdict (list )
@@ -242,26 +245,57 @@ def deduplicate_procedure_entities(notices: List[Notice], procedure_cet_uri: str
242245
243246 parent_uries = {}
244247 notice_repository = NoticeRepository (mongodb_client = mongodb_client )
248+ triple_store = FusekiAdapter ()
249+ if mdr_dataset_name not in triple_store .list_repositories ():
250+ try :
251+ triple_store .create_repository (repository_name = mdr_dataset_name )
252+ except Exception as exception :
253+ if str (exception ) != FUSEKI_REPOSITORY_ALREADY_EXIST_ERROR_MSG :
254+ log_error (message = str (exception ))
255+
245256 for parent_notice_id in notice_families .keys ():
246257 parent_notice = notice_repository .get (reference = parent_notice_id )
247258 if parent_notice and parent_notice .rdf_manifestation and parent_notice .rdf_manifestation .object_data :
248259 rdf_content = parent_notice .rdf_manifestation .object_data
249260 sparql_endpoint = SPARQLStringEndpoint (rdf_content = rdf_content )
250261 result_uris = get_subjects_by_cet_uri (sparql_endpoint = sparql_endpoint , cet_uri = procedure_cet_uri )
251- assert len (result_uris ) == 1
252- parent_procedure_uri = rdflib .URIRef (result_uris [0 ])
253- parent_uries [parent_notice_id ] = parent_procedure_uri
262+ result_uris_len = len (result_uris )
263+ if result_uris_len != 1 :
264+ notice_normalised_metadata = parent_notice .normalised_metadata
265+ log_notice_error (
266+ message = f"Parent notice with notice_id=[{ parent_notice .ted_id } ] have { result_uris_len } Procedure CETs!" ,
267+ notice_id = parent_notice .ted_id , domain_action = DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION ,
268+ notice_form_number = notice_normalised_metadata .form_number if notice_normalised_metadata else None ,
269+ notice_status = parent_notice .status ,
270+ notice_eforms_subtype = notice_normalised_metadata .eforms_subtype if notice_normalised_metadata else None )
271+ else :
272+ parent_procedure_uri = rdflib .URIRef (result_uris [0 ])
273+ parent_uries [parent_notice_id ] = parent_procedure_uri
274+ parent_procedure_rdf_fragments = get_rdf_fragment_by_cet_uri_from_notice (notice = parent_notice ,
275+ cet_uri = procedure_cet_uri )
276+ parent_new_cet = {parent_procedure_uri : parent_procedure_rdf_fragments [0 ]}
277+ register_new_cets_in_mdr (new_canonical_entities = parent_new_cet , triple_store = triple_store ,
278+ mdr_dataset_name = mdr_dataset_name )
254279
255280 for parent_uri_key in parent_uries .keys ():
256281 parent_uri = parent_uries [parent_uri_key ]
257282 for child_notice in notice_families [parent_uri_key ]:
258283 rdf_content = child_notice .rdf_manifestation .object_data
259284 sparql_endpoint = SPARQLStringEndpoint (rdf_content = rdf_content )
260285 result_uris = get_subjects_by_cet_uri (sparql_endpoint = sparql_endpoint , cet_uri = procedure_cet_uri )
261- assert len (result_uris ) == 1
262- child_procedure_uri = rdflib .URIRef (result_uris [0 ])
263- inject_links = rdflib .Graph ()
264- inject_links .add ((child_procedure_uri , OWL .sameAs , parent_uri ))
265- child_notice .distilled_rdf_manifestation .object_data = '\n ' .join (
266- [child_notice .distilled_rdf_manifestation .object_data ,
267- str (inject_links .serialize (format = "nt" ))])
286+ result_uris_len = len (result_uris )
287+ if result_uris_len != 1 :
288+ notice_normalised_metadata = child_notice .normalised_metadata
289+ log_notice_error (
290+ message = f"Child notice with notice_id=[{ child_notice .ted_id } ] have { result_uris_len } Procedure CETs!" ,
291+ notice_id = child_notice .ted_id , domain_action = DEDUPLICATE_PROCEDURE_ENTITIES_DOMAIN_ACTION ,
292+ notice_form_number = notice_normalised_metadata .form_number if notice_normalised_metadata else None ,
293+ notice_status = child_notice .status ,
294+ notice_eforms_subtype = notice_normalised_metadata .eforms_subtype if notice_normalised_metadata else None )
295+ else :
296+ child_procedure_uri = rdflib .URIRef (result_uris [0 ])
297+ inject_links = rdflib .Graph ()
298+ inject_links .add ((child_procedure_uri , OWL .sameAs , parent_uri ))
299+ child_notice .distilled_rdf_manifestation .object_data = '\n ' .join (
300+ [child_notice .distilled_rdf_manifestation .object_data ,
301+ str (inject_links .serialize (format = "nt" ))])
0 commit comments