gabotechs commented on code in PR #1611:
URL:
https://github.com/apache/datafusion-python/pull/1611#discussion_r3481852636
##########
crates/core/src/context.rs:
##########
@@ -392,13 +403,20 @@ impl PySessionContext {
} else {
RuntimeEnvBuilder::default()
};
+ let distributed =
DistributedConfig::from_config_options(config.options()).is_ok();
+
let runtime = Arc::new(runtime_env_builder.build()?);
- let session_state = SessionStateBuilder::new()
+ let mut builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
-
.with_analyzer_rule(Arc::new(crate::analyzer::ResolveLambdaVariables::new()))
- .build();
+
.with_analyzer_rule(Arc::new(crate::analyzer::ResolveLambdaVariables::new()));
+
+ if distributed {
+ builder = builder.with_distributed_planner();
+ }
Review Comment:
Rather than letting external system inject their own `QueryPlanner`s, this
allows just plumbing `datafusion-distributed` query planner from within the
Rust world, which is actually a pretty easy thing to do.
##########
python/datafusion/distributed.py:
##########
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Bindings for datafusion-distributed workers."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from datafusion.context import SessionContext
+
+from ._internal import Worker as WorkerInternal
+from ._internal import WorkerQueryContext as WorkerQueryContextInternal
+
+if TYPE_CHECKING:
+ from collections.abc import Awaitable, Callable
+
+
+class WorkerQueryContext:
+ """Context passed to :class:`WorkerSessionBuilder` callbacks."""
+
+ def __init__(self, context: WorkerQueryContextInternal) -> None:
+ """Wrap the internal worker query context.
+
+ This is created by DataFusion when a worker receives a query; user code
+ normally only sees it as the argument to ``build_session_state``.
+ """
+ self._raw = context
+
+ @property
+ def headers(self) -> dict[str, str]:
+ """Return incoming gRPC request headers for the worker query."""
+ return dict(self._raw.headers)
+
+ def session_context(self) -> SessionContext:
+ """Build a public :class:`SessionContext` from the upstream worker
builder.
+
+ The upstream builder is consumed, so this method can only be called
once
+ for each ``WorkerQueryContext``.
+
+ Examples:
+ >>> from datafusion import SessionContext
+ >>> from datafusion.distributed import WorkerQueryContext
+ >>> def build_session_state(
+ ... context: WorkerQueryContext,
+ ... ) -> SessionContext:
+ ... return context.session_context()
+ """
+ wrapper = SessionContext.__new__(SessionContext)
+ wrapper.ctx = self._raw.session_context()
+ return wrapper
+
+
+def _wrap_worker_session_builder(
+ callback: Callable[[WorkerQueryContext], SessionContext],
+) -> Callable[[WorkerQueryContextInternal], SessionContext]:
+ def adapter(context: WorkerQueryContextInternal) -> SessionContext:
+ wrapped_context = WorkerQueryContext(context)
+ return callback(wrapped_context)
+
+ return adapter
+
+
+class Worker:
+ """A datafusion-distributed worker service."""
+
+ def __init__(
+ self,
+ session_builder: Callable[[WorkerQueryContext], SessionContext] | None
= None,
+ ) -> None:
+ """Create a worker.
+
+ Args:
+ session_builder: Optional custom session builder callback or
object.
+
+ Examples:
+ >>> from datafusion import Worker
+ >>> worker = Worker()
+ >>> isinstance(worker, Worker)
+ True
+ """
+ if session_builder is None:
+ self._raw = WorkerInternal()
+ else:
+ if not callable(session_builder):
+ msg = "Expected session_builder to be callable"
+ raise TypeError(msg)
+ adapter = _wrap_worker_session_builder(session_builder)
+ self._raw = WorkerInternal.from_session_builder(adapter)
+
+ @classmethod
+ def from_session_builder(
+ cls,
+ session_builder: Callable[[WorkerQueryContext], SessionContext],
Review Comment:
In `datafusion-distributed`, this callback configures DataFusion at the
`SessionStateBuilder` level, but this project does not contain an analogous
structure, the closest is just a `SessionContext`, so users are expected to
build a full `SessionContext` here out of another `SessionContext`, even if in
`datafusion-distributed` the contract is `SessionStateBuilder` in and
`SessionState` out.
##########
crates/core/Cargo.toml:
##########
@@ -40,6 +40,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"sync",
] }
+tonic = { workspace = true }
Review Comment:
This is the kind of thing that could be easily hidden behind a flag.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]