88from ted_sws .core .model .notice import NoticeStatus
99from ted_sws .data_manager .adapters .notice_repository import NoticeRepository
1010from ted_sws .event_manager .adapters .event_log_decorator import event_log
11+ from ted_sws .event_manager .adapters .event_logger import EventLogger
1112from ted_sws .event_manager .model .event_message import TechnicalEventMessage , EventMessageMetadata , \
12- EventMessageProcessType
13+ EventMessageProcessType , EventMessage
14+ from ted_sws .event_manager .services .logger_from_context import get_logger_from_dag_context
1315
1416DAG_NAME = "selector_re_transform_process_orchestrator"
1517
16- RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus .ELIGIBLE_FOR_TRANSFORMATION , NoticeStatus .NORMALISED_METADATA ,
17- NoticeStatus .ELIGIBLE_FOR_TRANSFORMATION ,
18+ RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus .ELIGIBLE_FOR_TRANSFORMATION ,
1819 NoticeStatus .PREPROCESSED_FOR_TRANSFORMATION ,
1920 NoticeStatus .INELIGIBLE_FOR_TRANSFORMATION , NoticeStatus .TRANSFORMED ,
20- NoticeStatus .DISTILLED ,
21- NoticeStatus .VALIDATED , NoticeStatus . INELIGIBLE_FOR_PACKAGING
21+ NoticeStatus .DISTILLED , NoticeStatus . VALIDATED ,
22+ NoticeStatus .INELIGIBLE_FOR_PACKAGING
2223 ]
2324
2425
@@ -33,13 +34,15 @@ def selector_re_transform_process_orchestrator():
3334 process_type = EventMessageProcessType .DAG , process_name = DAG_NAME
3435 ))
3536 )
36- def select_notices_for_re_transform_and_reset_status ():
37+ def select_notices_for_re_transform_and_reset_status (** context_args ):
38+ event_logger : EventLogger = get_logger_from_dag_context (context_args )
3739 mongodb_client = MongoClient (config .MONGO_DB_AUTH_URL )
3840 notice_repository = NoticeRepository (mongodb_client = mongodb_client )
3941 for target_notice_state in RE_TRANSFORM_TARGET_NOTICE_STATES :
42+ event_logger .info (event_message = EventMessage (message = f"select notices with status : { target_notice_state } " ))
4043 notices = notice_repository .get_notice_by_status (notice_status = target_notice_state )
4144 for notice in notices :
42- notice .update_status_to (new_status = NoticeStatus .ELIGIBLE_FOR_TRANSFORMATION )
45+ notice .update_status_to (new_status = NoticeStatus .NORMALISED_METADATA )
4346 notice_repository .update (notice = notice )
4447
4548 @task
@@ -53,7 +56,7 @@ def trigger_worker_for_transform_branch():
5356 context = get_current_context ()
5457 mongodb_client = MongoClient (config .MONGO_DB_AUTH_URL )
5558 notice_repository = NoticeRepository (mongodb_client = mongodb_client )
56- notices = notice_repository .get_notice_by_status (notice_status = NoticeStatus .ELIGIBLE_FOR_TRANSFORMATION )
59+ notices = notice_repository .get_notice_by_status (notice_status = NoticeStatus .NORMALISED_METADATA )
5760 for notice in notices :
5861 TriggerDagRunOperator (
5962 task_id = f'trigger_worker_dag_{ notice .ted_id } ' ,
0 commit comments