|
| 1 | +from pathlib import Path |
| 2 | +from typing import List, Union, Set, Dict |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +from jinja2 import Environment, PackageLoader |
| 6 | +from pymongo import MongoClient |
| 7 | + |
| 8 | +from ted_sws.core.model.manifestation import XPATHCoverageValidationReport, XPATHCoverageValidationAssertion, \ |
| 9 | + XPATHCoverageValidationResult |
| 10 | +from ted_sws.core.model.notice import Notice |
| 11 | +from ted_sws.core.model.transform import ConceptualMapping |
| 12 | +from ted_sws.data_sampler.services.notice_xml_indexer import index_notice, get_unique_xpaths_covered_by_notices |
| 13 | +from ted_sws.mapping_suite_processor.services.conceptual_mapping_reader import mapping_suite_read_metadata, \ |
| 14 | + mapping_suite_read_conceptual_mapping |
| 15 | +from ted_sws.notice_validator import BASE_XPATH_FIELD |
| 16 | +from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB |
| 17 | + |
| 18 | +TEMPLATES = Environment(loader=PackageLoader("ted_sws.notice_validator.resources", "templates")) |
| 19 | +XPATH_COVERAGE_REPORT_TEMPLATE = "xpath_coverage_report.jinja2" |
| 20 | + |
| 21 | +PATH_TYPE = Union[str, Path] |
| 22 | +XPATH_TYPE = Dict[str, List[str]] |
| 23 | + |
| 24 | + |
| 25 | +class CoverageRunner: |
| 26 | + """ |
| 27 | + Runs coverage measurement of the XML notice |
| 28 | + """ |
| 29 | + |
| 30 | + conceptual_xpaths: Set[str] = set() |
| 31 | + conceptual_xpath_names: Dict[str, str] = {} |
| 32 | + mongodb_client: MongoClient |
| 33 | + base_xpath: str |
| 34 | + mapping_suite_id: str |
| 35 | + |
| 36 | + def __init__(self, mapping_suite_id: str, conceptual_mappings_file_path: PATH_TYPE = None, xslt_transformer=None, |
| 37 | + mongodb_client: MongoClient = None): |
| 38 | + self.mapping_suite_id = mapping_suite_id |
| 39 | + self.mongodb_client = mongodb_client |
| 40 | + self.xslt_transformer = xslt_transformer |
| 41 | + |
| 42 | + conceptual_mapping: ConceptualMapping |
| 43 | + if self._db_readable(): |
| 44 | + mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client) |
| 45 | + mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id) |
| 46 | + if mapping_suite is None: |
| 47 | + raise ValueError(f'Mapping suite, with {mapping_suite_id} id, was not found') |
| 48 | + conceptual_mapping: ConceptualMapping = mapping_suite.conceptual_mapping |
| 49 | + else: |
| 50 | + conceptual_mapping = mapping_suite_read_conceptual_mapping(Path(conceptual_mappings_file_path)) |
| 51 | + |
| 52 | + for cm_xpath in conceptual_mapping.xpaths: |
| 53 | + self.conceptual_xpaths.add(cm_xpath.xpath) |
| 54 | + self.conceptual_xpath_names[cm_xpath.xpath] = cm_xpath.name |
| 55 | + |
| 56 | + self.base_xpath = conceptual_mapping.metadata.base_xpath |
| 57 | + |
| 58 | + def _db_readable(self) -> bool: |
| 59 | + return self.mongodb_client is not None |
| 60 | + |
| 61 | + @classmethod |
| 62 | + def find_notice_by_xpath(cls, notice_xpaths: XPATH_TYPE, xpath: str) -> Dict[str, int]: |
| 63 | + notice_hit: Dict[str, int] = {k: v.count(xpath) for k, v in sorted(notice_xpaths.items()) if xpath in v} |
| 64 | + return notice_hit |
| 65 | + |
| 66 | + def xpath_assertions(self, notice_xpaths: XPATH_TYPE, |
| 67 | + xpaths_list: List[str]) -> List[XPATHCoverageValidationAssertion]: |
| 68 | + xpath_assertions = [] |
| 69 | + for xpath in self.conceptual_xpaths: |
| 70 | + xpath_assertion = XPATHCoverageValidationAssertion() |
| 71 | + title = self.conceptual_xpath_names[xpath] |
| 72 | + xpath_assertion.title = title if title is not np.nan else '' |
| 73 | + xpath_assertion.xpath = xpath |
| 74 | + xpath_assertion.count = xpaths_list.count(xpath) |
| 75 | + xpath_assertion.notice_hit = self.find_notice_by_xpath(notice_xpaths, xpath) |
| 76 | + xpath_assertion.query_result = xpath_assertion.count > 0 |
| 77 | + xpath_assertions.append(xpath_assertion) |
| 78 | + return xpath_assertions |
| 79 | + |
| 80 | + def validate_xpath_coverage_report(self, report: XPATHCoverageValidationReport, notice_xpaths: XPATH_TYPE, |
| 81 | + xpaths_list: List[str], notice_id: List[str]): |
| 82 | + unique_notice_xpaths: Set[str] = set(xpaths_list) |
| 83 | + |
| 84 | + validation_result: XPATHCoverageValidationResult = XPATHCoverageValidationResult() |
| 85 | + validation_result.notice_id = notice_id |
| 86 | + validation_result.xpath_assertions = self.xpath_assertions(notice_xpaths, xpaths_list) |
| 87 | + validation_result.xpath_covered = list(self.conceptual_xpaths & unique_notice_xpaths) |
| 88 | + validation_result.xpath_not_covered = list(unique_notice_xpaths - self.conceptual_xpaths) |
| 89 | + validation_result.xpath_extra = list(self.conceptual_xpaths - unique_notice_xpaths) |
| 90 | + unique_notice_xpaths_len = len(unique_notice_xpaths) |
| 91 | + xpath_covered_len = len(validation_result.xpath_covered) |
| 92 | + conceptual_xpaths_len = len(self.conceptual_xpaths) |
| 93 | + if unique_notice_xpaths_len: |
| 94 | + validation_result.coverage = xpath_covered_len / unique_notice_xpaths_len |
| 95 | + if conceptual_xpaths_len: |
| 96 | + validation_result.conceptual_coverage = xpath_covered_len / conceptual_xpaths_len |
| 97 | + |
| 98 | + report.validation_result = validation_result |
| 99 | + |
| 100 | + @classmethod |
| 101 | + def based_xpaths(cls, xpaths: List[str], base_xpath: str) -> List[str]: |
| 102 | + """ |
| 103 | +
|
| 104 | + :param xpaths: |
| 105 | + :param base_xpath: |
| 106 | + :return: |
| 107 | + """ |
| 108 | + base_xpath += "/" |
| 109 | + return list(filter(lambda xpath: xpath.startswith(base_xpath), xpaths)) |
| 110 | + |
| 111 | + def coverage_notice_xpath(self, notices: List[Notice], mapping_suite_id) -> XPATHCoverageValidationReport: |
| 112 | + report: XPATHCoverageValidationReport = XPATHCoverageValidationReport( |
| 113 | + object_data="XPATHCoverageValidationReport", |
| 114 | + mapping_suite_identifier=mapping_suite_id) |
| 115 | + |
| 116 | + notice_id = [] |
| 117 | + |
| 118 | + notice_xpaths: XPATH_TYPE = {} |
| 119 | + xpaths_list: List[str] = [] |
| 120 | + for notice in notices: |
| 121 | + xpaths: List[str] = [] |
| 122 | + notice_id.append(notice.ted_id) |
| 123 | + if self._db_readable(): |
| 124 | + xpaths = get_unique_xpaths_covered_by_notices([notice.ted_id], self.mongodb_client) |
| 125 | + else: |
| 126 | + notice = index_notice(notice, self.xslt_transformer) |
| 127 | + |
| 128 | + if notice.xml_metadata and notice.xml_metadata.unique_xpaths: |
| 129 | + xpaths = notice.xml_metadata.unique_xpaths |
| 130 | + |
| 131 | + notice_xpaths[notice.ted_id] = self.based_xpaths(xpaths, self.base_xpath) |
| 132 | + xpaths_list += notice_xpaths[notice.ted_id] |
| 133 | + |
| 134 | + self.validate_xpath_coverage_report(report, notice_xpaths, xpaths_list, sorted(notice_id)) |
| 135 | + |
| 136 | + return report |
| 137 | + |
| 138 | + @classmethod |
| 139 | + def json_report(cls, report: XPATHCoverageValidationReport) -> dict: |
| 140 | + return report.dict() |
| 141 | + |
| 142 | + @classmethod |
| 143 | + def html_report(cls, report: XPATHCoverageValidationReport) -> str: |
| 144 | + data: dict = cls.json_report(report) |
| 145 | + html_report = TEMPLATES.get_template(XPATH_COVERAGE_REPORT_TEMPLATE).render(data) |
| 146 | + return html_report |
0 commit comments