kaxil commented on code in PR #65313:
URL: https://github.com/apache/airflow/pull/65313#discussion_r3088306450
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -982,6 +982,84 @@ def terminate_orphan_processes(self, present:
set[DagFileInfo]):
processor.logger_filehandle.close()
self._file_stats.pop(file, None)
+ def handle_parsing_result(
+ self, file: DagFileInfo, proc: DagFileProcessorProcess, *, session:
Session
+ ) -> None:
+ """
+ Post-process a single finished parse result.
+
+ Detects callback-only processing, updates file stats, emits metrics,
+ and persists DAGs/import-errors via :meth:`persist_parsing_result`.
+ Extracted from ``_collect_results`` to keep result handling and
+ persistence separate.
+ """
+ is_callback_only = proc.had_callbacks and proc.parsing_result is None
+ if is_callback_only:
+ self.log.debug("Detected callback-only processing for %s", file)
+
+ run_duration = time.monotonic() - proc.start_time
+ next_stat = process_parse_results(
+ run_duration=run_duration,
+ finish_time=timezone.utcnow(),
+ run_count=self._file_stats[file].run_count,
+ bundle_name=file.bundle_name,
+ bundle_version=self._bundle_versions[file.bundle_name],
+ parsing_result=proc.parsing_result,
+ is_callback_only=is_callback_only,
+ relative_fileloc=str(file.rel_path),
+ )
+
+ if proc.parsing_result is not None:
+ self.persist_parsing_result(
Review Comment:
If `persist_parsing_result` raises here, the exception propagates out of
`_collect_results` and skips all remaining ready processors for that cycle.
That's the same as before the refactor, but once the override swaps DB writes
for API calls (which I assume is the next step), network errors become more
frequent than DB errors. Might be worth wrapping this call (or the
`handle_parsing_result` call in `_collect_results`) in a try/except so one
file's persist failure doesn't stall the entire batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]