This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2973d0f9d6c [fix](arrow-flight-sql) Fix kill timeout 
FlightSqlConnection and FlightSqlConnectProcessor close (#41770)
2973d0f9d6c is described below

commit 2973d0f9d6c93dfe1636688c2e272c5dda1fd1af
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Oct 16 17:07:34 2024 +0800

    [fix](arrow-flight-sql) Fix kill timeout FlightSqlConnection and 
FlightSqlConnectProcessor close (#41770)
    
    1. Doris will cancel the connection that has not responded for a long
    time, Mysql Conenction will exit directly, but Arrow Flight Conenction
    does not exit immediately, may be frequently cancel and print logs.
    timeout is `wait_timeout` in session veriable.
    2. FlightSqlConnectProcessor Use try-with-resources to close correctly.
---
 .../java/org/apache/doris/qe/ConnectContext.java   |  16 +--
 .../arrowflight/DorisFlightSqlProducer.java        | 120 ++++++++++-----------
 .../arrowflight/FlightSqlConnectProcessor.java     |   3 +
 .../sessions/FlightSqlConnectContext.java          |   3 +-
 4 files changed, 74 insertions(+), 68 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 783844f12f5..2493b8e6203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -951,7 +951,8 @@ public class ConnectContext {
 
     // kill operation with no protect.
     public void kill(boolean killConnection) {
-        LOG.warn("kill query from {}, kill mysql connection: {}", 
getRemoteHostPortString(), killConnection);
+        LOG.warn("kill query from {}, kill {} connection: {}", 
getRemoteHostPortString(), getConnectType(),
+                killConnection);
 
         if (killConnection) {
             isKilled = true;
@@ -964,10 +965,10 @@ public class ConnectContext {
 
     // kill operation with no protect by timeout.
     private void killByTimeout(boolean killConnection) {
-        LOG.warn("kill query from {}, kill mysql connection: {} reason time 
out", getRemoteHostPortString(),
-                killConnection);
-
         if (killConnection) {
+            LOG.warn("kill wait timeout connection, connection type: {}, 
connectionId: {}, remote: {}, "
+                            + "wait timeout: {}",
+                    getConnectType(), connectionId, getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
             isKilled = true;
             // Close channel to break connection with client
             closeChannel();
@@ -976,6 +977,10 @@ public class ConnectContext {
         // cancelQuery by time out
         StmtExecutor executorRef = executor;
         if (executorRef != null) {
+            LOG.warn("kill time out query, remote: {}, at the same time kill 
connection is {},"
+                            + " connection type: {}, connectionId: {}",
+                    getRemoteHostPortString(), killConnection,
+                    getConnectType(), connectionId);
             executorRef.cancel(new Status(TStatusCode.TIMEOUT,
                     "query is timeout, killed by timeout checker"));
         }
@@ -999,9 +1004,6 @@ public class ConnectContext {
         if (command == MysqlCommand.COM_SLEEP) {
             if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
                 // Need kill this connection.
-                LOG.warn("kill wait timeout connection, remote: {}, wait 
timeout: {}",
-                        getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
-
                 killFlag = true;
                 killConnection = true;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 16195469af9..5d431b386b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -186,64 +186,66 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             Preconditions.checkState(!query.isEmpty());
             // After the previous query was executed, there was no 
getStreamStatement to take away the result.
             connectContext.getFlightSqlChannel().reset();
-            final FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext);
-
-            flightSQLConnectProcessor.handleQuery(query);
-            if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
-                throw new RuntimeException("after executeQueryStatement 
handleQuery");
-            }
-
-            if (connectContext.isReturnResultFromLocal()) {
-                // set/use etc. stmt returns an OK result by default.
-                if (connectContext.getFlightSqlChannel().resultNum() == 0) {
-                    // a random query id and add empty results
-                    String queryId = UUID.randomUUID().toString();
-                    connectContext.getFlightSqlChannel().addOKResult(queryId, 
query);
+            try (FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext)) {
+                flightSQLConnectProcessor.handleQuery(query);
+                if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
+                    throw new RuntimeException("after executeQueryStatement 
handleQuery");
+                }
 
-                    final ByteString handle = 
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
-                    TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                            .build();
-                    return getFlightInfoForSchema(ticketStatement, descriptor,
-                            
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
+                if (connectContext.isReturnResultFromLocal()) {
+                    // set/use etc. stmt returns an OK result by default.
+                    if (connectContext.getFlightSqlChannel().resultNum() == 0) 
{
+                        // a random query id and add empty results
+                        String queryId = UUID.randomUUID().toString();
+                        
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
+
+                        final ByteString handle = 
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        return getFlightInfoForSchema(ticketStatement, 
descriptor,
+                                
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot()
+                                        .getSchema());
+                    } else {
+                        // A Flight Sql request can only contain one statement 
that returns result,
+                        // otherwise expected thrown exception during 
execution.
+                        
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
+
+                        // The tokens used for authentication between 
getStreamStatement and getFlightInfoStatement
+                        // are different. So put the peerIdentity into the 
ticket and then getStreamStatement is used to
+                        // find the correct ConnectContext.
+                        // queryId is used to find query results.
+                        final ByteString handle = ByteString.copyFromUtf8(
+                                peerIdentity + ":" + 
DebugUtil.printId(connectContext.queryId()));
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        return getFlightInfoForSchema(ticketStatement, 
descriptor, connectContext.getFlightSqlChannel()
+                                
.getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot()
+                                .getSchema());
+                    }
                 } else {
-                    // A Flight Sql request can only contain one statement 
that returns result,
-                    // otherwise expected thrown exception during execution.
-                    
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
-
-                    // The tokens used for authentication between 
getStreamStatement and getFlightInfoStatement
-                    // are different. So put the peerIdentity into the ticket 
and then getStreamStatement is used to
-                    // find the correct ConnectContext.
-                    // queryId is used to find query results.
-                    final ByteString handle = ByteString.copyFromUtf8(
-                            peerIdentity + ":" + 
DebugUtil.printId(connectContext.queryId()));
+                    // Now only query stmt will pull results from BE.
+                    final ByteString handle;
+                    if 
(connectContext.getSessionVariable().enableParallelResultSink()) {
+                        handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + 
query);
+                    } else {
+                        // only one instance
+                        handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
+                    }
+                    Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+                    if (schema == null) {
+                        throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null")
+                                .toRuntimeException();
+                    }
                     TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
                             .build();
-                    return getFlightInfoForSchema(ticketStatement, descriptor,
-                            
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
-                                    .getVectorSchemaRoot().getSchema());
-                }
-            } else {
-                // Now only query stmt will pull results from BE.
-                final ByteString handle;
-                if 
(connectContext.getSessionVariable().enableParallelResultSink()) {
-                    handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" + 
query);
-                } else {
-                    // only one instance
-                    handle = 
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
-                }
-                Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
-                if (schema == null) {
-                    throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null").toRuntimeException();
+                    Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
+                    // TODO Support multiple endpoints.
+                    Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+                            connectContext.getResultFlightServerAddr().port);
+                    List<FlightEndpoint> endpoints = 
Collections.singletonList(new FlightEndpoint(ticket, location));
+                    // TODO Set in BE callback after query end, Client will 
not callback.
+                    return new FlightInfo(schema, descriptor, endpoints, -1, 
-1);
                 }
-                TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                        .build();
-                Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
-                // TODO Support multiple endpoints.
-                Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-                        connectContext.getResultFlightServerAddr().port);
-                List<FlightEndpoint> endpoints = Collections.singletonList(new 
FlightEndpoint(ticket, location));
-                // TODO Set in BE callback after query end, Client will not 
callback.
-                return new FlightInfo(schema, descriptor, endpoints, -1, -1);
             }
         } catch (Exception e) {
             String errMsg = "get flight info statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(e)
@@ -296,8 +298,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         final ByteString bytes = Objects.isNull(parameterSchema) ? 
ByteString.EMPTY
                 : ByteString.copyFrom(serializeMetadata(parameterSchema));
         return ActionCreatePreparedStatementResult.newBuilder()
-                
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
-                .setParameterSchema(bytes)
+                
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes)
                 .setPreparedStatementHandle(handle).build();
     }
 
@@ -326,12 +327,11 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 Schema metaData = connectContext.getFlightSqlChannel()
                         .createOneOneSchemaRoot("ResultMeta", 
"UNIMPLEMENTED").getSchema();
                 listener.onNext(new Result(
-                        Any.pack(buildCreatePreparedStatementResult(handle, 
parameterSchema, metaData))
-                                .toByteArray()));
+                        Any.pack(buildCreatePreparedStatementResult(handle, 
parameterSchema, metaData)).toByteArray()));
             } catch (Exception e) {
-                String errMsg = "create prepared statement failed, " + 
e.getMessage() + ", "
-                        + Util.getRootCauseMessage(e) + ", error code: " + 
connectContext.getState().getErrorCode()
-                        + ", error msg: " + 
connectContext.getState().getErrorMessage();
+                String errMsg = "create prepared statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(
+                        e) + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
+                        + connectContext.getState().getErrorMessage();
                 LOG.warn(errMsg, e);
                 
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
                 return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index fe0648a0680..b812bf81d8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,6 +53,8 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * Process one flgiht sql connection.
+ *
+ * Must use try-with-resources.
  */
 public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
@@ -177,6 +179,7 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
     @Override
     public void close() throws Exception {
         ctx.setCommand(MysqlCommand.COM_SLEEP);
+        ctx.clear();
         // TODO support query profile
         for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
             asynExecutor.finalizeQuery();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 4badae03b31..b90d7505923 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -63,6 +63,7 @@ public class FlightSqlConnectContext extends ConnectContext {
         if (flightSqlChannel != null) {
             flightSqlChannel.close();
         }
+        connectScheduler.unregisterConnection(this);
     }
 
     // kill operation with no protect.
@@ -72,8 +73,8 @@ public class FlightSqlConnectContext extends ConnectContext {
 
         if (killConnection) {
             isKilled = true;
+            // Close channel and break connection with client.
             closeChannel();
-            connectScheduler.unregisterConnection(this);
         }
         // Now, cancel running query.
         cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query 
killed by user"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to