This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
commit b53dc00c2c895b3879c9f2053243a3e92e6c9f29 Author: Youngwb <yangwenbo_mail...@163.com> AuthorDate: Wed Dec 18 13:08:21 2019 +0800 Fix bug when spark on doris run long time (#2485) --- README.md | 1 + .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java | 2 ++ src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 +++++++++ src/main/thrift/doris/DorisExternalService.thrift | 2 ++ 4 files changed, 14 insertions(+) diff --git a/README.md b/README.md index 76f9d95..2b0d2ef 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ dorisSparkRDD.collect() | doris.request.retries | 3 | 向Doris发送请求的重试次数 | | doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 | | doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 | +| doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | | doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。<br />此数值设置越小,则会生成越多的Partition。<br />从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。<br />增大此数值可减少Spark与Doris之间建立连接的次数。<br />从而减轻网络延迟所带来的的额外时间开销。 | diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 63dede1..d9b4231 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -40,9 +40,11 @@ public interface ConfigurationOptions { String DORIS_REQUEST_RETRIES = "doris.request.retries"; String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms"; String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms"; + String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s"; int DORIS_REQUEST_RETRIES_DEFAULT = 3; int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; + int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; String DORIS_TABLET_SIZE = "doris.request.tablet.size"; int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index dabd826..16c5feb 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -64,7 +64,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { DORIS_BATCH_SIZE_DEFAULT } + val queryDorisTimeout = Try { + settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt + } getOrElse { + logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)) + DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT + } + params.setBatch_size(batchSize) + params.setQuery_timeout(queryDorisTimeout) params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) @@ -74,6 +82,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { s"table: ${params.getTable}, " + s"tabletId: ${params.getTablet_ids}, " + s"batch size: $batchSize, " + + s"query timeout: $queryDorisTimeout, " + s"user: ${params.getUser}, " + s"query plan: ${params.opaqued_query_plan}") diff --git a/src/main/thrift/doris/DorisExternalService.thrift b/src/main/thrift/doris/DorisExternalService.thrift index 9a0b9b5..d3f7e8e 100644 --- a/src/main/thrift/doris/DorisExternalService.thrift +++ b/src/main/thrift/doris/DorisExternalService.thrift @@ -54,6 +54,8 @@ struct TScanOpenParams { 10: optional string passwd // max keep alive time min 11: optional i16 keep_alive_min + + 12: optional i32 query_timeout } struct TScanColumnDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org