This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new acd95a5ef1 feat: Add parent_run_id for COMPLETE and FAIL events 
(#36067)
acd95a5ef1 is described below

commit acd95a5ef19e8b98404a1eccd11a2d862f21d519
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Dec 5 16:33:48 2023 +0100

    feat: Add parent_run_id for COMPLETE and FAIL events (#36067)
---
 airflow/providers/openlineage/plugins/adapter.py   |  58 ++++--
 airflow/providers/openlineage/plugins/listener.py  |  11 +-
 .../providers/openlineage/plugins/test_listener.py |   9 +-
 .../plugins/test_openlineage_adapter.py            | 209 ++++++++++++++++++++-
 4 files changed, 273 insertions(+), 14 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index a69ae36bcf..ad648f8828 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -164,17 +164,19 @@ class OpenLineageAdapter(LoggingMixin):
 
         if not run_facets:
             run_facets = {}
+        if task:
+            run_facets = {**task.run_facets, **run_facets}
         run_facets["processing_engine"] = processing_engine_version_facet  # 
type: ignore
         event = RunEvent(
             eventType=RunState.START,
             eventTime=event_time,
             run=self._build_run(
-                run_id,
-                job_name,
-                parent_job_name,
-                parent_run_id,
-                nominal_start_time,
-                nominal_end_time,
+                run_id=run_id,
+                job_name=job_name,
+                parent_job_name=parent_job_name,
+                parent_run_id=parent_run_id,
+                nominal_start_time=nominal_start_time,
+                nominal_end_time=nominal_end_time,
                 run_facets=run_facets,
             ),
             job=self._build_job(
@@ -190,19 +192,36 @@ class OpenLineageAdapter(LoggingMixin):
         )
         self.emit(event)
 
-    def complete_task(self, run_id: str, job_name: str, end_time: str, task: 
OperatorLineage):
+    def complete_task(
+        self,
+        run_id: str,
+        job_name: str,
+        parent_job_name: str | None,
+        parent_run_id: str | None,
+        end_time: str,
+        task: OperatorLineage,
+    ):
         """
         Emits openlineage event of type COMPLETE.
 
         :param run_id: globally unique identifier of task in dag run
         :param job_name: globally unique identifier of task between dags
+        :param parent_job_name: the name of the parent job (typically the DAG,
+                but possibly a task group)
+        :param parent_run_id: identifier of job spawning this task
         :param end_time: time of task completion
         :param task: metadata container with information extracted from 
operator
         """
         event = RunEvent(
             eventType=RunState.COMPLETE,
             eventTime=end_time,
-            run=self._build_run(run_id, job_name=job_name, 
run_facets=task.run_facets),
+            run=self._build_run(
+                run_id=run_id,
+                job_name=job_name,
+                parent_job_name=parent_job_name,
+                parent_run_id=parent_run_id,
+                run_facets=task.run_facets,
+            ),
             job=self._build_job(job_name, job_facets=task.job_facets),
             inputs=task.inputs,
             outputs=task.outputs,
@@ -210,20 +229,37 @@ class OpenLineageAdapter(LoggingMixin):
         )
         self.emit(event)
 
-    def fail_task(self, run_id: str, job_name: str, end_time: str, task: 
OperatorLineage):
+    def fail_task(
+        self,
+        run_id: str,
+        job_name: str,
+        parent_job_name: str | None,
+        parent_run_id: str | None,
+        end_time: str,
+        task: OperatorLineage,
+    ):
         """
         Emits openlineage event of type FAIL.
 
         :param run_id: globally unique identifier of task in dag run
         :param job_name: globally unique identifier of task between dags
+        :param parent_job_name: the name of the parent job (typically the DAG,
+                but possibly a task group)
+        :param parent_run_id: identifier of job spawning this task
         :param end_time: time of task completion
         :param task: metadata container with information extracted from 
operator
         """
         event = RunEvent(
             eventType=RunState.FAIL,
             eventTime=end_time,
-            run=self._build_run(run_id, job_name=job_name, 
run_facets=task.run_facets),
-            job=self._build_job(job_name),
+            run=self._build_run(
+                run_id=run_id,
+                job_name=job_name,
+                parent_job_name=parent_job_name,
+                parent_run_id=parent_run_id,
+                run_facets=task.run_facets,
+            ),
+            job=self._build_job(job_name, job_facets=task.job_facets),
             inputs=task.inputs,
             outputs=task.outputs,
             producer=_PRODUCER,
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 5ab6dc1d19..6fa02b9fc1 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -101,7 +101,6 @@ class OpenLineageListener:
                 owners=dag.owner.split(", "),
                 task=task_metadata,
                 run_facets={
-                    **task_metadata.run_facets,
                     **get_custom_facets(task_instance),
                     **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
                 },
@@ -115,6 +114,7 @@ class OpenLineageListener:
 
         dagrun = task_instance.dag_run
         task = task_instance.task
+        dag = task.dag
 
         task_uuid = OpenLineageAdapter.build_task_instance_run_id(
             task.task_id, task_instance.execution_date, 
task_instance.try_number - 1
@@ -122,6 +122,8 @@ class OpenLineageListener:
 
         @print_warning(self.log)
         def on_success():
+            parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+
             task_metadata = self.extractor_manager.extract_metadata(
                 dagrun, task, complete=True, task_instance=task_instance
             )
@@ -131,6 +133,8 @@ class OpenLineageListener:
             self.adapter.complete_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
+                parent_job_name=dag.dag_id,
+                parent_run_id=parent_run_id,
                 end_time=end_date.isoformat(),
                 task=task_metadata,
             )
@@ -143,6 +147,7 @@ class OpenLineageListener:
 
         dagrun = task_instance.dag_run
         task = task_instance.task
+        dag = task.dag
 
         task_uuid = OpenLineageAdapter.build_task_instance_run_id(
             task.task_id, task_instance.execution_date, 
task_instance.try_number
@@ -150,6 +155,8 @@ class OpenLineageListener:
 
         @print_warning(self.log)
         def on_failure():
+            parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+
             task_metadata = self.extractor_manager.extract_metadata(
                 dagrun, task, complete=True, task_instance=task_instance
             )
@@ -159,6 +166,8 @@ class OpenLineageListener:
             self.adapter.fail_task(
                 run_id=task_uuid,
                 job_name=get_job_name(task),
+                parent_job_name=dag.dag_id,
+                parent_run_id=parent_run_id,
                 end_time=end_date.isoformat(),
                 task=task_metadata,
             )
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index 360459eea8..c616f77f97 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -223,7 +223,6 @@ def test_adapter_start_task_is_called_with_proper_arguments(
         owners=["Test Owner"],
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
-            "run_facet": 1,
             "custom_facet": 2,
             "airflow_run_facet": 3,
         },
@@ -243,11 +242,14 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
     mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
 
     listener.on_task_instance_failed(None, task_instance, None)
     listener.adapter.fail_task.assert_called_once_with(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
+        parent_job_name="dag_id",
+        parent_run_id="dag_id.dag_run_run_id",
         run_id="task_id.execution_date.1",
         task=listener.extractor_manager.extract_metadata(),
     )
@@ -267,6 +269,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
     mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
 
     listener.on_task_instance_success(None, task_instance, None)
     # This run_id will be different as we did NOT simulate increase of the 
try_number attribute,
@@ -274,6 +277,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
     listener.adapter.complete_task.assert_called_once_with(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
+        parent_job_name="dag_id",
+        parent_run_id="dag_id.dag_run_run_id",
         run_id="task_id.execution_date.0",
         task=listener.extractor_manager.extract_metadata(),
     )
@@ -285,6 +290,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
     listener.adapter.complete_task.assert_called_once_with(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
+        parent_job_name="dag_id",
+        parent_run_id="dag_id.dag_run_run_id",
         run_id="task_id.execution_date.1",
         task=listener.extractor_manager.extract_metadata(),
     )
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py 
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 93e7b928c2..70d374b07e 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -27,10 +27,15 @@ import pytest
 from openlineage.client.facet import (
     DocumentationJobFacet,
     ErrorMessageRunFacet,
+    ExternalQueryRunFacet,
     NominalTimeRunFacet,
+    OwnershipJobFacet,
+    OwnershipJobFacetOwners,
+    ParentRunFacet,
     ProcessingEngineRunFacet,
+    SqlJobFacet,
 )
-from openlineage.client.run import Job, Run, RunEvent, RunState
+from openlineage.client.run import Dataset, Job, Run, RunEvent, RunState
 
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _DAG_NAMESPACE, 
_PRODUCER, OpenLineageAdapter
@@ -172,6 +177,90 @@ def test_emit_start_event(mock_stats_incr, 
mock_stats_timer):
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_start_event_with_additional_information(mock_stats_incr, 
mock_stats_timer):
+    client = MagicMock()
+    adapter = OpenLineageAdapter(client)
+
+    run_id = str(uuid.uuid4())
+    event_time = datetime.datetime.now().isoformat()
+    adapter.start_task(
+        run_id=run_id,
+        job_name="job",
+        job_description="description",
+        event_time=event_time,
+        parent_job_name="parent_job_name",
+        parent_run_id="parent_run_id",
+        code_location=None,
+        nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(),
+        nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(),
+        owners=["owner1", "owner2"],
+        task=OperatorLineage(
+            inputs=[Dataset(namespace="bigquery", name="a.b.c"), 
Dataset(namespace="bigquery", name="x.y.z")],
+            outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+            job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+            run_facets={"externalQuery1": 
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+        ),
+        run_facets={"externalQuery2": 
ExternalQueryRunFacet(externalQueryId="999", source="source")},
+    )
+
+    assert (
+        call(
+            RunEvent(
+                eventType=RunState.START,
+                eventTime=event_time,
+                run=Run(
+                    runId=run_id,
+                    facets={
+                        "nominalTime": NominalTimeRunFacet(
+                            nominalStartTime="2022-01-01T00:00:00",
+                            nominalEndTime="2022-01-01T00:00:00",
+                        ),
+                        "processing_engine": ProcessingEngineRunFacet(
+                            version=ANY, name="Airflow", 
openlineageAdapterVersion=ANY
+                        ),
+                        "parent": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "parentRun": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "externalQuery1": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+                        "externalQuery2": 
ExternalQueryRunFacet(externalQueryId="999", source="source"),
+                    },
+                ),
+                job=Job(
+                    namespace="default",
+                    name="job",
+                    facets={
+                        "documentation": 
DocumentationJobFacet(description="description"),
+                        "ownership": OwnershipJobFacet(
+                            owners=[
+                                OwnershipJobFacetOwners(name="owner1", 
type=None),
+                                OwnershipJobFacetOwners(name="owner2", 
type=None),
+                            ]
+                        ),
+                        "sql": SqlJobFacet(query="SELECT 1;"),
+                    },
+                ),
+                producer=_PRODUCER,
+                inputs=[
+                    Dataset(namespace="bigquery", name="a.b.c"),
+                    Dataset(namespace="bigquery", name="x.y.z"),
+                ],
+                outputs=[Dataset(namespace="gs://bucket", 
name="exported_folder")],
+            )
+        )
+        in client.emit.mock_calls
+    )
+
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
@@ -183,6 +272,8 @@ def test_emit_complete_event(mock_stats_incr, 
mock_stats_timer):
     adapter.complete_task(
         run_id=run_id,
         end_time=event_time,
+        parent_job_name=None,
+        parent_run_id=None,
         job_name="job",
         task=OperatorLineage(),
     )
@@ -206,6 +297,63 @@ def test_emit_complete_event(mock_stats_incr, 
mock_stats_timer):
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_complete_event_with_additional_information(mock_stats_incr, 
mock_stats_timer):
+    client = MagicMock()
+    adapter = OpenLineageAdapter(client)
+
+    run_id = str(uuid.uuid4())
+    event_time = datetime.datetime.now().isoformat()
+    adapter.complete_task(
+        run_id=run_id,
+        end_time=event_time,
+        parent_job_name="parent_job_name",
+        parent_run_id="parent_run_id",
+        job_name="job",
+        task=OperatorLineage(
+            inputs=[Dataset(namespace="bigquery", name="a.b.c"), 
Dataset(namespace="bigquery", name="x.y.z")],
+            outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+            job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+            run_facets={"externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+        ),
+    )
+
+    assert (
+        call(
+            RunEvent(
+                eventType=RunState.COMPLETE,
+                eventTime=event_time,
+                run=Run(
+                    runId=run_id,
+                    facets={
+                        "parent": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "parentRun": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+                    },
+                ),
+                job=Job(namespace="default", name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
+                producer=_PRODUCER,
+                inputs=[
+                    Dataset(namespace="bigquery", name="a.b.c"),
+                    Dataset(namespace="bigquery", name="x.y.z"),
+                ],
+                outputs=[Dataset(namespace="gs://bucket", 
name="exported_folder")],
+            )
+        )
+        in client.emit.mock_calls
+    )
+
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
@@ -217,6 +365,8 @@ def test_emit_failed_event(mock_stats_incr, 
mock_stats_timer):
     adapter.fail_task(
         run_id=run_id,
         end_time=event_time,
+        parent_job_name=None,
+        parent_run_id=None,
         job_name="job",
         task=OperatorLineage(),
     )
@@ -240,6 +390,63 @@ def test_emit_failed_event(mock_stats_incr, 
mock_stats_timer):
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_failed_event_with_additional_information(mock_stats_incr, 
mock_stats_timer):
+    client = MagicMock()
+    adapter = OpenLineageAdapter(client)
+
+    run_id = str(uuid.uuid4())
+    event_time = datetime.datetime.now().isoformat()
+    adapter.fail_task(
+        run_id=run_id,
+        end_time=event_time,
+        parent_job_name="parent_job_name",
+        parent_run_id="parent_run_id",
+        job_name="job",
+        task=OperatorLineage(
+            inputs=[Dataset(namespace="bigquery", name="a.b.c"), 
Dataset(namespace="bigquery", name="x.y.z")],
+            outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+            run_facets={"externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+            job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+        ),
+    )
+
+    assert (
+        call(
+            RunEvent(
+                eventType=RunState.FAIL,
+                eventTime=event_time,
+                run=Run(
+                    runId=run_id,
+                    facets={
+                        "parent": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "parentRun": ParentRunFacet(
+                            run={"runId": "parent_run_id"},
+                            job={"namespace": "default", "name": 
"parent_job_name"},
+                        ),
+                        "externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+                    },
+                ),
+                job=Job(namespace="default", name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
+                producer=_PRODUCER,
+                inputs=[
+                    Dataset(namespace="bigquery", name="a.b.c"),
+                    Dataset(namespace="bigquery", name="x.y.z"),
+                ],
+                outputs=[Dataset(namespace="gs://bucket", 
name="exported_folder")],
+            )
+        )
+        in client.emit.mock_calls
+    )
+
+    mock_stats_incr.assert_not_called()
+    mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
 @mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")

Reply via email to