Skip to content

Commit 40e28a7

Browse files
add reprocess published in cellar notices feature
- add update mets package if mets package already was publicly available - add published_in_cellar_counter in notice normalised metadata - add new DAG for reprocess published in cellar notices
1 parent 865c859 commit 40e28a7

4 files changed

Lines changed: 61 additions & 1 deletion

File tree

dags/pipelines/notice_processor_pipelines.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from dags.pipelines.pipeline_protocols import NoticePipelineOutput
33
from ted_sws.core.model.notice import Notice, NoticeStatus
44
from ted_sws.event_manager.services.log import log_notice_error
5+
from ted_sws.notice_packager.model.metadata import METS_TYPE_UPDATE
56
from ted_sws.notice_validator.services.entity_deduplication_validation import \
67
generate_rdf_manifestation_entity_deduplication_report
78

@@ -88,11 +89,15 @@ def notice_package_pipeline(notice: Notice, mongodb_client: MongoClient = None)
8889
8990
"""
9091
from ted_sws.notice_packager.services.notice_packager import package_notice
92+
from ted_sws.notice_packager.model.metadata import METS_TYPE_CREATE
9193

9294
notice.update_status_to(new_status=NoticeStatus.VALIDATED)
9395
# TODO: Implement notice package eligiblity
9496
notice.set_is_eligible_for_packaging(eligibility=True)
95-
packaged_notice = package_notice(notice=notice)
97+
package_action = METS_TYPE_CREATE
98+
if notice.normalised_metadata.published_in_cellar_counter > 0:
99+
package_action = METS_TYPE_UPDATE
100+
packaged_notice = package_notice(notice=notice, action=package_action)
96101
return NoticePipelineOutput(notice=packaged_notice)
97102

98103

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from airflow.decorators import dag, task
2+
3+
from dags import DEFAULT_DAG_ARGUMENTS
4+
from dags.dags_utils import push_dag_downstream, get_dag_param
5+
from dags.notice_processing_pipeline import NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
6+
from dags.operators.DagBatchPipelineOperator import NOTICE_IDS_KEY, TriggerNoticeBatchPipelineOperator, \
7+
EXECUTE_ONLY_ONE_STEP_KEY
8+
from dags.pipelines.notice_selectors_pipelines import notice_ids_selector_by_status
9+
from ted_sws.core.model.notice import NoticeStatus
10+
from ted_sws.event_manager.adapters.event_log_decorator import event_log
11+
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
12+
EventMessageProcessType
13+
14+
DAG_NAME = "reprocess_published_in_cellar_notices"
15+
16+
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_AVAILABLE]
17+
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
18+
FORM_NUMBER_DAG_PARAM = "form_number"
19+
START_DATE_DAG_PARAM = "start_date"
20+
END_DATE_DAG_PARAM = "end_date"
21+
XSD_VERSION_DAG_PARAM = "xsd_version"
22+
23+
24+
@dag(default_args=DEFAULT_DAG_ARGUMENTS,
25+
schedule_interval=None,
26+
tags=['selector', 're-transform-publicly-available'])
27+
def reprocess_published_in_cellar_notices():
28+
@task
29+
@event_log(TechnicalEventMessage(
30+
message="select_notices_for_re_transform",
31+
metadata=EventMessageMetadata(
32+
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
33+
))
34+
)
35+
def select_notices_for_re_transform():
36+
form_number = get_dag_param(key=FORM_NUMBER_DAG_PARAM)
37+
start_date = get_dag_param(key=START_DATE_DAG_PARAM)
38+
end_date = get_dag_param(key=END_DATE_DAG_PARAM)
39+
xsd_version = get_dag_param(key=XSD_VERSION_DAG_PARAM)
40+
notice_ids = notice_ids_selector_by_status(notice_statuses=RE_TRANSFORM_TARGET_NOTICE_STATES,
41+
form_number=form_number, start_date=start_date,
42+
end_date=end_date, xsd_version=xsd_version)
43+
push_dag_downstream(key=NOTICE_IDS_KEY, value=notice_ids)
44+
45+
trigger_notice_process_workflow = TriggerNoticeBatchPipelineOperator(
46+
task_id=TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID,
47+
start_with_step_name=NOTICE_TRANSFORMATION_PIPELINE_TASK_ID
48+
)
49+
select_notices_for_re_transform() >> trigger_notice_process_workflow
50+
51+
52+
dag = reprocess_published_in_cellar_notices()

ted_sws/core/model/metadata.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class NormalisedMetadata(Metadata):
8080
form_number: str
8181
eforms_subtype: str
8282
xsd_version: str
83+
published_in_cellar_counter: int = Field(default=0)
8384

8485

8586
class NormalisedMetadataView(Metadata):
@@ -104,6 +105,7 @@ class NormalisedMetadataView(Metadata):
104105
form_number: str
105106
eforms_subtype: str
106107
xsd_version: str
108+
published_in_cellar_counter: int = Field(default=0)
107109

108110

109111

ted_sws/notice_validator/services/check_availability_of_notice_in_cellar.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus],
104104
for notice_uri, notice in selected_notices_map.items():
105105
if notice_uri in available_notice_uries_in_cellar:
106106
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_AVAILABLE)
107+
notice.normalised_metadata.published_in_cellar_counter += 1
107108
else:
108109
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_UNAVAILABLE)
109110
notice_repository.update(notice=notice)

0 commit comments

Comments
 (0)