This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4557e8f  [Improve] add thrift max message size options (#233)
4557e8f is described below

commit 4557e8fcf64f294a9f3773d7090306166c0c3429
Author: wudi <676366...@qq.com>
AuthorDate: Thu Oct 24 11:51:08 2024 +0800

    [Improve] add thrift max message size options (#233)
---
 .../java/org/apache/doris/spark/backend/BackendClient.java  | 13 ++++++++++---
 .../org/apache/doris/spark/cfg/ConfigurationOptions.java    |  3 +++
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
index b10797b..04c3288 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
@@ -43,6 +43,9 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
+import static 
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;
+
 /**
  * Client to request Doris BE
  */
@@ -58,6 +61,7 @@ public class BackendClient {
     private final int retries;
     private final int socketTimeout;
     private final int connectTimeout;
+    private final int thriftMaxMessageSize;
 
     public BackendClient(Routing routing, Settings settings) throws 
ConnectedFailedException {
         this.routing = routing;
@@ -67,8 +71,9 @@ public class BackendClient {
                 ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
         this.retries = 
settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
                 ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
-        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'.",
-                this.connectTimeout, this.socketTimeout, this.retries);
+        this.thriftMaxMessageSize = 
settings.getIntegerProperty(DORIS_THRIFT_MAX_MESSAGE_SIZE, 
DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. 
retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
+                this.connectTimeout, this.socketTimeout, this.retries, 
this.thriftMaxMessageSize);
         open();
     }
 
@@ -79,7 +84,9 @@ public class BackendClient {
             logger.debug("Attempt {} to connect {}.", attempt, routing);
             try {
                 TBinaryProtocol.Factory factory = new 
TBinaryProtocol.Factory();
-                transport = new TSocket(new TConfiguration(), 
routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
+                TConfiguration.Builder configBuilder = TConfiguration.custom();
+                configBuilder.setMaxMessageSize(thriftMaxMessageSize);
+                transport = new TSocket(configBuilder.build(), 
routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
                 TProtocol protocol = factory.getProtocol(transport);
                 client = new TDorisExternalService.Client(protocol);
                 logger.trace("Connect status before open transport to {} is 
'{}'.", routing, isConnected);
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 3b9b554..cf0630f 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -167,4 +167,7 @@ public interface ConfigurationOptions {
 
     String DORIS_ARROW_FLIGHT_SQL_PORT = "doris.arrow-flight-sql.port";
 
+    String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
+    int DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to