Skip to content

Commit 27a6897

Browse files
add fetch_notices_by_query DAG
1 parent e01b55c commit 27a6897

2 files changed

Lines changed: 84 additions & 0 deletions

File tree

dags/fetch_notices_by_query.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from airflow.decorators import dag, task
2+
from airflow.operators.dummy import DummyOperator
3+
from airflow.operators.python import BranchPythonOperator
4+
from airflow.utils.trigger_rule import TriggerRule
5+
from dags import DEFAULT_DAG_ARGUMENTS
6+
from dags.dags_utils import get_dag_param, push_dag_downstream, pull_dag_upstream
7+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
8+
from dags.pipelines.notice_fetcher_pipelines import notice_fetcher_by_query_pipeline
9+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
10+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
11+
EventMessageProcessType
12+
13+
DAG_NAME = "fetch_notices_by_query"
14+
BATCH_SIZE = 2000
15+
QUERY_DAG_KEY = "query"
16+
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
17+
TRIGGER_PARTIAL_WORKFLOW_TASK_ID = "trigger_partial_notice_proc_workflow"
18+
TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "trigger_complete_notice_proc_workflow"
19+
CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID = "check_if_trigger_complete_workflow"
20+
FINISH_FETCH_BY_DATE_TASK_ID = "finish_fetch_by_query"
21+
22+
23+
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
24+
catchup=False,
25+
tags=['fetch'])
26+
def fetch_notices_by_query():
27+
@task
28+
@event_log(TechnicalEventMessage(
29+
message="fetch_by_query_notice_from_ted",
30+
metadata=EventMessageMetadata(
31+
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
32+
))
33+
)
34+
def fetch_by_query_notice_from_ted():
35+
notice_ids = notice_fetcher_by_query_pipeline(query=get_dag_param(key=QUERY_DAG_KEY, raise_error=True))
36+
if not notice_ids:
37+
raise Exception("No notices has been fetched!")
38+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
39+
40+
trigger_complete_workflow = TriggerNoticeBatchPipelineOperator(task_id=TRIGGER_COMPLETE_WORKFLOW_TASK_ID,
41+
execute_only_one_step=False
42+
)
43+
trigger_normalisation_workflow = TriggerNoticeBatchPipelineOperator(
44+
task_id=TRIGGER_PARTIAL_WORKFLOW_TASK_ID,
45+
batch_size=BATCH_SIZE,
46+
execute_only_one_step=True)
47+
48+
def _branch_selector():
49+
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
50+
default_value=True)
51+
push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY))
52+
if trigger_complete_workflow:
53+
return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID]
54+
return [TRIGGER_PARTIAL_WORKFLOW_TASK_ID]
55+
56+
branch_task = BranchPythonOperator(
57+
task_id=CHECK_IF_TRIGGER_COMPLETE_WORKFLOW_TASK_ID,
58+
python_callable=_branch_selector,
59+
)
60+
61+
finish_step = DummyOperator(task_id=FINISH_FETCH_BY_DATE_TASK_ID,
62+
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
63+
64+
fetch_by_query_notice_from_ted() >> branch_task >> [trigger_normalisation_workflow,
65+
trigger_complete_workflow] >> finish_step
66+
67+
68+
dag = fetch_notices_by_query()

dags/pipelines/notice_fetcher_pipelines.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from datetime import datetime, timedelta
22
from typing import List
33

4+
45
def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
56
from pymongo import MongoClient
67
from ted_sws import config
@@ -21,3 +22,18 @@ def notice_fetcher_by_date_pipeline(date_wild_card: str = None) -> List[str]:
2122
notice_fetched_date=notice_publication_date)
2223

2324
return notice_ids
25+
26+
27+
def notice_fetcher_by_query_pipeline(query: str = None) -> List[str]:
28+
from pymongo import MongoClient
29+
from ted_sws import config
30+
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
31+
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
32+
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
33+
34+
ted_api_query = {"q": query}
35+
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
36+
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
37+
ted_api_adapter=TedAPIAdapter(
38+
request_api=TedRequestAPI())).fetch_notices_by_query(query=ted_api_query)
39+
return notice_ids

0 commit comments

Comments
 (0)