This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e40ef8cf0d7c [SPARK-56413] Add gRPC UDF execution protocol
e40ef8cf0d7c is described below

commit e40ef8cf0d7c61f06fc28e27762cc87e79999ea6
Author: Haiyang Sun <[email protected]>
AuthorDate: Mon May 18 17:48:38 2026 -0400

    [SPARK-56413] Add gRPC UDF execution protocol
    
    This PR introduces the gRPC-based UDF execution protocol for the 
language-agnostic UDF worker framework ([SPIP 
SPARK-55278](https://issues.apache.org/jira/browse/SPARK-55278)).
    
      **`udf/worker/proto/src/main/protobuf/udf_protocol.proto`** — defines the 
full `UdfWorker` gRPC service:
    
      - **`Execute` RPC** (bidirectional streaming): carries one UDF execution. 
The wire protocol is:
        Engine -> Worker:  Init -> PayloadChunk* -> (DataRequest)* -> Finish 
(Cancel)?
                                                                     | Cancel
        Worker -> Engine:          InitResponse  -> (DataResponse)* -> 
(ExecutionError)? -> (FinishResponse | CancelResponse)
      Key design decisions:
      - `Init.protocol_version` (field 1) is the first field on every stream 
for early version mismatch detection.
      - `Init.is_chunking_payload` explicitly signals that `PayloadChunk` 
messages will follow, removing ambiguity about when the worker should send 
`InitResponse`.
      - `DataRequest` and `DataResponse` are **independent streams**: the 
worker may emit `DataResponse` before the first `DataRequest` (generator-style 
UDFs), and the engine may pipeline `DataRequests` before `InitResponse` arrives.
      - **Cancel-after-Finish**: `Cancel` MAY follow `Finish` on the same 
stream, allowing the engine to abort processing of already-submitted data 
without waiting for `FinishResponse`. The worker sends `CancelResponse` if 
`Cancel` arrives before
      `FinishResponse` is sent, otherwise `FinishResponse`; the engine must 
accept either.
      - **`ExecutionError`** carries three subtypes: `UserError` (UDF code 
exception), `WorkerError` (worker implementation error), `ProtocolError` 
(protocol violation). After sending `ExecutionError` the worker waits for the 
engine's
      `Finish`/`Cancel` before sending the terminator; the engine sends 
`Cancel` if the stream is still open, or waits for `FinishResponse` if already 
closed.
      - **Error contract**: gRPC errors are reserved for transport failures 
only. All application-level errors go through `ExecutionError`, keeping the 
stream lifecycle intact.
      - **Backpressure**: current protocol relies on gRPC's HTTP/2 
transport-level flow control; application-level backpressure is not yet defined 
and may be introduced in a future revision.
    
      - **`Manage` RPC** (unary): worker-scoped operations independent of any 
`Execute` stream — `Heartbeat` (application-level liveness probe, distinct from 
gRPC keepalive), and `ShutdownRequest`/`ShutdownResponse` (`cancel_sessions` 
flag
      controls whether in-flight streams are aborted or drained; 
`sessions_settled` on the response confirms the worker's state).
    
      **`WorkerSession.scala`** — updated lifecycle contract:
      - `cancel()` is valid even after all data has been submitted 
(post-Finish); implementations must send `Cancel` if `FinishResponse` has not 
yet been received and accept either terminator.
      - `close()` (now `final`) automatically sends `Cancel` when `init()` was 
called but `process()` was not, preventing the worker from seeing a bare gRPC 
transport error.
      - `doInit()` contract: implementations must not open the Execute stream 
before this call, so `cancel()` before `init()` remains a transport-level no-op.
      - `doProcess()` contract: on gRPC transport error the worker must not be 
returned to any reuse pool.
    
      **`EchoProtocolSuite.scala`** — a protocol validation test that 
implements a minimal echo worker (gRPC server) and engine client against the 
generated stubs:
      - Covers the full state machine: `AwaitingInit → AwaitingData → 
(Chunking) → Data → Finishing → Done`.
      - Exercises all protocol paths: normal echo, chunked payload, 
generator-style (zero DataRequests), mid-stream cancel, Cancel-after-Finish, 
`ExecutionError` with engine-driven terminator, `ProtocolError` on protocol 
violations, concurrent
      producer/consumer interleaving, and `Manage` RPC.
    
      **`README.md`** — updated wire protocol notation and added a backpressure 
note.
    
      ### Why are the changes needed?
    
      The SPIP requires a language-agnostic, versioned, and extensible 
execution protocol between the Spark engine and UDF workers implemented in any 
language. This PR defines that protocol in protobuf/gRPC, with precise 
lifecycle semantics that
      guide both the Spark engine implementation and external worker authors.
    
      ### Does this PR introduce _any_ user-facing change?
    
      No. The protocol and its Scala abstractions are marked `Experimental`. No 
existing user-facing APIs are modified.
    
      ### How was this patch tested?
    
      `EchoProtocolSuite` implements a self-contained gRPC server (echo worker) 
and client (engine) against the protocol, exercising normal paths, error paths, 
cancellation (including Cancel-after-Finish), chunked payload delivery, the 
concurrent
      producer/consumer pattern, and the `Manage` RPC. The suite serves as a 
live specification check: if the proto comments and the test disagree, the test 
fails.
    
      build/sbt 'udf-worker-core/testOnly *EchoProtocolSuite'
    
      ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes
    
    Closes #55657 from haiyangsun-db/SPARK-56413.
    
    Authored-by: Haiyang Sun <[email protected]>
    Signed-off-by: Herman van Hövell <[email protected]>
---
 project/SparkBuild.scala                           |  47 +-
 udf/worker/README.md                               |  42 +-
 udf/worker/core/pom.xml                            |   6 +
 .../spark/udf/worker/core/WorkerDispatcher.scala   |   8 +
 .../spark/udf/worker/core/WorkerSession.scala      | 102 ++-
 .../core/direct/DirectWorkerDispatcher.scala       |   3 +-
 .../worker/core/direct/DirectWorkerSession.scala   |   4 +-
 .../worker/core/DirectWorkerDispatcherSuite.scala  |   8 +-
 .../spark/udf/worker/core/EchoProtocolSuite.scala  | 945 +++++++++++++++++++++
 udf/worker/proto/pom.xml                           |  75 +-
 udf/worker/proto/src/main/protobuf/common.proto    |   4 +-
 .../proto/src/main/protobuf/udf_protocol.proto     | 730 ++++++++++++++++
 12 files changed, 1862 insertions(+), 112 deletions(-)

diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 6b89a0e6ba9f..5d59e1141327 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1092,25 +1092,10 @@ object SparkProtobuf {
 }
 
 object UDFWorkerProto {
-  import BuildCommons.protoVersion
-  lazy val settings = Seq(
-    PB.protocVersion := BuildCommons.protoVersion,
-    libraryDependencies += "com.google.protobuf" % "protobuf-java" % 
protoVersion % "protobuf",
-
-    dependencyOverrides += "com.google.protobuf" % "protobuf-java" % 
protoVersion,
-
-    (Compile / PB.targets) := Seq(
-      PB.gens.java -> target.value / "generated-sources"
-    ),
-
-    (assembly / test) := { },
-
-    (assembly / logLevel) := Level.Info,
-
-    // Exclude `scala-library` from assembly.
-    (assembly / assemblyPackageScala / assembleArtifact) := false,
-
-    // Include only the proto module jar and protobuf-java in the assembly.
+  // Reuses SparkConnectCommon for proto + gRPC codegen wiring; overrides
+  // only the assembly fields that need the UDF-worker namespace.
+  lazy val settings = SparkConnectCommon.settings ++ Seq(
+    // Include only this module's jar and protobuf-java in the assembly.
     (assembly / assemblyExcludedJars) := {
       val cp = (assembly / fullClasspath).value
       val validPrefixes = Set("spark-udf-worker-proto", "protobuf-")
@@ -1121,27 +1106,9 @@ object UDFWorkerProto {
 
     (assembly / assemblyShadeRules) := Seq(
       ShadeRule.rename("com.google.protobuf.**" ->
-        "org.sparkproject.spark_udf_worker.protobuf.@1").inAll,
-    ),
-
-    (assembly / assemblyMergeStrategy) := {
-      case m if m.toLowerCase(Locale.ROOT).endsWith("manifest.mf") => 
MergeStrategy.discard
-      case m if m.toLowerCase(Locale.ROOT).matches("meta-inf.*\\.sf$") => 
MergeStrategy.discard
-      case m if m.toLowerCase(Locale.ROOT).startsWith("meta-inf/services/") =>
-        MergeStrategy.filterDistinctLines
-      case m if m.toLowerCase(Locale.ROOT).endsWith(".proto") => 
MergeStrategy.discard
-      case _ => MergeStrategy.first
-    }
-  ) ++ {
-    val sparkProtocExecPath = sys.props.get("spark.protoc.executable.path")
-    if (sparkProtocExecPath.isDefined) {
-      Seq(
-        PB.protocExecutable := file(sparkProtocExecPath.get)
-      )
-    } else {
-      Seq.empty
-    }
-  }
+        "org.sparkproject.spark_udf_worker.protobuf.@1").inAll
+    )
+  )
 }
 
 object Unsafe {
diff --git a/udf/worker/README.md b/udf/worker/README.md
index b843c430d0e0..21846218d3f9 100644
--- a/udf/worker/README.md
+++ b/udf/worker/README.md
@@ -19,7 +19,7 @@ WorkerDispatcher      -- manages workers, creates sessions
     |
     v
 WorkerSession         -- one UDF execution
-    |  1. session.init(InitMessage(payload, inputSchema, outputSchema))
+    |  1. session.init(Init proto)
     |  2. val results = session.process(inputBatches)
     |  3. session.close()
 ```
@@ -34,12 +34,13 @@ provisioning service or daemon).
 ```
 udf/worker/
 ├── proto/
-│     worker_spec.proto           -- UDFWorkerSpecification protobuf (+ 
generated Java classes)
+│     worker_spec.proto           -- UDFWorkerSpecification protobuf
+│     udf_protocol.proto          -- UDF execution protocol (Init, UdfPayload, 
...)
 │     common.proto                -- shared enums (UDFWorkerDataFormat, etc.)
 │
 └── core/                         -- abstract interfaces
       WorkerDispatcher.scala      -- creates sessions, manages worker lifecycle
-      WorkerSession.scala         -- per-UDF init/process/cancel/close + 
InitMessage
+      WorkerSession.scala         -- per-UDF init/process/cancel/close
       WorkerConnection.scala      -- transport channel abstraction
       WorkerSecurityScope.scala   -- security boundary for worker pooling
       │
@@ -55,6 +56,19 @@ worker creation where Spark spawns local OS processes. 
Future packages
 (e.g., `core/indirect/`) can implement alternative creation modes such as
 obtaining workers from a provisioning service or daemon.
 
+## Wire protocol
+
+Each UDF execution uses a single bidirectional `Execute` gRPC stream.
+
+```
+Engine -> Worker:  Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)?
+                                                            | Cancel
+Worker -> Engine:          InitResponse  -> (DataResponse)* -> 
(ErrorResponse)? -> (FinishResponse | CancelResponse)
+```
+
+See `udf/worker/proto/src/main/protobuf/udf_protocol.proto` for the complete
+protocol definition, ordering invariants, and error contract.
+
 ### Direct worker creation
 
 `DirectWorkerDispatcher` spawns worker processes locally. On the first
@@ -76,10 +90,12 @@ Workers are terminated via SIGTERM/SIGKILL when the 
dispatcher is closed.
 
 ```scala
 import org.apache.spark.udf.worker.{
-  DirectWorker, ProcessCallable, UDFProtoCommunicationPattern,
-  UDFWorkerDataFormat, UDFWorkerProperties, UDFWorkerSpecification,
-  UnixDomainSocket, WorkerCapabilities, WorkerConnectionSpec, 
WorkerEnvironment}
+  DirectWorker, Init, ProcessCallable, UdfPayload,
+  UDFProtoCommunicationPattern, UDFWorkerDataFormat, UDFWorkerProperties,
+  UDFWorkerSpecification, UnixDomainSocket, WorkerCapabilities,
+  WorkerConnectionSpec, WorkerEnvironment}
 import org.apache.spark.udf.worker.core._
+import com.google.protobuf.ByteString
 
 // 1. Define a worker spec (direct creation mode).
 val spec = UDFWorkerSpecification.newBuilder()
@@ -112,10 +128,16 @@ val dispatcher: WorkerDispatcher = ...
 val session = dispatcher.createSession(securityScope = None)
 try {
   // 4. Initialize with the serialized function and schemas.
-  session.init(InitMessage(
-    functionPayload = serializedFunction,
-    inputSchema = arrowInputSchema,
-    outputSchema = arrowOutputSchema))
+  session.init(Init.newBuilder()
+    .setProtocolVersion(1)
+    .setUdf(UdfPayload.newBuilder()
+      .setPayload(ByteString.copyFrom(serializedFunction))
+      .setFormat(payloadFormat)   // worker-recognised tag
+      .build())
+    .setDataFormat(UDFWorkerDataFormat.ARROW)
+    .setInputSchema(ByteString.copyFrom(arrowInputSchema))
+    .setOutputSchema(ByteString.copyFrom(arrowOutputSchema))
+    .build())
 
   // 5. Process data -- Iterator in, Iterator out.
   val results: Iterator[Array[Byte]] =
diff --git a/udf/worker/core/pom.xml b/udf/worker/core/pom.xml
index 9c84bac58377..10122687648d 100644
--- a/udf/worker/core/pom.xml
+++ b/udf/worker/core/pom.xml
@@ -51,6 +51,12 @@
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-inprocess</artifactId>
+      <version>${io.grpc.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala
 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala
index 008cfc2993a0..e938c3e04be5 100644
--- 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala
+++ 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala
@@ -27,6 +27,14 @@ import org.apache.spark.udf.worker.UDFWorkerSpecification
  * as security scope). It owns the underlying worker processes and connections,
  * handling pooling, reuse, and lifecycle behind the scenes. Spark interacts 
with
  * workers exclusively through the [[WorkerSession]]s returned by 
[[createSession]].
+ *
+ * '''Worker invalidation:''' if a session terminates with a transport error 
the
+ * worker that backed it MUST NOT be returned to any reuse pool. A transport
+ * error leaves the worker in an unknown state; only workers that complete
+ * sessions cleanly are eligible for reuse. Implementations are responsible for
+ * tracking this condition -- typically [[WorkerSession.doProcess]] flags the
+ * worker as invalid before [[WorkerSession.doClose]] releases it, so the
+ * dispatcher can distinguish a clean release from a failed one.
  */
 @Experimental
 trait WorkerDispatcher extends AutoCloseable {
diff --git 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala
 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala
index f4c4091688c9..ef18ce8d7085 100644
--- 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala
+++ 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala
@@ -19,31 +19,7 @@ package org.apache.spark.udf.worker.core
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * Carries all information needed to initialize a UDF execution on a worker.
- *
- * This message is passed to [[WorkerSession#init]] and contains the function
- * definition, schemas, and any additional configuration.
- *
- * Placeholder: will be replaced by a generated proto message once the
- * UDF wire protocol lands. Do not rely on case-class equality --
- * `Array[Byte]` fields compare by reference.
- *
- * @param functionPayload serialized function (e.g., pickled Python, JVM bytes)
- * @param inputSchema     serialized input schema (e.g., Arrow schema bytes)
- * @param outputSchema    serialized output schema (e.g., Arrow schema bytes)
- * @param properties      additional key-value configuration. Can carry
- *                        protocol-specific or engine-specific metadata that
- *                        does not yet have a dedicated field.
- */
-@Experimental
-case class InitMessage(
-    functionPayload: Array[Byte],
-    inputSchema: Array[Byte],
-    outputSchema: Array[Byte],
-    properties: Map[String, String] = Map.empty)
+import org.apache.spark.udf.worker.Init
 
 /**
  * :: Experimental ::
@@ -62,7 +38,11 @@ case class InitMessage(
  * {{{
  *   val session = dispatcher.createSession(securityScope = None)
  *   try {
- *     session.init(InitMessage(functionPayload, inputSchema, outputSchema))
+ *     session.init(Init.newBuilder()
+ *       .setProtocolVersion(1)
+ *       
.setUdf(UdfPayload.newBuilder().setPayload(callable).setFormat(fmt).build())
+ *       .setDataFormat(UDFWorkerDataFormat.ARROW)
+ *       .build())
  *     val results = session.process(inputBatches)
  *     results.foreach(handleBatch)
  *   } finally {
@@ -74,7 +54,8 @@ case class InitMessage(
  *  - [[init]] must be called exactly once before [[process]].
  *  - [[process]] must be called at most once per session.
  *  - [[close]] must always be called (use try-finally).
- *  - [[cancel]] may be called at any time to abort execution.
+ *  - [[cancel]] may be called at any time from any execution context.
+ *    See [[cancel]] for the full contract.
  *
  * The lifecycle is enforced here: [[init]] and [[process]] are `final`
  * and delegate to [[doInit]] / [[doProcess]] after AtomicBoolean guards.
@@ -93,10 +74,11 @@ abstract class WorkerSession extends AutoCloseable {
    *
    * Throws `IllegalStateException` if called more than once.
    *
-   * @param message the initialization parameters including the serialized
-   *                function, input/output schemas, and configuration.
+   * @param message the [[Init]] message carrying the UDF body, data
+   *                format, optional schemas, and any session context
+   *                the worker needs to start processing.
    */
-  final def init(message: InitMessage): Unit = {
+  final def init(message: Init): Unit = {
     if (!initialized.compareAndSet(false, true)) {
       throw new IllegalStateException("init has already been called on this 
session")
     }
@@ -108,7 +90,7 @@ abstract class WorkerSession extends AutoCloseable {
    *
    * Follows Spark's Iterator-to-Iterator pattern: input batches are streamed
    * to the worker, and result batches are lazily pulled from the returned
-   * iterator. The session sends a Finish signal to the worker when the input
+   * iterator. The session sends a finish signal to the worker when the input
    * iterator is exhausted.
    *
    * Must be called after [[init]] and at most once per session.
@@ -127,8 +109,12 @@ abstract class WorkerSession extends AutoCloseable {
     doProcess(input)
   }
 
-  /** Subclass hook for [[init]]. Called once, after the guard. */
-  protected def doInit(message: InitMessage): Unit
+  /**
+   * Subclass hook for [[init]]. Called once, after the guard.
+   * Implementations MUST establish the worker connection here, not
+   * earlier. This ensures [[cancel]] before [[init]] is a no-op.
+   */
+  protected def doInit(message: Init): Unit
 
   /** Subclass hook for [[process]]. Called at most once, after the guard. */
   protected def doProcess(input: Iterator[Array[Byte]]): Iterator[Array[Byte]]
@@ -136,13 +122,51 @@ abstract class WorkerSession extends AutoCloseable {
   /**
    * Requests cancellation of the current UDF execution.
    *
-   * '''Thread-safety:''' implementations must allow [[cancel]] to be called
-   * from a thread different from the one driving [[process]] (typically a
-   * task interruption thread). It may be invoked at any point after
-   * [[init]] and should be a no-op if execution has already finished.
+   * '''Thread-safety:''' [[cancel]] may be called concurrently with
+   * [[process]] from any execution context.
+   *
+   * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in
+   * the session's life:
+   *  - before [[init]] -- a no-op; the session may still be closed
+   *    normally via [[close]].
+   *  - between [[init]] and [[process]] -- signals that the session
+   *    should be terminated; the caller should not invoke [[process]]
+   *    and should call [[close]] to release resources.
+   *    Implementations SHOULD surface this as an error if [[process]]
+   *    is subsequently invoked despite the cancellation.
+   *  - during [[process]] (data flowing or awaiting completion)
+   *    -- requests the worker to abort on a best-effort basis.
+   *  - after [[process]] has returned (session already terminated)
+   *    -- a no-op.
+   *
+   * Implementations are responsible for the lifecycle-aware behavior
+   * described above so that the caller does not need to coordinate
+   * with the execution context driving [[process]].
    */
   def cancel(): Unit
 
-  /** Closes this session and releases resources. */
-  override def close(): Unit
+  /**
+   * Closes this session and releases resources. Idempotent; safe to
+   * call from a `finally` block regardless of whether [[init]],
+   * [[process]], or [[cancel]] have been invoked.
+   *
+   * If [[init]] was called but [[process]] was not (e.g. an exception
+   * was thrown between the two), [[close]] signals cancellation to the
+   * worker before releasing resources so it can clean up
+   * deterministically. Subclasses implement [[doClose]] for resource
+   * teardown; the base class handles the cancel-before-close guarantee
+   * automatically.
+   */
+  final override def close(): Unit = {
+    if (initialized.get() && !processed.get()) {
+      cancel()
+    }
+    doClose()
+  }
+
+  /** Subclass hook for [[close]]. The base class guarantees that
+   *  [[cancel]] has already been called if [[init]] was invoked but
+   *  [[process]] was not.
+   */
+  protected def doClose(): Unit
 }
diff --git 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala
 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala
index afaf23791d80..14db8da7ac89 100644
--- 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala
+++ 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerDispatcher.scala
@@ -373,7 +373,8 @@ abstract class DirectWorkerDispatcher(
       "DirectWorker.runner must have at least one entry in command or 
arguments")
     val workerId = UUID.randomUUID().toString
     val address = newEndpointAddress(workerId)
-    // Proto contract: the engine must pass --id and --connection.
+    // The engine injects --connection (the socket address it manages) and
+    // --id (an internal correlation identifier) into the worker command.
     val cmd = baseCmd ++ Seq("--id", workerId, "--connection", address)
     val env = runner.getEnvironmentVariablesMap.asScala.toMap
     val outputFile = Files.createTempFile("udf-worker-", ".log")
diff --git 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala
 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala
index 7cdc5329350e..de1dc45b5a8d 100644
--- 
a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala
+++ 
b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/direct/DirectWorkerSession.scala
@@ -27,7 +27,7 @@ import org.apache.spark.udf.worker.core.{WorkerConnection, 
WorkerSession}
  *
  * This is the session type returned by [[DirectWorkerDispatcher]]. It ties
  * the session lifecycle to the worker's ref-count: the dispatcher increments
- * the count before construction, and [[close]] decrements it, so the
+ * the count before construction, and [[doClose]] decrements it, so the
  * dispatcher knows when a worker process is idle and can be terminated or
  * reused.
  *
@@ -48,7 +48,7 @@ abstract class DirectWorkerSession(
   /** The connection to the worker for this session. */
   def connection: WorkerConnection = workerProcess.connection
 
-  override def close(): Unit = {
+  override protected def doClose(): Unit = {
     if (released.compareAndSet(false, true)) {
       workerProcess.releaseSession()
     }
diff --git 
a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala
 
b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala
index 60f5e2211b70..7302c697d93c 100644
--- 
a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala
+++ 
b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.spark.udf.worker.{
-  DirectWorker, LocalTcpConnection, ProcessCallable, UDFWorkerProperties,
+  DirectWorker, Init, LocalTcpConnection, ProcessCallable, UDFWorkerProperties,
   UDFWorkerSpecification, UnixDomainSocket, WorkerConnectionSpec,
   WorkerEnvironment}
 import 
org.apache.spark.udf.worker.core.direct.{DirectUnixSocketWorkerDispatcher,
@@ -51,14 +51,14 @@ class SocketFileConnection(socketPath: String)
  * TODO: [[cancel]] is a no-op here. Once a concrete [[DirectWorkerSession]]
  *   with real data-plane wiring lands, add tests exercising cancel() in
  *   particular: cancel from a different thread than process(), cancel
- *   after process() has returned, and cancel before init (should be a
- *   no-op). Tracking the thread-safety contract in the docstring on
+ *   after process() has returned, and cancel before init (should be a no-op).
+ *   See the thread-safety contract in the docstring on
  *   [[org.apache.spark.udf.worker.core.WorkerSession.cancel]].
  */
 class StubWorkerSession(
     workerProcess: DirectWorkerProcess) extends 
DirectWorkerSession(workerProcess) {
 
-  override protected def doInit(message: InitMessage): Unit = {}
+  override protected def doInit(message: Init): Unit = {}
 
   override protected def doProcess(
       input: Iterator[Array[Byte]]): Iterator[Array[Byte]] =
diff --git 
a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala
 
b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala
new file mode 100644
index 000000000000..80802dd082f6
--- /dev/null
+++ 
b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/EchoProtocolSuite.scala
@@ -0,0 +1,945 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server, Status}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import io.grpc.stub.StreamObserver
+import org.scalatest.BeforeAndAfterEach
+// scalastyle:off funsuite
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.spark.udf.worker.{
+  Cancel, CancelResponse, DataRequest, DataResponse, ErrorResponse, 
ExecutionError,
+  Finish, FinishResponse, Heartbeat, HeartbeatResponse, Init, InitResponse,
+  PayloadChunk, ProtocolError, ShutdownRequest, ShutdownResponse,
+  UdfControlRequest, UdfControlResponse, UdfPayload, UdfRequest, UdfResponse,
+  UDFWorkerDataFormat, UdfWorkerGrpc, UserError, WorkerError, WorkerRequest,
+  WorkerResponse
+}
+
+/**
+ * Protocol validation test for the UDF gRPC execution protocol.
+ *
+ * Implements a minimal echo worker (gRPC server) and engine client to verify
+ * the full Execute stream lifecycle: init, data streaming, finish, cancel,
+ * error handling, and the Manage RPC. The worker echoes each DataRequest
+ * batch back as a DataResponse; error paths are triggered by a sentinel
+ * payload value.
+ */
+class EchoProtocolSuite extends AnyFunSuite with BeforeAndAfterEach {
+// scalastyle:on funsuite
+
+  private val SUPPORTED_VERSION: Int = 1
+  // A DataRequest whose payload equals this value triggers an ErrorResponse.
+  private val ERROR_TRIGGER: ByteString = ByteString.copyFromUtf8("ERROR")
+  // An init payload whose value equals this triggers an init failure
+  // (InitResponse with error set).
+  private val INIT_ERROR_TRIGGER: ByteString = 
ByteString.copyFromUtf8("INIT_ERROR")
+
+  private var server: Server = _
+  private var channel: ManagedChannel = _
+  private var stub: UdfWorkerGrpc.UdfWorkerStub = _
+
+  override def beforeEach(): Unit = {
+    val serverName = InProcessServerBuilder.generateName()
+    server = InProcessServerBuilder.forName(serverName)
+      .directExecutor()
+      .addService(new EchoWorkerService)
+      .build()
+      .start()
+    channel = 
InProcessChannelBuilder.forName(serverName).directExecutor().build()
+    stub = UdfWorkerGrpc.newStub(channel)
+  }
+
+  override def afterEach(): Unit = {
+    channel.shutdownNow()
+    server.shutdownNow()
+  }
+
+  // 
===========================================================================
+  // WORKER SIDE (gRPC server)
+  // 
===========================================================================
+
+  /**
+   * Worker state machine for one Execute stream.
+   *
+   *   AwaitingInit --> AwaitingChunks? --> Data --> Draining --> Drained --> 
Done
+   *                                          |
+   *                                          +--> PostError --> Cancelling 
--> Cancelled --> Done
+   *
+   * `[process done]` marks an event (not a state): the asynchronous
+   * completion notification of in-flight work.
+   *
+   *   AwaitingInit
+   *     Init(inline)   --> Data            (send InitResponse)
+   *     Init(chunked)  --> AwaitingChunks
+   *     Init(failed)   --> PostError       (send InitResponse with error)
+   *     Cancel         --> Cancelling      --[process done]--> Cancelled --> 
Done (send CR)
+   *
+   *   AwaitingChunks
+   *     PayloadChunk(last=false) --> AwaitingChunks (accumulate)
+   *     PayloadChunk(last=true)  --> Data           (send InitResponse)
+   *     Cancel                   --> Cancelling     --[process done]--> 
Cancelled --> Done
+   *     Finish                   --> protocol error (engine must wait for
+   *                                                  InitResponse first)
+   *
+   *   Data
+   *     ErrorResponse sent --> PostError
+   *     Finish             --> Draining     --[process done]--> Drained --> 
Done (send FR)
+   *     Cancel             --> Cancelling   --[process done]--> Cancelled --> 
Done (send CR)
+   *
+   *   PostError
+   *     Finish --> PostError (no-op; engine MUST follow up with Cancel)
+   *     Cancel --> Cancelling --[process done]--> Cancelled --> Done
+   *
+   *   Draining (in-flight work running)
+   *     [process done] --> Drained
+   *     Cancel         --> Cancelling (the pending [process done] then sees
+   *                                    Cancelling and routes to 
CancelResponse)
+   *
+   *   Drained (work done; optional post-work cleanup hook may run here;
+   *            any error from the hook is reported via FinishResponse.error)
+   *     --> Done (send FinishResponse)
+   *
+   *   Cancelling (in-flight work being cancelled)
+   *     [process done] --> Cancelled
+   *
+   *   Cancelled (cleanup done; optional post-work cleanup hook may run here;
+   *              any error from the hook is reported via CancelResponse.error)
+   *     --> Done (send CancelResponse)
+   *
+   * Cross-cutting:
+   *   - Protocol violation in any active state: send 
ErrorResponse(ProtocolError)
+   *     followed by CancelResponse, transition to Done.
+   *   - gRPC transport error (onError): transition to Done, no response sent.
+   */
+  private sealed trait WorkerState
+  private case object AwaitingInit extends WorkerState
+  // Chunked init handshake in progress; `accumulated` holds the inline
+  // portion of Init.udf.payload plus all chunks received so far.
+  private case class AwaitingChunks(accumulated: ByteString) extends 
WorkerState
+  private case object Data extends WorkerState
+  private case object PostError extends WorkerState
+  // Finish received; in-flight finish-callback / drain work is running.
+  private case object Draining extends WorkerState
+  // Drain complete; FinishResponse not yet sent. Post-work cleanup hook
+  // (if any) runs in this state before the terminator is emitted.
+  private case object Drained extends WorkerState
+  // Cancel received; cancel-callback / cleanup work is running.
+  private case object Cancelling extends WorkerState
+  // Cleanup complete; CancelResponse not yet sent. Post-work cleanup hook
+  // (if any) runs in this state before the terminator is emitted.
+  private case object Cancelled extends WorkerState
+  private case object Done extends WorkerState
+
+  private class EchoWorkerService extends UdfWorkerGrpc.UdfWorkerImplBase {
+
+    override def execute(
+        responseObserver: StreamObserver[UdfResponse]): 
StreamObserver[UdfRequest] =
+      new ExecuteStreamHandler(responseObserver)
+
+    override def manage(
+        request: WorkerRequest,
+        responseObserver: StreamObserver[WorkerResponse]): Unit = {
+      request.getManageCase match {
+        case WorkerRequest.ManageCase.HEARTBEAT =>
+          responseObserver.onNext(WorkerResponse.newBuilder()
+            .setHeartbeat(HeartbeatResponse.getDefaultInstance)
+            .build())
+          responseObserver.onCompleted()
+
+        case WorkerRequest.ManageCase.SHUTDOWN =>
+          responseObserver.onNext(WorkerResponse.newBuilder()
+            
.setShutdown(ShutdownResponse.newBuilder().setSessionsSettled(true).build())
+            .build())
+          responseObserver.onCompleted()
+
+        case _ =>
+          responseObserver.onError(
+            Status.INVALID_ARGUMENT.withDescription("empty manage request")
+              .asRuntimeException())
+      }
+    }
+  }
+
+  private class ExecuteStreamHandler(
+      responseObserver: StreamObserver[UdfResponse]) extends 
StreamObserver[UdfRequest] {
+
+    // State mutations go through `matchUpdateThen`: under stateLock, the
+    // caller-supplied function inspects the current state, returns the
+    // next state and a non-blocking follow-up callback; the helper writes
+    // the new state and releases the lock before invoking the callback,
+    // so I/O does not extend the critical section.
+    @volatile private var state: WorkerState = AwaitingInit
+    private val stateLock = new Object
+
+    private def matchUpdateThen(
+        transition: WorkerState => (WorkerState, () => Unit)): Unit = {
+      val followUp = stateLock.synchronized {
+        val (next, callback) = transition(state)
+        state = next
+        callback
+      }
+      followUp()
+    }
+
+    // gRPC does not permit concurrent calls to the response StreamObserver;
+    // all writes are serialized through this lock.
+    private val responseLock = new Object
+
+    override def onNext(request: UdfRequest): Unit = {
+      request.getRequestCase match {
+        case UdfRequest.RequestCase.CONTROL => 
handleControl(request.getControl)
+        case UdfRequest.RequestCase.DATA => handleDataRequest(request.getData)
+        case _ => closeWithProtocolError("empty request oneof")
+      }
+    }
+
+    private def handleControl(ctrl: UdfControlRequest): Unit = {
+      ctrl.getControlCase match {
+        case UdfControlRequest.ControlCase.INIT => handleInit(ctrl.getInit)
+        case UdfControlRequest.ControlCase.PAYLOAD => 
handleChunk(ctrl.getPayload)
+        case UdfControlRequest.ControlCase.FINISH => handleFinish()
+        case UdfControlRequest.ControlCase.CANCEL => 
handleCancel(ctrl.getCancel)
+        case _ => closeWithProtocolError("empty control oneof")
+      }
+    }
+
+    private def handleInit(init: Init): Unit = matchUpdateThen {
+      case AwaitingInit =>
+        if (init.hasProtocolVersion &&
+            init.getProtocolVersion != SUPPORTED_VERSION) {
+          val err = ExecutionError.newBuilder()
+            .setProtocol(ProtocolError.newBuilder()
+              .setMessage(s"unsupported protocol version: 
${init.getProtocolVersion}")
+              .build())
+            .build()
+          (PostError, () => sendControl(UdfControlResponse.newBuilder()
+            .setInit(InitResponse.newBuilder().setError(err).build())
+            .build()))
+        } else if (init.getIsChunkingPayload) {
+          // Payload arrives via PayloadChunk messages; defer init
+          // processing until the last chunk has been received.
+          (AwaitingChunks(init.getUdf.getPayload), () => ())
+        } else {
+          // Payload is fully inline; process init outside the lock.
+          // finalizeInit performs its own CAS on entry.
+          val payload = init.getUdf.getPayload
+          (AwaitingInit, () => finalizeInit(payload))
+        }
+      case other =>
+        (other, () => closeWithProtocolError(s"Init received in state $other"))
+    }
+
+    private def handleChunk(chunk: PayloadChunk): Unit = matchUpdateThen {
+      case AwaitingChunks(existing) =>
+        val updated = existing.concat(chunk.getData)
+        if (chunk.hasLast && chunk.getLast) {
+          // Stay in AwaitingChunks until finalizeInit's CAS transitions us.
+          (AwaitingChunks(existing), () => finalizeInit(updated))
+        } else {
+          (AwaitingChunks(updated), () => ())
+        }
+      case other =>
+        (other, () => closeWithProtocolError(s"PayloadChunk received in state 
$other"))
+    }
+
+    // Init processing hook: invoked once with the complete assembled UDF
+    // payload (inline + all chunks, if any). A real worker would deserialize
+    // the UDF, run validation, set up runtime resources here. The echo worker
+    // succeeds for any payload other than INIT_ERROR_TRIGGER, which simulates
+    // an init-time failure (e.g. deserialization error, missing dependency).
+    private def finalizeInit(payload: ByteString): Unit = {
+      val initError: Option[ExecutionError] = if (payload == 
INIT_ERROR_TRIGGER) {
+        Some(ExecutionError.newBuilder()
+          .setWorker(WorkerError.newBuilder()
+            .setMessage("simulated init failure")
+            .build())
+          .build())
+      } else {
+        None
+      }
+      matchUpdateThen {
+        case AwaitingInit | AwaitingChunks(_) =>
+          initError match {
+            case Some(err) =>
+              (PostError, () => sendControl(UdfControlResponse.newBuilder()
+                .setInit(InitResponse.newBuilder().setError(err).build())
+                .build()))
+            case None =>
+              (Data, () => sendInitResponse())
+          }
+        // Concurrent Cancel / transport error moved state past the init
+        // phase; the cancel path owns the terminator.
+        case other @ (Cancelling | Cancelled | Done) =>
+          (other, () => ())
+        case other =>
+          (other, () => closeWithProtocolError(s"finalizeInit invoked in state 
$other"))
+      }
+    }
+
+    private def handleDataRequest(data: DataRequest): Unit = state match {
+      case Data => processEcho(data)
+
+      case _ => closeWithProtocolError(s"DataRequest received in state $state")
+    }
+
+    // Echo "processing" runs inline on the gRPC callback thread for test
+    // simplicity. Workers that offload to a thread pool (the typical
+    // approach for non-trivial UDFs) must apply back-pressure via a
+    // bounded queue and serialize state mutations across threads.
+    private def processEcho(data: DataRequest): Unit = {
+      if (data.getData == ERROR_TRIGGER) {
+        // Data-phase error: emit ErrorResponse and enter PostError so the
+        // terminator becomes CancelResponse after the engine's Cancel.
+        // Only transition if we are still in Data: a concurrent Cancel
+        // may have moved us to Cancelling, in which case the cancel path
+        // owns the terminator.
+        val errEnvelope = UdfControlResponse.newBuilder()
+          .setError(ErrorResponse.newBuilder()
+            .setError(ExecutionError.newBuilder()
+              .setUser(UserError.newBuilder()
+                .setMessage("simulated user-code error")
+                .setErrorClass("SimulatedError")
+                .build())
+              .build())
+            .build())
+          .build()
+        matchUpdateThen {
+          case Data => (PostError, () => sendControl(errEnvelope))
+          // Concurrent Cancel / transport error already moved past data
+          // phase; the cancel path owns the terminator.
+          case other @ (Cancelling | Cancelled | Done) => (other, () => ())
+          case other =>
+            (other, () => closeWithProtocolError(s"processEcho invoked in 
state $other"))
+        }
+      } else {
+        responseLock.synchronized {
+          responseObserver.onNext(UdfResponse.newBuilder()
+            .setData(DataResponse.newBuilder().setData(data.getData).build())
+            .build())
+        }
+      }
+    }
+
+    private def handleFinish(): Unit = matchUpdateThen {
+      case Data =>
+        (Draining, () => onWorkComplete())
+      case PostError =>
+        // ErrorResponse already sent; this Finish was in flight before the
+        // engine learned about the error. The engine MUST follow up with
+        // Cancel; wait for it.
+        (PostError, () => ())
+      // Finish in AwaitingInit or AwaitingChunks is a protocol error:
+      // the engine MUST wait for InitResponse before sending Finish.
+      case other =>
+        (other, () => closeWithProtocolError(s"Finish received in state 
$other"))
+    }
+
+    // Lazy-cancel: transition to Cancelling and let any in-flight work run
+    // to natural completion; the pending onWorkComplete (or this method's
+    // own follow-up call when no work is in flight) sees Cancelling and
+    // routes to CancelResponse.
+    private def handleCancel(cancel: Cancel): Unit = matchUpdateThen {
+      case AwaitingInit | AwaitingChunks(_) | Data | PostError | Draining | 
Drained =>
+        (Cancelling, () => onWorkComplete())
+      case other @ (Cancelling | Cancelled | Done) =>
+        // Already cancelling or terminated; ignore duplicate Cancel.
+        (other, () => ())
+    }
+
+    // Called when in-flight work (finish callback, cancel cleanup, or
+    // batch processing) completes. The current state decides the
+    // terminator:
+    //   Draining   -> Drained   -> send FinishResponse
+    //   Cancelling -> Cancelled -> send CancelResponse
+    //
+    // An optional post-work cleanup hook (release file handles, flush
+    // metrics) belongs between the state transition and the terminator
+    // send. Any error from the hook is reported via FinishResponse.error
+    // or CancelResponse.error.
+    private def onWorkComplete(): Unit = matchUpdateThen {
+      case Draining =>
+        (Drained, () => sendFinishResponseAndFinalize())
+      case Cancelling =>
+        (Cancelled, () => sendCancelResponseAndFinalize())
+      // Stream already finalized (e.g. onError fired before this
+      // completion notification arrived) -- nothing to do.
+      case Done => (Done, () => ())
+      case other =>
+        (other, () => closeWithProtocolError(s"onWorkComplete invoked in state 
$other"))
+    }
+
+    private def sendFinishResponseAndFinalize(): Unit = {
+      sendControl(UdfControlResponse.newBuilder()
+        .setFinish(FinishResponse.newBuilder()
+          .putMetrics("status", "ok")
+          .build())
+        .build())
+      matchUpdateThen { _ =>
+        (Done, () => responseLock.synchronized { 
responseObserver.onCompleted() })
+      }
+    }
+
+    private def sendCancelResponseAndFinalize(): Unit = {
+      sendControl(UdfControlResponse.newBuilder()
+        .setCancel(CancelResponse.getDefaultInstance)
+        .build())
+      matchUpdateThen { _ =>
+        (Done, () => responseLock.synchronized { 
responseObserver.onCompleted() })
+      }
+    }
+
+    // gRPC transport error: the connection dropped. The stream is dead,
+    // so no response can be sent. The worker MUST still run the cleanup
+    // it would perform on an explicit Cancel (stop in-progress work,
+    // release resources, free buffers). The echo worker has nothing to
+    // release; only the state is updated.
+    override def onError(t: Throwable): Unit = matchUpdateThen { _ =>
+      (Done, () => ())
+    }
+
+    override def onCompleted(): Unit = state match {
+      case Done => // normal: engine half-closed after session terminated
+      case _ =>
+        closeWithProtocolError(
+          s"request stream closed by engine in unexpected state $state")
+    }
+
+    private def sendInitResponse(): Unit =
+      sendControl(UdfControlResponse.newBuilder()
+        .setInit(InitResponse.getDefaultInstance)
+        .build())
+
+    private def sendControl(ctrl: UdfControlResponse): Unit =
+      responseLock.synchronized {
+        responseObserver.onNext(
+          UdfResponse.newBuilder().setControl(ctrl).build())
+      }
+
+    // Emit ErrorResponse(ProtocolError) followed immediately by
+    // CancelResponse. No in-flight work to drain, so the Cancelling /
+    // Cancelled intermediate states are bypassed.
+    private def closeWithProtocolError(msg: String): Unit = {
+      sendControl(UdfControlResponse.newBuilder()
+        .setError(ErrorResponse.newBuilder()
+          .setError(ExecutionError.newBuilder()
+            .setProtocol(ProtocolError.newBuilder().setMessage(msg).build())
+            .build())
+          .build())
+        .build())
+      sendCancelResponseAndFinalize()
+    }
+  }
+
+  // 
===========================================================================
+  // ENGINE SIDE (gRPC client)
+  // 
===========================================================================
+
+  /**
+   * Minimal engine client that drives the Execute stream and collects results.
+   *
+   * The request stream is half-closed (onCompleted) only after the session
+   * outcome is known from the server: on receiving FinishResponse,
+   * CancelResponse, or a gRPC error. This keeps the stream open long enough
+   * for Cancel to follow Finish when needed.
+   */
+  private class EngineClient(stub: UdfWorkerGrpc.UdfWorkerStub) {
+    private val results = new LinkedBlockingQueue[Array[Byte]]()
+    private val done = new CountDownLatch(1)
+    @volatile var executionError: Option[ExecutionError] = None
+    @volatile var streamError: Option[Throwable] = None
+    private val requestCompleted = new AtomicBoolean(false)
+    // Counted down on InitResponse (success or failure) or on terminal error.
+    // The engine MUST wait for this before sending any DataRequest or Finish.
+    private val initResponseLatch = new CountDownLatch(1)
+
+    private val responseObserver = new StreamObserver[UdfResponse] {
+      override def onNext(response: UdfResponse): Unit = {
+        response.getResponseCase match {
+          case UdfResponse.ResponseCase.DATA =>
+            results.add(response.getData.getData.toByteArray)
+
+          case UdfResponse.ResponseCase.CONTROL =>
+            val ctrl = response.getControl
+            ctrl.getControlCase match {
+              case UdfControlResponse.ControlCase.INIT =>
+                // InitResponse received. If error is set, init failed.
+                val resp = ctrl.getInit
+                if (resp.hasError) {
+                  executionError = Some(resp.getError)
+                  if (!requestCompleted.get()) sendCancel("aborting after init 
error")
+                }
+                initResponseLatch.countDown()
+                // Data phase begins only on success (no error).
+
+              case UdfControlResponse.ControlCase.ERROR =>
+                // Data-phase error. Send Cancel so the worker can abort 
cleanly;
+                // the error is surfaced after CancelResponse arrives.
+                executionError = Some(ctrl.getError.getError)
+                if (!requestCompleted.get()) {
+                  sendCancel("aborting after ErrorResponse")
+                }
+
+              case UdfControlResponse.ControlCase.FINISH =>
+                completeRequestStream()
+                done.countDown()
+
+              case UdfControlResponse.ControlCase.CANCEL =>
+                completeRequestStream()
+                done.countDown()
+
+              case unexpected =>
+                throw new IllegalStateException(
+                  s"unexpected control response: $unexpected")
+            }
+
+          case unexpected =>
+            throw new IllegalStateException(
+              s"unexpected response type: $unexpected")
+        }
+      }
+
+      override def onError(t: Throwable): Unit = {
+        streamError = Some(t)
+        completeRequestStream()
+        initResponseLatch.countDown()
+        done.countDown()
+      }
+
+      override def onCompleted(): Unit = {
+        initResponseLatch.countDown()
+        done.countDown()
+      }
+    }
+
+    private val requestObserver: StreamObserver[UdfRequest] = 
stub.execute(responseObserver)
+
+    def sendInit(
+        payloadBytes: Array[Byte],
+        sendChunked: Boolean = false,
+        protocolVersion: Int = SUPPORTED_VERSION): Unit = {
+      if (sendChunked) {
+        requestObserver.onNext(UdfRequest.newBuilder()
+          .setControl(UdfControlRequest.newBuilder()
+            .setInit(Init.newBuilder()
+              .setProtocolVersion(protocolVersion)
+              .setIsChunkingPayload(true)
+              .setDataFormat(UDFWorkerDataFormat.ARROW)
+              .setUdf(UdfPayload.newBuilder()
+                .setPayload(ByteString.EMPTY)
+                .setFormat("echo")
+                .build())
+              .build())
+            .build())
+          .build())
+        requestObserver.onNext(UdfRequest.newBuilder()
+          .setControl(UdfControlRequest.newBuilder()
+            .setPayload(PayloadChunk.newBuilder()
+              .setData(ByteString.copyFrom(payloadBytes))
+              .setLast(true)
+              .build())
+            .build())
+          .build())
+      } else {
+        requestObserver.onNext(UdfRequest.newBuilder()
+          .setControl(UdfControlRequest.newBuilder()
+            .setInit(Init.newBuilder()
+              .setProtocolVersion(protocolVersion)
+              .setDataFormat(UDFWorkerDataFormat.ARROW)
+              .setUdf(UdfPayload.newBuilder()
+                .setPayload(ByteString.copyFrom(payloadBytes))
+                .setFormat("echo")
+                .build())
+              .build())
+            .build())
+          .build())
+      }
+    }
+
+    def sendData(data: Array[Byte]): Unit = {
+      awaitInitResponse()
+      requestObserver.onNext(UdfRequest.newBuilder()
+        .setData(DataRequest.newBuilder()
+          .setData(ByteString.copyFrom(data))
+          .build())
+        .build())
+    }
+
+    // Sends Init with is_chunking_payload=true but no chunks. Tests then
+    // drive the chunks themselves via sendPayloadChunk.
+    def sendInitChunked(protocolVersion: Int = SUPPORTED_VERSION): Unit = {
+      requestObserver.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setInit(Init.newBuilder()
+            .setProtocolVersion(protocolVersion)
+            .setIsChunkingPayload(true)
+            .setDataFormat(UDFWorkerDataFormat.ARROW)
+            .setUdf(UdfPayload.newBuilder()
+              .setPayload(ByteString.EMPTY)
+              .setFormat("echo")
+              .build())
+            .build())
+          .build())
+        .build())
+    }
+
+    // Sends a single PayloadChunk. Does not wait for InitResponse --
+    // chunks are part of the init handshake itself.
+    def sendPayloadChunk(data: Array[Byte], last: Boolean): Unit = {
+      requestObserver.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setPayload(PayloadChunk.newBuilder()
+            .setData(ByteString.copyFrom(data))
+            .setLast(last)
+            .build())
+          .build())
+        .build())
+    }
+
+    def sendFinish(): Unit = {
+      awaitInitResponse()
+      if (requestCompleted.get()) return
+      requestObserver.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setFinish(Finish.getDefaultInstance)
+          .build())
+        .build())
+      // Request stream stays open: Cancel may still follow Finish.
+      // completeRequestStream() is called by the response observer.
+    }
+
+    // The engine MUST wait for InitResponse before sending any DataRequest
+    // or Finish. Under directExecutor this returns immediately because
+    // sendInit's InitResponse callback runs synchronously.
+    private def awaitInitResponse(): Unit = {
+      if (!initResponseLatch.await(5, TimeUnit.SECONDS)) {
+        throw new IllegalStateException("InitResponse not received within 
timeout")
+      }
+    }
+
+    def sendCancel(reason: String = ""): Unit = {
+      // If a terminator already arrived (FinishResponse / CancelResponse),
+      // the request stream has been half-closed and Cancel arrives too
+      // late -- silently ignore, matching the proto's Cancel-after-Finish
+      // contract.
+      if (requestCompleted.get()) return
+      requestObserver.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setCancel(Cancel.newBuilder().setReason(reason).build())
+          .build())
+        .build())
+      // Request stream stays open until the response terminator arrives;
+      // completeRequestStream() is called by the response observer.
+    }
+
+    def completeRequestStream(): Unit = {
+      if (requestCompleted.compareAndSet(false, true)) {
+        requestObserver.onCompleted()
+      }
+    }
+
+    def awaitDone(timeoutMs: Long = 5000): Boolean =
+      done.await(timeoutMs, TimeUnit.MILLISECONDS)
+
+    def drainResults(): Seq[Array[Byte]] = {
+      val buf = new java.util.ArrayList[Array[Byte]]()
+      results.drainTo(buf)
+      import scala.jdk.CollectionConverters._
+      buf.asScala.toSeq
+    }
+  }
+
+  // 
===========================================================================
+  // TESTS
+  // 
===========================================================================
+
+  test("echo: single DataRequest round-trip") {
+    val client = new EngineClient(stub)
+    client.sendInit("dummy-payload".getBytes)
+    client.sendData("hello".getBytes)
+    client.sendFinish()
+
+    assert(client.awaitDone(), "stream did not complete in time")
+    assert(client.streamError.isEmpty, s"unexpected stream error: 
${client.streamError}")
+    assert(client.executionError.isEmpty, s"unexpected execution error: 
${client.executionError}")
+    val results = client.drainResults()
+    assert(results.length == 1)
+    assert(new String(results.head) == "hello")
+  }
+
+  test("echo: multiple DataRequest batches are all echoed") {
+    val client = new EngineClient(stub)
+    client.sendInit("dummy-payload".getBytes)
+    Seq("batch1", "batch2", "batch3").foreach(b => client.sendData(b.getBytes))
+    client.sendFinish()
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    val results = client.drainResults().map(new String(_))
+    assert(results == Seq("batch1", "batch2", "batch3"))
+  }
+
+  // The engine drives the request side from a producer thread while the
+  // response observer fires on a gRPC-managed callback thread. gRPC's
+  // bidirectional streaming and HTTP/2 flow control manage the interleaving;
+  // no explicit coordination is needed beyond the protocol ordering 
invariants.
+  test("echo: concurrent sending and receiving (producer/consumer pattern)") {
+    val asyncStub = UdfWorkerGrpc.newStub(channel)
+
+    val receivedCount = new java.util.concurrent.atomic.AtomicInteger(0)
+    val doneLatch = new CountDownLatch(1)
+    @volatile var streamErr: Option[Throwable] = None
+    val requestCompleted = new AtomicBoolean(false)
+    // reqObs is assigned after responseObs is created. AtomicReference
+    // gives the response observer (which may run on a gRPC callback
+    // thread) a safe view of the assignment made by the test thread.
+    val reqObsRef = new 
java.util.concurrent.atomic.AtomicReference[StreamObserver[UdfRequest]]()
+
+    val responseObs = new StreamObserver[UdfResponse] {
+      private def completeRequestStream(): Unit =
+        if (requestCompleted.compareAndSet(false, true)) 
reqObsRef.get().onCompleted()
+
+      override def onNext(r: UdfResponse): Unit = r.getResponseCase match {
+        case UdfResponse.ResponseCase.DATA => receivedCount.incrementAndGet()
+        case UdfResponse.ResponseCase.CONTROL =>
+          val c = r.getControl
+          c.getControlCase match {
+            case UdfControlResponse.ControlCase.INIT => // InitResponse: data 
phase can proceed
+            case UdfControlResponse.ControlCase.FINISH =>
+              completeRequestStream()
+              doneLatch.countDown()
+            case unexpected =>
+              throw new IllegalStateException(
+                s"unexpected control response: $unexpected")
+          }
+        case unexpected =>
+          throw new IllegalStateException(
+            s"unexpected response type: $unexpected")
+      }
+      override def onError(t: Throwable): Unit = {
+        streamErr = Some(t)
+        completeRequestStream()
+        doneLatch.countDown()
+      }
+      override def onCompleted(): Unit = doneLatch.countDown()
+    }
+    reqObsRef.set(asyncStub.execute(responseObs))
+
+    val producer = new Thread(() => {
+      val reqObs = reqObsRef.get()
+      reqObs.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setInit(Init.newBuilder()
+            .setProtocolVersion(SUPPORTED_VERSION)
+            .setDataFormat(UDFWorkerDataFormat.ARROW)
+            .setUdf(UdfPayload.newBuilder()
+              .setPayload(ByteString.copyFromUtf8("payload"))
+              .setFormat("echo").build())
+            .build())
+          .build())
+        .build())
+      (1 to 5).foreach { i =>
+        reqObs.onNext(UdfRequest.newBuilder()
+          .setData(DataRequest.newBuilder()
+            .setData(ByteString.copyFromUtf8(s"batch-$i")).build())
+          .build())
+      }
+      reqObs.onNext(UdfRequest.newBuilder()
+        .setControl(UdfControlRequest.newBuilder()
+          .setFinish(Finish.getDefaultInstance).build())
+        .build())
+      // Request stream stays open; completeRequestStream() is called by
+      // the response observer on FinishResponse or gRPC error.
+    }, "producer")
+    producer.start()
+
+    assert(doneLatch.await(10, TimeUnit.SECONDS), "stream did not complete")
+    assert(streamErr.isEmpty, s"unexpected error: $streamErr")
+    assert(receivedCount.get() == 5, s"expected 5 echoes, got 
${receivedCount.get()}")
+  }
+
+  test("echo: chunked payload delivery") {
+    val client = new EngineClient(stub)
+    client.sendInit("chunked-payload".getBytes, sendChunked = true)
+    client.sendData("data".getBytes)
+    client.sendFinish()
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    assert(new String(client.drainResults().head) == "data")
+  }
+
+  test("echo: generator-style UDF (zero DataRequests, engine sends Finish 
after Init)") {
+    val client = new EngineClient(stub)
+    client.sendInit("generator-payload".getBytes)
+    client.sendFinish()
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    assert(client.drainResults().isEmpty)
+  }
+
+  test("cancel: engine cancels mid-stream before sending Finish") {
+    val client = new EngineClient(stub)
+    client.sendInit("dummy-payload".getBytes)
+    client.sendData("batch1".getBytes)
+    client.sendCancel("task interrupted")
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+  }
+
+  // Cancel MAY follow Finish. The worker sends CancelResponse if Cancel 
arrives
+  // before FinishResponse is sent, or FinishResponse if it arrived too late.
+  // The engine must accept either outcome.
+  test("cancel: engine sends Cancel after Finish -- accepts FinishResponse or 
CancelResponse") {
+    val client = new EngineClient(stub)
+    client.sendInit("dummy-payload".getBytes)
+    client.sendData("data".getBytes)
+    client.sendFinish()
+    client.sendCancel("task interrupted after finish")
+
+    assert(client.awaitDone(), "stream did not complete")
+    assert(client.streamError.isEmpty,
+      s"Cancel-after-Finish must not cause a gRPC error: 
${client.streamError}")
+  }
+
+  test("ErrorResponse: worker signals UserError, engine sends Cancel and 
receives CancelResponse") {
+    val client = new EngineClient(stub)
+    client.sendInit("dummy-payload".getBytes)
+    client.sendData(ERROR_TRIGGER.toByteArray)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, s"expected no gRPC error, got 
${client.streamError}")
+    assert(client.executionError.isDefined, "expected an ExecutionError")
+    assert(client.executionError.get.hasUser, "expected UserError kind")
+    assert(client.executionError.get.getUser.getErrorClass == "SimulatedError")
+  }
+
+  test("protocol error: second Init is rejected with ProtocolError + 
CancelResponse") {
+    val client = new EngineClient(stub)
+    client.sendInit("payload".getBytes)
+    client.sendInit("second-init".getBytes)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, "expected ProtocolError, not a gRPC 
stream error")
+    assert(client.executionError.isDefined, "expected an ExecutionError")
+    assert(client.executionError.get.hasProtocol, "expected ProtocolError 
kind")
+  }
+
+  test("init error: inline payload triggers init failure") {
+    val client = new EngineClient(stub)
+    client.sendInit(INIT_ERROR_TRIGGER.toByteArray)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, s"expected no gRPC error, got 
${client.streamError}")
+    assert(client.executionError.isDefined, "expected an init error")
+    assert(client.executionError.get.hasWorker, "expected WorkerError kind")
+    assert(client.executionError.get.getWorker.getMessage == "simulated init 
failure")
+  }
+
+  test("init error: chunked payload assembled across chunks triggers init 
failure") {
+    val client = new EngineClient(stub)
+    client.sendInit(INIT_ERROR_TRIGGER.toByteArray, sendChunked = true)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, s"expected no gRPC error, got 
${client.streamError}")
+    assert(client.executionError.isDefined, "expected an init error")
+    assert(client.executionError.get.hasWorker, "expected WorkerError kind")
+  }
+
+  test("init error: unsupported protocol version triggers init failure") {
+    val client = new EngineClient(stub)
+    client.sendInit("payload".getBytes, protocolVersion = SUPPORTED_VERSION + 
999)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, s"expected no gRPC error, got 
${client.streamError}")
+    assert(client.executionError.isDefined, "expected an init error")
+    assert(client.executionError.get.hasProtocol, "expected ProtocolError 
kind")
+  }
+
+  test("cancel: Cancel before Init is accepted (AwaitingInit state)") {
+    val client = new EngineClient(stub)
+    client.sendCancel("aborting before init")
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty, s"expected no gRPC error, got 
${client.streamError}")
+    assert(client.executionError.isEmpty, "Cancel before Init is normal abort, 
not an error")
+  }
+
+  test("cancel: Cancel during chunked payload delivery (AwaitingChunks 
state)") {
+    val client = new EngineClient(stub)
+    client.sendInitChunked()
+    client.sendPayloadChunk("partial".getBytes, last = false)
+    // No final chunk: worker is still accumulating when Cancel arrives.
+    client.sendCancel("aborting mid-chunking")
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    assert(client.executionError.isEmpty)
+  }
+
+  test("echo: chunked payload assembled from multiple non-final chunks") {
+    val client = new EngineClient(stub)
+    client.sendInitChunked()
+    client.sendPayloadChunk("part1".getBytes, last = false)
+    client.sendPayloadChunk("part2".getBytes, last = false)
+    client.sendPayloadChunk("final".getBytes, last = true)
+    // The accumulator state machine should have produced InitResponse after
+    // the last=true chunk; the data-phase round-trip below verifies the
+    // worker correctly transitioned to Data.
+    client.sendData("after-chunks".getBytes)
+    client.sendFinish()
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    assert(new String(client.drainResults().head) == "after-chunks")
+  }
+
+  test("protocol error: PayloadChunk in Data state (no chunking flag on 
Init)") {
+    val client = new EngineClient(stub)
+    client.sendInit("payload".getBytes) // non-chunked init transitions to Data
+    client.sendPayloadChunk("oops".getBytes, last = true)
+
+    assert(client.awaitDone())
+    assert(client.streamError.isEmpty)
+    assert(client.executionError.isDefined, "expected a protocol error")
+    assert(client.executionError.get.hasProtocol)
+  }
+
+  test("Manage: heartbeat is acknowledged") {
+    val blockingStub = UdfWorkerGrpc.newBlockingStub(channel)
+    val resp = blockingStub.manage(WorkerRequest.newBuilder()
+      .setHeartbeat(Heartbeat.getDefaultInstance)
+      .build())
+    assert(resp.hasHeartbeat, "expected HeartbeatResponse")
+  }
+
+  test("Manage: ShutdownRequest is acknowledged") {
+    val blockingStub = UdfWorkerGrpc.newBlockingStub(channel)
+    val resp = blockingStub.manage(WorkerRequest.newBuilder()
+      .setShutdown(ShutdownRequest.newBuilder().setReason("test done").build())
+      .build())
+    assert(resp.hasShutdown, "expected ShutdownResponse")
+  }
+}
diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml
index 55c27003943c..8868a50707cb 100644
--- a/udf/worker/proto/pom.xml
+++ b/udf/worker/proto/pom.xml
@@ -43,6 +43,21 @@
       <artifactId>protobuf-java</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-api</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+      <version>${io.grpc.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
@@ -58,30 +73,54 @@
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
     <plugins>
       <plugin>
-        <groupId>com.github.os72</groupId>
-        <artifactId>protoc-jar-maven-plugin</artifactId>
-        <version>${protoc-jar-maven-plugin.version}</version>
+        <groupId>eu.maveniverse.maven.plugins</groupId>
+        <artifactId>nisse-plugin3</artifactId>
+        <version>0.7.0</version>
         <executions>
           <execution>
-            <phase>generate-sources</phase>
+            <id>set-os-detector-properties</id>
             <goals>
-              <goal>run</goal>
+              <goal>inject-properties</goal>
+            </goals>
+            <phase>validate</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <version>0.6.1</version>
+        <configuration>
+          
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+          <pluginId>grpc-java</pluginId>
+          
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+          <protoSourceRoot>src/main/protobuf</protoSourceRoot>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>compile-custom</goal>
             </goals>
-            <configuration>
-              
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
-              <protocVersion>${protobuf.version}</protocVersion>
-              <inputDirectories>
-                <include>src/main/protobuf</include>
-              </inputDirectories>
-            </configuration>
           </execution>
         </executions>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <shadedArtifactAttached>false</shadedArtifactAttached>
+        <!--
+          Override the parent pom's shade configuration completely (rather than
+          merging) so this module's shade output contains only the proto module
+          plus its relocated protobuf-java classes. The shaded fat jar is 
published
+          as an attached `-shaded` classifier artifact, keeping the main 
artifact
+          unshaded so consumers (e.g. spark-udf-worker-core) and their tests 
can
+          continue to compile against the unrelocated 
com.google.protobuf.ByteString.
+          This mirrors the SBT setup where `udf-worker-proto/assembly` 
produces a
+          shaded fat jar separately from the regular `package` output.
+        -->
+        <configuration combine.self="override">
+          <shadedArtifactAttached>true</shadedArtifactAttached>
+          <shadedClassifierName>shaded</shadedClassifierName>
           <shadeTestJar>false</shadeTestJar>
           <artifactSet>
             <includes>
@@ -106,6 +145,14 @@
             </filter>
           </filters>
         </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
       <plugin>
         <groupId>net.alchim31.maven</groupId>
diff --git a/udf/worker/proto/src/main/protobuf/common.proto 
b/udf/worker/proto/src/main/protobuf/common.proto
index ee032def73ef..7028b1357187 100644
--- a/udf/worker/proto/src/main/protobuf/common.proto
+++ b/udf/worker/proto/src/main/protobuf/common.proto
@@ -26,7 +26,7 @@ option java_multiple_files = true;
 // The UDF in & output data format.
 enum UDFWorkerDataFormat {
     UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0;
-    
+
     // The worker accepts and produces Apache arrow batches.
     ARROW = 1;
 }
@@ -42,7 +42,7 @@ enum UDFWorkerDataFormat {
 enum UDFProtoCommunicationPattern {
     UDF_PROTO_COMMUNICATION_PATTERN_UNSPECIFIED = 0;
 
-    // Data exachanged as a bidrectional
+    // Data exchanged as a bidirectional
     // stream of bytes.
     BIDIRECTIONAL_STREAMING = 1;
 }
diff --git a/udf/worker/proto/src/main/protobuf/udf_protocol.proto 
b/udf/worker/proto/src/main/protobuf/udf_protocol.proto
new file mode 100644
index 000000000000..a66e09af09fe
--- /dev/null
+++ b/udf/worker/proto/src/main/protobuf/udf_protocol.proto
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "common.proto";
+
+package org.apache.spark.udf.worker;
+
+option java_package = "org.apache.spark.udf.worker";
+option java_multiple_files = true;
+
+// =====================================================================
+// Language-agnostic UDF execution protocol.
+//
+// The Spark engine acts as the gRPC client; a UDF worker (in any
+// language) acts as the gRPC server.
+// =====================================================================
+
+// The default UDF gRPC service. A worker that exposes this service
+// MUST do so over the default connection of the worker specification.
+//
+// Future revisions of the worker specification may introduce additional
+// dedicated connections for specific purposes (e.g. a separate channel
+// for streaming state store access in stateful UDFs).
+service UdfWorker {
+    // Per-execution stream. See [[UdfControlRequest]] for the complete
+    // wire protocol and ordering invariants.
+    //
+    // Error contract: a gRPC error on this stream signals a transport or
+    // connection failure. Application-level errors (user code exceptions,
+    // worker errors, protocol violations) are reported via [[ErrorResponse]]
+    // or [[InitResponse.error]] so the stream lifecycle remains intact.
+    //
+    // Hanging UDFs: this protocol does not define what constitutes a
+    // hanging UDF -- the protocol cannot distinguish user code that is
+    // genuinely stuck from code that is merely slow or intentionally
+    // long-running. In-band [[Cancel]] is also not a reliable remedy:
+    // a Cancel queued behind a blocked batch (or behind a full sender
+    // buffer) is never delivered to the worker. Detection and recovery
+    // are layered on top of this protocol -- typically via worker-side
+    // watchdog timers configurable at the session or UDF level.
+    //
+    // Stream lifecycle: the engine MUST half-close the request stream
+    // (call onCompleted() on the gRPC request side) only after the session
+    // outcome is known: on receiving [[FinishResponse]] or [[CancelResponse]]
+    // (clean termination), or on receiving a gRPC error. Deferring the
+    // half-close keeps the stream open long enough for [[Cancel]] to follow
+    // [[Finish]] when needed (see [[Finish]] for the full contract).
+    //
+    // For stateful execution, the state is maintained per bi-directional
+    // stream, mapping to a `WorkerSession` on the engine side.
+    rpc Execute(stream UdfRequest) returns (stream UdfResponse);
+
+    // Worker-scoped management RPC for heartbeat, capability query, and
+    // graceful shutdown. Workers MUST ensure this RPC remains serviceable
+    // regardless of how many [[Execute]] streams are in flight; failing to
+    // do so can prevent the engine from detecting a hung worker or
+    // initiating a clean shutdown.
+    rpc Manage(WorkerRequest) returns (WorkerResponse);
+}
+
+// =====================================================================
+// Execute stream -- envelope
+// =====================================================================
+
+// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]]
+// / [[Finish]] / [[Cancel]]) or a data message.
+message UdfRequest {
+    oneof request {
+        UdfControlRequest control = 1;
+        DataRequest       data    = 2;
+    }
+}
+
+// Worker -> Engine. Either a control response ([[InitResponse]] /
+// [[FinishResponse]] / [[CancelResponse]] / [[ErrorResponse]]) or a
+// data response message.
+message UdfResponse {
+    oneof response {
+        UdfControlResponse control = 1;
+        DataResponse       data    = 2;
+    }
+}
+
+// Engine -> Worker control messages.
+//
+// Wire protocol for one Execute stream (both directions):
+//
+//   Engine -> Worker:  Init -> PayloadChunk* -> (DataRequest)* -> Finish 
(Cancel)?
+//                                                               | Cancel
+//   Worker -> Engine:          InitResponse  -> (DataResponse)* -> 
(ErrorResponse)? -> (FinishResponse | CancelResponse)
+//
+// [[InitResponse]] may carry [[InitResponse.error]] to signal an init
+// failure; in that case no [[DataResponse]] messages follow.
+//
+// DataRequest and DataResponse are independent streams: the worker
+// may emit DataResponse messages at any point after InitResponse,
+// including before the first DataRequest arrives. For generator-style
+// UDFs that produce output without consuming input, there may be zero
+// DataRequest messages -- the engine sends Finish directly after
+// receiving InitResponse. The arrows above denote ordering constraints
+// within each direction, not a request/response pairing.
+//
+// Ordering invariants:
+//   - PayloadChunk* sit between Init and InitResponse, used only when
+//     [[Init.is_chunking_payload]] = true. The worker waits for the
+//     chunk carrying [[PayloadChunk.last]] = true before sending
+//     InitResponse; otherwise InitResponse MAY follow Init immediately.
+//   - InitResponse MUST precede any DataResponse.
+//   - The engine MUST wait for InitResponse before sending any
+//     DataRequest or Finish. Cancel is the only message permitted
+//     before InitResponse.
+//   - ErrorResponse (if any) is emitted after all DataResponse messages,
+//     only from the data-processing phase, and at most once per stream
+//     (multiple errors are aggregated internally). The terminator is
+//     always CancelResponse: the engine MUST send Cancel
+//     (Cancel-after-Finish if Finish was already sent) and the worker
+//     replies with CancelResponse. [[InitResponse.error]] follows the
+//     same termination rule. Errors raised inside the finish or cancel
+//     callback are carried in [[FinishResponse.error]] /
+//     [[CancelResponse.error]] respectively.
+//   - Stream terminators -- Cancel MUST NOT precede Finish:
+//       (a) Finish alone        -> FinishResponse
+//       (b) Cancel alone        -> CancelResponse
+//       (c) Finish then Cancel  -> CancelResponse if FinishResponse has
+//           not yet been sent, otherwise FinishResponse (see [[Finish]]).
+//
+// A worker that receives messages out of order (e.g. a second Init,
+// a PayloadChunk after InitResponse, or a DataRequest/Finish before
+// InitResponse) MUST report the violation as a [[ProtocolError]] --
+// carried in [[InitResponse.error]] when the violation occurs in the
+// init phase (no [[InitResponse]] sent yet), in [[FinishResponse.error]]
+// or [[CancelResponse.error]] when it occurs while producing a
+// terminator, and in [[ErrorResponse.error]] otherwise -- and close
+// the stream with [[CancelResponse]]. (Cancel is permitted at any
+// point after the stream opens, including before Init -- it terminates
+// the session.)
+message UdfControlRequest {
+    oneof control {
+        Init         init    = 1;
+        PayloadChunk payload = 2;
+        Finish       finish  = 3;
+        Cancel       cancel  = 4;
+    }
+}
+
+// Worker -> Engine control messages.
+message UdfControlResponse {
+    oneof control {
+        InitResponse   init   = 1;
+        FinishResponse finish = 2;
+        CancelResponse cancel = 3;
+        ErrorResponse  error  = 4;
+    }
+}
+
+// =====================================================================
+// Init phase
+// =====================================================================
+
+// Sent once, as the first message on an Execute stream. Describes
+// the UDF body to run plus the minimum metadata the worker needs to
+// start processing it.
+//
+// Today the protocol mandates exactly one Init per UDF execution
+// (one Init -> data -> Finish). This is the simplest contract and
+// covers all currently supported UDF kinds. In the future we may
+// evolve to support multiple init phases on the same stream -- e.g.
+// when worker setup requires an interactive handshake (negotiate a
+// schema, exchange capabilities, fetch driver-side metadata, ...)
+// before the data plane opens. Such an extension would be additive
+// and would not change the single-Init semantics already in use.
+//
+// Engine vs. client split:
+//   * Most fields on Init are engine-side. They describe what
+//     flows on the wire for this session ([[data_format]] /
+//     [[input_schema]] / [[output_schema]] -- matching the worker
+//     spec, not the function's view) and what per-session
+//     context the worker needs ([[timezone]], [[session_conf]],
+//     [[task_context]], [[parameters]]).
+//   * [[UdfPayload]] carries everything the client side of Spark
+//     (where the UDF is defined and serialized) packs -- the
+//     serialized callable, an opaque format tag, and any encoder
+//     metadata bundled with the callable. The wire protocol does
+//     not enumerate encoder shapes; that is left to the client and
+//     worker to agree on per UDF type.
+message Init {
+    // (Optional) Protocol version declared by the engine for this stream.
+    // Allows the worker to detect version mismatches early and reject
+    // streams using a protocol revision it does not support. When not set,
+    // the worker SHOULD assume the initial protocol version.
+    optional uint32 protocol_version = 1;
+
+    // (Required) Wire format used for [[DataRequest.data]] and
+    // [[DataResponse.data]] for the life of this session. Must be
+    // one of the formats the worker declared in
+    // [[WorkerCapabilities.supported_data_formats]]; the client side
+    // of the protocol picks one at planning time and sticks with it.
+    //
+    // Workers MUST reject an [[Init]] whose [[data_format]] is
+    // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`, or whose value is not
+    // present in their declared
+    // [[WorkerCapabilities.supported_data_formats]]. The latter covers
+    // unknown enum values that proto3 passes through as numeric
+    // constants -- e.g. a newer engine selecting a format the worker
+    // does not implement.
+    UDFWorkerDataFormat data_format = 2;
+
+    // (Required) The UDF body to execute on the worker for this
+    // session. Exactly one payload per Execute stream.
+    UdfPayload udf = 3;
+
+    // (Optional) When true, the UDF payload is delivered via
+    // [[PayloadChunk]] messages rather than inline in
+    // [[UdfPayload.payload]]; see [[PayloadChunk]] for the chunking
+    // contract.
+    optional bool is_chunking_payload = 4;
+
+    // (Optional) Schema of the input data plane in the wire format
+    // declared by [[data_format]] -- e.g. an Arrow IPC schema when
+    // data_format = ARROW. This is an engine-side requirement: it
+    // describes the bytes the engine will actually put on
+    // [[DataRequest.data]] for this session, matching what the
+    // worker advertised in its spec. It is NOT necessarily the
+    // schema the function definer expressed; the UDF's own type
+    // information lives inside [[UdfPayload]], typically embedded
+    // alongside the callable in [[UdfPayload.payload]] (e.g. as
+    // input/output encoders chosen per UDF type).
+    //
+    // Left unset when the worker can derive the schema from the
+    // payload alone.
+    optional bytes input_schema = 5;
+
+    // (Optional) Schema of the output data plane in the wire format
+    // declared by [[data_format]]. Same semantics as
+    // [[input_schema]] -- engine-side requirement describing the
+    // bytes the engine expects on [[DataResponse.data]].
+    optional bytes output_schema = 6;
+
+    // (Optional; defaults to an empty map.) Per-task context
+    // provided by the engine. Common keys identify the task instance
+    // for diagnostics, logging, and stateful workers -- e.g.
+    // partition id, task attempt id, stage id, micro-batch id.
+    // Engine and worker agree on the keys they share; the protocol
+    // does not enumerate them.
+    map<string, string> task_context = 7;
+
+    // (Optional; defaults to an empty map.) Worker-private knobs not
+    // already captured by typed fields above. Free-form; both sides
+    // agree on the keys they need.
+    //
+    // Any key that two languages converge on is a candidate for
+    // promotion to a structured proto field -- once promoted, it gets
+    // a typed field number from the reserved range right after this
+    // block and is removed from [[session_conf]]. [[timezone]] below
+    // is an example of a key that has already been promoted.
+    map<string, string> session_conf = 8;
+
+    // (Optional) Session timezone, promoted out of [[session_conf]]
+    // because every eval needs it for timestamp encoding/decoding.
+    //
+    // Format follows Spark's `spark.sql.session.timeZone` config --
+    // typically an IANA TZ id (e.g. "America/Los_Angeles") or a
+    // fixed offset (e.g. "+08:00"). The engine MUST pass the value
+    // it would resolve from the session conf without further
+    // transformation, so the worker can interpret it the same way
+    // Spark does.
+    optional string timezone = 9;
+
+    // Reserved for future typed Init fields, in particular keys
+    // graduated from [[session_conf]] (see the [[timezone]] precedent
+    // above). Numbers >= 100 are intentionally NOT reserved here; if
+    // a future revision needs an opaque escape-hatch field, give it a
+    // number >= 100 alongside [[parameters]] and add a field-level
+    // comment so the convention stays visible.
+    reserved 10 to 99;
+
+    // (Optional) Engine-packed opaque parameters specific to a
+    // particular kind of UDF execution. The escape hatch for
+    // anything the engine needs the worker to see at init time
+    // that is not already captured by the typed fields above and
+    // does not fit naturally into [[task_context]]. The encoding
+    // is agreed between the engine and the worker; the protocol
+    // does not interpret it. The matching response, also opaque
+    // bytes, is returned via [[InitResponse.data]].
+    //
+    // Numbers >= 100 are reserved by convention for opaque
+    // escape-hatch fields like this one; new typed fields use the
+    // reserved 10..99 range.
+    //
+    // Client-side init data (anything packed by the layer that
+    // defines and serializes the UDF) does NOT belong here -- it
+    // travels inside [[UdfPayload.payload]] instead.
+    optional bytes parameters = 100;
+}
+
+// Acknowledgment for [[Init]]. Always emitted exactly once, whether init
+// succeeds or fails. When [[PayloadChunk]] is used to deliver the UDF
+// payload, the worker MUST also wait until end-of-chunking to emit it
+// (see [[PayloadChunk]]).
+//
+// On init failure the worker sets [[error]]; no [[DataResponse]] messages
+// follow. The engine MUST send [[Cancel]] and the worker responds with
+// [[CancelResponse]].
+//
+// The init phase is a bidirectional handshake: the worker can return
+// inline bytes for the engine to consume before data starts flowing.
+// This enables certain UDF execution types to communicate init-time
+// results back to the engine early -- for example, signalling that
+// execution should be skipped entirely (e.g. the UDF determined during
+// init that its output is empty), or returning an output schema derived
+// from the payload. The semantics of those bytes are agreed between the
+// client side of the protocol and the worker; this message itself is
+// otherwise opaque.
+message InitResponse {
+    // (Optional) Inline init result returned by the worker. Opaque
+    // to the protocol; the client side of the protocol and the
+    // worker agree on what (if anything) it carries.
+    optional bytes data = 1;
+
+    // (Optional) Error raised during init. See the message-level
+    // comment for the termination rule.
+    optional ExecutionError error = 2;
+}
+
+// Optional. Used to stream the single UDF payload when it does not
+// fit in a single gRPC message. The default is to send the payload
+// inline on [[UdfPayload.payload]]; chunking is only needed when a
+// payload exceeds the gRPC message size limit.
+//
+// When used, at least one chunk MUST be sent after [[Init]] and
+// before the first [[DataRequest]], with the final chunk carrying
+// [[PayloadChunk.last]] = true. The worker concatenates the
+// inline [[UdfPayload.payload]] (if any) followed by all chunks in
+// arrival order to form the final payload.
+//
+// Chunks are part of the Init handshake, not standalone control
+// messages: they extend [[Init.udf.payload]] and are not
+// individually acknowledged. The single [[InitResponse]] covers
+// Init plus all of its chunks together. [[PayloadChunk.last]] = true
+// is the canonical end-of-chunking signal; the worker MUST NOT send
+// [[InitResponse]] before receiving it.
+//
+// Validation: when [[UdfPayload.payload_size]] is set, receivers MUST
+// verify the total assembled length matches; when
+// [[UdfPayload.payload_crc32]] is set, receivers MUST verify the CRC
+// over the assembled bytes. Either mismatch is a [[ProtocolError]].
+message PayloadChunk {
+    // (Required, non-empty.) Bytes appended to the [[Init.udf]]
+    // payload.
+    bytes data = 1;
+
+    // Marks the final chunk. Non-final chunks omit this field. See
+    // the message-level comment for the end-of-chunking contract.
+    //
+    // Kept `optional` so future revisions can distinguish "engine did
+    // not set this field" from "engine set false" without renumbering.
+    optional bool last = 2;
+}
+
+// =====================================================================
+// Data phase
+//
+// `data` is intentionally a top-level `bytes` field on both request
+// and response messages -- not nested inside a wrapper -- so that
+// implementations can avoid an extra copy when reading or writing
+// the payload. The wire format (Arrow IPC etc.) is declared once per
+// session via [[Init.data_format]] and stays the same for the life
+// of the stream.
+//
+// Backpressure: this protocol currently relies on gRPC's transport-level
+// (HTTP/2) flow control for backpressure.
+// =====================================================================
+
+// Engine -> Worker per-batch payload.
+message DataRequest {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. What "empty" means for a batch is
+    // defined by the session's [[Init.data_format]] -- for Arrow IPC
+    // even a zero-row batch carries a non-empty header, while future
+    // formats may permit truly zero-length payloads. Validation
+    // beyond "non-empty bytes" is delegated to the format decoder.
+    bytes data = 1;
+}
+
+// Worker -> Engine per-batch payload. The worker emits zero or more
+// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
+// [[CancelResponse]]. Sink-style UDFs (which consume input but
+// produce no output rows on the data plane) emit exactly zero.
+message DataResponse {
+    // (Required) Encoded data bytes for one batch in the
+    // session-declared format. See [[DataRequest.data]] for the
+    // meaning of "empty".
+    bytes data = 1;
+}
+
+// =====================================================================
+// Finish / Cancel phase
+// =====================================================================
+
+// Sent by the engine when all input data has been submitted and normal
+// completion is expected. The worker MUST drain any remaining output,
+// then emit [[FinishResponse]] and close the response stream.
+//
+// [[Cancel]] MAY follow [[Finish]] on the same stream if the engine
+// wants to abort processing of already-submitted data (e.g. a Spark
+// task is interrupted after all input batches were sent). [[Cancel]]
+// MUST NOT precede [[Finish]]; if the engine cancels before sending
+// all data it sends [[Cancel]] without [[Finish]].
+//
+// Worker behavior when [[Cancel]] follows [[Finish]]:
+//   - If [[FinishResponse]] has not yet been sent, the worker MUST
+//     abort output, run cleanup, and send [[CancelResponse]].
+//   - If [[FinishResponse]] has already been sent, [[Cancel]] arrives
+//     too late and is ignored; the engine receives [[FinishResponse]].
+// The engine MUST therefore be prepared to receive either
+// [[FinishResponse]] or [[CancelResponse]] when it sends both.
+message Finish {}
+
+// Worker -> Engine completion message. Carries per-execution summary metrics.
+//
+// Metrics design:
+//   - [[FinishResponse.metrics]] / [[CancelResponse.metrics]]: per-execution
+//     metrics accumulated up to the point of stream termination (e.g. rows
+//     processed, time per phase). Emitted once per [[Execute]] stream,
+//     regardless of whether the stream ended cleanly or was cancelled.
+//   - [[HeartbeatResponse.metrics]]: worker-global metrics aggregated across
+//     all sessions (e.g. total rows processed, memory usage). Emitted
+//     periodically via the [[Manage]] RPC.
+//   - Real-time / dynamic per-execution metrics (e.g. incremental progress
+//     during a long-running UDF) are not yet defined. They will be
+//     introduced as a separate streaming message in a future revision
+//     without changing this contract. TODO(SPARK-56922).
+message FinishResponse {
+    // Per-execution metrics accumulated over the session. Free-form;
+    // names are worker-defined.
+    map<string, string> metrics = 1;
+
+    // (Optional) Inline finish result returned by the worker.
+    // Mirrors [[InitResponse.data]] -- the finish phase allows the
+    // engine to interact with the UDF after data has stopped
+    // flowing, with the worker returning opaque bytes the engine (or
+    // higher-level code) may consume during teardown. The semantics
+    // of those bytes are agreed between the client side of the
+    // protocol and the worker.
+    optional bytes data = 2;
+
+    // (Optional) Error raised by the finish callback (if any), invoked
+    // after all input data has been consumed. This field is only set on
+    // the non-error execution path: if [[ErrorResponse]] was sent during
+    // data processing, the terminator is [[CancelResponse]], not
+    // [[FinishResponse]]. The engine SHOULD surface this as an exception.
+    optional ExecutionError error = 3;
+}
+
+// Engine -> Worker explicit cancel. Distinct from a gRPC stream error
+// so the worker can run cleanup deterministically (release file
+// handles, drop temp state, etc.). After receiving [[Cancel]] the
+// worker MUST stop emitting [[DataResponse]] messages, run cleanup,
+// and emit [[CancelResponse]] before closing.
+//
+// [[Cancel]] is the cooperative cancellation path and may be sent
+// either instead of [[Finish]] (engine cancels before all data is
+// submitted) or after [[Finish]] (engine aborts processing of
+// already-submitted data -- see [[Finish]] for the full contract).
+// A broken gRPC connection is the involuntary fallback -- in that
+// case gRPC surfaces an error on the stream (see [[Execute]]).
+//
+// Cancellation latency: [[Cancel]] is delivered in-order on the same
+// stream as [[DataRequest]] messages. A worker that processes batches
+// synchronously on the gRPC callback thread observes Cancel only after
+// the queued batches have been processed; a worker that hands batches
+// off to a thread pool (and returns from the handler quickly) observes
+// Cancel as soon as gRPC delivers it, regardless of whether in-flight
+// batches have finished. Workers SHOULD set a cancellation flag on
+// receipt so in-flight processing can abort at the next safe
+// checkpoint. The pathological case is a hung UDF combined with a full
+// sender buffer: Cancel may never reach the worker at all (see the
+// [[Execute]] service comment for the hanging-UDF discussion).
+//
+// Worker behavior on involuntary stream error: when the worker observes
+// a gRPC error on the Execute stream (i.e. the engine-side connection
+// dropped), it MUST treat this as equivalent to [[Cancel]] for cleanup
+// purposes -- stop producing output and release resources. The worker
+// MUST NOT attempt to send [[CancelResponse]] or any other message,
+// since the stream is already dead.
+message Cancel {
+    // (Optional) Free-form reason for diagnostics.
+    optional string reason = 1;
+}
+
+// Worker -> Engine acknowledgment of [[Cancel]], or the terminator after
+// [[ErrorResponse]] or [[InitResponse.error]] (the execution is always
+// considered aborted when an error occurred). Carries any per-execution 
metrics
+// accumulated up to the point of cancellation -- even a partial execution
+// may produce useful diagnostics (e.g. rows processed before abort).
+message CancelResponse {
+    // Per-execution metrics accumulated up to cancellation. Free-form;
+    // names are worker-defined. See [[FinishResponse.metrics]] for the
+    // full metrics design.
+    map<string, string> metrics = 1;
+
+    // (Optional) Error raised by the cancel callback (if any), invoked
+    // when [[Cancel]] is received. The engine SHOULD surface this
+    // alongside any prior [[ExecutionError]] (e.g. as a suppressed
+    // exception).
+    optional ExecutionError error = 2;
+}
+
+// Worker -> Engine application-level error raised during the
+// data-processing phase. See the ordering invariants on
+// [[UdfControlRequest]] for the wire position, the termination rule,
+// and the engine's reaction.
+message ErrorResponse {
+    ExecutionError error = 1;
+}
+
+// Application-level error detail, used as the [[error]] field in all
+// [[UdfControlResponse]] response messages: [[InitResponse]],
+// [[ErrorResponse]], [[FinishResponse]], and [[CancelResponse]].
+// Distinct from a gRPC stream error, which indicates a transport failure.
+message ExecutionError {
+    // Exactly one kind MUST be set.
+    oneof kind {
+        UserError     user     = 1;
+        WorkerError   worker   = 2;
+        ProtocolError protocol = 3;
+    }
+}
+
+// Error raised by the user's UDF code.
+message UserError {
+    // (Required) Human-readable error message from the UDF.
+    string message = 1;
+
+    // (Optional) Full stack trace or traceback in the worker's
+    // language-specific format. Forwarded to the user as-is.
+    optional string traceback = 2;
+
+    // (Optional) Language-specific error class name (e.g. "ValueError"
+    // in Python, "RuntimeException" in Java).
+    optional string error_class = 3;
+}
+
+// Error originating from the worker implementation itself, not user code.
+message WorkerError {
+    // (Required) Human-readable description of the worker error.
+    string message = 1;
+
+    // (Optional) Stack trace for diagnostics.
+    optional string traceback = 2;
+}
+
+// Protocol violation detected by the worker (e.g. messages received out
+// of order, unsupported [[Init.protocol_version]]). Sending this type
+// instead of closing with a gRPC error keeps the stream lifecycle intact:
+// [[FinishResponse]] or [[CancelResponse]] still follows.
+message ProtocolError {
+    // (Required) Description of the protocol violation.
+    string message = 1;
+}
+
+// The single UDF body delivered to the worker on [[Init]]. Opaque to
+// the engine: the engine forwards [[payload]] and [[format]]
+// unchanged, and the worker decodes them per the format the client
+// and worker have agreed on.
+message UdfPayload {
+    // (Required, may be empty when chunked.) Serialized UDF bundle,
+    // opaque to the engine. The encoding is declared in [[format]].
+    //
+    // The bundle is not necessarily just the serialized callable;
+    // it is up to the client side of the protocol and the worker to
+    // agree on what is packed inside it -- e.g. custom encoders for
+    // user-defined types, type hints, or any other metadata the
+    // worker needs to invoke the UDF.
+    //
+    // For payloads too large to fit on a single gRPC message, this
+    // field MAY be left empty (zero-length bytes) and the bytes
+    // delivered via the [[PayloadChunk]] mechanism instead. See
+    // [[PayloadChunk]] for chunking semantics.
+    bytes payload = 1;
+
+    // (Required, non-empty.) Format tag identifying the encoding of
+    // [[payload]]. The protocol does not enumerate the values: the
+    // client side of the protocol and the worker agree on the
+    // namespace, and each worker recognises the tags it knows how
+    // to decode. The engine forwards this string unchanged.
+    string format = 2;
+
+    // (Optional) Total payload size in bytes. Useful when chunked
+    // streaming is used so the worker can pre-allocate buffers.
+    optional int64 payload_size = 3;
+
+    // (Optional) CRC32 checksum (IEEE 802.3 polynomial, zlib-compatible)
+    // of the assembled [[payload]] bytes. When set, the worker MUST
+    // recompute the CRC over the assembled payload (inline +
+    // [[PayloadChunk.data]] bytes in arrival order) and reject the
+    // session as a [[ProtocolError]] on mismatch. Strongly recommended
+    // when the payload is delivered via [[PayloadChunk]], where the
+    // assembly spans multiple RPCs.
+    optional uint32 payload_crc32 = 6;
+
+    // (Optional) Human-readable name for diagnostics and metrics.
+    optional string name = 4;
+
+    // (Optional) Worker / language-specific dispatch hint. A
+    // free-form string the worker uses to pick the code path that
+    // handles this payload. The protocol does not enumerate eval
+    // types because they are language-specific; the client side of
+    // the protocol and the worker agree on the namespace and the
+    // values.
+    //
+    // When the worker can derive the eval type from the payload
+    // itself (embedded metadata, format tag, etc.), this field is
+    // left unset. Otherwise the client side of the protocol sets it
+    // explicitly.
+    optional string eval_type = 5;
+}
+
+// =====================================================================
+// Manage RPC -- worker-scoped operations independent of Execute
+// =====================================================================
+
+// Engine -> Worker management request. Exactly one operation is set
+// per call; new operations (capability query, profiling, etc.) can be
+// added as additional branches without changing the RPC signature.
+message WorkerRequest {
+    oneof manage {
+        Heartbeat       heartbeat = 1;
+        ShutdownRequest shutdown  = 2;
+    }
+}
+
+// Worker -> Engine management response. The set branch MUST correspond
+// to the request's operation -- e.g. [[Heartbeat]] is answered with
+// [[HeartbeatResponse]]. A mismatched branch is a protocol error.
+message WorkerResponse {
+    oneof manage {
+        HeartbeatResponse heartbeat = 1;
+        ShutdownResponse  shutdown  = 2;
+    }
+}
+
+// Liveness probe. The engine may send this periodically to detect a
+// hung worker process. The worker SHOULD reply within a small bounded
+// time.
+//
+// This is an application-level liveness check distinct from gRPC's
+// transport-level keepalive: gRPC keepalive proves the TCP connection
+// is alive, whereas [[Heartbeat]] proves the worker's request-handling
+// thread is responsive. Deployments may use either or both; they do
+// not replace each other.
+//
+// What the engine does in response to a missed heartbeat (e.g.,
+// tearing down the worker) is outside the scope of this protocol and
+// depends on the worker management mode defined in the worker
+// specification.
+//
+// Note: [[Heartbeat]] can only detect a hung worker process (one
+// whose request-handling thread is unresponsive). It cannot detect
+// user code that is executing but taking unexpectedly long -- during
+// init, data processing, or finish -- because such code is
+// indistinguishable from legitimately slow UDF execution. Handling
+// hanging user code (e.g. via UDF-level timeouts) is the
+// responsibility of the UDF author or the worker implementation, and
+// is outside the scope of this protocol.
+message Heartbeat {
+    // Reserved for future additive fields (e.g. an engine-side
+    // sequence number or a request-id tag for correlating heartbeats
+    // when sent over a long-lived connection).
+    reserved 1;
+}
+
+// Acknowledgment for [[Heartbeat]].
+message HeartbeatResponse {
+    // (Optional) Worker-global metrics aggregated across all active sessions
+    // (e.g. total rows processed, active session count, memory usage).
+    // Complements the per-execution metrics in [[FinishResponse.metrics]].
+    // Free-form; names are worker-defined.
+    map<string, string> metrics = 1;
+}
+
+// Engine-initiated graceful shutdown request. This lets the worker
+// know the engine has finished with it and intends no further Execute
+// streams. OS-level process management is outside the scope of this
+// protocol and is defined in the worker specification.
+//
+// Interaction with in-flight Execute streams: when [[cancel_sessions]]
+// is false (the default), in-flight Execute streams MUST be allowed to
+// complete normally (via [[Finish]] or [[Cancel]]). When [[cancel_sessions]]
+// is true, the worker MUST cancel all in-flight streams immediately by
+// sending [[ErrorResponse]] wrapping a [[WorkerError]] followed by
+// [[CancelResponse]] on each stream, then exit. The engine SHOULD NOT
+// send [[ShutdownRequest]] while it still intends to start new Execute
+// streams. Once all Execute streams have terminated (cleanly or via gRPC
+// error), the worker SHOULD exit.
+message ShutdownRequest {
+    // (Optional) Free-form reason for diagnostics.
+    optional string reason = 1;
+
+    // (Optional; defaults to false.) When true, the worker MUST cancel
+    // all in-flight Execute streams immediately rather than waiting for
+    // them to complete naturally.
+    optional bool cancel_sessions = 2;
+}
+
+// Worker -> Engine acknowledgment of [[ShutdownRequest]].
+message ShutdownResponse {
+    // True when all Execute streams were fully settled (finished or
+    // cancelled) before this response was sent. False when the worker
+    // acknowledged the request but streams were still in flight (only
+    // possible when [[ShutdownRequest.cancel_sessions]] is false and the
+    // worker sends the response before draining).
+    bool sessions_settled = 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to