This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 0ef5cac4bc748f6f9c2550e0191bce44245fdc44
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Fri May 5 10:40:07 2023 +0800

    KYLIN-5687 Fix async query and resource detect step jobs incorrectly update 
metadata
---
 .../kylin/query/engine/AsyncQueryApplication.java  | 20 ++++++++-
 .../engine/spark/application/SparkApplication.java | 36 +++++++++++++--
 .../kylin/engine/spark/job/RDSegmentBuildJob.java  |  5 +++
 .../spark/job/ResourceDetectBeforeMergingJob.java  |  5 +++
 .../spark/job/ResourceDetectBeforeSampling.java    |  5 +++
 .../apache/spark/application/JobWorkSpace.scala    | 47 +++-----------------
 .../spark/application/SparkApplicationTest.java    | 51 +++++++++++++++++++++-
 .../kylin/query/runtime/plan/ResultPlan.scala      |  2 +-
 8 files changed, 123 insertions(+), 48 deletions(-)

diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
index 24a977bee2..ab6d943ffc 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryApplication.java
@@ -32,14 +32,15 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.query.util.AsyncQueryUtil;
-import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.scheduler.JobFailed;
 import org.apache.kylin.metadata.query.QueryHistorySql;
 import org.apache.kylin.metadata.query.QueryHistorySqlParam;
 import org.apache.kylin.metadata.query.QueryMetricsContext;
 import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
 import org.apache.kylin.metadata.query.util.QueryHistoryUtil;
+import org.apache.kylin.query.util.AsyncQueryUtil;
+import org.apache.kylin.query.util.QueryParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +92,21 @@ public class AsyncQueryApplication extends SparkApplication {
         return config.getAsyncQuerySparkConfigOverride();
     }
 
+    @Override
+    protected void waiteForResourceSuccess() {
+        // do nothing
+    }
+
+    @Override
+    public void logJobInfo() {
+        // do nothing
+    }
+
+    @Override
+    public void updateJobErrorInfo(JobFailed jobFailed) {
+        // do nothing
+    }
+
     private void saveQueryHistory(QueryContext queryContext, QueryParams 
queryParams) {
         if (StringUtils.isEmpty(queryContext.getMetrics().getCorrectedSql())) {
             
queryContext.getMetrics().setCorrectedSql(queryContext.getUserSQL());
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index b087d758df..9d9af5980f 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -72,6 +73,7 @@ import org.apache.kylin.engine.spark.job.SegmentBuildJob;
 import org.apache.kylin.engine.spark.job.SparkJobConstants;
 import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
+import org.apache.kylin.engine.spark.scheduler.JobFailed;
 import org.apache.kylin.engine.spark.utils.HDFSUtils;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
@@ -106,6 +108,8 @@ import org.apache.spark.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+
 import lombok.val;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
@@ -114,7 +118,8 @@ public abstract class SparkApplication implements 
Application {
     private static final Logger logger = 
LoggerFactory.getLogger(SparkApplication.class);
     private Map<String, String> params = Maps.newHashMap();
     public static final String JOB_NAME_PREFIX = "job_step_";
-    private IJobProgressReport report;
+    private static final String JOB_ERROR_API = "/kylin/api/jobs/error";
+    protected IJobProgressReport report;
 
     protected volatile KylinConfig config;
     protected volatile String jobId;
@@ -298,7 +303,7 @@ public abstract class SparkApplication implements 
Application {
             TimeZoneUtils.setDefaultTimeZone(config);
 
             /// wait until resource is enough
-            waiteForResource(atomicSparkConf.get(), buildEnv);
+            waiteForResource();
 
             ///
             logger.info("Prepare job environment");
@@ -461,7 +466,7 @@ public abstract class SparkApplication implements 
Application {
         helper.applySparkConf(sparkConf);
     }
 
-    private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) 
throws Exception {
+    protected void waiteForResource() {
         val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
         infos.recordStageId(waiteForResource.getId());
         waiteForResource.execute();
@@ -530,6 +535,31 @@ public abstract class SparkApplication implements 
Application {
         }
     }
 
+    public void updateJobErrorInfo(JobFailed jobFailed) throws 
JsonProcessingException {
+        val stageId = infos.getStageId();
+        val jobStepId = StringUtils.replace(infos.getJobStepId(), 
SparkApplication.JOB_NAME_PREFIX, "");
+
+        val failedStepId = StringUtils.isBlank(stageId) ? jobStepId : stageId;
+        val failedSegmentId = infos.getSegmentId();
+        val failedStack = ExceptionUtils.getStackTrace(jobFailed.throwable());
+        val failedReason = this.getAtomicUnreachableSparkMaster().get()
+                ? "Unable connect spark master to reach timeout maximum time"
+                : jobFailed.reason();
+
+        val payload = new HashMap<String, Object>(5);
+        payload.put("project", project);
+        payload.put("job_id", jobId);
+        payload.put("failed_step_id", failedStepId);
+        payload.put("failed_segment_id", failedSegmentId);
+        payload.put("failed_stack", failedStack);
+        payload.put("failed_reason", failedReason);
+        val json = JsonUtil.writeValueAsString(payload);
+        val paramsMap = new HashMap<String, String>();
+        paramsMap.put(ParamsConstants.TIME_OUT, 
String.valueOf(config.getUpdateJobInfoTimeout()));
+        paramsMap.put(ParamsConstants.JOB_TMP_DIR, 
config.getJobTmpDir(project, true));
+        this.getReport().updateSparkJobInfo(paramsMap, JOB_ERROR_API, json);
+    }
+
     private Map<String, String> getReportParams() {
         val reportParams = new HashMap<String, String>();
         reportParams.put(ParamsConstants.TIME_OUT, 
String.valueOf(config.getUpdateJobInfoTimeout()));
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
index 0abb71ec21..4a1aa2fa8f 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
@@ -49,6 +49,11 @@ public class RDSegmentBuildJob extends SegmentJob implements 
ResourceDetect {
         }
     }
 
+    @Override
+    protected void waiteForResourceSuccess() {
+        // do nothing 
+    }
+
     private void detectPartition() throws IOException {
         for (NDataSegment dataSegment : readOnlySegments) {
             val buildParam = new BuildParam();
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
index 468eb50420..2f063cf6d1 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
@@ -79,6 +79,11 @@ public class ResourceDetectBeforeMergingJob extends 
SparkApplication implements
         return LogJobInfoUtils.resourceDetectBeforeMergingJobInfo();
     }
 
+    @Override
+    protected void waiteForResourceSuccess() {
+        // do nothing
+    }
+
     public static void main(String[] args) {
         ResourceDetectBeforeMergingJob resourceDetectJob = new 
ResourceDetectBeforeMergingJob();
         resourceDetectJob.execute(args);
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
index 3b74081556..2f77a528bf 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
@@ -74,4 +74,9 @@ public class ResourceDetectBeforeSampling extends 
SparkApplication implements Re
         ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, 
jobId),
                 tableName + "_" + 
ResourceDetectUtils.samplingDetectItemFileSuffix()), tableLeafTaskNums);
     }
+
+    @Override
+    protected void waiteForResourceSuccess() {
+        // do nothing
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index 3013cb5742..44c13b575c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -18,18 +18,16 @@
 
 package org.apache.spark.application
 
-import org.apache.commons.lang.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.kylin.common.util.{JsonUtil, Unsafe}
+import java.util
+import java.util.concurrent.CountDownLatch
+
+import org.apache.kylin.common.util.Unsafe
 import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.{KylinBuildEnv, ParamsConstants}
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
 import org.apache.kylin.engine.spark.scheduler._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.KylinJobEventLoop
 
-import java.util
-import java.util.concurrent.CountDownLatch
-
 /**
  * Spark driver part, construct the real spark job [SparkApplication]
  */
@@ -103,7 +101,7 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: 
JobMonitor, worker: Jo
     try {
       logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable)
       KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new 
util.HashMap, jf.throwable))
-      updateJobErrorInfo(jf)
+      worker.getApplication.updateJobErrorInfo(jf)
       stop()
     } finally {
       statusCode = 1
@@ -116,37 +114,4 @@ class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: 
JobMonitor, worker: Jo
     worker.stop()
     eventLoop.stop()
   }
-
-  def updateJobErrorInfo(jf: JobFailed): Unit = {
-    val infos = KylinBuildEnv.get().buildJobInfos
-    val context = worker.getApplication
-
-    val project = context.getProject
-    val jobId = context.getJobId
-
-    val stageId = infos.getStageId
-    val jobStepId = StringUtils.replace(infos.getJobStepId, 
SparkApplication.JOB_NAME_PREFIX, "")
-    val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId
-
-    val failedSegmentId = infos.getSegmentId
-    val failedStack = ExceptionUtils.getStackTrace(jf.throwable)
-    val failedReason =
-      if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark 
master to reach timeout maximum time"
-      else jf.reason
-    val url = "/kylin/api/jobs/error"
-
-    val payload: util.HashMap[String, Object] = new util.HashMap[String, 
Object](5)
-    payload.put("project", project)
-    payload.put("job_id", jobId)
-    payload.put("failed_step_id", failedStepId)
-    payload.put("failed_segment_id", failedSegmentId)
-    payload.put("failed_stack", failedStack)
-    payload.put("failed_reason", failedReason)
-    val json = JsonUtil.writeValueAsString(payload)
-    val params = new util.HashMap[String, String]()
-    val config = KylinBuildEnv.get().kylinConfig
-    params.put(ParamsConstants.TIME_OUT, 
config.getUpdateJobInfoTimeout.toString)
-    params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
-    context.getReport.updateSparkJobInfo(params, url, json);
-  }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
index 80d5f6fdd1..a0e5207da3 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/application/SparkApplicationTest.java
@@ -25,16 +25,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import lombok.val;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.engine.spark.job.BuildJobInfos;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
+import org.apache.kylin.engine.spark.job.MockJobProgressReport;
 import org.apache.kylin.engine.spark.job.ParamsConstants;
 import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
+import org.apache.kylin.engine.spark.scheduler.JobFailed;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -51,9 +54,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import lombok.val;
+
 public class SparkApplicationTest extends NLocalWithSparkSessionTest {
 
     File tempDir = new File("./temp/");
@@ -241,4 +249,45 @@ public class SparkApplicationTest extends 
NLocalWithSparkSessionTest {
         application.exchangeSparkConf(sparkConf);
     }
 
+    @Test
+    public void testUpdateJobErrorInfo() throws JsonProcessingException {
+        val config = getTestConfig();
+        val project = "test_project";
+        SparkApplication application = Mockito.spy(new SparkApplication() {
+            @Override
+            protected void doExecute() {
+            }
+        });
+
+        application.config = config;
+        application.jobId = "job_id";
+        application.project = project;
+
+        BuildJobInfos infos = new BuildJobInfos();
+        infos.recordStageId("stage_id");
+        infos.recordJobStepId("job_step_id");
+        infos.recordSegmentId("segment_id");
+
+        application.infos = infos;
+        MockJobProgressReport mockJobProgressReport = Mockito.spy(new 
MockJobProgressReport());
+        
Mockito.when(application.getReport()).thenReturn(mockJobProgressReport);
+
+        JobFailed jobFailed = Mockito.mock(JobFailed.class);
+        Mockito.when(jobFailed.reason()).thenReturn("test job failed");
+        try (MockedStatic<ExceptionUtils> exceptionUtilsMockedStatic = 
Mockito.mockStatic(ExceptionUtils.class)) {
+            exceptionUtilsMockedStatic.when(() -> 
ExceptionUtils.getStackTrace(jobFailed.throwable()))
+                    .thenReturn("test stack trace");
+            application.updateJobErrorInfo(jobFailed);
+        }
+
+        val paramsMap = new HashMap<String, String>();
+        paramsMap.put(ParamsConstants.TIME_OUT, 
String.valueOf(config.getUpdateJobInfoTimeout()));
+        paramsMap.put(ParamsConstants.JOB_TMP_DIR, 
config.getJobTmpDir(project, true));
+
+        val json = 
"{\"project\":\"test_project\",\"failed_segment_id\":\"segment_id\",\"failed_stack\":\"test
 stack "
+                + "trace\",\"job_id\":\"job_id\",\"failed_reason\":\"test job 
failed\",\"failed_step_id\":\"stage_id\"}";
+
+        Mockito.verify(application.getReport(), 
Mockito.times(1)).updateSparkJobInfo(paramsMap, "/kylin/api/jobs/error",
+                json);
+    }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index a1ad366e6e..3f23ab26ac 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -423,7 +423,7 @@ object ResultPlan extends LogEx {
   }
 
   def uploadAsyncQueryResult(file: File, path: String, queryId: String, 
format: String): Unit = {
-    HadoopUtil.getWorkingFileSystem
+    AsyncQueryUtil.getFileSystem
       .copyFromLocalFile(true, true, new Path(file.getPath), new Path(path + 
"/" + queryId + "." + format))
     if (file.exists()) file.delete()
   }

Reply via email to