Skip to content

Commit 49ddaba

Browse files
committed
fixes for fetch by dates
1 parent 5dca2bc commit 49ddaba

3 files changed

Lines changed: 23 additions & 17 deletions

File tree

dags/fetch_notices_by_date.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import timedelta, date
1+
from datetime import timedelta, date, datetime
22

33
from airflow.decorators import dag, task
44
from airflow.models import Param
@@ -37,18 +37,19 @@
3737
tags=['selector', 'daily-fetch'],
3838
params={
3939
WILD_CARD_DAG_KEY: Param(
40+
default=f"{date.today() - timedelta(days=1)}",
4041
type="string",
4142
format="date",
4243
title="Date",
4344
description="""This field is required.
44-
Date to fetch notices from TED. Correct format is YYYYMMDD*"""
45+
Date to fetch notices from TED."""
4546
),
4647
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: Param(
4748
default=True,
4849
type="boolean",
4950
title="Trigger Complete Workflow",
5051
description="""This field is required.
51-
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
52+
If true, the complete workflow will be triggered, otherwise only the partial workflow will be triggered."""
5253
)
5354
}
5455
)
@@ -61,7 +62,11 @@ def fetch_notices_by_date():
6162
))
6263
)
6364
def fetch_by_date_notice_from_ted():
64-
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=get_dag_param(key=WILD_CARD_DAG_KEY))
65+
default_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
66+
selected_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY,
67+
default_value=default_date), "%Y-%m-%d")
68+
date_wild_card = datetime.strftime(selected_date, "%Y%m%d*")
69+
notice_ids = notice_fetcher_by_date_pipeline(date_wild_card=date_wild_card)
6570
if not notice_ids:
6671
log_error("No notices has been fetched!")
6772
else:
@@ -83,10 +88,10 @@ def validate_fetched_notices():
8388
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice
8489
from datetime import datetime
8590
from pymongo import MongoClient
86-
91+
default_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
8792
publication_date = datetime.strptime(get_dag_param(key=WILD_CARD_DAG_KEY,
88-
default_value=(datetime.now() - timedelta(days=1)).strftime(
89-
"%Y%m%d*")), "%Y%m%d*")
93+
default_value=default_date
94+
), "%Y-%m-%d")
9095
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
9196
validate_and_update_daily_supra_notice(ted_publication_date=publication_date, mongodb_client=mongodb_client)
9297

dags/fetch_notices_by_date_range.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@
2121
END_DATE_KEY = "end_date"
2222

2323

24-
def generate_wildcards_foreach_day_in_range(start_date: str, end_date: str) -> list:
24+
def generate_list_of_dates_from_date_range(start_date: str, end_date: str) -> list:
2525
"""
2626
Given a date range returns all daily dates in that range
2727
:param start_date:
2828
:param end_date:
2929
:return:
3030
"""
31-
return [dt.strftime('%Y%m%d*')
31+
return [dt
3232
for dt in rrule.rrule(rrule.DAILY,
33-
dtstart=datetime.strptime(start_date, '%Y%m%d'),
34-
until=datetime.strptime(end_date, '%Y%m%d'))]
33+
dtstart=datetime.strptime(start_date, '%Y-%m-%d'),
34+
until=datetime.strptime(end_date, '%Y-%m-%d'))]
3535

3636

37-
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['fetch'],
37+
@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master'],
3838
params={
3939
START_DATE_KEY: Param(
4040
default=f"{date.today()}",
@@ -74,12 +74,12 @@ def trigger_notice_by_date_for_each_date_in_range():
7474
start_date = get_dag_param(key=START_DATE_KEY, raise_error=True)
7575
end_date = get_dag_param(key=END_DATE_KEY, raise_error=True)
7676
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False)
77-
date_wildcards = generate_wildcards_foreach_day_in_range(start_date, end_date)
78-
for date_wildcard in date_wildcards:
77+
fetch_dates = generate_list_of_dates_from_date_range(start_date, end_date)
78+
for fetch_date in fetch_dates:
7979
TriggerDagRunOperator(
80-
task_id=f'trigger_notice_fetch_by_date_workflow_dag_{date_wildcard[:-1]}',
80+
task_id=f'trigger_notice_fetch_by_date_workflow_dag_{fetch_date.strftime("%Y_%m_%d")}',
8181
trigger_dag_id=FETCH_NOTICES_BY_DATE_DAG_NAME,
82-
conf={WILD_CARD_DAG_KEY: date_wildcard,
82+
conf={WILD_CARD_DAG_KEY: fetch_date.strftime('%Y-%m-%d'),
8383
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: trigger_complete_workflow,
8484
}
8585
).execute(context=context)

infra/airflow/docker-compose.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ x-airflow-common:
6060
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
6161
AIRFLOW__CORE__ENABLE_XCOM_PICKLING: "true"
6262
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
63+
# AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS: 'true'
6364
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
6465
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
6566
VAULT_TOKEN: ${VAULT_TOKEN}
@@ -90,7 +91,7 @@ x-airflow-common:
9091

9192
services:
9293
postgres:
93-
image: postgres:13
94+
image: postgres:14.4-alpine
9495
container_name: postgres-airflow-${ENVIRONMENT}
9596
environment:
9697
POSTGRES_USER: ${AIRFLOW_POSTGRES_USER}

0 commit comments

Comments
 (0)