Copilot commented on code in PR #7385:
URL: https://github.com/apache/kyuubi/pull/7385#discussion_r3043403763


##########
externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.schema.{DataAgentTRowSetGenerator, 
SchemaHelper}
+import 
org.apache.kyuubi.engine.dataagent.schema.DataAgentTRowSetGenerator.COL_STRING_TYPE
+import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, 
OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, 
FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+abstract class DataAgentOperation(session: Session) extends 
AbstractOperation(session) {
+
+  @volatile protected var iter: FetchIterator[Array[String]] = _
+
+  protected lazy val conf: KyuubiConf = session.sessionManager.getConf
+
+  override def getNextRowSetInternal(
+      order: FetchOrientation,
+      rowSetSize: Int): TFetchResultsResp = {
+    validateDefaultFetchOrientation(order)
+    // Allow fetching during RUNNING state for streaming support
+    if (state != OperationState.FINISHED && state != OperationState.RUNNING) {
+      throw new IllegalStateException(
+        s"Expected state FINISHED or RUNNING, but found $state")
+    }
+    require(iter != null, s"Operation $statementId result iterator not 
initialized")
+    setHasResultSet(true)
+    order match {
+      case FETCH_NEXT =>
+        iter.fetchNext()
+      case FETCH_PRIOR =>
+        iter.fetchPrior(rowSetSize)
+      case FETCH_FIRST =>
+        iter.fetchAbsolute(0)
+    }
+
+    val taken = iter.take(rowSetSize).map(_.toSeq)
+    val resultRowSet = new DataAgentTRowSetGenerator().toTRowSet(
+      taken.toSeq,
+      Seq(COL_STRING_TYPE),
+      getProtocolVersion)
+    resultRowSet.setStartRowOffset(iter.getPosition)

Review Comment:
   `setStartRowOffset(iter.getPosition)` is using the iterator position *after* 
consuming `taken`, which typically points to the end of the returned batch 
rather than its start. Set the offset to the batch start (e.g., 
`iter.getFetchStart`, or `iter.getPosition - taken.size`) so clients see 
correct paging/offset semantics.
   ```suggestion
       val startRowOffset = iter.getPosition
       val taken = iter.take(rowSetSize).map(_.toSeq)
       val resultRowSet = new DataAgentTRowSetGenerator().toTRowSet(
         taken.toSeq,
         Seq(COL_STRING_TYPE),
         getProtocolVersion)
       resultRowSet.setStartRowOffset(startRowOffset)
   ```



##########
externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import java.util.concurrent.RejectedExecutionException
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.slf4j.MDC
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.provider.{DataAgentProvider, 
ProviderRunRequest}
+import org.apache.kyuubi.engine.dataagent.runtime.event.{AgentError, 
AgentEvent, AgentFinish, ApprovalRequest, ContentDelta, EventType, StepEnd, 
StepStart, ToolCall, ToolResult}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+class ExecuteStatement(
+    session: Session,
+    override val statement: String,
+    confOverlay: Map[String, String],
+    override val shouldRunAsync: Boolean,
+    queryTimeout: Long,
+    dataAgentProvider: DataAgentProvider)
+  extends DataAgentOperation(session) with Logging {
+
+  import ExecuteStatement.JSON
+
+  private val operationLog: OperationLog = 
OperationLog.createOperationLog(session, getHandle)
+  override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+  private val incrementalIter = new IncrementalFetchIterator[Array[String]]()
+
+  override protected def runInternal(): Unit = {
+    addTimeoutMonitor(queryTimeout)
+    iter = incrementalIter
+
+    val asyncOperation = new Runnable {
+      override def run(): Unit = {
+        executeStatement()
+      }
+    }
+
+    try {
+      val sessionManager = session.sessionManager
+      val backgroundHandle = 
sessionManager.submitBackgroundOperation(asyncOperation)
+      setBackgroundHandle(backgroundHandle)
+    } catch {
+      case rejected: RejectedExecutionException =>
+        setState(OperationState.ERROR)
+        val ke =
+          KyuubiSQLException("Error submitting query in background, query 
rejected", rejected)
+        setOperationException(ke)
+        shutdownTimeoutMonitor()
+        throw ke
+    }
+  }
+
+  private def toJson(build: ObjectNode => Unit): String = {
+    val node = JSON.createObjectNode()
+    build(node)
+    JSON.writeValueAsString(node)
+  }
+
+  private def executeStatement(): Unit = {
+    setState(OperationState.RUNNING)
+
+    try {
+      val sessionId = session.handle.identifier.toString
+      val operationId = getHandle.identifier.toString
+      MDC.put("operationId", operationId)
+      MDC.put("sessionId", sessionId)
+      val request = new ProviderRunRequest(statement)
+      // Merge session-level conf with per-statement confOverlay (overlay 
takes precedence)
+      val mergedConf = session.conf ++ confOverlay
+      
mergedConf.get(KyuubiConf.ENGINE_DATA_AGENT_LLM_MODEL.key).foreach(request.modelName)
+      val approvalMode = mergedConf.getOrElse(
+        KyuubiConf.ENGINE_DATA_AGENT_APPROVAL_MODE.key,
+        
session.sessionManager.getConf.get(KyuubiConf.ENGINE_DATA_AGENT_APPROVAL_MODE))
+      request.approvalMode(approvalMode)
+
+      val eventConsumer: AgentEvent => Unit = { (event: AgentEvent) =>
+        val sseType = event.eventType().sseEventName()
+        event.eventType() match {
+          case EventType.AGENT_START =>
+            incrementalIter.append(Array(toJson { n =>
+              n.put("type", sseType)
+            }))
+          case EventType.STEP_START =>
+            val stepStart = event.asInstanceOf[StepStart]
+            incrementalIter.append(Array(toJson { n =>
+              n.put("type", sseType); n.put("step", stepStart.stepNumber())
+            }))
+          case EventType.CONTENT_DELTA =>
+            val delta = event.asInstanceOf[ContentDelta]
+            incrementalIter.append(Array(toJson { n =>
+              n.put("type", sseType); n.put("text", delta.text())
+            }))
+          case EventType.TOOL_CALL =>
+            val toolCall = event.asInstanceOf[ToolCall]
+            incrementalIter.append(Array(toJson { n =>
+              n.put("type", sseType)
+              n.put("id", toolCall.toolCallId())
+              n.put("name", toolCall.toolName())
+              n.put("args", toolCall.toolArgs().toString)
+            }))
+          case EventType.TOOL_RESULT =>
+            val toolResult = event.asInstanceOf[ToolResult]
+            incrementalIter.append(Array(toJson { n =>
+              n.put("type", sseType)
+              n.put("id", toolResult.toolCallId())
+              n.put("name", toolResult.toolName())
+              n.put("output", toolResult.output())
+            }))

Review Comment:
   `toolArgs().toString` is not JSON (it serializes as a Java `Map#toString` 
like `{k=v}`), which makes the emitted rows inconsistent and hard for clients 
to parse reliably. Serialize `toolArgs` as a proper JSON object (e.g., via 
`ObjectMapper.valueToTree(...)`/`set(...)`) and consider including 
`toolResult.isError()` in the `TOOL_RESULT` payload so consumers can 
distinguish failures without string parsing.



##########
build/dist:
##########
@@ -363,6 +364,18 @@ for jar in $(ls "$DISTDIR/jars/"); do
   fi
 done
 
+# Copy data-agent engines
+cp 
"$KYUUBI_HOME/externals/kyuubi-data-agent-engine/target/kyuubi-data-agent-engine_${SCALA_VERSION}-${VERSION}.jar"
 "$DISTDIR/externals/engines/data-agent/"
+cp -r 
"$KYUUBI_HOME"/externals/kyuubi-data-agent-engine/target/scala-$SCALA_VERSION/jars/*.jar
 "$DISTDIR/externals/engines/data-agent/"
+
+# Share the jars w/ server to reduce binary size
+# shellcheck disable=SC2045
+for jar in $(ls "$DISTDIR/jars/"); do
+  if [[ -f "$DISTDIR/externals/engines/data-agent/$jar" ]]; then
+    (cd $DISTDIR/externals/engines/data-agent; ln -snf "../../../jars/$jar" 
"$DISTDIR/externals/engines/data-agent/$jar")

Review Comment:
   `$DISTDIR` is unquoted in the `cd` command, which will break if the path 
contains spaces (and can reintroduce shellcheck warnings). Quote the `cd` path 
and, since you already `cd` into the directory, prefer linking to `"$jar"` 
rather than writing the absolute destination path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to