Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dags/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
"execution_timeout": timedelta(days=10),
}

BATCH_SIZE = 2000

NOTICE_NORMALISATION_PIPELINE_TASK_ID = "notice_normalisation_pipeline"
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID = "notice_transformation_pipeline"
NOTICE_DISTILLATION_PIPELINE_TASK_ID = "notice_distillation_pipeline"
Expand Down
3 changes: 2 additions & 1 deletion dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils.trigger_rule import TriggerRule

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_date_pipeline
Expand Down Expand Up @@ -36,6 +36,7 @@
schedule=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_FETCH,
timezone=DAG_DEFAULT_TIMEZONE),
start_date=datetime.today(),
tags=['selector', 'daily-fetch'],
params={
WILD_CARD_DAG_KEY: Param(
Expand Down
5 changes: 3 additions & 2 deletions dags/load_mapping_suite_in_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow.utils.trigger_rule import TriggerRule
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from dags import DEFAULT_DAG_ARGUMENTS, BATCH_SIZE
from dags.dags_utils import push_dag_downstream, pull_dag_upstream, get_dag_param
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from ted_sws import config
Expand Down Expand Up @@ -114,7 +114,8 @@ def _branch_selector():
finish_step = EmptyOperator(task_id=FINISH_LOADING_MAPPING_SUITE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)

trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID)
trigger_document_proc_pipeline = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID,
batch_size=BATCH_SIZE)
fetch_mapping_suite_package_from_github_into_mongodb() >> branch_task
trigger_document_proc_pipeline >> finish_step
branch_task >> [trigger_document_proc_pipeline, finish_step]
Expand Down
26 changes: 13 additions & 13 deletions dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

from airflow.decorators import dag
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule

Expand Down Expand Up @@ -63,37 +64,36 @@ def _selector_branch_before_publish():
return branch_selector(NOTICE_PUBLISH_PIPELINE_TASK_ID)

def _stop_processing():
notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY)
if not notice_ids:
raise Exception(f"No notice has been processed!")
pass

start_processing = BranchPythonOperator(
task_id=BRANCH_SELECTOR_TASK_ID,
python_callable=_start_processing,
trigger_rule=TriggerRule.ALWAYS
)

selector_branch_before_transformation = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
python_callable=_selector_branch_before_transformation,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

selector_branch_before_validation = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID,
python_callable=_selector_branch_before_validation,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

selector_branch_before_package = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID,
python_callable=_selector_branch_before_package,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

selector_branch_before_publish = BranchPythonOperator(
task_id=SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID,
python_callable=_selector_branch_before_publish,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

stop_processing = PythonOperator(
Expand All @@ -105,27 +105,27 @@ def _stop_processing():

notice_normalisation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_normalisation_pipeline,
task_id=NOTICE_NORMALISATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_rule=TriggerRule.ALL_SUCCESS)

notice_transformation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_transformation_pipeline,
task_id=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_rule=TriggerRule.ALL_SUCCESS)

notice_distillation_step = NoticeBatchPipelineOperator(batch_pipeline_callable=notices_batch_distillation_pipeline,
task_id=NOTICE_DISTILLATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
trigger_rule=TriggerRule.ALL_SUCCESS
)

notice_validation_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_validation_pipeline,
task_id=NOTICE_VALIDATION_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_rule=TriggerRule.ALL_SUCCESS)
notice_package_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_package_pipeline,
task_id=NOTICE_PACKAGE_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_rule=TriggerRule.ALL_SUCCESS)

notice_publish_step = NoticeBatchPipelineOperator(notice_pipeline_callable=notice_publish_pipeline,
task_id=NOTICE_PUBLISH_PIPELINE_TASK_ID,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
trigger_rule=TriggerRule.ALL_SUCCESS)

start_processing >> [notice_normalisation_step, selector_branch_before_transformation,
selector_branch_before_validation,
Expand Down
25 changes: 15 additions & 10 deletions dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from typing import Any, Protocol, List
from uuid import uuid4

from airflow.exceptions import AirflowSkipException
from airflow.models import BaseOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient

from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \
smart_xcom_push
from ted_sws.core.service.batch_processing import chunks
from dags.pipelines.pipeline_protocols import NoticePipelineCallable
from dags.pipelines.pipeline_protocols import NoticePipelineCallable, NoticePipelineOutput
from ted_sws import config
from ted_sws.core.service.batch_processing import chunks
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.event_manager.model.event_message import EventMessage, NoticeEventMessage
from ted_sws.event_manager.services.log import log_notice_error
Expand All @@ -26,7 +28,7 @@

class BatchPipelineCallable(Protocol):

def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
def __call__(self, notice_ids: List[str], mongodb_client: MongoClient) -> List[NoticePipelineOutput]:
"""
:param notice_ids:
:param mongodb_client:
Expand Down Expand Up @@ -57,11 +59,14 @@ def execute(self, context: Any):
"""
logger = get_logger()
notice_ids = smart_xcom_pull(key=NOTICE_IDS_KEY)
if not notice_ids:
if notice_ids is None:
raise Exception(f"XCOM key [{NOTICE_IDS_KEY}] is not present in context!")
if len(notice_ids) == 0:
smart_xcom_push(key=NOTICE_IDS_KEY, value=[])
raise AirflowSkipException("No notices to process!")
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
processed_notice_ids = []
processed_notices_pipeline_output: List[NoticePipelineOutput] = []
pipeline_name = DEFAULT_PIPELINE_NAME_FOR_LOGS
if self.notice_pipeline_callable:
pipeline_name = self.notice_pipeline_callable.__name__
Expand All @@ -76,7 +81,7 @@ def execute(self, context: Any):
handle_event_message_metadata_dag_context(batch_event_message, context)
batch_event_message.start_record()
if self.batch_pipeline_callable is not None:
processed_notice_ids.extend(
processed_notices_pipeline_output.extend(
self.batch_pipeline_callable(notice_ids=notice_ids, mongodb_client=mongodb_client))
elif self.notice_pipeline_callable is not None:
for notice_id in notice_ids:
Expand All @@ -89,7 +94,7 @@ def execute(self, context: Any):
if result_notice_pipeline.store_result:
notice_repository.update(notice=result_notice_pipeline.notice)
if result_notice_pipeline.processed:
processed_notice_ids.append(notice_id)
processed_notices_pipeline_output.append(result_notice_pipeline)
notice_event.end_record()
if notice.normalised_metadata:
notice_event.notice_form_number = notice.normalised_metadata.form_number
Expand All @@ -102,12 +107,12 @@ def execute(self, context: Any):
notice_form_number=notice_normalised_metadata.form_number if notice_normalised_metadata else None,
notice_status=notice.status if notice else None,
notice_eforms_subtype=notice_normalised_metadata.eforms_subtype if notice_normalised_metadata else None)
raise e

batch_event_message.end_record()
logger.info(event_message=batch_event_message)
if not processed_notice_ids:
raise Exception(f"No notice has been processed!")
smart_xcom_push(key=NOTICE_IDS_KEY, value=processed_notice_ids)
smart_xcom_push(key=NOTICE_IDS_KEY, value=[notice_pipeline_output.notice.ted_id for notice_pipeline_output in
processed_notices_pipeline_output])


class TriggerNoticeBatchPipelineOperator(BaseOperator):
Expand Down
8 changes: 6 additions & 2 deletions dags/pipelines/notice_batch_processor_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from typing import List

from pymongo import MongoClient

from dags.pipelines.pipeline_protocols import NoticePipelineOutput
from ted_sws.master_data_registry.services.entity_deduplication import deduplicate_procedure_entities

CET_URIS = ["http://www.w3.org/ns/org#Organization"]
PROCEDURE_CET_URI = "http://data.europa.eu/a4g/ontology#Procedure"


def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: MongoClient) -> List[str]:
def notices_batch_distillation_pipeline(notice_ids: List[str],
mongodb_client: MongoClient
) -> List[NoticePipelineOutput]:
"""

:param notice_ids:
Expand All @@ -29,4 +33,4 @@ def notices_batch_distillation_pipeline(notice_ids: List[str], mongodb_client: M
deduplicate_procedure_entities(notices=notices, procedure_cet_uri=PROCEDURE_CET_URI, mongodb_client=mongodb_client)
for notice in notices:
notice_repository.update(notice=notice)
return notice_ids
return [NoticePipelineOutput(notice=notice) for notice in notices]
4 changes: 2 additions & 2 deletions dags/pipelines/notice_fetcher_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
create_and_store_in_mongo_db_daily_supra_notice
from ted_sws.event_manager.services.log import log_error
notice_ids = None
try:
date_wild_card = date_wild_card if date_wild_card else (datetime.now() - timedelta(days=1)).strftime("%Y%m%d*")
notice_publication_date = datetime.strptime(date_wild_card, "%Y%m%d*").date()
Expand All @@ -24,6 +23,7 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
ted_publication_date=notice_publication_date)
except Exception as error:
log_error(message=str(error))
raise error

return notice_ids

Expand All @@ -35,7 +35,6 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]:
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.event_manager.services.log import log_error
notice_ids = None
try:
ted_api_query = {"query": query}
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
Expand All @@ -44,4 +43,5 @@ def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]:
request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query)
except Exception as error:
log_error(message=str(error))
raise error
return notice_ids
2 changes: 2 additions & 0 deletions ted_sws/mapping_suite_processor/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class MappingSuiteProcessorServiceError(Exception):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.event_manager.services.log import log_mapping_suite_info, log_mapping_suite_error
from ted_sws.mapping_suite_processor.adapters.github_package_downloader import GitHubMappingSuitePackageDownloader
from ted_sws.mapping_suite_processor.services import MappingSuiteProcessorServiceError
from ted_sws.mapping_suite_processor.services.mapping_suite_digest_service import \
update_digest_api_address_for_mapping_suite
from ted_sws.mapping_suite_processor.services.mapping_suite_validation_service import validate_mapping_suite, \
Expand Down Expand Up @@ -98,9 +99,11 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong
validation_result = validate_mapping_suite(mapping_suite_path=mapping_suite_package_path)
mapping_suite_id = get_mapping_suite_id_from_file_system(mapping_suite_path=mapping_suite_package_path)
if mapping_suite_id is None:
error_msg = "Invalid mapping suite metadata, can't read mapping suite identifier!"
log_mapping_suite_error(
message="Invalid mapping suite metadata, can't read mapping suite identifier!",
message=error_msg,
mapping_suite_id=MAPPING_SUITE_UNKNOWN_ID)
raise MappingSuiteProcessorServiceError(error_msg)
elif validation_result:
log_mapping_suite_info(
message=f"Mapping suite with id={mapping_suite_id} is valid for loading in MongoDB!",
Expand All @@ -115,8 +118,10 @@ def mapping_suite_processor_from_github_expand_and_load_package_in_mongo_db(mong
message=f"Mapping suite with id={mapping_suite_id} loaded with success in MongoDB!",
mapping_suite_id=mapping_suite_id)
else:
error_msg = f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!"
log_mapping_suite_error(
message=f"Mapping suite with id={mapping_suite_id} is invalid for loading in MongoDB!",
message=error_msg,
mapping_suite_id=mapping_suite_id)
raise MappingSuiteProcessorServiceError(error_msg)

return result_notice_ids
9 changes: 4 additions & 5 deletions ted_sws/notice_fetcher/adapters/ted_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,10 @@ def __call__(self, api_url: str, api_query: dict) -> dict:
headers = get_configured_custom_headers(CUSTOM_HEADER)
response = execute_request_with_retries(
request_lambda=lambda: requests.post(api_url, json=api_query, headers=headers))
if response.ok:
response_content = json.loads(response.text)
return response_content
else:
raise Exception(f"The TED-API call failed with: {response}")
response.raise_for_status()
response_content = json.loads(response.text)
return response_content



class TedAPIAdapter(TedAPIAdapterABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_notice_processor_pipelines(fake_mongodb_client):
notice_repository.update(notice=notice)
result_list = notices_batch_distillation_pipeline(notice_ids=[notice_id], mongodb_client=fake_mongodb_client)
assert len(result_list) == 1
notice_id = result_list[0]
notice_id = result_list[0].notice.ted_id
notice = notice_repository.get(reference=notice_id)
pipelines = [notice_validation_pipeline, notice_package_pipeline, notice_publish_pipeline]
notice_states = [NoticeStatus.DISTILLED, NoticeStatus.VALIDATED, NoticeStatus.PACKAGED, NoticeStatus.PUBLISHED]
Expand Down
3 changes: 2 additions & 1 deletion tests/e2e/notice_fetcher/test_ted_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_ted_api():

def test_ted_api_error():
ted = TedAPIAdapter(request_api=TedRequestAPI())
response_status = "400"
with pytest.raises(Exception) as e:
ted.get_by_query(query={"query": "NDE=67623-2022"})
assert str(e.value) == "The TED-API call failed with: <Response [400]>"
assert response_status in str(e.value)
3 changes: 2 additions & 1 deletion tests/e2e/notice_fetcher/test_ted_request_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_ted_request_api():
notice_by_query = ted_api_request(api_url=config.TED_API_URL, api_query=api_query)
assert notice_by_query
assert isinstance(notice_by_query, dict)
response_code = "400"
with pytest.raises(Exception) as e:
ted_api_request(api_url=config.TED_API_URL, api_query={"query": "INCORRECT PARAMS"})
assert str(e.value) == "The TED-API call failed with: <Response [400]>"
assert response_code in str(e.value)
2 changes: 1 addition & 1 deletion tests/unit/dags/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from airflow.utils.db import resetdb, initdb

from dags.daily_materialized_views_update import DAILY_MATERIALISED_VIEWS_DAG_NAME
from dags.fetch_notices_by_date import FETCHER_DAG_NAME
from dags.fetch_notices_by_date import DAG_ID as FETCHER_DAG_NAME
from tests import AIRFLOW_DAG_FOLDER


Expand Down
Loading