This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f61397f1729d05d2d5e70c5e99387aa538cbc46b Author: Jiale He <jialeheb...@gmail.com> AuthorDate: Fri Jan 6 16:22:20 2023 +0800 [DIRTY] fix start streaming job throw NPE --- .../apache/kylin/rest/service/BasicService.java | 1 + .../kylin/rest/service/KafkaServiceTest.java | 5 ----- .../kylin/rest/service/StreamingJobService.java | 14 +++++++------- .../jobs/impl/StreamingJobLauncherTest.java | 22 ++++++++++++++++++---- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java index b349943496..e6cfd173f1 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -35,6 +35,7 @@ import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.streaming.DataParserManager; import org.apache.kylin.rest.response.EnvelopeResponse; diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java index 1ce8eef38f..78254267bc 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/KafkaServiceTest.java @@ -46,9 +46,7 @@ import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclUtil; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.mockito.Mock; import org.mockito.Mockito; import org.springframework.mock.web.MockMultipartFile; @@ -58,9 +56,6 @@ import lombok.val; public class KafkaServiceTest extends NLocalFileMetadataTestCase { - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Mock private final KafkaService kafkaService = Mockito.spy(KafkaService.class); diff --git a/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java b/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java index 689565b1c5..3658119bcd 100644 --- a/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java +++ b/src/streaming-service/src/main/java/org/apache/kylin/rest/service/StreamingJobService.java @@ -47,13 +47,6 @@ import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.metadata.model.SegmentRange; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.rest.response.DataResult; -import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.util.PagingUtil; import org.apache.kylin.metadata.cube.model.NDataLayout; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; @@ -64,17 +57,24 @@ import org.apache.kylin.metadata.cube.utils.StreamingUtils; import org.apache.kylin.metadata.model.FusionModelManager; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.streaming.StreamingJobRecord; import org.apache.kylin.metadata.streaming.StreamingJobRecordManager; import org.apache.kylin.metadata.streaming.StreamingJobStats; import org.apache.kylin.metadata.streaming.StreamingJobStatsManager; import org.apache.kylin.rest.request.StreamingJobActionEnum; import org.apache.kylin.rest.request.StreamingJobFilter; +import org.apache.kylin.rest.response.DataResult; import org.apache.kylin.rest.response.StreamingJobDataStatsResponse; import org.apache.kylin.rest.response.StreamingJobResponse; +import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.ModelUtils; +import org.apache.kylin.rest.util.PagingUtil; import org.apache.kylin.streaming.constants.StreamingConstants; import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler; import org.apache.kylin.streaming.manager.StreamingJobManager; diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java index 11640895c5..2bef1b243d 100644 --- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java +++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java @@ -47,7 +47,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import org.junit.runner.RunWith; @@ -70,8 +69,6 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public TestName testName = new TestName(); - @Rule - public ExpectedException thrown = ExpectedException.none(); @Before public void setUp() throws Exception { @@ -277,7 +274,7 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { } @Test - public void testLaunchBuildJobException_Yarn() throws Exception { + public void testLaunchBuildJobException_Yarn() { try { overwriteSystemProp("streaming.local", "true"); @@ -524,6 +521,23 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase { Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions")); } + @Test + public void testAddParserJar() throws Exception { + val modelId = "e78a89dd-847f-4574-8afa-8768b4228b72"; + val launcher = new StreamingJobLauncher(); + launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD); + val mockup = new MockupSparkLauncher(); + ReflectionTestUtils.setField(launcher, "launcher", mockup); + val mockLaunch = PowerMockito.spy(launcher); + PowerMockito.when(mockLaunch, "getParserName").thenReturn("io.kyligence.kap.parser.TimedJsonStreamParser2"); + DataParserInfo dataParserInfo = new DataParserInfo(PROJECT, DEFAULT_PARSER_NAME, "default"); + PowerMockito.when(mockLaunch, "getDataParser", Mockito.anyString()).thenReturn(dataParserInfo); + PowerMockito.doReturn("default").when(mockLaunch, "getParserJarPath", dataParserInfo); + ReflectionTestUtils.invokeMethod(mockLaunch, "addParserJar", mockup); + mockup.startApplication(); + Assert.assertTrue(mockup.jars.contains("default")); + } + static class MockupSparkLauncher extends SparkLauncher { private Map<String, String> sparkConf; private List<String> files;