Copilot commented on code in PR #64538:
URL: https://github.com/apache/airflow/pull/64538#discussion_r3025327445


##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py:
##########
@@ -81,6 +81,13 @@ class _CreateDatabricksWorkflowOperator(BaseOperator):
 
     :param task_id: The task_id of the operator
     :param databricks_conn_id: The connection ID to use when connecting to 
Databricks.
+    :param access_control_list: List of permissions to set on the job. Array 
of object
+        (AccessControlRequestForUser) or object (AccessControlRequestForGroup) 
or object
+        (AccessControlRequestForServicePrincipal).
+
+        .. seealso::
+            This will only be used on create. In order to reset ACL consider 
using the Databricks
+            UI.

Review Comment:
   The docstring note says ACL “will only be used on create”, but 
`_CreateDatabricksWorkflowOperator._create_or_reset_job` calls `reset_job()` 
for existing jobs, and `DatabricksHook.reset_job()` applies 
`access_control_list` (via the permissions endpoint) when provided. Please 
update the documentation to reflect that ACL can also be applied on job 
updates/resets (and clarify whether it overwrites existing permissions).
   ```suggestion
               The access control list is applied both when creating a new job 
and when resetting
               an existing job. When provided for an existing job, the supplied 
ACL replaces the
               job's current permissions. You can also manage job permissions 
directly in the
               Databricks UI.
   ```



##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py:
##########
@@ -137,6 +146,7 @@ def __init__(
         **kwargs,
     ):
         self.databricks_conn_id = databricks_conn_id
+        self.access_control_list = access_control_list or []

Review Comment:
   `access_control_list` is normalized to `[]` and always included in 
`default_json`. This differs from other Databricks operators (e.g. 
`DatabricksCreateJobsOperator` only sets the field when the parameter is not 
`None`) and changes behavior by sending an explicit empty ACL when the user 
didn’t set one. Consider keeping `self.access_control_list` as `None` by 
default and only adding the `access_control_list` key to the job spec when the 
user explicitly provided a value (including allowing an explicit empty list if 
you want to support clearing).
   ```suggestion
           self.access_control_list = access_control_list
   ```



##########
providers/databricks/tests/unit/databricks/operators/test_databricks_workflow.py:
##########
@@ -97,6 +104,26 @@ def test_create_workflow_json(mock_databricks_hook, 
context, mock_task_group):
     assert workflow_json["job_clusters"] == []
     assert workflow_json["max_concurrent_runs"] == 1
     assert workflow_json["timeout_seconds"] == 0
+    assert workflow_json["access_control_list"] == []

Review Comment:
   This test asserts that `create_workflow_json()` returns an 
`access_control_list` key with an empty list by default. If the intent is to 
match other Databricks operators (omit ACL unless the user provided it), update 
the test expectations accordingly; otherwise, please justify why sending an 
explicit empty ACL is required/desired.
   ```suggestion
       assert "access_control_list" not in workflow_json
   ```



##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py:
##########
@@ -356,6 +376,7 @@ def __exit__(
             task_group=self,
             task_id="launch",
             databricks_conn_id=self.databricks_conn_id,
+            access_control_list=self.access_control_list,
             existing_clusters=self.existing_clusters,
             extra_job_params=self.extra_job_params,

Review Comment:
   Test coverage currently verifies the default `access_control_list=[]` being 
passed from `DatabricksWorkflowTaskGroup.__exit__` into 
`_CreateDatabricksWorkflowOperator`, but doesn’t cover the new behavior with a 
non-empty ACL provided on the TaskGroup. Add a unit test that creates a 
TaskGroup with a non-empty `access_control_list` and asserts the operator is 
instantiated with that value.



##########
providers/databricks/src/airflow/providers/databricks/operators/databricks_workflow.py:
##########
@@ -321,6 +339,7 @@ class DatabricksWorkflowTaskGroup(TaskGroup):
     def __init__(
         self,
         databricks_conn_id: str,
+        access_control_list: list[dict] | None = None,
         existing_clusters: list[str] | None = None,
         extra_job_params: dict[str, Any] | None = None,
         jar_params: list[str] | None = None,

Review Comment:
   Adding `access_control_list` as the second positional parameter changes the 
positional-argument order for `DatabricksWorkflowTaskGroup.__init__`, which can 
break downstream code that instantiated the TaskGroup with positional args for 
`existing_clusters`/`extra_job_params` etc. To preserve backward compatibility, 
consider appending the new optional parameter at the end of the signature (or 
otherwise ensuring it can’t be passed positionally in a way that reinterprets 
existing arguments).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to