This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 1d43e0a KYLIN-4800 Add canary tool for sparder-context 1d43e0a is described below commit 1d43e0a4ac81648f89666365e06e2519ae53271b Author: yaqian.zhang <598593...@qq.com> AuthorDate: Wed Oct 28 09:09:02 2020 +0800 KYLIN-4800 Add canary tool for sparder-context --- .../org/apache/kylin/common/KylinConfigBase.java | 21 +++ kylin-spark-project/kylin-spark-query/pom.xml | 7 + .../kylin/query/monitor/SparderContextCanary.java | 145 +++++++++++++++++++++ .../org/apache/spark/sql/SparderContext.scala | 6 + .../query/monitor/SparderContextCanaryTest.java | 94 +++++++++++++ 5 files changed, 273 insertions(+) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 1ffdc16..45afaa2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2879,6 +2879,27 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context", "false")); } + /** + * Sparder is considered unavailable when the check task is unresponsive for more than this time + */ + public int getSparderCanaryErrorResponseMs() { + return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-error-response-ms", "3000")); + } + + /** + * The maximum number of restart sparder when sparder is not available + */ + public int getThresholdToRestartSparder() { + return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-threshold-to-restart-spark", "3")); + } + + /** + * Time period between two sparder health checks + */ + public int getSparderCanaryPeriodMinutes() { + return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3")); + } + // ============================================================================ // Spark with Kerberos // ============================================================================ diff --git a/kylin-spark-project/kylin-spark-query/pom.xml b/kylin-spark-project/kylin-spark-query/pom.xml index ee8ce74..59493eb 100644 --- a/kylin-spark-project/kylin-spark-query/pom.xml +++ b/kylin-spark-project/kylin-spark-query/pom.xml @@ -56,6 +56,13 @@ <type>test-jar</type> </dependency> <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-spark-engine</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> <version>${awaitility.version}</version> diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java new file mode 100644 index 0000000..d0950c1 --- /dev/null +++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/monitor/SparderContextCanary.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.monitor; + +import org.apache.kylin.common.KylinConfig; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.KylinSparkEnv; +import org.apache.spark.sql.SparderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SparderContextCanary { + private static final Logger logger = LoggerFactory.getLogger(SparderContextCanary.class); + private static volatile boolean isStarted = false; + + private static final int THRESHOLD_TO_RESTART_SPARK = KylinConfig.getInstanceFromEnv().getThresholdToRestartSparder(); + private static final int PERIOD_MINUTES = KylinConfig.getInstanceFromEnv().getSparderCanaryPeriodMinutes(); + + private static volatile int errorAccumulated = 0; + private static volatile long lastResponseTime = -1; + private static volatile boolean sparderRestarting = false; + + private SparderContextCanary() { + } + + public static int getErrorAccumulated() { + return errorAccumulated; + } + + @SuppressWarnings("unused") + public long getLastResponseTime() { + return lastResponseTime; + } + + @SuppressWarnings("unused") + public boolean isSparderRestarting() { + return sparderRestarting; + } + + public static void init() { + if (!isStarted) { + synchronized (SparderContextCanary.class) { + if (!isStarted) { + isStarted = true; + logger.info("Start monitoring Sparder"); + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(SparderContextCanary::monitor, + PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES); + } + } + } + } + + public static boolean isError() { + return errorAccumulated >= THRESHOLD_TO_RESTART_SPARK; + } + + public static void monitor() { + try { + long startTime = System.currentTimeMillis(); + // check sparder context + if (!SparderContext.isSparkAvailable()) { + logger.info("Sparder is unavailable, need to restart immediately."); + errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK); + } else { + try { + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparderContext.getSparkSession().sparkContext()); + jsc.setLocalProperty("spark.scheduler.pool", "vip_tasks"); + + long t = System.currentTimeMillis(); + long ret = numberCount(jsc).get(KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), + TimeUnit.MILLISECONDS); + logger.info("SparderContextCanary numberCount returned successfully with value {}, takes {} ms.", ret, + (System.currentTimeMillis() - t)); + // reset errorAccumulated once good context is confirmed + errorAccumulated = 0; + } catch (TimeoutException te) { + errorAccumulated++; + logger.error("SparderContextCanary numberCount timeout, didn't return in {} ms, error {} times.", + KylinConfig.getInstanceFromEnv().getSparderCanaryErrorResponseMs(), errorAccumulated); + } catch (ExecutionException ee) { + logger.error("SparderContextCanary numberCount occurs exception, need to restart immediately.", ee); + errorAccumulated = Math.max(errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK); + } catch (Exception e) { + errorAccumulated++; + logger.error("SparderContextCanary numberCount occurs exception.", e); + } + } + + lastResponseTime = System.currentTimeMillis() - startTime; + logger.debug("Sparder context errorAccumulated:{}", errorAccumulated); + + if (isError()) { + sparderRestarting = true; + try { + // Take repair action if error accumulated exceeds threshold + logger.warn("Repairing sparder context"); + if ("true".equals(System.getProperty("spark.local"))) { + SparderContext.setSparkSession(KylinSparkEnv.getSparkSession()); + } else { + SparderContext.restartSpark(); + } + } catch (Throwable th) { + logger.error("Restart sparder context failed.", th); + } + sparderRestarting = false; + } + } catch (Throwable th) { + logger.error("Error when monitoring Sparder.", th); + } + } + + // for canary + private static JavaFutureAction<Long> numberCount(JavaSparkContext jsc) { + List<Integer> list = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + list.add(i); + } + + return jsc.parallelize(list).countAsync(); + } +} diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index bd994e2..638a9ac 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference import org.apache.commons.io.FileUtils import org.apache.kylin.common.KylinConfig +import org.apache.kylin.query.monitor.SparderContextCanary import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import org.apache.spark.sql.execution.datasource.KylinSourceStrategy @@ -194,6 +195,11 @@ object SparderContext extends Logging { logInfo("Initializing Spark, waiting for done.") initializingThread.join() } + + if (System.getProperty("spark.local") ne "true") { + //monitor sparder + SparderContextCanary.init() + } } } diff --git a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java new file mode 100644 index 0000000..7a22892 --- /dev/null +++ b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/monitor/SparderContextCanaryTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.monitor; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.TempMetadataBuilder; +import org.apache.kylin.engine.spark.LocalWithSparkSessionTest; +import org.apache.kylin.job.exception.SchedulerException; +import org.apache.spark.sql.KylinSparkEnv; +import org.apache.spark.sql.SparderContext; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SparderContextCanaryTest extends LocalWithSparkSessionTest { + @Override + @Before + public void setup() throws SchedulerException { + super.setup(); + SparderContext.setSparkSession(KylinSparkEnv.getSparkSession()); + } + + @After + public void after() { + super.after(); + } + + @Test + public void testSparderKilled() { + // first check should be good + Boolean ss = SparderContext.isSparkAvailable(); + Assert.assertTrue(SparderContext.isSparkAvailable()); + + // stop sparder and check again, the sparder context should auto-restart + SparderContext.getSparkSession().stop(); + Assert.assertFalse(SparderContext.isSparkAvailable()); + + SparderContextCanary.monitor(); + + Assert.assertTrue(SparderContext.isSparkAvailable()); + + SparderContextCanary.monitor(); + Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated()); + } + + @Test + public void testSparderTimeout() { + // first check should be GOOD + Assert.assertTrue(SparderContext.isSparkAvailable()); + + // set kylin.canary.sqlcontext-error-response-ms to 1 + // And SparkContextCanary numberCount will timeout + Assert.assertEquals(0, SparderContextCanary.getErrorAccumulated()); + System.setProperty("kylin.canary.sparder-context-error-response-ms", "1"); + SparderContextCanary.monitor(); + + // errorAccumulated increase + Assert.assertEquals(1, SparderContextCanary.getErrorAccumulated()); + + // reach threshold to restart spark. Reset errorAccumulated. + SparderContextCanary.monitor(); + Assert.assertEquals(2, SparderContextCanary.getErrorAccumulated()); + SparderContextCanary.monitor(); + Assert.assertEquals(3, SparderContextCanary.getErrorAccumulated()); + + Assert.assertTrue(SparderContext.isSparkAvailable()); + + System.clearProperty("kylin.canary.sparder-context-error-response-ms"); + + } + + public void createTestMetadata() { + String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata(); + KylinConfig.setKylinConfigForLocalTest(tempMetadataDir); + getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false"); + } +}