|
1 | 1 | from datetime import timezone |
2 | 2 |
|
3 | | -from airflow.models import DagRun, TaskInstance |
4 | | -from airflow.utils.types import DagRunType |
| 3 | +# from airflow.models import DagRun, TaskInstance |
| 4 | +# from airflow.utils.types import DagRunType |
5 | 5 |
|
6 | 6 | INDEX_NOTICE_XML_CONTENT_TASK_ID = "index_notice_xml_content" |
7 | 7 | NORMALISE_NOTICE_METADATA_TASK_ID = "normalise_notice_metadata" |
|
46 | 46 | FULL_BRANCH_TASK_IDS = [START_PROCESSING_NOTICE_TASK_ID, |
47 | 47 | NORMALISE_NOTICE_METADATA_TASK_ID] + TRANSFORM_BRANCH_TASK_IDS + PACKAGE_BRANCH_TASK_IDS + PUBLISH_BRANCH_TASK_IDS |
48 | 48 |
|
49 | | - |
50 | | -def run_task(dag, task, conf: dict, xcom_push_data: dict = None, ignore_first_depends_on_past=True) -> TaskInstance: |
51 | | - start_date = dag.default_args["start_date"] |
52 | | - end_date = dag.default_args["start_date"] |
53 | | - start_date = start_date or task.start_date |
54 | | - end_date = end_date or task.end_date or timezone.utcnow() |
55 | | - |
56 | | - info = list(task.dag.iter_dagrun_infos_between(start_date, end_date, align=False))[0] |
57 | | - ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past |
58 | | - dr = DagRun( |
59 | | - dag_id=task.dag_id, |
60 | | - run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date), |
61 | | - run_type=DagRunType.MANUAL, |
62 | | - execution_date=info.logical_date, |
63 | | - data_interval=info.data_interval, |
64 | | - conf=conf |
65 | | - ) |
66 | | - ti = TaskInstance(task, run_id=None) |
67 | | - ti.dag_run = dr |
68 | | - |
69 | | - if xcom_push_data is not None: |
70 | | - for key, value in xcom_push_data.items(): |
71 | | - ti.xcom_push(key=str(key), value=value) |
72 | | - |
73 | | - ti.run( |
74 | | - mark_success=False, |
75 | | - ignore_task_deps=True, |
76 | | - ignore_depends_on_past=ignore_depends_on_past, |
77 | | - ignore_ti_state=False, |
78 | | - test_mode=True, |
79 | | - ) |
80 | | - return ti |
| 49 | +# |
| 50 | +# def run_task(dag, task, conf: dict, xcom_push_data: dict = None, ignore_first_depends_on_past=True) -> TaskInstance: |
| 51 | +# start_date = dag.default_args["start_date"] |
| 52 | +# end_date = dag.default_args["start_date"] |
| 53 | +# start_date = start_date or task.start_date |
| 54 | +# end_date = end_date or task.end_date or timezone.utcnow() |
| 55 | +# |
| 56 | +# info = list(task.dag.iter_dagrun_infos_between(start_date, end_date, align=False))[0] |
| 57 | +# ignore_depends_on_past = info.logical_date == start_date and ignore_first_depends_on_past |
| 58 | +# dr = DagRun( |
| 59 | +# dag_id=task.dag_id, |
| 60 | +# run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date), |
| 61 | +# run_type=DagRunType.MANUAL, |
| 62 | +# execution_date=info.logical_date, |
| 63 | +# data_interval=info.data_interval, |
| 64 | +# conf=conf |
| 65 | +# ) |
| 66 | +# ti = TaskInstance(task, run_id=None) |
| 67 | +# ti.dag_run = dr |
| 68 | +# |
| 69 | +# if xcom_push_data is not None: |
| 70 | +# for key, value in xcom_push_data.items(): |
| 71 | +# ti.xcom_push(key=str(key), value=value) |
| 72 | +# |
| 73 | +# ti.run( |
| 74 | +# mark_success=False, |
| 75 | +# ignore_task_deps=True, |
| 76 | +# ignore_depends_on_past=ignore_depends_on_past, |
| 77 | +# ignore_ti_state=False, |
| 78 | +# test_mode=True, |
| 79 | +# ) |
| 80 | +# return ti |
0 commit comments