mandeepzemo commented on issue #54332:
URL: https://github.com/apache/airflow/issues/54332#issuecomment-3199407809
I tried reproducing this issue using the below plugin code in my local
Airflow setup:
**plugins/debug_listener.py**
```
import logging
from typing import Optional
from airflow.listeners import hookimpl
from airflow.utils.state import TaskInstanceState
logger = logging.getLogger(__name__)
class DebugTaskListener:
@hookimpl
def on_task_instance_running(
self,
previous_state: Optional[TaskInstanceState],
task_instance
) -> None:
try:
task_id = getattr(task_instance, 'task_id', 'unknown')
dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id',
'unknown')
print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id} is
now RUNNING (was: {previous_state or 'N/A'})\n")
logger.info(
"[DEBUG LISTENER] Task %s in DAG %s is now RUNNING (was:
%s)",
task_id,
dag_id,
previous_state or "N/A"
)
except Exception as e:
print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n")
logger.exception("Error in debug listener (running): %s", str(e))
@hookimpl
def on_task_instance_success(
self,
previous_state: Optional[TaskInstanceState],
task_instance
) -> None:
try:
task_id = getattr(task_instance, 'task_id', 'unknown')
dag_id = getattr(getattr(task_instance, 'dag', None), 'dag_id',
'unknown')
print(f"\n\n[DEBUG LISTENER] Task {task_id} in DAG {dag_id}
SUCCEEDED (was: {previous_state or 'N/A'})\n")
logger.info(
"[DEBUG LISTENER] MAIN Task %s in DAG %s SUCCEEDED (was:
%s)",
task_id,
dag_id,
previous_state or "N/A"
)
except Exception as e:
print(f"\n\n[DEBUG LISTENER ERROR] {str(e)}\n")
logger.exception("Error in debug listener (success): %s", str(e))
debug_task_listener = DebugTaskListener()
```
**plugins/debug_plugin.py**
```
from __future__ import annotations
import logging
from airflow.plugins_manager import AirflowPlugin
from debug_listener import debug_task_listener
logger = logging.getLogger(__name__)
class DebugPlugin(AirflowPlugin):
name = "debug_plugin"
def __init__(self):
logger.info("Initializing DebugPlugin")
try:
from airflow.listeners.listener import get_listener_manager
logger.info("Getting listener manager...")
listener_manager = get_listener_manager()
logger.info("Registering debug listener...")
listener_manager.add_listener(debug_task_listener)
logger.info("Debug listener registered successfully")
except Exception as e:
logger.exception("Failed to initialize DebugPlugin: %s", str(e))
raise
super().__init__()
print("\n\n[DEBUG PLUGIN] DebugPlugin has been loaded!\n\n")
```
**dags/debug_listener_test.py**
```
import logging
import sys
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.listeners.listener import get_listener_manager
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
logger = logging.getLogger(__name__)
def log_listeners():
try:
lm = get_listener_manager()
logger.info("=== LISTENER MANAGER INFO ===")
logger.info("Has listeners: %s", lm.has_listeners)
if hasattr(lm, 'pm') and hasattr(lm.pm, 'get_plugins'):
plugins = lm.pm.get_plugins()
logger.info("Registered plugins: %s", [type(p).__name__ for p in
plugins])
for plugin in plugins:
if hasattr(plugin, 'on_task_instance_running'):
logger.info("Found task instance running listener: %s",
type(plugin).__name__)
except Exception as e:
logger.exception("Error checking listeners: %s", str(e))
def test_task():
"""Test function that verifies the listener is working."""
logger.info("\n=== TASK IS EXECUTING ===\n")
log_listeners()
return "Task completed successfully"
logger.info("\n=== DAG IS BEING PARSED ===\n")
log_listeners()
with DAG(
'debug_listener_test',
default_args={
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
},
description='Test DAG for debugging task state change listeners',
catchup=False,
tags=['debug'],
) as dag:
test_task = PythonOperator(
task_id='debug_test_task',
python_callable=test_task,
)
test_task
```
Output:
<img width="1465" height="621" alt="Image"
src="https://github.com/user-attachments/assets/95aa4642-4648-4232-807b-2db2fae4746c"
/>
The listener was working properly, and I was able to see the success logs
being printed.
Based on my testing, it looks like this issue has already been resolved in
the latest Airflow release. @plovegro Please confirm
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]