Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dags/daily_check_notices_availability_in_cellar.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
validate_notices_availability_in_cellar

DAG_NAME = "daily_check_notices_availability_in_cellar"
DAG_ID = "daily_check_notices_availability_in_cellar"
DAG_NAME = "Daily check notices availability in Cellar"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
catchup=False,
dag_display_name=DAG_NAME,
dag_id=DAG_ID,
schedule_interval="0 0 * * *",
tags=['daily', 'validation'])
def daily_check_notices_availability_in_cellar():
Expand Down
6 changes: 4 additions & 2 deletions dags/daily_materialized_views_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, create_notice_kpi_collection

DAILY_MATERIALISED_VIEWS_DAG_NAME = "daily_materialized_views_update"
DAG_ID = "daily_materialized_views_update"
DAG_NAME = "Materialized views update"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAILY_MATERIALISED_VIEWS_DAG_NAME,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_MATERIALIZED_VIEW_UPDATE,
Expand Down
8 changes: 5 additions & 3 deletions dags/fetch_notices_by_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
EventMessageProcessType
from ted_sws.event_manager.services.log import log_error

FETCHER_DAG_NAME = "fetch_notices_by_date"
DAG_ID = "fetch_notices_by_date"
DAG_NAME = "Fetch notices by date"
BATCH_SIZE = 2000
WILD_CARD_DAG_KEY = "wild_card"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
Expand All @@ -29,7 +30,8 @@


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=FETCHER_DAG_NAME,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
catchup=False,
timetable=CronTriggerTimetable(
cron=config.SCHEDULE_DAG_FETCH,
Expand Down Expand Up @@ -58,7 +60,7 @@ def fetch_notices_by_date():
@event_log(TechnicalEventMessage(
message="fetch_notice_from_ted",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=FETCHER_DAG_NAME
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
))
)
def fetch_by_date_notice_from_ted():
Expand Down
10 changes: 6 additions & 4 deletions dags/fetch_notices_by_date_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from dags.fetch_notices_by_date import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, \
FETCHER_DAG_NAME as FETCH_NOTICES_BY_DATE_DAG_NAME
DAG_ID as FETCH_NOTICES_BY_DATE_DAG_NAME
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "fetch_notices_by_date_range"
DAG_ID = "fetch_notices_by_date_range"
DAG_NAME = "Fetch notices by date range"

START_DATE_KEY = "start_date"
END_DATE_KEY = "end_date"
Expand All @@ -34,7 +35,8 @@ def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> li
until=datetime.strptime(end_date, '%Y-%m-%d'))]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, dag_id=DAG_ID, dag_display_name=DAG_NAME,
tags=['master'],
params={
START_DATE_KEY: Param(
default=f"{date.today()}",
Expand Down Expand Up @@ -66,7 +68,7 @@ def fetch_notices_by_date_range():
@event_log(TechnicalEventMessage(
message="trigger_fetch_notices_workers_for_date_range",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
))
)
def trigger_notice_by_date_for_each_date_in_range():
Expand Down
7 changes: 5 additions & 2 deletions dags/fetch_notices_by_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType

DAG_NAME = "fetch_notices_by_query"
DAG_ID = "fetch_notices_by_query"
DAG_NAME = "Fetch notices by query"
BATCH_SIZE = 2000
QUERY_DAG_KEY = "query"
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY = "trigger_complete_workflow"
Expand All @@ -23,6 +24,8 @@

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
dag_display_name=DAG_NAME,
dag_id=DAG_ID,
tags=['fetch'],
params={
QUERY_DAG_KEY: Param(
Expand All @@ -46,7 +49,7 @@ def fetch_notices_by_query():
@event_log(TechnicalEventMessage(
message="fetch_by_query_notice_from_ted",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
process_type=EventMessageProcessType.DAG, process_name=DAG_ID
))
)
def fetch_by_query_notice_from_ted():
Expand Down
5 changes: 4 additions & 1 deletion dags/load_mapping_suite_in_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
FINISH_LOADING_MAPPING_SUITE_TASK_ID = "finish_loading_mapping_suite"
TRIGGER_DOCUMENT_PROC_PIPELINE_TASK_ID = "trigger_document_proc_pipeline"
CHECK_IF_LOAD_TEST_DATA_TASK_ID = "check_if_load_test_data"

DAG_ID = "load_mapping_suite_in_database"
DAG_NAME = "Load mapping suite"

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
tags=['fetch', 'mapping-suite', 'github'],
params={
GITHUB_REPOSITORY_URL_DAG_PARAM_KEY: Param(
Expand Down
5 changes: 4 additions & 1 deletion dags/load_notices_in_fuseki.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
FUSEKI_DATASET_NAME_DAG_PARAM_KEY = "fuseki_dataset_name"
NOTICE_STATUS_DAG_PARAM_KEY = "notice_status"
DEFAULT_FUSEKI_DATASET_NAME = "mdr_dataset"

DAG_ID = "load_notices_in_fuseki"
DAG_NAME = "Load notices in Fuseki"

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
dag_display_name=DAG_NAME,
dag_id=DAG_ID,
tags=['load', 'notices', 'fuseki'])
def load_notices_in_fuseki():
@task
Expand Down
5 changes: 4 additions & 1 deletion dags/notice_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
SELECTOR_BRANCH_BEFORE_VALIDATION_TASK_ID = "switch_to_validation"
SELECTOR_BRANCH_BEFORE_PACKAGE_TASK_ID = "switch_to_package"
SELECTOR_BRANCH_BEFORE_PUBLISH_TASK_ID = "switch_to_publish"
DAG_NAME = "notice_processing_pipeline"
DAG_ID = "notice_processing_pipeline"
DAG_NAME = "Notice processing pipeline"

BRANCH_SELECTOR_MAP = {NOTICE_NORMALISATION_PIPELINE_TASK_ID: NOTICE_NORMALISATION_PIPELINE_TASK_ID,
NOTICE_TRANSFORMATION_PIPELINE_TASK_ID: SELECTOR_BRANCH_BEFORE_TRANSFORMATION_TASK_ID,
Expand All @@ -46,6 +47,8 @@ def branch_selector(result_branch: str, xcom_forward_keys: List[str] = [NOTICE_I

@dag(default_args=DEFAULT_DAG_ARGUMENTS,
schedule_interval=None,
dag_display_name=DAG_NAME,
dag_id=DAG_ID,
max_active_runs=256,
max_active_tasks=256,
tags=['worker', 'pipeline'])
Expand Down
56 changes: 56 additions & 0 deletions dags/reprocess_notices_from_backlog_by_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from airflow.decorators import dag, task
from airflow.models import Param

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType

DAG_ID = "reprocess_notices_by_id_from_backlog"
DAG_NAME = "Reprocess notices from backlog by ID"

NOTICE_IDS_DAG_PARAM = "notice_ids"
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"

@dag(
default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
schedule_interval=None,
tags=["selector", "re-transform"],
params={
NOTICE_IDS_DAG_PARAM: Param(
type="array",
title="Notice IDs",
description="Required. List of TED Notice IDs to reprocess. Each value should be entered on a new line. Example: [\"123456-2022\", \"456789-2023\"]. Every ID value should be entered on a newline"
)
},
description=DAG_NAME
)
def reprocess_notices_by_id_from_backlog():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_reprocess_by_id",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG,
process_name=DAG_ID
))
)
def select_notice_ids():
notice_ids = get_dag_param(key=NOTICE_IDS_DAG_PARAM, raise_error=True)

if not notice_ids:
raise Exception("No notice IDs provided.")

push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID
)

select_notice_ids() >> trigger_notice_process_workflow

dag = reprocess_notices_by_id_from_backlog()
72 changes: 72 additions & 0 deletions dags/reprocess_notices_from_backlog_by_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from airflow.decorators import dag, task
from airflow.models import Param

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import push_dag_downstream, get_dag_param
from dags.notice_processing_pipeline import NOTICE_NORMALISATION_PIPELINE_TASK_ID
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, EventMessageProcessType

DAG_ID = "reprocess_notices_from_backlog_by_status"
DAG_NAME = "Reprocess notices from backlog by status"

TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
START_DATE_DAG_PARAM = "start_date"
END_DATE_DAG_PARAM = "end_date"
NOTICE_STATUSES_DAG_PARAM = "notice_statuses"


@dag(
default_args=DEFAULT_DAG_ARGUMENTS,
dag_id=DAG_ID,
dag_display_name=DAG_NAME,
schedule_interval=None,
tags=['selector', 're-transform'],
params={

NOTICE_STATUSES_DAG_PARAM: Param(
default=[],
type="array",
title="Notice Statuses",
description="Required. Select one or more notice statuses to reprocess.",
examples=[status.name for status in NoticeStatus]
),
START_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="Start publication date (YYYY-MM-DD)"),
END_DATE_DAG_PARAM: Param(default="", type=["null", "string"], format="date", description="End publication date (YYYY-MM-DD)")
}
)
def reprocess_notices_from_backlog_by_status():
@task
@event_log(TechnicalEventMessage(
message="select_notices_for_re_transform",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG,
process_name=DAG_ID
))
)
def select_notices_for_re_transform():
start_date = get_dag_param(key=START_DATE_DAG_PARAM, default_value="")
end_date = get_dag_param(key=END_DATE_DAG_PARAM,default_value="")
statuses_param = get_dag_param(key=NOTICE_STATUSES_DAG_PARAM)

notice_statuses = [NoticeStatus[status_str] for status_str in statuses_param]

notice_ids = notice_ids_selector_by_status(
notice_statuses=notice_statuses,
start_date=start_date,
end_date=end_date
)

push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)

trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
start_with_step_name=NOTICE_NORMALISATION_PIPELINE_TASK_ID
)

select_notices_for_re_transform() >> trigger_notice_process_workflow

dag = reprocess_notices_from_backlog_by_status()
52 changes: 0 additions & 52 deletions dags/reprocess_published_in_cellar_notices.py

This file was deleted.

Loading
Loading