This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e30f8102b2 Ability to add custom facet in OpenLineage events (#38982)
e30f8102b2 is described below

commit e30f8102b2dfc2c99454c99c286138754e4a1f1c
Author: anandhimurali <[email protected]>
AuthorDate: Mon Jul 22 04:17:20 2024 -0700

    Ability to add custom facet in OpenLineage events (#38982)
    
    * Ability to add custom facet in OpenLineage events
    
    * Update airflow/providers/openlineage/provider.yaml
    
    Co-authored-by: Elad Kalif <[email protected]>
    
    * Update airflow/providers/openlineage/provider.yaml
    
    Co-authored-by: Kacper Muda <[email protected]>
    
    * Adding None type hint for the custom facet function
    
    * Fix a test after rebase
    
    * Removed the legacy OPENLINEAGE_ configs format for 
OPENLINEAGE_CUSTOM_FACET_FUNCTIONS
    
    * Duplicate facet key check
    
    * Update airflow/providers/openlineage/utils/utils.py
    
    Co-authored-by: Kacper Muda <[email protected]>
    
    * Update airflow/providers/openlineage/utils/utils.py
    
    Co-authored-by: Kacper Muda <[email protected]>
    
    * Fixes after rebase
    
    * Adding user docs for custom_facet_functions
    
    * Rename custom_facet_functions as custom_run_facets
    
    * Increment version for custom_run_facets feature
    
    * Enrich example with access to operator and return value as None.
    
    * Add try-except for custom facet function execution
    
    * Fix the typing for the custom facet fucntion return type
    
    * Documentation: funcs are executed only for START events
    
    * Fix the typing for the custom facet function return type
    
    * Fixes after pre-commit hook checks
    
    * Adding start_date to test DAGs for 2.7 compatibility tests
    
    * Removing a out of scope __init__ file added by pre-commit check
    
    ---------
    
    Co-authored-by: Anandhi <[email protected]>
    Co-authored-by: Elad Kalif <[email protected]>
    Co-authored-by: Kacper Muda <[email protected]>
---
 airflow/providers/openlineage/conf.py              |  11 ++
 airflow/providers/openlineage/provider.yaml        |   7 +
 airflow/providers/openlineage/utils/utils.py       |  38 ++++-
 .../guides/developer.rst                           |  89 ++++++++++-
 .../guides/user.rst                                |  20 ++-
 tests/providers/openlineage/test_conf.py           |  26 ++++
 .../openlineage/utils/custom_facet_fixture.py      |  87 +++++++++++
 tests/providers/openlineage/utils/test_utils.py    | 170 ++++++++++++++++++++-
 8 files changed, 443 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/openlineage/conf.py 
b/airflow/providers/openlineage/conf.py
index 8b35056e47..562b673ed5 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -86,6 +86,17 @@ def custom_extractors() -> set[str]:
     return set(extractor.strip() for extractor in option.split(";") if 
extractor.strip())
 
 
+@cache
+def custom_run_facets() -> set[str]:
+    """[openlineage] custom_run_facets."""
+    option = conf.get(_CONFIG_SECTION, "custom_run_facets", fallback="")
+    return set(
+        custom_facet_function.strip()
+        for custom_facet_function in option.split(";")
+        if custom_facet_function.strip()
+    )
+
+
 @cache
 def namespace() -> str:
     """[openlineage] namespace."""
diff --git a/airflow/providers/openlineage/provider.yaml 
b/airflow/providers/openlineage/provider.yaml
index cfa001fdba..733b047418 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -105,6 +105,13 @@ config:
         example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
         default: ~
         version_added: ~
+      custom_run_facets:
+        description: |
+          Register custom run facet functions by passing a string of semicolon 
separated full import paths.
+        type: string
+        example: 
full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
+        default: ''
+        version_added: 1.10.0
       config_path:
         description: |
           Specify the path to the YAML configuration file.
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index 0484d11f53..0689ea3977 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -24,7 +24,7 @@ import re
 from contextlib import redirect_stdout, suppress
 from functools import wraps
 from io import StringIO
-from typing import TYPE_CHECKING, Any, Iterable
+from typing import TYPE_CHECKING, Any, Callable, Iterable
 
 import attrs
 from deprecated import deprecated
@@ -79,11 +79,47 @@ def get_job_name(task: TaskInstance) -> str:
 
 
 def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, 
Any]:
+    from airflow.providers.openlineage.extractors.manager import 
try_import_from_string
+
     custom_facets = {}
     # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
     # this comes from Airflow code
     if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
         custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+    # Append custom run facets by executing the custom_run_facet functions.
+    for custom_facet_func in conf.custom_run_facets():
+        try:
+            func: Callable[[Any], dict] | None = 
try_import_from_string(custom_facet_func)
+            if not func:
+                log.warning(
+                    "OpenLineage is unable to import custom facet function 
`%s`; will ignore it.",
+                    custom_facet_func,
+                )
+                continue
+            facet: dict[str, dict[Any, Any]] | None = func(task_instance)
+            if facet and isinstance(facet, dict):
+                duplicate_facet_keys = [facet_key for facet_key in 
facet.keys() if facet_key in custom_facets]
+                if duplicate_facet_keys:
+                    log.warning(
+                        "Duplicate OpenLineage custom facets key(s) found: 
`%s` from function `%s`; "
+                        "this will overwrite the previous value.",
+                        ", ".join(duplicate_facet_keys),
+                        custom_facet_func,
+                    )
+                log.debug(
+                    "Adding OpenLineage custom facet with key(s): `%s` from 
function `%s`.",
+                    tuple(facet),
+                    custom_facet_func,
+                )
+                custom_facets.update(facet)
+        except Exception as exc:
+            log.warning(
+                "Error processing custom facet function `%s`; will ignore it. 
Error was: %s: %s",
+                custom_facet_func,
+                type(exc).__name__,
+                exc,
+            )
     return custom_facets
 
 
diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst 
b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 2ce4ef7493..86f57ac3e1 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -446,15 +446,100 @@ Conversion from Airflow Table entity to OpenLineage 
Dataset is made in the follo
 
 .. _custom_facets:openlineage:
 
-Custom facets
+Custom Facets
 =============
 To learn more about facets in OpenLineage, please refer to `facet 
documentation <https://openlineage.io/docs/spec/facets/>`_.
-Also check out `available Facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
+Also check out `available facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
 
 The OpenLineage spec might not contain all the facets you need to write your 
extractor,
 in which case you will have to make your own `custom facets 
<https://openlineage.io/docs/spec/facets/custom-facets>`_.
 More on creating custom facets can be found `here 
<https://openlineage.io/blog/extending-with-facets/>`_.
 
+Custom Run Facets
+=================
+
+You can inject your own custom facets in the lineage event's run facet using 
the ``custom_run_facets`` Airflow configuration.
+
+Steps to be taken,
+
+1. Write a function that returns the custom facet. You can write as many 
custom facet functions as needed.
+2. Register the functions using the ``custom_run_facets`` Airflow 
configuration.
+
+Once done, Airflow OpenLineage listener will automatically execute these 
functions during the lineage event generation
+and append their return values to the run facet in the lineage event.
+
+Writing a custom facet function
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- **Input arguments:** The function should accept the ``TaskInstance`` as an 
input argument.
+- **Function body:** Perform the logic needed to generate the custom facet. 
The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` 
and ``_schemaURL`` to be automatically added for the facet.
+- **Return value:** The custom facet to be added to the lineage event. Return 
type should be ``dict[str, dict]`` or ``None``. You may choose to return 
``None``, if you do not want to add custom facets for certain criteria.
+
+**Example custom facet function**
+
+.. code-block:: python
+
+    import attrs
+    from airflow.models import TaskInstance
+    from openlineage.client.facet import BaseFacet
+
+
+    @attrs.define(slots=False)
+    class MyCustomRunFacet(BaseFacet):
+        """Define a custom facet."""
+
+        name: str
+        jobState: str
+        uniqueName: str
+        displayName: str
+        dagId: str
+        taskId: str
+        cluster: str
+
+
+    def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | 
None:
+        operator_name = task_instance.task.operator_name
+        if operator_name == "BashOperator":
+            return
+        job_unique_name = 
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
+        return {
+            "additional_run_facet": attrs.asdict(
+                MyCustomRunFacet(
+                    name="test-lineage-namespace",
+                    jobState=task_instance.state,
+                    uniqueName=job_unique_name,
+                    
displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
+                    dagId=task_instance.dag_id,
+                    taskId=task_instance.task_id,
+                    cluster="TEST",
+                )
+            )
+        }
+
+Register the custom facet functions
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the ``custom_run_facets`` Airflow configuration to register the custom run 
facet functions by passing
+a string of semicolon separated full import path to the functions.
+
+.. code-block:: ini
+
+    [openlineage]
+    transport = {"type": "http", "url": "http://example.com:5000";, "endpoint": 
"api/v1/lineage"}
+    custom_run_facets = 
full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
+
+``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an 
equivalent.
+
+.. code-block:: ini
+
+  
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
+
+.. note::
+
+    - The custom facet functions are only executed at the start of the 
TaskInstance and added to the OpenLineage START event.
+    - Duplicate functions if registered, will be executed only once.
+    - When duplicate custom facet keys are returned by different functions, 
the last processed function will be added to the lineage event.
+
 .. _job_hierarchy:openlineage:
 
 Job Hierarchy
diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst 
b/docs/apache-airflow-providers-openlineage/guides/user.rst
index 437da6d0fa..19b8ef9d78 100644
--- a/docs/apache-airflow-providers-openlineage/guides/user.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/user.rst
@@ -271,7 +271,7 @@ serializing only a few known attributes, we exclude certain 
non-serializable ele
 Custom Extractors
 ^^^^^^^^^^^^^^^^^
 
-If you use :ref:`custom Extractors <custom_extractors:openlineage>` feature, 
register the extractors by passing
+To use :ref:`custom Extractors <custom_extractors:openlineage>` feature, 
register the extractors by passing
 a string of semicolon separated Airflow Operators full import paths to 
``extractors`` option in Airflow configuration.
 
 .. code-block:: ini
@@ -286,6 +286,24 @@ a string of semicolon separated Airflow Operators full 
import paths to ``extract
 
   
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
 
+Custom Run Facets
+^^^^^^^^^^^^^^^^^
+
+To inject :ref:`custom run facets <custom_facets:openlineage>`, register the 
custom run facet functions by passing
+a string of semicolon separated full import paths to ``custom_run_facets`` 
option in Airflow configuration.
+
+.. code-block:: ini
+
+    [openlineage]
+    transport = {"type": "http", "url": "http://example.com:5000";, "endpoint": 
"api/v1/lineage"}
+    custom_run_facets = 
full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
+
+``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an 
equivalent.
+
+.. code-block:: ini
+
+  
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
+
 Enabling OpenLineage on DAG/task level
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/tests/providers/openlineage/test_conf.py 
b/tests/providers/openlineage/test_conf.py
index 3ee606dbde..7eeea35db7 100644
--- a/tests/providers/openlineage/test_conf.py
+++ b/tests/providers/openlineage/test_conf.py
@@ -26,6 +26,7 @@ from airflow.providers.openlineage.conf import (
     _is_true,
     config_path,
     custom_extractors,
+    custom_run_facets,
     dag_state_change_process_pool_size,
     disabled_operators,
     execution_timeout,
@@ -41,6 +42,7 @@ from tests.test_utils.config import conf_vars, env_vars
 _CONFIG_SECTION = "openlineage"
 _VAR_CONFIG_PATH = "OPENLINEAGE_CONFIG"
 _CONFIG_OPTION_CONFIG_PATH = "config_path"
+_CONFIG_OPTION_CUSTOM_RUN_FACETS = "custom_run_facets"
 _VAR_DISABLE_SOURCE_CODE = "OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE"
 _CONFIG_OPTION_DISABLE_SOURCE_CODE = "disable_source_code"
 _CONFIG_OPTION_DISABLED_FOR_OPERATORS = "disabled_for_operators"
@@ -255,6 +257,30 @@ def test_extractors_do_not_fail_if_conf_option_missing():
     assert custom_extractors() == set()
 
 
+@conf_vars(dict())
+def test_custom_run_facets_not_set():
+    assert custom_run_facets() == set()
+
+
+def test_custom_run_facets_with_no_values():
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): 
None}):
+        assert custom_run_facets() == set()
+    with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_CUSTOM_RUN_FACETS): ""}):
+        assert custom_run_facets() == set()
+
+
+@conf_vars(
+    {
+        (
+            _CONFIG_SECTION,
+            _CONFIG_OPTION_CUSTOM_RUN_FACETS,
+        ): " tests.my_function;; tests.my_function ; my_function_2; ",
+    }
+)
+def test_custom_run_facets():
+    assert custom_run_facets() == {"tests.my_function", "my_function_2"}
+
+
 @env_vars({_VAR_NAMESPACE: "my_custom_namespace"})
 @conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_NAMESPACE): None})
 def test_namespace_legacy_env_var_is_used_when_no_conf_option_set():
diff --git a/tests/providers/openlineage/utils/custom_facet_fixture.py 
b/tests/providers/openlineage/utils/custom_facet_fixture.py
new file mode 100644
index 0000000000..5a051218e2
--- /dev/null
+++ b/tests/providers/openlineage/utils/custom_facet_fixture.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import attrs
+from openlineage.client.facet import BaseFacet
+
+if TYPE_CHECKING:
+    from airflow.models import TaskInstance
+
+
[email protected](slots=False)
+class MyCustomRunFacet(BaseFacet):
+    """Define a custom run facet."""
+
+    name: str
+    jobState: str
+    uniqueName: str
+    displayName: str
+    dagId: str
+    taskId: str
+    cluster: str
+
+
+def get_additional_test_facet(task_instance: TaskInstance) -> dict[str, dict] 
| None:
+    operator_name = task_instance.task.operator_name if task_instance.task 
else None
+    if operator_name == "BashOperator":
+        return None
+    job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
+    return {
+        "additional_run_facet": attrs.asdict(
+            MyCustomRunFacet(
+                name="test-lineage-namespace",
+                jobState=task_instance.state,
+                uniqueName=job_unique_name,
+                displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
+                dagId=task_instance.dag_id,
+                taskId=task_instance.task_id,
+                cluster="TEST",
+            )
+        )
+    }
+
+
+def get_duplicate_test_facet_key(task_instance: TaskInstance):
+    job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
+    return {
+        "additional_run_facet": attrs.asdict(
+            MyCustomRunFacet(
+                name="test-lineage-namespace",
+                jobState=task_instance.state,
+                uniqueName=job_unique_name,
+                displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
+                dagId=task_instance.dag_id,
+                taskId=task_instance.task_id,
+                cluster="TEST",
+            )
+        )
+    }
+
+
+def get_another_test_facet(task_instance: TaskInstance):
+    return {"another_run_facet": {"name": "another-lineage-namespace"}}
+
+
+def return_type_is_not_dict(task_instance: TaskInstance):
+    return "return type is not dict"
+
+
+def get_custom_facet_throws_exception(task_instance: TaskInstance):
+    raise Exception("fake exception from custom fcet function")
diff --git a/tests/providers/openlineage/utils/test_utils.py 
b/tests/providers/openlineage/utils/test_utils.py
index 381743141a..d3a9d89445 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -18,12 +18,13 @@
 from __future__ import annotations
 
 import datetime
-from unittest.mock import MagicMock
+from unittest.mock import ANY, MagicMock, patch
 
 from airflow import DAG
 from airflow.decorators import task
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.mappedoperator import MappedOperator
+from airflow.models.taskinstance import TaskInstance
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
@@ -34,6 +35,7 @@ from airflow.providers.openlineage.utils.utils import (
     _get_tasks_details,
     _safe_get_dag_tree_view,
     get_airflow_job_facet,
+    get_custom_facets,
     get_fully_qualified_class_name,
     get_job_name,
     get_operator_class,
@@ -482,3 +484,169 @@ def test_get_task_groups_details_nested():
 
 def test_get_task_groups_details_no_task_groups():
     assert _get_task_groups_details(DAG("test_dag", 
start_date=datetime.datetime(2024, 6, 1))) == {}
+
+
+@patch("airflow.providers.openlineage.conf.custom_run_facets", 
return_value=set())
+def 
test_get_custom_facets_with_no_function_definition(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {}
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    
return_value={"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet"},
+)
+def test_get_custom_facets_with_function_definition(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {
+        "additional_run_facet": {
+            "_producer": ANY,
+            "_schemaURL": ANY,
+            "name": "test-lineage-namespace",
+            "jobState": "running",
+            "uniqueName": "TEST.test-dag.test-task",
+            "displayName": "test-dag.test-task",
+            "dagId": "test-dag",
+            "taskId": "test-task",
+            "cluster": "TEST",
+        }
+    }
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    return_value={
+        
"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet",
+    },
+)
+def test_get_custom_facets_with_return_value_as_none(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=BashOperator(
+            task_id="test-task",
+            bash_command="exit 0;",
+            dag=DAG("test-dag", start_date=datetime.datetime(2024, 7, 1)),
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {}
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    return_value={
+        "invalid_function",
+        
"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet",
+        
"tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict",
+        
"tests.providers.openlineage.utils.custom_facet_fixture.get_another_test_facet",
+    },
+)
+def 
test_get_custom_facets_with_multiple_function_definition(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {
+        "additional_run_facet": {
+            "_producer": ANY,
+            "_schemaURL": ANY,
+            "name": "test-lineage-namespace",
+            "jobState": "running",
+            "uniqueName": "TEST.test-dag.test-task",
+            "displayName": "test-dag.test-task",
+            "dagId": "test-dag",
+            "taskId": "test-task",
+            "cluster": "TEST",
+        },
+        "another_run_facet": {"name": "another-lineage-namespace"},
+    }
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    return_value={
+        
"tests.providers.openlineage.utils.custom_facet_fixture.get_additional_test_facet",
+        
"tests.providers.openlineage.utils.custom_facet_fixture.get_duplicate_test_facet_key",
+    },
+)
+def test_get_custom_facets_with_duplicate_facet_keys(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {
+        "additional_run_facet": {
+            "_producer": ANY,
+            "_schemaURL": ANY,
+            "name": "test-lineage-namespace",
+            "jobState": "running",
+            "uniqueName": "TEST.test-dag.test-task",
+            "displayName": "test-dag.test-task",
+            "dagId": "test-dag",
+            "taskId": "test-task",
+            "cluster": "TEST",
+        }
+    }
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    return_value={"invalid_function"},
+)
+def 
test_get_custom_facets_with_invalid_function_definition(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {}
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    
return_value={"tests.providers.openlineage.utils.custom_facet_fixture.return_type_is_not_dict"},
+)
+def 
test_get_custom_facets_with_wrong_return_type_function(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {}
+
+
+@patch(
+    "airflow.providers.openlineage.conf.custom_run_facets",
+    
return_value={"tests.providers.openlineage.utils.custom_facet_fixture.get_custom_facet_throws_exception"},
+)
+def test_get_custom_facets_with_exception(mock_custom_facet_funcs):
+    sample_ti = TaskInstance(
+        task=EmptyOperator(
+            task_id="test-task", dag=DAG("test-dag", 
start_date=datetime.datetime(2024, 7, 1))
+        ),
+        state="running",
+    )
+    result = get_custom_facets(sample_ti)
+    assert result == {}

Reply via email to