zettelin commented on issue #41822:
URL: https://github.com/apache/airflow/issues/41822#issuecomment-3081932096

   I made a personal workaround so that people who already migrated to Otel 
could use this until it is officially fixed.
   
   ### Cause of issue (As I understand)
   
   Flushing from Airflow processes to OpenTelemetry collector relies on 
periodic flush (default 60s). There is no guaranteed force flush at the end of 
the process. 
   
   Scheduler or Celery worker processes are longlived, so they have no problem. 
However, task instance processes (subprocess of the worker process) are 
ephemeral and usually spans shorter than the periodic flush, so they might 
never flush once.
   
   #### How to verify this is the cause?
   
   I tested with this dummy DAG. if you activate sleep(), you can see the 
metrics in otelcol; if you remove it, you mostly cannot.
   
   ```py
   from airflow.stats import Stats
   
   def dummy():
       Stats.gauge("dummy_gauge", 1, {"tag1": "value1", "tag2": "value2"})
       Stats.incr("dummy_incr", 1, {"tag3": "value3", "tag4": "value4"})
       # sleep(70)
   
   with DAG(...):
       _ = PythonOperator(
           python_callable=dummy,
           task_id="dummy",
       )
   ```
   
   ### Possible Workaround
   
   Add a force-flush **callback** on every task instance.
   
   ```py
   # DAG(default_args=default_args, ...)
   default_args = {
           "on_success_callback": flush_metrics,
           "on_failure_callback": flush_metrics,
           "on_retry_callback": flush_metrics,
           "on_skipped_callback": flush_metrics,
   }
   
   def flush_metrics(context: dict[str, Any]) -> None:
       def flush(provider: Any) -> None:
           if provider and hasattr(provider, "force_flush"):
               provider.force_flush()
   
       flush(metrics.get_meter_provider())
       flush(getattr(metrics.get_meter_provider(), "_real_meter_provider", 
None))
   ```
   
   Same function should be also available via **plugins.event_listener**.
   
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/example_dags/plugins/event_listener/index.html#module-contents


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to