This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 0ef5cac4bc748f6f9c2550e0191bce44245fdc44 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Fri May 5 10:40:07 2023 +0800 KYLIN-5687 Fix async query and resource detect step jobs incorrectly update metadata --- .../kylin/query/engine/AsyncQueryApplication.java | 20 ++++++++- .../engine/spark/application/SparkApplication.java | 36 +++++++++++++-- .../kylin/engine/spark/job/RDSegmentBuildJob.java | 5 +++ .../spark/job/ResourceDetectBeforeMergingJob.java | 5 +++ .../spark/job/ResourceDetectBeforeSampling.java | 5 +++ .../apache/spark/application/JobWorkSpace.scala | 47 +++----------------- .../spark/application/SparkApplicationTest.java | 51 +++++++++++++++++++++- .../kylin/query/runtime/plan/ResultPlan.scala | 2 +- 8 files changed, 123 insertions(+), 48 deletions(-) diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java index 24a977bee2..ab6d943ffc 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java @@ -32,14 +32,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.query.util.AsyncQueryUtil; -import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.engine.spark.application.SparkApplication; +import org.apache.kylin.engine.spark.scheduler.JobFailed; import org.apache.kylin.metadata.query.QueryHistorySql; import org.apache.kylin.metadata.query.QueryHistorySqlParam; import org.apache.kylin.metadata.query.QueryMetricsContext; import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO; import org.apache.kylin.metadata.query.util.QueryHistoryUtil; +import org.apache.kylin.query.util.AsyncQueryUtil; +import org.apache.kylin.query.util.QueryParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +92,21 @@ public class AsyncQueryApplication extends SparkApplication { return config.getAsyncQuerySparkConfigOverride(); } + @Override + protected void waiteForResourceSuccess() { + // do nothing + } + + @Override + public void logJobInfo() { + // do nothing + } + + @Override + public void updateJobErrorInfo(JobFailed jobFailed) { + // do nothing + } + private void saveQueryHistory(QueryContext queryContext, QueryParams queryParams) { if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) { queryContext.getMetrics().setCorrectedSql(queryContext.getUserSQL()); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index b087d758df..9d9af5980f 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -72,6 +73,7 @@ import org.apache.kylin.engine.spark.job.SegmentBuildJob; import org.apache.kylin.engine.spark.job.SparkJobConstants; import org.apache.kylin.engine.spark.job.UdfManager; import org.apache.kylin.engine.spark.scheduler.ClusterMonitor; +import org.apache.kylin.engine.spark.scheduler.JobFailed; import org.apache.kylin.engine.spark.utils.HDFSUtils; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.SparkConfHelper; @@ -106,6 +108,8 @@ import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; + import lombok.val; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; @@ -114,7 +118,8 @@ public abstract class SparkApplication implements Application { private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class); private Map<String, String> params = Maps.newHashMap(); public static final String JOB_NAME_PREFIX = "job_step_"; - private IJobProgressReport report; + private static final String JOB_ERROR_API = "/kylin/api/jobs/error"; + protected IJobProgressReport report; protected volatile KylinConfig config; protected volatile String jobId; @@ -298,7 +303,7 @@ public abstract class SparkApplication implements Application { TimeZoneUtils.setDefaultTimeZone(config); /// wait until resource is enough - waiteForResource(atomicSparkConf.get(), buildEnv); + waiteForResource(); /// logger.info("Prepare job environment"); @@ -461,7 +466,7 @@ public abstract class SparkApplication implements Application { helper.applySparkConf(sparkConf); } - private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) throws Exception { + protected void waiteForResource() { val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null); infos.recordStageId(waiteForResource.getId()); waiteForResource.execute(); @@ -530,6 +535,31 @@ public abstract class SparkApplication implements Application { } } + public void updateJobErrorInfo(JobFailed jobFailed) throws JsonProcessingException { + val stageId = infos.getStageId(); + val jobStepId = StringUtils.replace(infos.getJobStepId(), SparkApplication.JOB_NAME_PREFIX, ""); + + val failedStepId = StringUtils.isBlank(stageId) ? jobStepId : stageId; + val failedSegmentId = infos.getSegmentId(); + val failedStack = ExceptionUtils.getStackTrace(jobFailed.throwable()); + val failedReason = this.getAtomicUnreachableSparkMaster().get() + ? "Unable connect spark master to reach timeout maximum time" + : jobFailed.reason(); + + val payload = new HashMap<String, Object>(5); + payload.put("project", project); + payload.put("job_id", jobId); + payload.put("failed_step_id", failedStepId); + payload.put("failed_segment_id", failedSegmentId); + payload.put("failed_stack", failedStack); + payload.put("failed_reason", failedReason); + val json = JsonUtil.writeValueAsString(payload); + val paramsMap = new HashMap<String, String>(); + paramsMap.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout())); + paramsMap.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)); + this.getReport().updateSparkJobInfo(paramsMap, JOB_ERROR_API, json); + } + private Map<String, String> getReportParams() { val reportParams = new HashMap<String, String>(); reportParams.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout())); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java index 0abb71ec21..4a1aa2fa8f 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java @@ -49,6 +49,11 @@ public class RDSegmentBuildJob extends SegmentJob implements ResourceDetect { } } + @Override + protected void waiteForResourceSuccess() { + // do nothing + } + private void detectPartition() throws IOException { for (NDataSegment dataSegment : readOnlySegments) { val buildParam = new BuildParam(); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java index 468eb50420..2f063cf6d1 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java @@ -79,6 +79,11 @@ public class ResourceDetectBeforeMergingJob extends SparkApplication implements return LogJobInfoUtils.resourceDetectBeforeMergingJobInfo(); } + @Override + protected void waiteForResourceSuccess() { + // do nothing + } + public static void main(String[] args) { ResourceDetectBeforeMergingJob resourceDetectJob = new ResourceDetectBeforeMergingJob(); resourceDetectJob.execute(args); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java index 3b74081556..2f77a528bf 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java @@ -74,4 +74,9 @@ public class ResourceDetectBeforeSampling extends SparkApplication implements Re ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), tableName + "_" + ResourceDetectUtils.samplingDetectItemFileSuffix()), tableLeafTaskNums); } + + @Override + protected void waiteForResourceSuccess() { + // do nothing + } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala index 3013cb5742..44c13b575c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -18,18 +18,16 @@ package org.apache.spark.application -import org.apache.commons.lang.StringUtils -import org.apache.commons.lang3.exception.ExceptionUtils -import org.apache.kylin.common.util.{JsonUtil, Unsafe} +import java.util +import java.util.concurrent.CountDownLatch + +import org.apache.kylin.common.util.Unsafe import org.apache.kylin.engine.spark.application.SparkApplication -import org.apache.kylin.engine.spark.job.{KylinBuildEnv, ParamsConstants} +import org.apache.kylin.engine.spark.job.KylinBuildEnv import org.apache.kylin.engine.spark.scheduler._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.KylinJobEventLoop -import java.util -import java.util.concurrent.CountDownLatch - /** * Spark driver part, construct the real spark job [SparkApplication] */ @@ -103,7 +101,7 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: Jo try { logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable) KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new util.HashMap, jf.throwable)) - updateJobErrorInfo(jf) + worker.getApplication.updateJobErrorInfo(jf) stop() } finally { statusCode = 1 @@ -116,37 +114,4 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: Jo worker.stop() eventLoop.stop() } - - def updateJobErrorInfo(jf: JobFailed): Unit = { - val infos = KylinBuildEnv.get().buildJobInfos - val context = worker.getApplication - - val project = context.getProject - val jobId = context.getJobId - - val stageId = infos.getStageId - val jobStepId = StringUtils.replace(infos.getJobStepId, SparkApplication.JOB_NAME_PREFIX, "") - val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId - - val failedSegmentId = infos.getSegmentId - val failedStack = ExceptionUtils.getStackTrace(jf.throwable) - val failedReason = - if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark master to reach timeout maximum time" - else jf.reason - val url = "/kylin/api/jobs/error" - - val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](5) - payload.put("project", project) - payload.put("job_id", jobId) - payload.put("failed_step_id", failedStepId) - payload.put("failed_segment_id", failedSegmentId) - payload.put("failed_stack", failedStack) - payload.put("failed_reason", failedReason) - val json = JsonUtil.writeValueAsString(payload) - val params = new util.HashMap[String, String]() - val config = KylinBuildEnv.get().kylinConfig - params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString) - params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)) - context.getReport.updateSparkJobInfo(params, url, json); - } } diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java index 80d5f6fdd1..a0e5207da3 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java @@ -25,16 +25,19 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import lombok.val; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.engine.spark.job.BuildJobInfos; import org.apache.kylin.engine.spark.job.KylinBuildEnv; +import org.apache.kylin.engine.spark.job.MockJobProgressReport; import org.apache.kylin.engine.spark.job.ParamsConstants; import org.apache.kylin.engine.spark.job.RestfulJobProgressReport; +import org.apache.kylin.engine.spark.scheduler.JobFailed; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.model.ColumnDesc; @@ -51,9 +54,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; +import com.fasterxml.jackson.core.JsonProcessingException; + +import lombok.val; + public class SparkApplicationTest extends NLocalWithSparkSessionTest { File tempDir = new File("./temp/"); @@ -241,4 +249,45 @@ public class SparkApplicationTest extends NLocalWithSparkSessionTest { application.exchangeSparkConf(sparkConf); } + @Test + public void testUpdateJobErrorInfo() throws JsonProcessingException { + val config = getTestConfig(); + val project = "test_project"; + SparkApplication application = Mockito.spy(new SparkApplication() { + @Override + protected void doExecute() { + } + }); + + application.config = config; + application.jobId = "job_id"; + application.project = project; + + BuildJobInfos infos = new BuildJobInfos(); + infos.recordStageId("stage_id"); + infos.recordJobStepId("job_step_id"); + infos.recordSegmentId("segment_id"); + + application.infos = infos; + MockJobProgressReport mockJobProgressReport = Mockito.spy(new MockJobProgressReport()); + Mockito.when(application.getReport()).thenReturn(mockJobProgressReport); + + JobFailed jobFailed = Mockito.mock(JobFailed.class); + Mockito.when(jobFailed.reason()).thenReturn("test job failed"); + try (MockedStatic<ExceptionUtils> exceptionUtilsMockedStatic = Mockito.mockStatic(ExceptionUtils.class)) { + exceptionUtilsMockedStatic.when(() -> ExceptionUtils.getStackTrace(jobFailed.throwable())) + .thenReturn("test stack trace"); + application.updateJobErrorInfo(jobFailed); + } + + val paramsMap = new HashMap<String, String>(); + paramsMap.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout())); + paramsMap.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)); + + val json = "{\"project\":\"test_project\",\"failed_segment_id\":\"segment_id\",\"failed_stack\":\"test stack " + + "trace\",\"job_id\":\"job_id\",\"failed_reason\":\"test job failed\",\"failed_step_id\":\"stage_id\"}"; + + Mockito.verify(application.getReport(), Mockito.times(1)).updateSparkJobInfo(paramsMap, "/kylin/api/jobs/error", + json); + } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index a1ad366e6e..3f23ab26ac 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -423,7 +423,7 @@ object ResultPlan extends LogEx { } def uploadAsyncQueryResult(file: File, path: String, queryId: String, format: String): Unit = { - HadoopUtil.getWorkingFileSystem + AsyncQueryUtil.getFileSystem .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + "/" + queryId + "." + format)) if (file.exists()) file.delete() }