Copilot commented on code in PR #1473:
URL: 
https://github.com/apache/datafusion-python/pull/1473#discussion_r3034074273


##########
crates/core/src/context.rs:
##########
@@ -439,6 +439,20 @@ impl PySessionContext {
         Ok(())
     }
 
+    /// Deregister an object store with the given url
+    #[pyo3(signature = (scheme, host=None))]
+    pub fn deregister_object_store(
+        &self,
+        scheme: &str,
+        host: Option<&str>,
+    ) -> PyDataFusionResult<()> {
+        let host = host.unwrap_or("");
+        let url_string = format!("{scheme}{host}");
+        let url = Url::parse(&url_string).unwrap();
+        self.ctx.runtime_env().deregister_object_store(&url)?;

Review Comment:
   `Url::parse(...).unwrap()` will panic the Python extension on invalid input 
(e.g., `schema="s3://"` with `host=None` produces an unparsable URL, or callers 
omit `://`). Please convert parse failures into a Python exception (e.g., map 
`Url::parse` error into `PyDataFusionError`) rather than unwrapping, and 
consider validating/normalizing the `scheme`+`host` combination so `host=None` 
is either rejected for schemes that require it or results in a valid URL form.



##########
python/tests/test_context.py:
##########
@@ -351,6 +351,126 @@ def test_deregister_table(ctx, database):
     assert public.names() == {"csv1", "csv2"}
 
 
+def test_deregister_udf():
+    ctx = SessionContext()
+    from datafusion import udf
+
+    is_null = udf(
+        lambda x: x.is_null(),
+        [pa.float64()],
+        pa.bool_(),
+        volatility="immutable",
+        name="my_is_null",
+    )
+    ctx.register_udf(is_null)
+
+    # Verify it works
+    df = ctx.from_pydict({"a": [1.0, None]})
+    ctx.register_table("t", df.into_view())
+    result = ctx.sql("SELECT my_is_null(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([False, True])
+
+    # Deregister and verify it's gone
+    ctx.deregister_udf("my_is_null")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_is_null(a) FROM t").collect()

Review Comment:
   These code paths raise DataFusion planning/execution errors that are 
converted to `ValueError`/`Exception` (see `crates/util/src/errors.rs:100-107` 
and `sql_with_options` mapping), not `RuntimeError`. Using 
`pytest.raises(RuntimeError)` is likely to fail; assert `ValueError` 
(optionally with a message match) instead.



##########
python/tests/test_context.py:
##########
@@ -351,6 +351,126 @@ def test_deregister_table(ctx, database):
     assert public.names() == {"csv1", "csv2"}
 
 
+def test_deregister_udf():
+    ctx = SessionContext()
+    from datafusion import udf
+
+    is_null = udf(
+        lambda x: x.is_null(),
+        [pa.float64()],
+        pa.bool_(),
+        volatility="immutable",
+        name="my_is_null",
+    )
+    ctx.register_udf(is_null)
+
+    # Verify it works
+    df = ctx.from_pydict({"a": [1.0, None]})
+    ctx.register_table("t", df.into_view())
+    result = ctx.sql("SELECT my_is_null(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([False, True])
+
+    # Deregister and verify it's gone
+    ctx.deregister_udf("my_is_null")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_is_null(a) FROM t").collect()
+
+
+def test_deregister_udaf():
+    import pyarrow.compute as pc
+
+    ctx = SessionContext()
+    from datafusion import Accumulator, udaf
+
+    class MySum(Accumulator):
+        def __init__(self):
+            self._sum = 0.0
+
+        def update(self, values: pa.Array) -> None:
+            self._sum += pc.sum(values).as_py()
+
+        def merge(self, states: list[pa.Array]) -> None:
+            self._sum += pc.sum(states[0]).as_py()
+
+        def state(self) -> list:
+            return [self._sum]
+
+        def evaluate(self) -> pa.Scalar:
+            return self._sum
+
+    my_sum = udaf(
+        MySum,
+        [pa.float64()],
+        pa.float64(),
+        [pa.float64()],
+        volatility="immutable",
+        name="my_sum",
+    )
+    ctx.register_udaf(my_sum)
+    df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
+    ctx.register_table("t", df.into_view())
+
+    result = ctx.sql("SELECT my_sum(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([6.0])
+
+    ctx.deregister_udaf("my_sum")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_sum(a) FROM t").collect()
+
+
+def test_deregister_udwf():
+    ctx = SessionContext()
+    from datafusion import udwf
+    from datafusion.user_defined import WindowEvaluator
+
+    class MyRowNumber(WindowEvaluator):
+        def __init__(self):
+            self._row = 0
+
+        def evaluate_all(self, values, num_rows):
+            return pa.array(list(range(1, num_rows + 1)), type=pa.uint64())
+
+    my_row_number = udwf(
+        MyRowNumber,
+        [pa.float64()],
+        pa.uint64(),
+        volatility="immutable",
+        name="my_row_number",
+    )
+    ctx.register_udwf(my_row_number)
+    df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
+    ctx.register_table("t", df.into_view())
+
+    result = ctx.sql("SELECT my_row_number(a) OVER () FROM t").collect()
+    assert result[0].column(0) == pa.array([1, 2, 3], type=pa.uint64())
+
+    ctx.deregister_udwf("my_row_number")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_row_number(a) OVER () FROM t").collect()
+
+
+def test_deregister_udtf():
+    import pyarrow.dataset as ds
+
+    ctx = SessionContext()
+    from datafusion import Table, udtf
+
+    class MyTable:
+        def __call__(self):
+            batch = pa.RecordBatch.from_pydict({"x": [1, 2, 3]})
+            return Table(ds.dataset([batch]))
+
+    my_table = udtf(MyTable(), "my_table")
+    ctx.register_udtf(my_table)
+
+    result = ctx.sql("SELECT * FROM my_table()").collect()
+    assert result[0].column(0) == pa.array([1, 2, 3])
+
+    ctx.deregister_udtf("my_table")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT * FROM my_table()").collect()

Review Comment:
   These code paths raise DataFusion planning/execution errors that are 
converted to `ValueError`/`Exception` (see `crates/util/src/errors.rs:100-107` 
and `sql_with_options` mapping), not `RuntimeError`. Using 
`pytest.raises(RuntimeError)` is likely to fail; assert `ValueError` 
(optionally with a message match) instead.



##########
python/tests/test_context.py:
##########
@@ -351,6 +351,126 @@ def test_deregister_table(ctx, database):
     assert public.names() == {"csv1", "csv2"}
 
 
+def test_deregister_udf():
+    ctx = SessionContext()
+    from datafusion import udf
+
+    is_null = udf(
+        lambda x: x.is_null(),
+        [pa.float64()],
+        pa.bool_(),
+        volatility="immutable",
+        name="my_is_null",
+    )
+    ctx.register_udf(is_null)
+
+    # Verify it works
+    df = ctx.from_pydict({"a": [1.0, None]})
+    ctx.register_table("t", df.into_view())
+    result = ctx.sql("SELECT my_is_null(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([False, True])
+
+    # Deregister and verify it's gone
+    ctx.deregister_udf("my_is_null")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_is_null(a) FROM t").collect()
+
+
+def test_deregister_udaf():
+    import pyarrow.compute as pc
+
+    ctx = SessionContext()
+    from datafusion import Accumulator, udaf
+
+    class MySum(Accumulator):
+        def __init__(self):
+            self._sum = 0.0
+
+        def update(self, values: pa.Array) -> None:
+            self._sum += pc.sum(values).as_py()
+
+        def merge(self, states: list[pa.Array]) -> None:
+            self._sum += pc.sum(states[0]).as_py()
+
+        def state(self) -> list:
+            return [self._sum]
+
+        def evaluate(self) -> pa.Scalar:
+            return self._sum
+
+    my_sum = udaf(
+        MySum,
+        [pa.float64()],
+        pa.float64(),
+        [pa.float64()],
+        volatility="immutable",
+        name="my_sum",
+    )
+    ctx.register_udaf(my_sum)
+    df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
+    ctx.register_table("t", df.into_view())
+
+    result = ctx.sql("SELECT my_sum(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([6.0])
+
+    ctx.deregister_udaf("my_sum")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_sum(a) FROM t").collect()
+
+
+def test_deregister_udwf():
+    ctx = SessionContext()
+    from datafusion import udwf
+    from datafusion.user_defined import WindowEvaluator
+
+    class MyRowNumber(WindowEvaluator):
+        def __init__(self):
+            self._row = 0
+
+        def evaluate_all(self, values, num_rows):
+            return pa.array(list(range(1, num_rows + 1)), type=pa.uint64())
+
+    my_row_number = udwf(
+        MyRowNumber,
+        [pa.float64()],
+        pa.uint64(),
+        volatility="immutable",
+        name="my_row_number",
+    )
+    ctx.register_udwf(my_row_number)
+    df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
+    ctx.register_table("t", df.into_view())
+
+    result = ctx.sql("SELECT my_row_number(a) OVER () FROM t").collect()
+    assert result[0].column(0) == pa.array([1, 2, 3], type=pa.uint64())
+
+    ctx.deregister_udwf("my_row_number")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_row_number(a) OVER () FROM t").collect()

Review Comment:
   These code paths raise DataFusion planning/execution errors that are 
converted to `ValueError`/`Exception` (see `crates/util/src/errors.rs:100-107` 
and `sql_with_options` mapping), not `RuntimeError`. Using 
`pytest.raises(RuntimeError)` is likely to fail; assert `ValueError` 
(optionally with a message match) instead.



##########
python/datafusion/context.py:
##########
@@ -568,6 +568,15 @@ def register_object_store(
         """
         self.ctx.register_object_store(schema, store, host)
 
+    def deregister_object_store(self, schema: str, host: str | None = None) -> 
None:
+        """Remove an object store from the session.
+
+        Args:
+            schema: The data source schema (e.g. ``"s3://"``).
+            host: URL for the host (e.g. bucket name).
+        """
+        self.ctx.deregister_object_store(schema, host)

Review Comment:
   A new public API (`deregister_object_store`) is added here but there is no 
corresponding unit test, while `register_object_store` already has coverage 
(e.g. `python/tests/test_sql.py::test_register_http_csv`). Please add a test 
that registers an object store, deregisters it, and verifies access fails or 
that re-registration behaves as expected.



##########
python/tests/test_context.py:
##########
@@ -351,6 +351,126 @@ def test_deregister_table(ctx, database):
     assert public.names() == {"csv1", "csv2"}
 
 
+def test_deregister_udf():
+    ctx = SessionContext()
+    from datafusion import udf
+
+    is_null = udf(
+        lambda x: x.is_null(),
+        [pa.float64()],
+        pa.bool_(),
+        volatility="immutable",
+        name="my_is_null",
+    )
+    ctx.register_udf(is_null)
+
+    # Verify it works
+    df = ctx.from_pydict({"a": [1.0, None]})
+    ctx.register_table("t", df.into_view())
+    result = ctx.sql("SELECT my_is_null(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([False, True])
+
+    # Deregister and verify it's gone
+    ctx.deregister_udf("my_is_null")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_is_null(a) FROM t").collect()
+
+
+def test_deregister_udaf():
+    import pyarrow.compute as pc
+
+    ctx = SessionContext()
+    from datafusion import Accumulator, udaf
+
+    class MySum(Accumulator):
+        def __init__(self):
+            self._sum = 0.0
+
+        def update(self, values: pa.Array) -> None:
+            self._sum += pc.sum(values).as_py()
+
+        def merge(self, states: list[pa.Array]) -> None:
+            self._sum += pc.sum(states[0]).as_py()
+
+        def state(self) -> list:
+            return [self._sum]
+
+        def evaluate(self) -> pa.Scalar:
+            return self._sum
+
+    my_sum = udaf(
+        MySum,
+        [pa.float64()],
+        pa.float64(),
+        [pa.float64()],
+        volatility="immutable",
+        name="my_sum",
+    )
+    ctx.register_udaf(my_sum)
+    df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
+    ctx.register_table("t", df.into_view())
+
+    result = ctx.sql("SELECT my_sum(a) FROM t").collect()
+    assert result[0].column(0) == pa.array([6.0])
+
+    ctx.deregister_udaf("my_sum")
+    with pytest.raises(RuntimeError):
+        ctx.sql("SELECT my_sum(a) FROM t").collect()

Review Comment:
   These code paths raise DataFusion planning/execution errors that are 
converted to `ValueError`/`Exception` (see `crates/util/src/errors.rs:100-107` 
and `sql_with_options` mapping), not `RuntimeError`. Using 
`pytest.raises(RuntimeError)` is likely to fail; assert `ValueError` 
(optionally with a message match) instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to