This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f61397f1729d05d2d5e70c5e99387aa538cbc46b
Author: Jiale He <jialeheb...@gmail.com>
AuthorDate: Fri Jan 6 16:22:20 2023 +0800

    [DIRTY] fix start streaming job throw NPE
---
 .../apache/kylin/rest/service/BasicService.java    |  1 +
 .../kylin/rest/service/KafkaServiceTest.java       |  5 -----
 .../kylin/rest/service/StreamingJobService.java    | 14 +++++++-------
 .../jobs/impl/StreamingJobLauncherTest.java        | 22 ++++++++++++++++++----
 4 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java
index b349943496..e6cfd173f1 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -35,6 +35,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.streaming.DataParserManager;
 import org.apache.kylin.rest.response.EnvelopeResponse;
diff --git 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java
 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java
index 1ce8eef38f..78254267bc 100644
--- 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java
+++ 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java
@@ -46,9 +46,7 @@ import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclUtil;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.springframework.mock.web.MockMultipartFile;
@@ -58,9 +56,6 @@ import lombok.val;
 
 public class KafkaServiceTest extends NLocalFileMetadataTestCase {
 
-    @Rule
-    public ExpectedException expectedException = ExpectedException.none();
-
     @Mock
     private final KafkaService kafkaService = Mockito.spy(KafkaService.class);
 
diff --git 
a/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java
 
b/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java
index 689565b1c5..3658119bcd 100644
--- 
a/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java
+++ 
b/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java
@@ -47,13 +47,6 @@ import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.rest.response.DataResult;
-import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.rest.util.PagingUtil;
 import org.apache.kylin.metadata.cube.model.NDataLayout;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
@@ -64,17 +57,24 @@ import org.apache.kylin.metadata.cube.utils.StreamingUtils;
 import org.apache.kylin.metadata.model.FusionModelManager;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.streaming.StreamingJobRecord;
 import org.apache.kylin.metadata.streaming.StreamingJobRecordManager;
 import org.apache.kylin.metadata.streaming.StreamingJobStats;
 import org.apache.kylin.metadata.streaming.StreamingJobStatsManager;
 import org.apache.kylin.rest.request.StreamingJobActionEnum;
 import org.apache.kylin.rest.request.StreamingJobFilter;
+import org.apache.kylin.rest.response.DataResult;
 import org.apache.kylin.rest.response.StreamingJobDataStatsResponse;
 import org.apache.kylin.rest.response.StreamingJobResponse;
+import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.ModelUtils;
+import org.apache.kylin.rest.util.PagingUtil;
 import org.apache.kylin.streaming.constants.StreamingConstants;
 import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
 import org.apache.kylin.streaming.manager.StreamingJobManager;
diff --git 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
index 11640895c5..2bef1b243d 100644
--- 
a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
+++ 
b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
@@ -47,7 +47,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
@@ -70,8 +69,6 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
     public TemporaryFolder temporaryFolder = new TemporaryFolder();
     @Rule
     public TestName testName = new TestName();
-    @Rule
-    public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setUp() throws Exception {
@@ -277,7 +274,7 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testLaunchBuildJobException_Yarn() throws Exception {
+    public void testLaunchBuildJobException_Yarn() {
 
         try {
             overwriteSystemProp("streaming.local", "true");
@@ -524,6 +521,23 @@ public class StreamingJobLauncherTest extends 
NLocalFileMetadataTestCase {
         
Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions"));
     }
 
+    @Test
+    public void testAddParserJar() throws Exception {
+        val modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
+        val launcher = new StreamingJobLauncher();
+        launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
+        val mockup = new MockupSparkLauncher();
+        ReflectionTestUtils.setField(launcher, "launcher", mockup);
+        val mockLaunch = PowerMockito.spy(launcher);
+        PowerMockito.when(mockLaunch, 
"getParserName").thenReturn("io.kyligence.kap.parser.TimedJsonStreamParser2");
+        DataParserInfo dataParserInfo = new DataParserInfo(PROJECT, 
DEFAULT_PARSER_NAME, "default");
+        PowerMockito.when(mockLaunch, "getDataParser", 
Mockito.anyString()).thenReturn(dataParserInfo);
+        PowerMockito.doReturn("default").when(mockLaunch, "getParserJarPath", 
dataParserInfo);
+        ReflectionTestUtils.invokeMethod(mockLaunch, "addParserJar", mockup);
+        mockup.startApplication();
+        Assert.assertTrue(mockup.jars.contains("default"));
+    }
+
     static class MockupSparkLauncher extends SparkLauncher {
         private Map<String, String> sparkConf;
         private List<String> files;

Reply via email to