kaxil commented on code in PR #65313:
URL: https://github.com/apache/airflow/pull/65313#discussion_r3088501951


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -982,6 +982,84 @@ def terminate_orphan_processes(self, present: 
set[DagFileInfo]):
                 processor.logger_filehandle.close()
                 self._file_stats.pop(file, None)
 
+    def handle_parsing_result(
+        self, file: DagFileInfo, proc: DagFileProcessorProcess, *, session: 
Session
+    ) -> None:
+        """
+        Post-process a single finished parse result.
+
+        Detects callback-only processing, updates file stats, emits metrics,
+        and persists DAGs/import-errors via :meth:`persist_parsing_result`.
+        Extracted from ``_collect_results`` to keep result handling and
+        persistence separate.
+        """
+        is_callback_only = proc.had_callbacks and proc.parsing_result is None
+        if is_callback_only:
+            self.log.debug("Detected callback-only processing for %s", file)
+
+        run_duration = time.monotonic() - proc.start_time
+        next_stat = process_parse_results(
+            run_duration=run_duration,
+            finish_time=timezone.utcnow(),
+            run_count=self._file_stats[file].run_count,
+            bundle_name=file.bundle_name,
+            bundle_version=self._bundle_versions[file.bundle_name],
+            parsing_result=proc.parsing_result,
+            is_callback_only=is_callback_only,
+            relative_fileloc=str(file.rel_path),
+        )
+
+        if proc.parsing_result is not None:
+            self.persist_parsing_result(
+                bundle_name=file.bundle_name,
+                bundle_version=self._bundle_versions[file.bundle_name],
+                parsing_result=proc.parsing_result,
+                run_duration=run_duration,
+                relative_fileloc=str(file.rel_path),
+                session=session,

Review Comment:
   Small behavior change here -- in the old code, the `elif 
parsing_result.import_errors and len(parsing_result.serialized_dags) == 0` 
branch skipped `update_dag_parsing_results_in_db` entirely. Now 
`persist_parsing_result` runs for all non-None `parsing_result`, so files with 
only import errors (no DAGs) will also get persisted to the DB.
   
   Is that intentional? If so, worth calling it out in the PR description since 
the rest reads like a pure extraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to