kacpermuda commented on code in PR #64843:
URL: https://github.com/apache/airflow/pull/64843#discussion_r3045965439
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -790,8 +790,21 @@ def _fork_execute(self, callable, callable_name: str):
if not AIRFLOW_V_3_0_PLUS:
configure_orm(disable_connection_pool=True)
self.log.debug("Executing OpenLineage process - %s - pid %s",
callable_name, os.getpid())
- callable()
- self.log.debug("Process with current pid finishes after %s",
callable_name)
+ try:
+ callable()
+ self.log.debug("Process with current pid finishes after %s",
callable_name)
+ except Exception:
+ self.log.warning(
+ "OpenLineage %s process failed. This has no impact on
actual task execution status.",
+ callable_name,
+ exc_info=True,
+ )
+ finally:
+ # os._exit(0) bypasses Python's atexit/stdio flush. Explicitly
shut down
+ # logging so buffered records (including any warnings above)
are flushed
+ # before the process exits. Without this, the final log lines
are silently
+ # dropped, making failures invisible.
+ logging.shutdown()
Review Comment:
I am not sure if there are any implications of doing that in a provider, for
other logs flowing in airflow. Can you double check that it's isolated here?
##########
providers/openlineage/src/airflow/providers/openlineage/utils/sql_hook_lineage.py:
##########
@@ -70,6 +71,27 @@ def emit_lineage_from_sql_extras(task_instance, sql_extras:
list, is_successful:
events: list[RunEvent] = []
query_count = 0
+ # Build conn_id -> hook mapping before iterating. Hook instances are not
hashable so
+ # conn_id (a plain string) is used as the @cache key throughout.
+ _hook_by_conn_id = {_get_hook_conn_id(e.context): e.context for e in
sql_extras}
Review Comment:
Is it enough to reduce hooks to using conn_id as key here? Maybe id(hook)?
Wondering if it's possible for two hooks to send HLL, with the same db
connection, but different params resulting in different database info ?
##########
providers/openlineage/src/airflow/providers/openlineage/extractors/manager.py:
##########
@@ -142,9 +142,26 @@ def extract_metadata(
task_info,
)
self.log.debug("OpenLineage extraction failure details:",
exc_info=True)
- elif (hook_lineage := self.get_hook_lineage(task_instance,
task_instance_state)) is not None:
- return hook_lineage
else:
+ # No extractor found — fall back to hook lineage. This call must
be wrapped in
+ # try/except: it runs emit_lineage_from_sql_extras →
_create_ol_event_pair which
+ # is not guarded internally. An uncaught exception here would
propagate into
+ # on_success()'s @print_warning, silently suppressing the
task-level COMPLETE event.
Review Comment:
This path is not only for COMPLETE events, so we should adjust comments and
logging.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]