This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 842a0601a643ac83d894ec8a248673c8fd13cfb5 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Thu Feb 23 18:00:57 2023 +0800 KYLIN-5528 support collect sparder gc * KYLIN-5528 support collect sparder gc * KYLIN-5528 add gc option to executor extraJavaOption * KYLIN-5528 change timeout to 30s --- .../src/main/resources/config/init.properties | 2 +- .../apache/kylin/rest/service/SystemService.java | 5 + .../org/apache/kylin/common/KylinConfigBase.java | 14 +++ .../src/main/resources/kylin-defaults0.properties | 2 +- .../src/main/resources/config/init.properties | 2 +- .../src/main/resources/config/init.properties | 2 +- .../kylin/rest/controller/NQueryController.java | 2 +- .../src/main/resources/config/init.properties | 2 +- .../engine/spark/job/NSparkExecutableTest.java | 15 ++- .../asyncprofiler/AsyncProfiling.scala | 2 +- .../QueryAsyncProfilerDriverPlugin.scala | 2 +- .../QueryAsyncProfilerSparkPlugin.scala | 2 +- .../diagnose/DiagnoseConstant.scala} | 21 +++- .../diagnose/DiagnoseDriverPlugin.scala} | 50 +++++---- .../plugin/diagnose/DiagnoseExecutorPlugin.scala | 124 +++++++++++++++++++++ .../query/plugin/diagnose/DiagnoseHelper.scala | 62 +++++++++++ .../diagnose/DiagnoseSparkPlugin.scala} | 9 +- .../scala/org/apache/spark/sql/KylinSession.scala | 56 ++++++---- .../scala/org/apache/spark/sql/SparderEnv.scala | 5 +- .../SparkPluginWithMeta.scala} | 4 +- .../asyncprofiler/AsyncProfilingTest.scala | 5 +- .../QueryAsyncProfilerSparkPluginTest.scala | 2 +- .../QueryProfilerDriverPluginTest.scala} | 5 +- .../diagnose/DiagnoseExecutorPluginTest.scala | 109 ++++++++++++++++++ .../query/plugin/diagnose/DiagnosePluginTest.scala | 82 ++++++++++++++ 25 files changed, 506 insertions(+), 80 deletions(-) diff --git a/src/common-booter/src/main/resources/config/init.properties b/src/common-booter/src/main/resources/config/init.properties index 5dbf475978..3ff1076256 100644 --- a/src/common-booter/src/main/resources/config/init.properties +++ b/src/common-booter/src/main/resources/config/init.properties @@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir} # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...] kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java index fa467e414d..25c961be8c 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java @@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.query.plugin.diagnose.DiagnoseHelper; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.BackupRequest; import org.apache.kylin.rest.request.DiagProgressRequest; @@ -138,6 +139,8 @@ public class SystemService extends BasicService { String[] arguments; // full if (StringUtils.isEmpty(jobId) && StringUtils.isEmpty(queryId)) { + // Sparder executor gc log should be collected before FULL and QUERY diag package + DiagnoseHelper.collectSparderExecutorGc(); if (startTime == null && endTime == null) { startTime = Long.toString(System.currentTimeMillis() - 259200000L); endTime = Long.toString(System.currentTimeMillis()); @@ -153,6 +156,8 @@ public class SystemService extends BasicService { arguments = new String[] { jobOpt, jobId, "-destDir", exportFile.getAbsolutePath(), "-diagId", uuid }; diagPackageType = JOB; } else { //query + // Sparder executor gc log should be collected before FULL and QUERY diag package + DiagnoseHelper.collectSparderExecutorGc(); arguments = new String[] { "-project", project, "-query", queryId, "-destDir", exportFile.getAbsolutePath(), "-diagId", uuid }; diagPackageType = QUERY; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 32af04760a..0490ce7b84 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2565,6 +2565,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.job.notification-on-empty-data-load", FALSE)); } + public boolean getJobErrorNotificationEnabled() { + return Boolean.parseBoolean(getOptional("kylin.job.notification-on-job-error", FALSE)); + } + public Long getStorageResourceSurvivalTimeThreshold() { return TimeUtil.timeStringAs(this.getOptional("kylin.storage.resource-survival-time-threshold", "7d"), TimeUnit.MILLISECONDS); @@ -3812,6 +3816,16 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.build.allow-non-strict-count-check", FALSE)); } + public long queryDiagnoseCollectionTimeout() { + return TimeUtil.timeStringAs(getOptional("kylin.query.diagnose-collection-timeout", "30s"), + TimeUnit.MILLISECONDS); + } + + public boolean queryDiagnoseEnable() { + return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE)) + && Boolean.parseBoolean(getOptional("kylin.query.diagnose-enabled", TRUE)); + } + // ============================================================================ // Cost based index Planner // ============================================================================ diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties index cbacd5d78a..9aee2acdda 100644 --- a/src/core-common/src/main/resources/kylin-defaults0.properties +++ b/src/core-common/src/main/resources/kylin-defaults0.properties @@ -115,7 +115,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir} # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...] kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer diff --git a/src/data-loading-booter/src/main/resources/config/init.properties b/src/data-loading-booter/src/main/resources/config/init.properties index 5dbf475978..3ff1076256 100644 --- a/src/data-loading-booter/src/main/resources/config/init.properties +++ b/src/data-loading-booter/src/main/resources/config/init.properties @@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir} # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...] kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer diff --git a/src/query-booter/src/main/resources/config/init.properties b/src/query-booter/src/main/resources/config/init.properties index 5dbf475978..3ff1076256 100644 --- a/src/query-booter/src/main/resources/config/init.properties +++ b/src/query-booter/src/main/resources/config/init.properties @@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir} # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...] kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java index ea088d38a8..21936b8faf 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java @@ -71,7 +71,7 @@ import org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNo import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.metadata.query.QueryHistoryRequest; import org.apache.kylin.metadata.query.util.QueryHisTransformStandardUtil; -import org.apache.kylin.query.asyncprofiler.AsyncProfiling; +import org.apache.kylin.query.plugin.asyncprofiler.AsyncProfiling; import org.apache.kylin.rest.cluster.ClusterManager; import org.apache.kylin.rest.request.SQLFormatRequest; import org.apache.kylin.rest.response.QueryStatisticsResponse; diff --git a/src/server/src/main/resources/config/init.properties b/src/server/src/main/resources/config/init.properties index 5dbf475978..3ff1076256 100644 --- a/src/server/src/main/resources/config/init.properties +++ b/src/server/src/main/resources/config/init.properties @@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir} # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here -kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M +kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...] kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java index ce5a26c72d..2fcf39b87d 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java @@ -29,6 +29,8 @@ import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin; +import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -152,12 +154,11 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase { String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc); Assert.assertNotNull(cmd); - Assert.assertTrue( - cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin")); + Assert.assertTrue(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName())); } overwriteSystemProp("kylin.engine.spark-conf.spark.plugins", - "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin"); + QueryAsyncProfilerSparkPlugin.class.getCanonicalName()); { val desc = sparkExecutable.getSparkAppDesc(); desc.setHadoopConfDir(hadoopConf); @@ -166,9 +167,8 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase { String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc); Assert.assertNotNull(cmd); - Assert.assertTrue( - cmd.contains("spark.plugins=org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin," - + "org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin")); + Assert.assertTrue(cmd.contains("spark.plugins=" + QueryAsyncProfilerSparkPlugin.class.getCanonicalName() + + "," + BuildAsyncProfilerSparkPlugin.class.getCanonicalName())); } overwriteSystemProp("kylin.engine.async-profiler-enabled", "false"); @@ -180,8 +180,7 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase { String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc); Assert.assertNotNull(cmd); - Assert.assertFalse( - cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin")); + Assert.assertFalse(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName())); } overwriteSystemProp("kylin.engine.spark-conf.spark.driver.extraJavaOptions", diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala similarity index 98% rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala index d27cfc76f5..8147ba94ec 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.asyncprofiler.Message._ diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala similarity index 97% copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala index 8af9631561..f8fe0e38a6 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool import org.apache.spark.SparkContext diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala similarity index 95% copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala index fd4306d3af..a2e4af8554 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala similarity index 66% copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala index fd4306d3af..98c3cbf585 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala @@ -16,14 +16,23 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.diagnose -import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin -import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin} +object DiagnoseConstant { + // state + val STATE_WAIT = "STATE_WAIT" + val STATE_COLLECT = "STATE_COLLECT" -class QueryAsyncProfilerSparkPlugin extends SparkPlugin { + // executor message + val NEXTCMD = "NEXTCMD" + val SENDRESULT = "SENDRESULT" + val HDFSDIR = "HDFSDIR" - override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin + // driver message + val NOP = "NOP" + val COLLECT = "COLLECT" + + // empty + val EMPTY = "" - override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala similarity index 51% rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala index 8af9631561..fed11cd45e 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala @@ -16,34 +16,36 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.diagnose -import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool -import org.apache.spark.SparkContext -import org.apache.spark.api.plugin.{DriverPlugin, PluginContext} +import org.apache.kylin.common.KylinConfig +import org.apache.spark.api.plugin.DriverPlugin import org.apache.spark.internal.Logging -import java.util - -class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging { - - override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { - // Sparder Driver and KE are always in one JVM, in client mode - AsyncProfilerTool.loadAsyncProfilerLib(true) - super.init(sc, pluginContext) - } - +class DiagnoseDriverPlugin extends DriverPlugin with Logging { override def receive(message: Any): AnyRef = { - import org.apache.kylin.common.asyncprofiler.Message._ + message match { + case DiagnoseConstant.NEXTCMD => + getNextCommand() + case DiagnoseConstant.HDFSDIR => + KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory + case DiagnoseConstant.SENDRESULT => + countDownGcResult() + case _ => DiagnoseConstant.EMPTY + } + } - val (command, executorId, param) = processMessage(message.toString) - command match { - case NEXT_COMMAND => - AsyncProfiling.nextCommand() - case RESULT => - AsyncProfiling.cacheExecutorResult(param, executorId) - "" - case _ => "" + def getNextCommand(): AnyRef = { + if (DiagnoseConstant.STATE_COLLECT.equals(DiagnoseHelper.state)) { + DiagnoseConstant.COLLECT + } else { + DiagnoseConstant.NOP } } -} \ No newline at end of file + + def countDownGcResult(): AnyRef = { + DiagnoseHelper.countDownGcResult() + DiagnoseConstant.EMPTY + } + +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala new file mode 100644 index 0000000000..c6c811b7c3 --- /dev/null +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.plugin.diagnose + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.kylin.common.util.ExecutorServiceUtil +import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext} +import org.apache.spark.internal.Logging +import org.joda.time.DateTime + +import java.io.File +import java.util +import java.util.concurrent.{Executors, TimeUnit} + +class DiagnoseExecutorPlugin extends ExecutorPlugin with Logging { + + private val SPARDER_LOG: String = "_sparder_logs" + private val LOCAL_GC_FILE_PREFIX: String = "gc" + private val DATE_PATTERN = "yyyy-MM-dd" + private val checkingInterval: Long = 10000L + private val configuration: Configuration = new Configuration() + private val fileSystem: FileSystem = FileSystem.get(configuration) + + private val scheduledExecutorService = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Diagnose-%d").build()) + + private var state = DiagnoseConstant.STATE_WAIT + private var curContainerDir = new File(".") + private var sparderLogDir: String = "" + private var ctx: PluginContext = _ + + override def init(_ctx: PluginContext, extraConf: util.Map[String, String]): Unit = { + ctx = _ctx + val diagnose = new Runnable { + override def run(): Unit = checkAndDiagnose() + } + logInfo("Diagnose executor plugin is initializing ...") + scheduledExecutorService.scheduleWithFixedDelay( + diagnose, 0, checkingInterval, TimeUnit.MILLISECONDS) + } + + def checkAndDiagnose(): Unit = { + try { + val replay: AnyRef = ctx.ask(DiagnoseConstant.NEXTCMD) + logDebug(s"Executor ${ctx.executorID()} get replay $replay from driver ...") + replay match { + case DiagnoseConstant.COLLECT => + if (DiagnoseConstant.STATE_WAIT.equals(state)) { + state = DiagnoseConstant.STATE_COLLECT + logDebug(s"Set executor state to $state") + collectGcLog() + ctx.send(DiagnoseConstant.SENDRESULT) + } + case DiagnoseConstant.NOP => + if (!DiagnoseConstant.STATE_WAIT.equals(state)) { + state = DiagnoseConstant.STATE_WAIT + logDebug(s"Set executor state to $state") + } + case _ => "" + } + } catch { + case e: Exception => + logInfo("Error while communication/Diagnose", e) + } + } + + def collectGcLog(): Unit = { + logDebug(s"Collectting sparder gc log file ...") + if (sparderLogDir.isEmpty) { + val reply = ctx.ask(DiagnoseConstant.HDFSDIR).toString + if (reply.isEmpty) { + logWarning(s"Can not get kylin working dir, will not collect sparder executor gc log.") + return + } else { + sparderLogDir = reply + SPARDER_LOG + logInfo(s"HDFS sparder log dir is setting to ${sparderLogDir}") + } + } + val filePath = sparderLogDir + File.separator + new DateTime().toString(DATE_PATTERN) + + File.separator + ctx.conf().getAppId + File.separator + val fileNamePrefix = "executor-%s-".format(ctx.executorID()) + + curContainerDir.listFiles().filter(file => file.getName.startsWith(LOCAL_GC_FILE_PREFIX)) + .map(file => copyLocalFileToHdfs(new Path(file.getAbsolutePath), new Path(filePath, fileNamePrefix + file.getName))) + } + + def copyLocalFileToHdfs(local: Path, hdfs: Path): Unit = { + logInfo(s"Local gc file path is: ${local}, target hdfs file is: ${hdfs}") + fileSystem.copyFromLocalFile(local, hdfs) + } + + override def shutdown(): Unit = { + ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3) + super.shutdown() + } + + // for test only + def setCtx(_ctx: PluginContext): Unit = { + ctx = _ctx + } + + // for test only + def setContainerDir(_curContainerDir: File): Unit = { + curContainerDir = _curContainerDir + } +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala new file mode 100644 index 0000000000..9e0c3d2133 --- /dev/null +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.plugin.diagnose + +import org.apache.kylin.common.KylinConfig +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparderEnv + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +object DiagnoseHelper extends Logging { + var state: String = DiagnoseConstant.STATE_WAIT + var resultCollectionTimeout: Long = KylinConfig.getInstanceFromEnv.queryDiagnoseCollectionTimeout + var gcResult: CountDownLatch = _ + + // activeExecutorCount will be set for UT + var activeExecutorCount: Int = 0 + + def collectSparderExecutorGc(): Unit = { + if (!KylinConfig.getInstanceFromEnv.isUTEnv) { + activeExecutorCount = SparderEnv.getActiveExecutorIds.size + } + initGcCount(activeExecutorCount) + + setState(DiagnoseConstant.STATE_COLLECT) + + if (gcResult.await(resultCollectionTimeout, TimeUnit.MILLISECONDS)) { + logInfo("All executor gc logs have been uploaded to hdfs") + } else { + logWarning("Timeout while waiting for gc log result") + } + setState(DiagnoseConstant.STATE_WAIT) + } + + def initGcCount(count: Int): Unit = { + gcResult = new CountDownLatch(count) + } + + def setState(_state: String): Unit = { + DiagnoseHelper.state = _state + } + + def countDownGcResult(): Unit = { + gcResult.countDown() + } +} diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala similarity index 72% rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala index fd4306d3af..52a90e93de 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.diagnose -import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin} -class QueryAsyncProfilerSparkPlugin extends SparkPlugin { +class DiagnoseSparkPlugin extends SparkPlugin { - override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin + override def driverPlugin(): DriverPlugin = new DiagnoseDriverPlugin - override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin + override def executorPlugin(): ExecutorPlugin = new DiagnoseExecutorPlugin } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala index 1f6ad0ebba..2ebf1d4826 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala @@ -18,28 +18,28 @@ package org.apache.spark.sql -import java.io._ -import java.net.URI -import java.nio.file.Paths - -import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation -import org.apache.kylin.common.{KapConfig, KylinConfig} import org.apache.kylin.common.util.{HadoopUtil, Unsafe} +import org.apache.kylin.common.{KapConfig, KylinConfig} import org.apache.kylin.metadata.query.BigQueryThresholdUpdater +import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin +import org.apache.kylin.query.plugin.diagnose.DiagnoseSparkPlugin import org.apache.kylin.query.util.ExtractFactory -import org.springframework.expression.common.TemplateParserContext -import org.springframework.expression.spel.standard.SpelExpressionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder -import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.{SQLConf, SessionState, SharedState, StaticSQLConf} import org.apache.spark.sql.udf.UdfManager import org.apache.spark.util.{KylinReflectUtils, Utils} +import org.apache.spark.{SparkConf, SparkContext} +import org.springframework.expression.common.TemplateParserContext +import org.springframework.expression.spel.standard.SpelExpressionParser + +import java.io._ +import java.net.URI +import java.nio.file.Paths +import scala.collection.JavaConverters._ class KylinSession( @transient val sc: SparkContext, @@ -112,6 +112,7 @@ class KylinSession( object KylinSession extends Logging { val NORMAL_FAIR_SCHEDULER_FILE_NAME: String = "/fairscheduler.xml" val QUERY_LIMIT_FAIR_SCHEDULER_FILE_NAME: String = "/query-limit-fair-scheduler.xml" + val SPARK_PLUGINS_KEY = "spark.plugins" implicit class KylinBuilder(builder: Builder) { var queryCluster: Boolean = true @@ -328,16 +329,30 @@ object KylinSession extends Logging { } } + checkAndSetSparkPlugins(sparkConf) + + sparkConf + } + + def checkAndSetSparkPlugins(sparkConf: SparkConf): Unit = { + // Sparder diagnose plugin + if (kapConfig.getKylinConfig.queryDiagnoseEnable()) { + addSparkPlugin(sparkConf, classOf[DiagnoseSparkPlugin].getCanonicalName) + } + + // Query profile plugin if (kapConfig.getKylinConfig.asyncProfilingEnabled()) { - val plugins = sparkConf.get("spark.plugins", "") - if (plugins.isEmpty) { - sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin") - } else { - sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin," + plugins) - } + addSparkPlugin(sparkConf, classOf[QueryAsyncProfilerSparkPlugin].getCanonicalName) } + } - sparkConf + def addSparkPlugin(sparkConf: SparkConf, pluginName: String): Unit = { + val plugins = sparkConf.get(SPARK_PLUGINS_KEY, "") + if (plugins.isEmpty) { + sparkConf.set(SPARK_PLUGINS_KEY, pluginName) + } else { + sparkConf.set(SPARK_PLUGINS_KEY, pluginName + "," + plugins) + } } def buildCluster(): KylinBuilder = { @@ -411,7 +426,8 @@ object KylinSession extends Logging { val parser = new SpelExpressionParser() val parserCtx = new TemplateParserContext() while ( { - templateLine = fileReader.readLine(); templateLine != null + templateLine = fileReader.readLine(); + templateLine != null }) { processedLine = parser.parseExpression(templateLine, parserCtx).getValue(params, classOf[String]) + "\r\n" fileWriter.write(processedLine) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index 59ce599fce..cfb469d547 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -346,5 +346,8 @@ object SparderEnv extends Logging { configuration } - + // Return the list of currently active executors + def getActiveExecutorIds(): Seq[String] = { + getSparkSession.sparkContext.getExecutorIds() + } } diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala similarity index 94% rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala index 9db5d72000..064fb4516f 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin import org.apache.kylin.common.util.NLocalFileMetadataTestCase import org.apache.spark.{SparkContext, SparkFunSuite} @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll import java.io.File -trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll { +trait SparkPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll { @transient var sc: SparkContext = _ protected val ut_meta = "../examples/test_case_data/localmeta" diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala similarity index 94% rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala index 33f497ee33..81188e5aef 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler import org.apache.kylin.common.KylinConfig +import org.apache.kylin.query.plugin.SparkPluginWithMeta import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin import org.apache.spark.launcher.SparkLauncher import org.apache.spark.{SparkConf, SparkContext} @@ -26,7 +27,7 @@ import org.mockito.Mockito.mock import java.io.{File, OutputStream} -class AsyncProfilingTest extends AsyncPluginWithMeta { +class AsyncProfilingTest extends SparkPluginWithMeta { val sparkPluginName: String = classOf[BuildAsyncProfilerSparkPlugin].getName val flagFileDir: String = System.getProperty("java.io.tmpdir") + "default/jobStepId/" diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala similarity index 96% rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala index 1ecbbe6a23..d0e21562cf 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin import org.junit.Assert diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala similarity index 92% rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala index 44c564b3ba..74db3cc3a1 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala @@ -16,13 +16,14 @@ * limitations under the License. */ -package org.apache.kylin.query.asyncprofiler +package org.apache.kylin.query.plugin.asyncprofiler +import org.apache.kylin.query.plugin.SparkPluginWithMeta import org.apache.spark.launcher.SparkLauncher import org.apache.spark.{SparkConf, SparkContext} import org.junit.Assert -class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta { +class QueryAsyncProfilerDriverPluginTest extends SparkPluginWithMeta { val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala new file mode 100644 index 0000000000..c725c3f83b --- /dev/null +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.plugin.diagnose + +import com.codahale.metrics.MetricRegistry +import org.apache.kylin.query.plugin.SparkPluginWithMeta +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.Assert + +import java.nio.file.Files +import java.util + +class DiagnoseExecutorPluginTest extends SparkPluginWithMeta { + + val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName + val executorPluginTest = new DiagnoseExecutorPlugin() + val mockPluginCtx = new MockPluginCtx + val tempContainerDir = Files.createTempDirectory("PluginContainerTest") + + override def beforeAll(): Unit = { + super.beforeAll() + val conf = new SparkConf() + .setAppName(getClass.getName) + .set(SparkLauncher.SPARK_MASTER, "local[1]") + .set("spark.plugins", sparkPluginName) + sc = new SparkContext(conf) + + mockPluginCtx.sparkConf = sc.getConf + mockPluginCtx.hdfsDir = tempContainerDir.toString + mockPluginCtx.mockId = "mockId" + + executorPluginTest.setCtx(mockPluginCtx) + executorPluginTest.setContainerDir(tempContainerDir.toFile) + } + + override def afterAll(): Unit = { + super.afterAll() + mockPluginCtx.clear() + } + + + test("Test executor plugin") { + val filePath = Files.createTempFile(tempContainerDir, "gc", "log") + Assert.assertTrue(filePath.toFile.exists()) + + mockPluginCtx.message = DiagnoseConstant.COLLECT + executorPluginTest.checkAndDiagnose() + + mockPluginCtx.message = DiagnoseConstant.NOP + executorPluginTest.checkAndDiagnose() + + mockPluginCtx.message = DiagnoseConstant.EMPTY + executorPluginTest.checkAndDiagnose() + } +} + +class MockPluginCtx() extends PluginContext { + var message: String = _ + var mockId: String = _ + var hdfsDir: String = _ + var sparkConf: SparkConf = _ + + override def ask(input: Any): String = { + if (DiagnoseConstant.HDFSDIR.equals(input)) { + hdfsDir + } else { + message + } + } + + override def executorID(): String = mockId + + override def conf(): SparkConf = sparkConf + + override def metricRegistry(): MetricRegistry = null + + override def hostname(): String = "MockHostname" + + override def resources(): util.Map[String, ResourceInformation] = null + + override def send(message: Any): Unit = {} + + def clear(): Unit = { + message = null + mockId = null + hdfsDir = null + sparkConf = null + } + +} diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala new file mode 100644 index 0000000000..75df31685e --- /dev/null +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.plugin.diagnose + +import org.apache.kylin.common.KylinConfig +import org.apache.kylin.query.plugin.SparkPluginWithMeta +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.Assert + +class DiagnosePluginTest extends SparkPluginWithMeta { + + val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName + val diagPlugin = new DiagnoseSparkPlugin + val driverPluginTest = new DiagnoseDriverPlugin() + val executorPluginTest = new DiagnoseExecutorPlugin() + + override def beforeAll(): Unit = { + super.beforeAll() + val conf = new SparkConf() + .setAppName(getClass.getName) + .set(SparkLauncher.SPARK_MASTER, "local[1]") + .set("spark.plugins", sparkPluginName) + sc = new SparkContext(conf) + } + + + test("Test trigger gc collection") { + // get gc success + DiagnoseHelper.collectSparderExecutorGc() + Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state) + + // get gc failed + DiagnoseHelper.resultCollectionTimeout = 100L + DiagnoseHelper.activeExecutorCount = 1 + DiagnoseHelper.collectSparderExecutorGc() + Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state) + } + + + test("Test driver plugin") { + // NEXTCMD + DiagnoseHelper.setState(DiagnoseConstant.STATE_WAIT) + var reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD) + Assert.assertEquals(DiagnoseConstant.NOP, reply) + + DiagnoseHelper.setState(DiagnoseConstant.STATE_COLLECT) + reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD) + Assert.assertEquals(DiagnoseConstant.COLLECT, reply) + + // SENDRESULT + DiagnoseHelper.initGcCount(2) + reply = driverPluginTest.receive(DiagnoseConstant.SENDRESULT) + Assert.assertEquals(DiagnoseConstant.EMPTY, reply) + Assert.assertEquals(1, DiagnoseHelper.gcResult.getCount) + + // HDFSDIR + reply = driverPluginTest.receive(DiagnoseConstant.HDFSDIR) + Assert.assertEquals(KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory, reply) + + // Other + reply = driverPluginTest.receive("Other") + Assert.assertEquals(DiagnoseConstant.EMPTY, reply) + } + +}