Copilot commented on code in PR #48659:
URL: https://github.com/apache/airflow/pull/48659#discussion_r3066499401


##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -666,7 +666,7 @@ def sync_to_db(self, bundle_name: str, bundle_version: str 
| None, session: Sess
 
 def generate_md5_hash(context):
     bundle_name = context.get_current_parameters()["bundle_name"]
-    relative_fileloc = context.get_current_parameters()["relative_fileloc"]
+    relative_fileloc = 
context.get_current_parameters().get("relative_fileloc", "")

Review Comment:
   `relative_fileloc` can now be NULL, but `generate_md5_hash()` will 
incorporate the Python `None` value into the hash (resulting in IDs based on 
the literal string "None") whenever the key exists with a NULL value. This 
makes IDs depend on how callers represent “whole-bundle” requests (NULL vs "") 
and can lead to duplicate semantic requests with different IDs. Normalize 
`relative_fileloc` to an empty string (or other single canonical value) before 
hashing so NULL/"" produce the same ID.
   ```suggestion
       relative_fileloc = 
context.get_current_parameters().get("relative_fileloc") or ""
   ```



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -685,12 +685,22 @@ class DagPriorityParsingRequest(Base):
     # Note: Do not depend on fileloc pointing to a file; in the case of a
     # packaged DAG, it will point to the subpath of the DAG within the
     # associated zip.
-    relative_fileloc = Column(String(2000), nullable=False)
+    relative_fileloc = Column(String(2000), nullable=True)
 
-    def __init__(self, bundle_name: str, relative_fileloc: str) -> None:
+    def __init__(self, bundle_name: str, relative_fileloc: str = "") -> None:
         super().__init__()
         self.bundle_name = bundle_name
         self.relative_fileloc = relative_fileloc
 
+    def parse_whole_folder(self) -> bool:
+        """
+        Check if this request should parse the whole folder based on 
relative_fileloc.
+
+        Returns:
+            bool: True if relative_fileloc is None, indicating the whole 
folder should be parsed,
+                 False otherwise.
+        """
+        return self.relative_fileloc == ""

Review Comment:
   `parse_whole_folder()` docstring says it returns True when 
`relative_fileloc` is None, but the implementation checks for an empty string. 
Since the column is nullable, a NULL value will currently be treated as 
“specific file” and will later cause `Path(None)` errors in the manager. Update 
the predicate to treat NULL (and ideally "" for backward-compat) as “whole 
bundle”, and align the docstring accordingly.
   ```suggestion
               bool: True if relative_fileloc is None or an empty string, 
indicating the whole folder
                   should be parsed, False otherwise.
           """
           return self.relative_fileloc in (None, "")
   ```



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -413,20 +413,35 @@ def _queue_requested_files_for_parsing(self) -> None:
 
     @provide_session
     def _get_priority_files(self, session: Session = NEW_SESSION) -> 
list[DagFileInfo]:
-        files: list[DagFileInfo] = []
+        files: set[DagFileInfo] = set()
         bundles = {b.name: b for b in self._dag_bundles}
         requests = session.scalars(
             
select(DagPriorityParsingRequest).where(DagPriorityParsingRequest.bundle_name.in_(bundles.keys()))
         )
         for request in requests:
             bundle = bundles[request.bundle_name]
-            files.append(
-                DagFileInfo(
-                    rel_path=Path(request.relative_fileloc), 
bundle_name=bundle.name, bundle_path=bundle.path
+            if request.parse_whole_folder():
+                # If relative_fileloc is null, get all files from DagModel
+                dag_files = session.scalars(
+                    
select(DagModel.relative_fileloc).where(DagModel.bundle_name == 
bundle.name).distinct()
+                )
+                for relative_fileloc in dag_files:
+                    files.add(
+                        DagFileInfo(
+                            rel_path=Path(relative_fileloc), 
bundle_name=bundle.name, bundle_path=bundle.path
+                        )
+                    )
+            else:
+                # If relative_fileloc has a value, just add that specific file
+                files.add(
+                    DagFileInfo(
+                        rel_path=Path(request.relative_fileloc),
+                        bundle_name=bundle.name,
+                        bundle_path=bundle.path,
+                    )

Review Comment:
   If a `DagPriorityParsingRequest.relative_fileloc` row is NULL (now allowed 
by the migration), the `else` branch will execute and call 
`Path(request.relative_fileloc)`, which raises `TypeError`. Ensure NULL 
requests are handled in the “whole folder” path (e.g., via `if not 
request.relative_fileloc` / updated `parse_whole_folder()`) before constructing 
`Path(...)`.



##########
airflow-core/src/airflow/migrations/versions/0069_3_0_0_make_relative_fileloc_nullable_for_.py:
##########
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+make relative_fileloc nullable for reserialized all bundles.
+
+Revision ID: 62e9fc3d00a4

Review Comment:
   Spelling: "reserialized" should be "reserialize" (e.g., “reserialize all 
bundles”).



##########
airflow-core/docs/migrations-ref.rst:
##########
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``29ce7909c52b`` (head) | ``959e216a3abb`` | ``3.0.0``         | Change TI 
table to have unique UUID id/pk per attempt.       |
+| ``62e9fc3d00a4`` (head) | ``29ce7909c52b`` | ``3.0.0``         | make 
relative_fileloc nullable for reserialized all bundles. |

Review Comment:
   Spelling: "reserialized" should be "reserialize" in the migration 
description.
   ```suggestion
   | ``62e9fc3d00a4`` (head) | ``29ce7909c52b`` | ``3.0.0``         | make 
relative_fileloc nullable for reserialize all bundles.  |
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to