This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 73e0104fc7f14c12fc4e85bbc1427501b8c001bb
Author: jlf <longfei.ji...@kyligence.io>
AuthorDate: Fri Jun 16 16:31:57 2023 +0800

    fix async query error
---
 .../apache/kylin/query/engine/AsyncQueryJob.java   | 17 ++++++--
 .../kylin/query/engine/AsyncQueryJobTest.java      | 46 ++++++++++++++++++++--
 .../kylin/engine/spark/job/NSparkExecutable.java   |  4 +-
 3 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java 
b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java
index f3430aa361..5676e7259a 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java
@@ -23,6 +23,7 @@ import static 
org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -30,6 +31,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.KylinRuntimeException;
+import org.apache.kylin.common.extension.KylinInfoExtension;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.metadata.MetadataStore;
 import org.apache.kylin.common.util.BufferedLogger;
@@ -38,6 +40,9 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.spark.job.DefaultSparkBuildJobHandler;
 import org.apache.kylin.engine.spark.job.NSparkExecutable;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.JobTypeEnum;
@@ -49,9 +54,6 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
 
 import lombok.val;
 
@@ -197,4 +199,13 @@ public class AsyncQueryJob extends NSparkExecutable {
         }
         return HadoopUtil.getHadoopConfDir();
     }
+
+    @Override
+    public void modifyDump(Properties props) {
+        super.modifyDump(props);
+        if (!KylinInfoExtension.getFactory().checkKylinInfo()) {
+            props.setProperty("kylin.streaming.enabled", KylinConfig.FALSE);
+            props.remove("kylin.second-storage.class");
+        }
+    }
 }
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java 
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
index e424e78d39..4ba275bf48 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java
@@ -41,12 +41,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.extension.KylinInfoExtension;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.ShellException;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.query.util.QueryParams;
@@ -57,10 +61,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 
-import org.apache.kylin.guava30.shaded.common.io.ByteSource;
 import lombok.val;
 
 public class AsyncQueryJobTest extends NLocalFileMetadataTestCase {
@@ -316,4 +317,43 @@ public class AsyncQueryJobTest extends 
NLocalFileMetadataTestCase {
             Assert.assertTrue(executeResult.output().contains("--conf 
'spark.executor.cores=3'"));
         }
     }
+
+    @Test
+    public void testModifyDump() {
+        AsyncQueryJob asyncQueryJob = new AsyncQueryJob() {
+            @Override
+            protected ExecuteResult runSparkSubmit(String hadoopConf, String 
kylinJobJar, String appArgs) {
+                return ExecuteResult.createSucceed();
+            }
+        };
+
+        val properties = new Properties();
+        properties.setProperty("kylin.extension.info.factory",
+                
"org.apache.kylin.common.extension.KylinInfoExtension$Factory");
+        properties.setProperty("kylin.second-storage.class",
+                
"org.apache.kylin.common.extension.KylinInfoExtension$Factory");
+        properties.setProperty("kylin.streaming.enabled", "true");
+        try (val mockedStatic = Mockito.mockStatic(KylinInfoExtension.class)) {
+            val kylinInfoExtensionFactory = 
Mockito.mock(KylinInfoExtension.Factory.class);
+            
mockedStatic.when(KylinInfoExtension::getFactory).thenReturn(kylinInfoExtensionFactory);
+
+            
Mockito.when(kylinInfoExtensionFactory.checkKylinInfo()).thenReturn(false);
+            val properties1 = new Properties();
+            properties1.putAll(properties);
+            asyncQueryJob.modifyDump(properties1);
+            Assert.assertNull(properties1.get("kylin.extension.info.factory"));
+            Assert.assertNull(properties1.get("kylin.second-storage.class"));
+            Assert.assertEquals("false", 
properties1.get("kylin.streaming.enabled"));
+
+            
Mockito.when(kylinInfoExtensionFactory.checkKylinInfo()).thenReturn(true);
+            val properties2 = new Properties();
+            properties2.putAll(properties);
+            asyncQueryJob.modifyDump(properties2);
+            Assert.assertNull(properties2.get("kylin.extension.info.factory"));
+            
Assert.assertEquals(properties.getProperty("kylin.second-storage.class"),
+                    properties2.get("kylin.second-storage.class"));
+            
Assert.assertEquals(properties.getProperty("kylin.streaming.enabled"),
+                    properties2.get("kylin.streaming.enabled"));
+        }
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index f73da6d1bc..5e21a23224 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -550,7 +550,7 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
         FileUtils.forceDelete(tmpDir);
     }
 
-    private void modifyDump(Properties props) {
+    protected void modifyDump(Properties props) {
         sparkJobHandler.modifyDump(props);
         removeUnNecessaryDump(props);
     }
@@ -577,6 +577,8 @@ public class NSparkExecutable extends AbstractExecutable 
implements ChainedStage
 
         
props.remove("kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions");
         
props.remove("kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions");
+
+        props.remove("kylin.extension.info.factory");
     }
 
     private void deleteSnapshotDirectoryOnExists() {

Reply via email to