This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new acd95a5ef1 feat: Add parent_run_id for COMPLETE and FAIL events
(#36067)
acd95a5ef1 is described below
commit acd95a5ef19e8b98404a1eccd11a2d862f21d519
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Dec 5 16:33:48 2023 +0100
feat: Add parent_run_id for COMPLETE and FAIL events (#36067)
---
airflow/providers/openlineage/plugins/adapter.py | 58 ++++--
airflow/providers/openlineage/plugins/listener.py | 11 +-
.../providers/openlineage/plugins/test_listener.py | 9 +-
.../plugins/test_openlineage_adapter.py | 209 ++++++++++++++++++++-
4 files changed, 273 insertions(+), 14 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index a69ae36bcf..ad648f8828 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -164,17 +164,19 @@ class OpenLineageAdapter(LoggingMixin):
if not run_facets:
run_facets = {}
+ if task:
+ run_facets = {**task.run_facets, **run_facets}
run_facets["processing_engine"] = processing_engine_version_facet #
type: ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
run=self._build_run(
- run_id,
- job_name,
- parent_job_name,
- parent_run_id,
- nominal_start_time,
- nominal_end_time,
+ run_id=run_id,
+ job_name=job_name,
+ parent_job_name=parent_job_name,
+ parent_run_id=parent_run_id,
+ nominal_start_time=nominal_start_time,
+ nominal_end_time=nominal_end_time,
run_facets=run_facets,
),
job=self._build_job(
@@ -190,19 +192,36 @@ class OpenLineageAdapter(LoggingMixin):
)
self.emit(event)
- def complete_task(self, run_id: str, job_name: str, end_time: str, task:
OperatorLineage):
+ def complete_task(
+ self,
+ run_id: str,
+ job_name: str,
+ parent_job_name: str | None,
+ parent_run_id: str | None,
+ end_time: str,
+ task: OperatorLineage,
+ ):
"""
Emits openlineage event of type COMPLETE.
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
+ :param parent_job_name: the name of the parent job (typically the DAG,
+ but possibly a task group)
+ :param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from
operator
"""
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_time,
- run=self._build_run(run_id, job_name=job_name,
run_facets=task.run_facets),
+ run=self._build_run(
+ run_id=run_id,
+ job_name=job_name,
+ parent_job_name=parent_job_name,
+ parent_run_id=parent_run_id,
+ run_facets=task.run_facets,
+ ),
job=self._build_job(job_name, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
@@ -210,20 +229,37 @@ class OpenLineageAdapter(LoggingMixin):
)
self.emit(event)
- def fail_task(self, run_id: str, job_name: str, end_time: str, task:
OperatorLineage):
+ def fail_task(
+ self,
+ run_id: str,
+ job_name: str,
+ parent_job_name: str | None,
+ parent_run_id: str | None,
+ end_time: str,
+ task: OperatorLineage,
+ ):
"""
Emits openlineage event of type FAIL.
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
+ :param parent_job_name: the name of the parent job (typically the DAG,
+ but possibly a task group)
+ :param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from
operator
"""
event = RunEvent(
eventType=RunState.FAIL,
eventTime=end_time,
- run=self._build_run(run_id, job_name=job_name,
run_facets=task.run_facets),
- job=self._build_job(job_name),
+ run=self._build_run(
+ run_id=run_id,
+ job_name=job_name,
+ parent_job_name=parent_job_name,
+ parent_run_id=parent_run_id,
+ run_facets=task.run_facets,
+ ),
+ job=self._build_job(job_name, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 5ab6dc1d19..6fa02b9fc1 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -101,7 +101,6 @@ class OpenLineageListener:
owners=dag.owner.split(", "),
task=task_metadata,
run_facets={
- **task_metadata.run_facets,
**get_custom_facets(task_instance),
**get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
},
@@ -115,6 +114,7 @@ class OpenLineageListener:
dagrun = task_instance.dag_run
task = task_instance.task
+ dag = task.dag
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
task.task_id, task_instance.execution_date,
task_instance.try_number - 1
@@ -122,6 +122,8 @@ class OpenLineageListener:
@print_warning(self.log)
def on_success():
+ parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
@@ -131,6 +133,8 @@ class OpenLineageListener:
self.adapter.complete_task(
run_id=task_uuid,
job_name=get_job_name(task),
+ parent_job_name=dag.dag_id,
+ parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
)
@@ -143,6 +147,7 @@ class OpenLineageListener:
dagrun = task_instance.dag_run
task = task_instance.task
+ dag = task.dag
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
task.task_id, task_instance.execution_date,
task_instance.try_number
@@ -150,6 +155,8 @@ class OpenLineageListener:
@print_warning(self.log)
def on_failure():
+ parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
@@ -159,6 +166,8 @@ class OpenLineageListener:
self.adapter.fail_task(
run_id=task_uuid,
job_name=get_job_name(task),
+ parent_job_name=dag.dag_id,
+ parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
)
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index 360459eea8..c616f77f97 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -223,7 +223,6 @@ def test_adapter_start_task_is_called_with_proper_arguments(
owners=["Test Owner"],
task=listener.extractor_manager.extract_metadata(),
run_facets={
- "run_facet": 1,
"custom_facet": 2,
"airflow_run_facet": 3,
},
@@ -243,11 +242,14 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
listener.on_task_instance_failed(None, task_instance, None)
listener.adapter.fail_task.assert_called_once_with(
end_time="2023-01-03T13:01:01",
job_name="job_name",
+ parent_job_name="dag_id",
+ parent_run_id="dag_id.dag_run_run_id",
run_id="task_id.execution_date.1",
task=listener.extractor_manager.extract_metadata(),
)
@@ -267,6 +269,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
listener.on_task_instance_success(None, task_instance, None)
# This run_id will be different as we did NOT simulate increase of the
try_number attribute,
@@ -274,6 +277,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
listener.adapter.complete_task.assert_called_once_with(
end_time="2023-01-03T13:01:01",
job_name="job_name",
+ parent_job_name="dag_id",
+ parent_run_id="dag_id.dag_run_run_id",
run_id="task_id.execution_date.0",
task=listener.extractor_manager.extract_metadata(),
)
@@ -285,6 +290,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
listener.adapter.complete_task.assert_called_once_with(
end_time="2023-01-03T13:01:01",
job_name="job_name",
+ parent_job_name="dag_id",
+ parent_run_id="dag_id.dag_run_run_id",
run_id="task_id.execution_date.1",
task=listener.extractor_manager.extract_metadata(),
)
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 93e7b928c2..70d374b07e 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -27,10 +27,15 @@ import pytest
from openlineage.client.facet import (
DocumentationJobFacet,
ErrorMessageRunFacet,
+ ExternalQueryRunFacet,
NominalTimeRunFacet,
+ OwnershipJobFacet,
+ OwnershipJobFacetOwners,
+ ParentRunFacet,
ProcessingEngineRunFacet,
+ SqlJobFacet,
)
-from openlineage.client.run import Job, Run, RunEvent, RunState
+from openlineage.client.run import Dataset, Job, Run, RunEvent, RunState
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _DAG_NAMESPACE,
_PRODUCER, OpenLineageAdapter
@@ -172,6 +177,90 @@ def test_emit_start_event(mock_stats_incr,
mock_stats_timer):
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_start_event_with_additional_information(mock_stats_incr,
mock_stats_timer):
+ client = MagicMock()
+ adapter = OpenLineageAdapter(client)
+
+ run_id = str(uuid.uuid4())
+ event_time = datetime.datetime.now().isoformat()
+ adapter.start_task(
+ run_id=run_id,
+ job_name="job",
+ job_description="description",
+ event_time=event_time,
+ parent_job_name="parent_job_name",
+ parent_run_id="parent_run_id",
+ code_location=None,
+ nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(),
+ nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(),
+ owners=["owner1", "owner2"],
+ task=OperatorLineage(
+ inputs=[Dataset(namespace="bigquery", name="a.b.c"),
Dataset(namespace="bigquery", name="x.y.z")],
+ outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+ job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+ run_facets={"externalQuery1":
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+ ),
+ run_facets={"externalQuery2":
ExternalQueryRunFacet(externalQueryId="999", source="source")},
+ )
+
+ assert (
+ call(
+ RunEvent(
+ eventType=RunState.START,
+ eventTime=event_time,
+ run=Run(
+ runId=run_id,
+ facets={
+ "nominalTime": NominalTimeRunFacet(
+ nominalStartTime="2022-01-01T00:00:00",
+ nominalEndTime="2022-01-01T00:00:00",
+ ),
+ "processing_engine": ProcessingEngineRunFacet(
+ version=ANY, name="Airflow",
openlineageAdapterVersion=ANY
+ ),
+ "parent": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "parentRun": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "externalQuery1":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+ "externalQuery2":
ExternalQueryRunFacet(externalQueryId="999", source="source"),
+ },
+ ),
+ job=Job(
+ namespace="default",
+ name="job",
+ facets={
+ "documentation":
DocumentationJobFacet(description="description"),
+ "ownership": OwnershipJobFacet(
+ owners=[
+ OwnershipJobFacetOwners(name="owner1",
type=None),
+ OwnershipJobFacetOwners(name="owner2",
type=None),
+ ]
+ ),
+ "sql": SqlJobFacet(query="SELECT 1;"),
+ },
+ ),
+ producer=_PRODUCER,
+ inputs=[
+ Dataset(namespace="bigquery", name="a.b.c"),
+ Dataset(namespace="bigquery", name="x.y.z"),
+ ],
+ outputs=[Dataset(namespace="gs://bucket",
name="exported_folder")],
+ )
+ )
+ in client.emit.mock_calls
+ )
+
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
@@ -183,6 +272,8 @@ def test_emit_complete_event(mock_stats_incr,
mock_stats_timer):
adapter.complete_task(
run_id=run_id,
end_time=event_time,
+ parent_job_name=None,
+ parent_run_id=None,
job_name="job",
task=OperatorLineage(),
)
@@ -206,6 +297,63 @@ def test_emit_complete_event(mock_stats_incr,
mock_stats_timer):
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_complete_event_with_additional_information(mock_stats_incr,
mock_stats_timer):
+ client = MagicMock()
+ adapter = OpenLineageAdapter(client)
+
+ run_id = str(uuid.uuid4())
+ event_time = datetime.datetime.now().isoformat()
+ adapter.complete_task(
+ run_id=run_id,
+ end_time=event_time,
+ parent_job_name="parent_job_name",
+ parent_run_id="parent_run_id",
+ job_name="job",
+ task=OperatorLineage(
+ inputs=[Dataset(namespace="bigquery", name="a.b.c"),
Dataset(namespace="bigquery", name="x.y.z")],
+ outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+ job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+ run_facets={"externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+ ),
+ )
+
+ assert (
+ call(
+ RunEvent(
+ eventType=RunState.COMPLETE,
+ eventTime=event_time,
+ run=Run(
+ runId=run_id,
+ facets={
+ "parent": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "parentRun": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+ },
+ ),
+ job=Job(namespace="default", name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
+ producer=_PRODUCER,
+ inputs=[
+ Dataset(namespace="bigquery", name="a.b.c"),
+ Dataset(namespace="bigquery", name="x.y.z"),
+ ],
+ outputs=[Dataset(namespace="gs://bucket",
name="exported_folder")],
+ )
+ )
+ in client.emit.mock_calls
+ )
+
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
@@ -217,6 +365,8 @@ def test_emit_failed_event(mock_stats_incr,
mock_stats_timer):
adapter.fail_task(
run_id=run_id,
end_time=event_time,
+ parent_job_name=None,
+ parent_run_id=None,
job_name="job",
task=OperatorLineage(),
)
@@ -240,6 +390,63 @@ def test_emit_failed_event(mock_stats_incr,
mock_stats_timer):
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_failed_event_with_additional_information(mock_stats_incr,
mock_stats_timer):
+ client = MagicMock()
+ adapter = OpenLineageAdapter(client)
+
+ run_id = str(uuid.uuid4())
+ event_time = datetime.datetime.now().isoformat()
+ adapter.fail_task(
+ run_id=run_id,
+ end_time=event_time,
+ parent_job_name="parent_job_name",
+ parent_run_id="parent_run_id",
+ job_name="job",
+ task=OperatorLineage(
+ inputs=[Dataset(namespace="bigquery", name="a.b.c"),
Dataset(namespace="bigquery", name="x.y.z")],
+ outputs=[Dataset(namespace="gs://bucket", name="exported_folder")],
+ run_facets={"externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source")},
+ job_facets={"sql": SqlJobFacet(query="SELECT 1;")},
+ ),
+ )
+
+ assert (
+ call(
+ RunEvent(
+ eventType=RunState.FAIL,
+ eventTime=event_time,
+ run=Run(
+ runId=run_id,
+ facets={
+ "parent": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "parentRun": ParentRunFacet(
+ run={"runId": "parent_run_id"},
+ job={"namespace": "default", "name":
"parent_job_name"},
+ ),
+ "externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
+ },
+ ),
+ job=Job(namespace="default", name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
+ producer=_PRODUCER,
+ inputs=[
+ Dataset(namespace="bigquery", name="a.b.c"),
+ Dataset(namespace="bigquery", name="x.y.z"),
+ ],
+ outputs=[Dataset(namespace="gs://bucket",
name="exported_folder")],
+ )
+ )
+ in client.emit.mock_calls
+ )
+
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")