This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57fb776408 fix: scheduler crashing with OL provider on airflow 
standalone (#40459)
57fb776408 is described below

commit 57fb7764088a795ef38c149f2cdf5329aabf649b
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Jun 28 08:42:13 2024 +0200

    fix: scheduler crashing with OL provider on airflow standalone (#40459)
    
    Signed-off-by: Kacper Muda <[email protected]>
---
 airflow/providers/openlineage/plugins/listener.py | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index b26d976774..71005e37a7 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -62,6 +62,18 @@ def _get_try_number_success(val):
     return val.try_number - 1
 
 
+def _executor_initializer():
+    """
+    Initialize worker processes for the executor used for DagRun listener.
+
+    This function must be picklable, so it cannot be defined as an inner 
method or local function.
+
+    Reconfigures the ORM engine to prevent issues that arise when multiple 
processes interact with
+    the Airflow database.
+    """
+    settings.configure_orm()
+
+
 class OpenLineageListener:
     """OpenLineage listener sends events on task instance and dag run starts, 
completes and failures."""
 
@@ -366,16 +378,10 @@ class OpenLineageListener:
 
     @property
     def executor(self) -> ProcessPoolExecutor:
-        # Executor for dag_run listener
-        def initializer():
-            # Re-configure the ORM engine as there are issues with multiple 
processes
-            # if process calls Airflow DB.
-            settings.configure_orm()
-
         if not self._executor:
             self._executor = ProcessPoolExecutor(
                 max_workers=conf.dag_state_change_process_pool_size(),
-                initializer=initializer,
+                initializer=_executor_initializer,
             )
         return self._executor
 

Reply via email to