This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 73e0104fc7f14c12fc4e85bbc1427501b8c001bb Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Fri Jun 16 16:31:57 2023 +0800 fix async query error --- .../apache/kylin/query/engine/AsyncQueryJob.java | 17 ++++++-- .../kylin/query/engine/AsyncQueryJobTest.java | 46 ++++++++++++++++++++-- .../kylin/engine/spark/job/NSparkExecutable.java | 4 +- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java index f3430aa361..5676e7259a 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/AsyncQueryJob.java @@ -23,6 +23,7 @@ import static org.apache.kylin.query.util.AsyncQueryUtil.ASYNC_QUERY_JOB_ID_PRE; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -30,6 +31,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.extension.KylinInfoExtension; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.metadata.MetadataStore; import org.apache.kylin.common.util.BufferedLogger; @@ -38,6 +40,9 @@ import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.spark.job.DefaultSparkBuildJobHandler; import org.apache.kylin.engine.spark.job.NSparkExecutable; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.JobTypeEnum; @@ -49,9 +54,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; import lombok.val; @@ -197,4 +199,13 @@ public class AsyncQueryJob extends NSparkExecutable { } return HadoopUtil.getHadoopConfDir(); } + + @Override + public void modifyDump(Properties props) { + super.modifyDump(props); + if (!KylinInfoExtension.getFactory().checkKylinInfo()) { + props.setProperty("kylin.streaming.enabled", KylinConfig.FALSE); + props.remove("kylin.second-storage.class"); + } + } } diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java index e424e78d39..4ba275bf48 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/AsyncQueryJobTest.java @@ -41,12 +41,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.extension.KylinInfoExtension; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.ShellException; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.guava30.shaded.common.io.ByteSource; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.query.util.QueryParams; @@ -57,10 +61,7 @@ import org.junit.Test; import org.mockito.Mockito; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; -import org.apache.kylin.guava30.shaded.common.io.ByteSource; import lombok.val; public class AsyncQueryJobTest extends NLocalFileMetadataTestCase { @@ -316,4 +317,43 @@ public class AsyncQueryJobTest extends NLocalFileMetadataTestCase { Assert.assertTrue(executeResult.output().contains("--conf 'spark.executor.cores=3'")); } } + + @Test + public void testModifyDump() { + AsyncQueryJob asyncQueryJob = new AsyncQueryJob() { + @Override + protected ExecuteResult runSparkSubmit(String hadoopConf, String kylinJobJar, String appArgs) { + return ExecuteResult.createSucceed(); + } + }; + + val properties = new Properties(); + properties.setProperty("kylin.extension.info.factory", + "org.apache.kylin.common.extension.KylinInfoExtension$Factory"); + properties.setProperty("kylin.second-storage.class", + "org.apache.kylin.common.extension.KylinInfoExtension$Factory"); + properties.setProperty("kylin.streaming.enabled", "true"); + try (val mockedStatic = Mockito.mockStatic(KylinInfoExtension.class)) { + val kylinInfoExtensionFactory = Mockito.mock(KylinInfoExtension.Factory.class); + mockedStatic.when(KylinInfoExtension::getFactory).thenReturn(kylinInfoExtensionFactory); + + Mockito.when(kylinInfoExtensionFactory.checkKylinInfo()).thenReturn(false); + val properties1 = new Properties(); + properties1.putAll(properties); + asyncQueryJob.modifyDump(properties1); + Assert.assertNull(properties1.get("kylin.extension.info.factory")); + Assert.assertNull(properties1.get("kylin.second-storage.class")); + Assert.assertEquals("false", properties1.get("kylin.streaming.enabled")); + + Mockito.when(kylinInfoExtensionFactory.checkKylinInfo()).thenReturn(true); + val properties2 = new Properties(); + properties2.putAll(properties); + asyncQueryJob.modifyDump(properties2); + Assert.assertNull(properties2.get("kylin.extension.info.factory")); + Assert.assertEquals(properties.getProperty("kylin.second-storage.class"), + properties2.get("kylin.second-storage.class")); + Assert.assertEquals(properties.getProperty("kylin.streaming.enabled"), + properties2.get("kylin.streaming.enabled")); + } + } } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index f73da6d1bc..5e21a23224 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -550,7 +550,7 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage FileUtils.forceDelete(tmpDir); } - private void modifyDump(Properties props) { + protected void modifyDump(Properties props) { sparkJobHandler.modifyDump(props); removeUnNecessaryDump(props); } @@ -577,6 +577,8 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage props.remove("kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions"); props.remove("kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions"); + + props.remove("kylin.extension.info.factory"); } private void deleteSnapshotDirectoryOnExists() {