Copilot commented on code in PR #63928:
URL: https://github.com/apache/airflow/pull/63928#discussion_r3025334304


##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)

Review Comment:
   `bundle_name` is taken directly from JSON and added to a `set`. If `name` is 
present but not a string (e.g., a JSON array/object), 
`seen_bundle_names.add(bundle_name)` will raise `TypeError: unhashable type`, 
breaking config loading. Validate `name` is a `str` (and non-empty) before 
using it for duplicate detection; otherwise log and skip that file.
   ```suggestion
                   if bundle_name is None:
                       self.log.error("Invalid config in %s: Missing required 
'name' field", file_path)
                       continue
                   if not isinstance(bundle_name, str):
                       self.log.error(
                           "Invalid config in %s: 'name' field must be a 
string, got %r",
                           file_path,
                           bundle_name,
                       )
                       continue
                   if not bundle_name:
                       self.log.error("Invalid config in %s: 'name' field 
cannot be empty", file_path)
                       continue
                   if bundle_name in seen_bundle_names:
                       self.log.warning(
                           "Duplicate bundle name '%s' found in %s, skipping 
this file",
                           bundle_name,
                           file_path,
                       )
                       continue
                   seen_bundle_names.add(bundle_name)
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime
+            except json.JSONDecodeError as e:
+                self.log.error("Failed to parse JSON from %s: %s", file_path, 
e)
+            except Exception as e:
+                self.log.error("Error reading config file %s: %s", file_path, 
e)

Review Comment:
   `check_config_path_changes()` compares the directory's `*.json` files to 
`self._config_path_mtime`, but `_config_path_mtime` is currently populated only 
for files that successfully parse *and* are not skipped as duplicates. If the 
directory contains any invalid JSON, schema-invalid config, or skipped 
duplicate-name file, the key sets will never match and the manager will report 
changes on every loop, causing repeated reload churn. Consider tracking mtimes 
for *all* `*.json` files (even ones skipped for invalid content/duplicates), 
and keep parsing/validation concerns separate from change-detection bookkeeping.



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime
+            except json.JSONDecodeError as e:
+                self.log.error("Failed to parse JSON from %s: %s", file_path, 
e)
+            except Exception as e:
+                self.log.error("Error reading config file %s: %s", file_path, 
e)
+
+        if not config_list:
+            self.log.warning("No valid DAG bundle configs found in %s", 
config_path)
+            return

Review Comment:
   In path mode, `_parse_config_from_path()` returns early when the directory 
is missing or when no valid configs are found, without clearing 
`self._bundle_config`. This can leave stale bundles configured even after all 
JSON files are removed (or the directory disappears), which undermines the PR’s 
“removals” behavior and prevents cleanup from ever triggering. A robust 
approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' 
(clear `_bundle_config` accordingly), and have `check_config_path_changes()` 
report a change when the directory disappears (at least when it previously 
existed / had tracked files).



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime
+            except json.JSONDecodeError as e:
+                self.log.error("Failed to parse JSON from %s: %s", file_path, 
e)
+            except Exception as e:
+                self.log.error("Error reading config file %s: %s", file_path, 
e)
+
+        if not config_list:
+            self.log.warning("No valid DAG bundle configs found in %s", 
config_path)
+            return
+
+        bundle_config_list = _parse_bundle_config(config_list)
+        if conf.getboolean("core", "LOAD_EXAMPLES"):
+            _add_example_dag_bundle(bundle_config_list)
+
+        # Clear existing config and reload
+        self._bundle_config.clear()
+        for bundle_config in bundle_config_list:
+            if bundle_config.team_name and not conf.getboolean("core", 
"multi_team"):
+                raise AirflowConfigException(
+                    "DAG bundle configurations from path "
+                    "cannot have a team name when multi-team mode is disabled. 
"
+                    "To enable multi-team, you need to update section `core` 
key `multi_team` in your config."
+                )
+
+            class_ = import_string(bundle_config.classpath)
+            self._bundle_config[bundle_config.name] = _InternalBundleConfig(
+                bundle_class=class_,
+                kwargs=bundle_config.kwargs,
+                team_name=bundle_config.team_name,
+            )
+        self.log.info("DAG bundles loaded from path: %s", ", 
".join(self._bundle_config.keys()))
+
+    def check_config_path_changes(self) -> bool:
+        """
+        Check if any configuration files have been added, removed, or modified.
+
+        :return: True if changes detected, False otherwise
+        """
+        config_path = conf.get("dag_processor", "dag_bundle_config_path", 
fallback="")
+        if not config_path:
+            return False
+
+        path = Path(config_path)
+        if not path.exists() or not path.is_dir():

Review Comment:
   In path mode, `_parse_config_from_path()` returns early when the directory 
is missing or when no valid configs are found, without clearing 
`self._bundle_config`. This can leave stale bundles configured even after all 
JSON files are removed (or the directory disappears), which undermines the PR’s 
“removals” behavior and prevents cleanup from ever triggering. A robust 
approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' 
(clear `_bundle_config` accordingly), and have `check_config_path_changes()` 
report a change when the directory disappears (at least when it previously 
existed / had tracked files).
   ```suggestion
           if not path.exists() or not path.is_dir():
               # If we previously tracked files from this path, treat 
disappearance as a change
               if getattr(self, "_config_path_mtime", None):
                   self.log.info("DAG bundle config path '%s' no longer exists 
or is not a directory", path)
                   return True
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime
+            except json.JSONDecodeError as e:
+                self.log.error("Failed to parse JSON from %s: %s", file_path, 
e)
+            except Exception as e:
+                self.log.error("Error reading config file %s: %s", file_path, 
e)
+
+        if not config_list:
+            self.log.warning("No valid DAG bundle configs found in %s", 
config_path)
+            return
+
+        bundle_config_list = _parse_bundle_config(config_list)
+        if conf.getboolean("core", "LOAD_EXAMPLES"):
+            _add_example_dag_bundle(bundle_config_list)
+
+        # Clear existing config and reload
+        self._bundle_config.clear()
+        for bundle_config in bundle_config_list:
+            if bundle_config.team_name and not conf.getboolean("core", 
"multi_team"):
+                raise AirflowConfigException(
+                    "DAG bundle configurations from path "
+                    "cannot have a team name when multi-team mode is disabled. 
"
+                    "To enable multi-team, you need to update section `core` 
key `multi_team` in your config."
+                )
+
+            class_ = import_string(bundle_config.classpath)
+            self._bundle_config[bundle_config.name] = _InternalBundleConfig(
+                bundle_class=class_,
+                kwargs=bundle_config.kwargs,
+                team_name=bundle_config.team_name,
+            )
+        self.log.info("DAG bundles loaded from path: %s", ", 
".join(self._bundle_config.keys()))
+
+    def check_config_path_changes(self) -> bool:
+        """
+        Check if any configuration files have been added, removed, or modified.
+
+        :return: True if changes detected, False otherwise
+        """
+        config_path = conf.get("dag_processor", "dag_bundle_config_path", 
fallback="")
+        if not config_path:
+            return False
+
+        path = Path(config_path)
+        if not path.exists() or not path.is_dir():
+            return False
+
+        current_files: dict[str, float] = {}
+        for file_path in path.glob("*.json"):
+            current_files[str(file_path)] = file_path.stat().st_mtime
+
+        # Check for added or removed files
+        if set(current_files.keys()) != set(self._config_path_mtime.keys()):
+            self.log.info("DAG bundle config files added or removed")
+            return True
+
+        # Check for modified files
+        for fpath, mtime in current_files.items():
+            if fpath in self._config_path_mtime and 
self._config_path_mtime[fpath] != mtime:
+                self.log.info("DAG bundle config file modified: %s", fpath)
+                return True
+
+        return False

Review Comment:
   `check_config_path_changes()` compares the directory's `*.json` files to 
`self._config_path_mtime`, but `_config_path_mtime` is currently populated only 
for files that successfully parse *and* are not skipped as duplicates. If the 
directory contains any invalid JSON, schema-invalid config, or skipped 
duplicate-name file, the key sets will never match and the manager will report 
changes on every loop, causing repeated reload churn. Consider tracking mtimes 
for *all* `*.json` files (even ones skipped for invalid content/duplicates), 
and keep parsing/validation concerns separate from change-detection bookkeeping.
   ```suggestion
           # Use a dedicated snapshot of all observed JSON files for change 
detection.
           # This is intentionally separate from self._config_path_mtime, which 
only
           # tracks successfully parsed and non-duplicate configs.
           previous_files: dict[str, float] = getattr(self, 
"_config_path_all_mtime", {})
   
           files_changed = False
   
           # Check for added or removed files
           if set(current_files.keys()) != set(previous_files.keys()):
               self.log.info("DAG bundle config files added or removed")
               files_changed = True
           else:
               # Check for modified files
               for fpath, mtime in current_files.items():
                   if previous_files.get(fpath) != mtime:
                       self.log.info("DAG bundle config file modified: %s", 
fpath)
                       files_changed = True
                       break
   
           # Update the snapshot to the current state so that subsequent checks
           # compare against the latest observed directory contents.
           self._config_path_all_mtime = current_files
   
           return files_changed
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return

Review Comment:
   In path mode, `_parse_config_from_path()` returns early when the directory 
is missing or when no valid configs are found, without clearing 
`self._bundle_config`. This can leave stale bundles configured even after all 
JSON files are removed (or the directory disappears), which undermines the PR’s 
“removals” behavior and prevents cleanup from ever triggering. A robust 
approach is to treat 'missing/empty/no-valid-config' as 'no bundles configured' 
(clear `_bundle_config` accordingly), and have `check_config_path_changes()` 
report a change when the directory disappears (at least when it previously 
existed / had tracked files).



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime
+            except json.JSONDecodeError as e:
+                self.log.error("Failed to parse JSON from %s: %s", file_path, 
e)
+            except Exception as e:
+                self.log.error("Error reading config file %s: %s", file_path, 
e)
+
+        if not config_list:
+            self.log.warning("No valid DAG bundle configs found in %s", 
config_path)
+            return
+
+        bundle_config_list = _parse_bundle_config(config_list)
+        if conf.getboolean("core", "LOAD_EXAMPLES"):
+            _add_example_dag_bundle(bundle_config_list)
+
+        # Clear existing config and reload
+        self._bundle_config.clear()
+        for bundle_config in bundle_config_list:
+            if bundle_config.team_name and not conf.getboolean("core", 
"multi_team"):
+                raise AirflowConfigException(
+                    "DAG bundle configurations from path "
+                    "cannot have a team name when multi-team mode is disabled. 
"
+                    "To enable multi-team, you need to update section `core` 
key `multi_team` in your config."
+                )
+
+            class_ = import_string(bundle_config.classpath)
+            self._bundle_config[bundle_config.name] = _InternalBundleConfig(
+                bundle_class=class_,
+                kwargs=bundle_config.kwargs,
+                team_name=bundle_config.team_name,
+            )
+        self.log.info("DAG bundles loaded from path: %s", ", 
".join(self._bundle_config.keys()))
+
+    def check_config_path_changes(self) -> bool:
+        """
+        Check if any configuration files have been added, removed, or modified.
+
+        :return: True if changes detected, False otherwise
+        """
+        config_path = conf.get("dag_processor", "dag_bundle_config_path", 
fallback="")
+        if not config_path:
+            return False
+
+        path = Path(config_path)
+        if not path.exists() or not path.is_dir():
+            return False
+
+        current_files: dict[str, float] = {}
+        for file_path in path.glob("*.json"):
+            current_files[str(file_path)] = file_path.stat().st_mtime

Review Comment:
   Using `st_mtime` (float, often with coarse resolution depending on 
filesystem/platform) can miss rapid successive edits, meaning config changes 
may not be detected reliably. To make reload detection more robust, consider 
using `st_mtime_ns` (integer nanoseconds) or combining mtime with file 
size/content hashing for change detection.
   ```suggestion
           current_files: dict[str, int] = {}
           for file_path in path.glob("*.json"):
               current_files[str(file_path)] = file_path.stat().st_mtime_ns
   ```



##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -368,5 +384,129 @@ def view_url(self, name: str, version: str | None = None) 
-> str | None:
             DeprecationWarning,
             stacklevel=2,
         )
-        bundle = self.get_bundle(name, version)
-        return bundle.view_url(version=version)
+        try:
+            bundle = self.get_bundle(name, version)
+            return bundle.view_url(version=version)
+        except ValueError:
+            # Bundle no longer configured, return None
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
view URL", name)
+            return None
+
+    def get_bundle_path_safe(self, name: str) -> Path | None:
+        """
+        Safely get a bundle's path without raising errors if bundle doesn't 
exist.
+
+        :param name: The name of the DAG bundle.
+        :return: The bundle path if configured, None otherwise.
+        """
+        try:
+            bundle = self.get_bundle(name)
+            return bundle.path
+        except ValueError:
+            # Bundle no longer configured
+            self.log.debug("Bundle '%s' is no longer configured, cannot get 
path", name)
+            return None
+
+    def _parse_config_from_path(self, config_path: str) -> None:
+        """
+        Parse DAG bundle configurations from JSON files in a directory.
+
+        :param config_path: Path to directory containing JSON config files
+        """
+        path = Path(config_path)
+        if not path.exists():
+            self.log.warning("DAG bundle config path does not exist: %s", 
config_path)
+            return
+
+        if not path.is_dir():
+            raise AirflowConfigException(f"dag_bundle_config_path must be a 
directory, got: {config_path}")
+
+        # Clear old mtime entries before repopulating
+        self._config_path_mtime.clear()
+
+        config_list = []
+        seen_bundle_names: set[str] = set()
+
+        for file_path in path.glob("*.json"):
+            try:
+                config = json.loads(file_path.read_text())
+                if not isinstance(config, dict):
+                    self.log.error("Invalid config in %s: Expected dict but 
got %s", file_path, type(config))
+                    continue
+
+                # Check for duplicate bundle names
+                bundle_name = config.get("name")
+                if bundle_name and bundle_name in seen_bundle_names:
+                    self.log.warning(
+                        "Duplicate bundle name '%s' found in %s, skipping this 
file",
+                        bundle_name,
+                        file_path,
+                    )
+                    continue
+                if bundle_name:
+                    seen_bundle_names.add(bundle_name)
+
+                config_list.append(config)
+                # Track file modification time for change detection
+                self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime

Review Comment:
   Using `st_mtime` (float, often with coarse resolution depending on 
filesystem/platform) can miss rapid successive edits, meaning config changes 
may not be detected reliably. To make reload detection more robust, consider 
using `st_mtime_ns` (integer nanoseconds) or combining mtime with file 
size/content hashing for change detection.
   ```suggestion
                   self._config_path_mtime[str(file_path)] = 
file_path.stat().st_mtime_ns
   ```



##########
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -467,3 +470,259 @@ def test_multiple_bundles_one_fails(clear_db, session):
 
 def test_get_all_bundle_names():
     assert DagBundlesManager().get_all_bundle_names() == ["dags-folder", 
"example_dags"]
+
+
+class TestDagBundleConfigPath:
+    """Tests for dynamic DAG bundle configuration from path."""
+
+    def test_parse_config_from_path(self):
+        """Test parsing DAG bundle configs from JSON files in a directory."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            bundle1 = {
+                "name": "test-bundle-1",
+                "classpath": 
"unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle",
+                "kwargs": {"refresh_interval": 1},
+            }
+            bundle2 = {
+                "name": "test-bundle-2",
+                "classpath": 
"unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle",
+                "kwargs": {"refresh_interval": 2},
+            }
+
+            
Path(tmpdir).joinpath("bundle1.json").write_text(json.dumps(bundle1))
+            
Path(tmpdir).joinpath("bundle2.json").write_text(json.dumps(bundle2))
+
+            with conf_vars(
+                {
+                    ("dag_processor", "dag_bundle_config_path"): tmpdir,
+                    ("core", "LOAD_EXAMPLES"): "False",
+                }
+            ):
+                manager = DagBundlesManager()
+                bundle_names = {b.name for b in manager.get_all_dag_bundles()}
+                assert bundle_names == {"test-bundle-1", "test-bundle-2"}
+
+    def test_config_path_takes_precedence_over_list(self):
+        """Test that config path takes precedence over config list."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            bundle = {
+                "name": "path-bundle",
+                "classpath": 
"unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle",
+                "kwargs": {"refresh_interval": 1},
+            }
+            Path(tmpdir).joinpath("bundle.json").write_text(json.dumps(bundle))
+
+            list_config = [
+                {
+                    "name": "list-bundle",
+                    "classpath": 
"unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle",
+                    "kwargs": {"refresh_interval": 1},
+                }
+            ]
+
+            with conf_vars(
+                {
+                    ("dag_processor", "dag_bundle_config_list"): 
json.dumps(list_config),
+                    ("dag_processor", "dag_bundle_config_path"): tmpdir,
+                    ("core", "LOAD_EXAMPLES"): "False",
+                }
+            ):
+                manager = DagBundlesManager()
+                bundle_names = {b.name for b in manager.get_all_dag_bundles()}
+                # Should only have bundle from path, not from list
+                assert bundle_names == {"path-bundle"}
+
+    def test_check_config_path_changes(self):
+        """Test detection of config file changes."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            bundle_file = Path(tmpdir).joinpath("bundle.json")
+            bundle_config = {
+                "name": "test-bundle",
+                "classpath": 
"unit.dag_processing.bundles.test_dag_bundle_manager.BasicBundle",
+                "kwargs": {"refresh_interval": 1},
+            }
+            bundle_file.write_text(json.dumps(bundle_config))
+
+            with conf_vars(
+                {
+                    ("dag_processor", "dag_bundle_config_path"): tmpdir,
+                    ("core", "LOAD_EXAMPLES"): "False",
+                }
+            ):
+                manager = DagBundlesManager()
+
+                # No changes initially
+                assert not manager.check_config_path_changes()
+
+                # Modify the file
+                time.sleep(0.01)  # Ensure mtime changes
+                bundle_config["kwargs"]["refresh_interval"] = 99
+                bundle_file.write_text(json.dumps(bundle_config))

Review Comment:
   This test relies on `time.sleep(0.01)` to force an mtime change, which can 
be flaky on filesystems with coarse timestamp granularity. Prefer explicitly 
bumping the mtime (e.g., via `os.utime`) or using `st_mtime_ns` in the 
implementation and adjusting the test accordingly, so the test doesn’t depend 
on timing.
   ```suggestion
                   # Modify the file and ensure its mtime changes 
deterministically
                   bundle_config["kwargs"]["refresh_interval"] = 99
                   bundle_file.write_text(json.dumps(bundle_config))
                   stat_result = bundle_file.stat()
                   os.utime(bundle_file, (stat_result.st_atime, 
stat_result.st_mtime + 1))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to