Skip to content

Commit a276070

Browse files
Merge pull request #476 from OP-TED/feature/TED-1365
Feature/ted 1365
2 parents 91e7b17 + 865c859 commit a276070

2 files changed

Lines changed: 13 additions & 5 deletions

File tree

dags/fetch_notices_by_date.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ted_sws.event_manager.adapters.event_log_decorator import event_log
1414
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
1515
EventMessageProcessType
16+
from ted_sws.event_manager.services.log import log_error
1617

1718
DAG_NAME = "fetch_notices_by_date"
1819
BATCH_SIZE = 2000
@@ -40,8 +41,9 @@ def fetch_notices_by_date():
4041
def fetch_by_date_notice_from_ted():
4142
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY))
4243
if not notice_ids:
43-
raise Exception("No notices has been fetched!")
44-
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
44+
log_error("No notices has been fetched!")
45+
else:
46+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
4547

4648
trigger_complete_workflow = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_COMPLETE_WORKFLOW_TASK_ID,
4749
execute_only_one_step=False
@@ -67,9 +69,13 @@ def validate_fetched_notices():
6769
validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client)
6870

6971
def _branch_selector():
72+
notice_ids = pull_dag_upstream(key=NOTICE_IDS_KEY)
73+
if not notice_ids:
74+
return [FINISH_FETCH_BY_DATE_TASK_ID]
75+
7076
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
7177
default_value=True)
72-
push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY))
78+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
7379
if trigger_complete_workflow:
7480
return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID]
7581
return [TRIGGER_PARTIAL_WORKFLOW_TASK_ID]
@@ -91,5 +97,7 @@ def _branch_selector():
9197
fetch_by_date_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,
9298
trigger_complete_workflow] >> validate_fetched_notices_step >> finish_step
9399

100+
branch_task >> finish_step
101+
94102

95103
dag = fetch_notices_by_date()

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Project dependecies
22
pydantic~=1.9.0
3-
requests~=2.28.1
3+
requests~=2.28.2
44
deepdiff~=5.7.0
55
jinja2~=3.1.2
66
python-dotenv~=0.19.2
@@ -25,4 +25,4 @@ ordered-set~=4.0.2
2525
json2html~=1.3.0
2626
minio~=7.1.1
2727
certifi~=2022.12.7
28-
shortuuid~=1.0.11
28+
shortuuid~=1.0.11

0 commit comments

Comments
 (0)