This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 28f684b6f9ab8d392fac17675376ab0b5814cd28 Author: Zhixiong Chen <[email protected]> AuthorDate: Thu Oct 27 15:42:25 2022 +0800 add connect timeout and ext config from yml Co-authored-by: chenzhx <[email protected]> --- .../kap/newten/clickhouse/ClickHouseUtils.java | 4 +- .../kap/clickhouse/ClickHouseStorage.java | 7 +++ .../kyligence/kap/clickhouse/job/ClickHouse.java | 11 +++- .../management/ClickHouseConfigLoader.java | 2 + .../kap/clickhouse/MockSecondStorage.java | 2 + .../kap/secondstorage/config/ClusterInfo.java | 24 +++++++- .../engine/spark/NLocalWithSparkSessionTest.java | 68 ++++++++++++---------- .../utils/HiveTransactionTableHelperTest.java | 17 +++++- 8 files changed, 96 insertions(+), 39 deletions(-) diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java index 0ca4863f8c..c6431ad78c 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/newten/clickhouse/ClickHouseUtils.java @@ -275,7 +275,7 @@ public class ClickHouseUtils { int pairNum = clickhouse.length / replica; IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>())); ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000") - .setCluster(clusterNode); + .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode); int i = 0; for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) { Node node = new Node(); @@ -302,7 +302,7 @@ public class ClickHouseUtils { int pairNum = clickhouse.length / replica; IntStream.range(0, pairNum).forEach(idx -> clusterNode.put("pair" + idx, new ArrayList<>())); ClusterInfo cluster = new ClusterInfo().setKeepAliveTimeout("600000").setSocketTimeout("600000") - .setCluster(clusterNode); + .setConnectTimeout("3000").setExtConfig("maxWait=10").setCluster(clusterNode); int i = 0; for (JdbcDatabaseContainer<?> jdbcDatabaseContainer : clickhouse) { Node node = new Node(); diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java index 0f4ebbfb83..9e5c578127 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/ClickHouseStorage.java @@ -178,6 +178,13 @@ public class ClickHouseStorage implements SecondStoragePlugin { if (StringUtils.isNotEmpty(cluster.getSocketTimeout())) { param.put(ClickHouse.SOCKET_TIMEOUT, cluster.getSocketTimeout()); } + if (StringUtils.isNotEmpty(cluster.getConnectTimeout())) { + int timeout = Integer.parseInt(cluster.getConnectTimeout()) / 1000; + param.put(ClickHouse.CONNECT_TIMEOUT, Integer.toString(timeout)); + } + if (StringUtils.isNotEmpty(cluster.getExtConfig())) { + param.put(ClickHouse.EXT_CONFIG, cluster.getExtConfig()); + } if (StringUtils.isNotEmpty(node.getUser())) { param.put(ClickHouse.USER, node.getUser()); } diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java index 7a0100c17b..9664bde2e7 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickHouse.java @@ -52,6 +52,8 @@ public class ClickHouse implements Closeable { public static final String USER = "user"; public static final String SOCKET_TIMEOUT = "socket_timeout"; public static final String KEEP_ALIVE_TIMEOUT = "keepAliveTimeout"; + public static final String CONNECT_TIMEOUT = "connect_timeout"; + public static final String EXT_CONFIG = "extConfig"; public static final String CLIENT_NAME = "client_name"; private final String shardName; @@ -110,8 +112,15 @@ public class ClickHouse implements Closeable { if (!param.isEmpty()) { base.append('?'); List<String> paramList = new ArrayList<>(); - param.forEach((name, value) -> paramList.add(name + "=" + value)); + param.forEach((name, value) -> { + if (!ClickHouse.EXT_CONFIG.equals(name)){ + paramList.add(name + "=" + value); + } + }); base.append(String.join("&", paramList)); + if(param.get(ClickHouse.EXT_CONFIG) != null) { + base.append("&").append(param.get(ClickHouse.EXT_CONFIG)); + } } return base.toString(); } diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java index a9f2794065..06c4143328 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/management/ClickHouseConfigLoader.java @@ -66,10 +66,12 @@ public class ClickHouseConfigLoader implements SecondStorageConfigLoader { clusterDesc.addPropertyParameters("cluster", String.class, List.class); clusterDesc.addPropertyParameters("socketTimeout", String.class); clusterDesc.addPropertyParameters("keepAliveTimeout", String.class); + clusterDesc.addPropertyParameters("connectTimeout", String.class); clusterDesc.addPropertyParameters("installPath", String.class); clusterDesc.addPropertyParameters("logPath", String.class); clusterDesc.addPropertyParameters("userName", String.class); clusterDesc.addPropertyParameters("password", String.class); + clusterDesc.addPropertyParameters("extConfig", String.class); constructor.addTypeDescription(clusterDesc); val nodeDesc = new TypeDescription(Node.class); nodeDesc.addPropertyParameters("name", String.class); diff --git a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java index 60191478a6..50f58b933a 100644 --- a/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java +++ b/src/second-storage/clickhouse/src/test/java/io/kyligence/kap/clickhouse/MockSecondStorage.java @@ -49,6 +49,7 @@ public class MockSecondStorage { ClusterInfo cluster = new ClusterInfo(); cluster.setKeepAliveTimeout("600000"); cluster.setSocketTimeout("600000"); + cluster.setConnectTimeout("3000"); cluster.setCluster(Collections.emptyMap()); File file = File.createTempFile("clickhouse", ".yaml"); ClickHouseConfigLoader.getConfigYaml().dump(JsonUtil.readValue(JsonUtil.writeValueAsString(cluster), @@ -62,6 +63,7 @@ public class MockSecondStorage { ClusterInfo cluster = new ClusterInfo(); cluster.setKeepAliveTimeout("600000"); cluster.setSocketTimeout("600000"); + cluster.setConnectTimeout("3000"); Map<String, List<Node>> clusterNodes = new HashMap<>(); cluster.setCluster(clusterNodes); val it = nodes.listIterator(); diff --git a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java index e57010a1f9..ab29f99695 100644 --- a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java +++ b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/config/ClusterInfo.java @@ -36,12 +36,14 @@ public class ClusterInfo { private Map<String, List<Node>> cluster; private String socketTimeout; private String keepAliveTimeout; + private String connectTimeout; private String installPath; private String logPath; //username of machine private String userName; private String password; + private String extConfig; @JsonIgnore public List<Node> getNodes() { @@ -93,6 +95,24 @@ public class ClusterInfo { return keepAliveTimeout; } + public String getConnectTimeout() { + return connectTimeout; + } + + public ClusterInfo setConnectTimeout(String connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public String getExtConfig() { + return extConfig; + } + + public ClusterInfo setExtConfig(String extConfig) { + this.extConfig = extConfig; + return this; + } + public ClusterInfo setKeepAliveTimeout(String keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; return this; @@ -135,11 +155,13 @@ public class ClusterInfo { public ClusterInfo(ClusterInfo cluster) { this.cluster = Maps.newHashMap(cluster.getCluster()); this.keepAliveTimeout = cluster.getKeepAliveTimeout(); - this.socketTimeout = cluster.getKeepAliveTimeout(); + this.socketTimeout = cluster.getSocketTimeout(); + this.connectTimeout = cluster.getConnectTimeout(); this.logPath = cluster.getLogPath(); this.userName = cluster.getUserName(); this.password = cluster.getPassword(); this.installPath = cluster.getInstallPath(); + this.extConfig = cluster.getExtConfig(); } public boolean emptyCluster() { diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java index 326386b865..df13963a68 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/NLocalWithSparkSessionTest.java @@ -17,10 +17,15 @@ */ package org.apache.kylin.engine.spark; -import com.google.common.base.Preconditions; -import org.apache.kylin.engine.spark.job.NSparkMergingJob; -import lombok.extern.slf4j.Slf4j; -import lombok.val; +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Random; +import java.util.Set; + import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.util.Shell; @@ -28,6 +33,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.TempMetadataBuilder; +import org.apache.kylin.engine.spark.job.NSparkMergingJob; import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.ExecutableState; @@ -60,14 +66,10 @@ import org.junit.Before; import org.junit.BeforeClass; import org.sparkproject.guava.collect.Sets; -import java.io.File; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import java.util.Random; -import java.util.Set; +import com.google.common.base.Preconditions; + +import lombok.val; +import lombok.extern.slf4j.Slf4j; @Slf4j public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase implements Serializable { @@ -129,7 +131,9 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple @AfterClass public static void afterClass() { - ss.close(); + if (ss != null) { + ss.close(); + } FileUtils.deleteQuietly(new File("../kap-it/metastore_db")); } @@ -211,29 +215,29 @@ public class NLocalWithSparkSessionTest extends NLocalFileMetadataTestCase imple if (type.isIntegerFamily()) switch (type.getName()) { - case "tinyint": - return DataTypes.ByteType; - case "smallint": - return DataTypes.ShortType; - case "integer": - case "int4": - return DataTypes.IntegerType; - default: - return DataTypes.LongType; + case "tinyint": + return DataTypes.ByteType; + case "smallint": + return DataTypes.ShortType; + case "integer": + case "int4": + return DataTypes.IntegerType; + default: + return DataTypes.LongType; } if (type.isNumberFamily()) switch (type.getName()) { - case "float": - return DataTypes.FloatType; - case "double": - return DataTypes.DoubleType; - default: - if (type.getPrecision() == -1 || type.getScale() == -1) { - return DataTypes.createDecimalType(19, 4); - } else { - return DataTypes.createDecimalType(type.getPrecision(), type.getScale()); - } + case "float": + return DataTypes.FloatType; + case "double": + return DataTypes.DoubleType; + default: + if (type.getPrecision() == -1 || type.getScale() == -1) { + return DataTypes.createDecimalType(19, 4); + } else { + return DataTypes.createDecimalType(type.getPrecision(), type.getScale()); + } } if (type.isStringFamily()) diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java index 185c9fc0ff..dca40c9f68 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/utils/HiveTransactionTableHelperTest.java @@ -37,8 +37,11 @@ import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Maps; @@ -51,6 +54,15 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest { private final String STORAGE_DFS_DIR = "/test"; private final String FILED_DELIMITER = "|"; + @BeforeClass + public static void beforeClass() { + if (SparderEnv.isSparkAvailable()) { + SparderEnv.getSparkSession().close(); + } + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + } + @Before public void setup() { { @@ -72,9 +84,8 @@ public class HiveTransactionTableHelperTest extends NLocalWithSparkSessionTest { SystemPropertiesCache.setProperty("kylin.source.provider.9", "io.kyligence.kap.engine.spark.source.NSparkDataSource"); SystemPropertiesCache.setProperty("kylin.build.resource.read-transactional-table-enabled", "true"); - KylinBuildEnv kylinBuildEnv = KylinBuildEnv.getOrCreate(getTestConfig()); - NTableMetadataManager tableMgr = NTableMetadataManager - .getInstance(getTestConfig(), "tdh"); + KylinBuildEnv kylinBuildEnv = new KylinBuildEnv(getTestConfig()); + NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), "tdh"); TableDesc fact = tableMgr.getTableDesc("TDH_TEST.LINEORDER_PARTITION"); fact.setTransactional(true); String result = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(fact, Maps.newHashMap(), "LO_ORDERKEY", kylinBuildEnv);
