This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git

commit b53dc00c2c895b3879c9f2053243a3e92e6c9f29
Author: Youngwb <yangwenbo_mail...@163.com>
AuthorDate: Wed Dec 18 13:08:21 2019 +0800

    Fix bug when spark on doris run long time   (#2485)
---
 README.md                                                        | 1 +
 .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java    | 2 ++
 src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 +++++++++
 src/main/thrift/doris/DorisExternalService.thrift                | 2 ++
 4 files changed, 14 insertions(+)

diff --git a/README.md b/README.md
index 76f9d95..2b0d2ef 100644
--- a/README.md
+++ b/README.md
@@ -98,6 +98,7 @@ dorisSparkRDD.collect()
 | doris.request.retries            | 3                 | 向Doris发送请求的重试次数       
                             |
 | doris.request.connect.timeout.ms | 30000             | 向Doris发送请求的连接超时时间     
                           |
 | doris.request.read.timeout.ms    | 30000             | 向Doris发送请求的读取超时时间     
                           |
+| doris.request.query.timeout.s    | 3600              | 
查询doris的超时时间,默认值为1小时,-1表示无超时限制             |
 | doris.request.tablet.size        | Integer.MAX_VALUE | 一个RDD 
Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。<br 
/>从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
 | doris.batch.size                 | 1024              | 一次从BE读取数据的最大行数。<br 
/>增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 |
 
diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java 
b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 63dede1..d9b4231 100644
--- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -40,9 +40,11 @@ public interface ConfigurationOptions {
     String DORIS_REQUEST_RETRIES = "doris.request.retries";
     String DORIS_REQUEST_CONNECT_TIMEOUT_MS = 
"doris.request.connect.timeout.ms";
     String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms";
+    String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s";
     int DORIS_REQUEST_RETRIES_DEFAULT = 3;
     int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
     int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+    int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
 
     String DORIS_TABLET_SIZE = "doris.request.tablet.size";
     int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index dabd826..16c5feb 100644
--- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -64,7 +64,15 @@ class ScalaValueReader(partition: PartitionDefinition, 
settings: Settings) {
         DORIS_BATCH_SIZE_DEFAULT
     }
 
+    val queryDorisTimeout = Try {
+      settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, 
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
+    } getOrElse {
+      logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, 
DORIS_REQUEST_QUERY_TIMEOUT_S, 
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
+      DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
+    }
+
     params.setBatch_size(batchSize)
+    params.setQuery_timeout(queryDorisTimeout)
     params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
     params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))
 
@@ -74,6 +82,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
settings: Settings) {
         s"table: ${params.getTable}, " +
         s"tabletId: ${params.getTablet_ids}, " +
         s"batch size: $batchSize, " +
+        s"query timeout: $queryDorisTimeout, " +
         s"user: ${params.getUser}, " +
         s"query plan: ${params.opaqued_query_plan}")
 
diff --git a/src/main/thrift/doris/DorisExternalService.thrift 
b/src/main/thrift/doris/DorisExternalService.thrift
index 9a0b9b5..d3f7e8e 100644
--- a/src/main/thrift/doris/DorisExternalService.thrift
+++ b/src/main/thrift/doris/DorisExternalService.thrift
@@ -54,6 +54,8 @@ struct TScanOpenParams {
   10: optional string passwd
     // max keep alive time min
   11: optional i16 keep_alive_min
+
+  12: optional i32 query_timeout
 }
 
 struct TScanColumnDesc {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to