Revert "Revert "refactor BuildCubeWithStream"" This reverts commit 8e9c4550bb562b497442b17eec6485ae96e848d8.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/be18158d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/be18158d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/be18158d Branch: refs/heads/KYLIN-1726-2 Commit: be18158dcc5ce739c272b9345d3b2296c3936ee3 Parents: 8cbffb4 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Sep 24 14:58:43 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 7 +- .../kylin/provision/BuildCubeWithStream.java | 10 +- .../kylin/provision/BuildCubeWithStream2.java | 145 +------------------ 3 files changed, 12 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 9b282e3..9e9df05 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -187,6 +187,7 @@ public class DeployUtil { File tmpFile = File.createTempFile(factTableName, "csv"); FileOutputStream out = new FileOutputStream(tmpFile); + InputStream tempIn = null; try { if (store.exists(factTablePath)) { InputStream oldContent = store.getResource(factTablePath).inputStream; @@ -194,13 +195,15 @@ public class DeployUtil { } IOUtils.copy(in, out); IOUtils.closeQuietly(in); + IOUtils.closeQuietly(out); store.deleteResource(factTablePath); - in = new FileInputStream(tmpFile); - store.putResource(factTablePath, in, System.currentTimeMillis()); + tempIn = new FileInputStream(tmpFile); + store.putResource(factTablePath, tempIn, System.currentTimeMillis()); } finally { IOUtils.closeQuietly(out); IOUtils.closeQuietly(in); + IOUtils.closeQuietly(tempIn); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 6e5313f..bfe1d0a 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -62,10 +62,10 @@ public class BuildCubeWithStream { private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class); - private CubeManager cubeManager; + protected CubeManager cubeManager; private DefaultScheduler scheduler; protected ExecutableManager jobService; - private static final String cubeName = "test_streaming_table_cube"; + static final String cubeName = "test_streaming_table_cube"; private KafkaConfig kafkaConfig; private MockKafka kafkaServer; @@ -114,13 +114,13 @@ public class BuildCubeWithStream { Assert.assertEquals(topicName, topicMetadata.topic()); } - private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { + protected void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); logger.info("Test data inserted into Kafka"); } - private void clearSegment(String cubeName) throws Exception { + protected void clearSegment(String cubeName) throws Exception { CubeInstance cube = cubeManager.getCube(cubeName); // remove all existing segments CubeUpdate cubeBuilder = new CubeUpdate(cube); @@ -187,7 +187,7 @@ public class BuildCubeWithStream { return job.getId(); } - private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { + protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/be18158d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java index 2812446..7959701 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java @@ -18,13 +18,11 @@ package org.apache.kylin.provision; -import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.List; import java.util.Random; import java.util.TimeZone; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,32 +30,9 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.streaming.StreamingConfig; -import org.apache.kylin.engine.streaming.StreamingManager; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.job.streaming.Kafka10DataLoader; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.BrokerConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,79 +42,12 @@ import static java.lang.Thread.sleep; /** * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently. */ -public class BuildCubeWithStream2 { +public class BuildCubeWithStream2 extends BuildCubeWithStream { private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class); - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - private static final String cubeName = "test_streaming_table_cube"; - - private KafkaConfig kafkaConfig; - private MockKafka kafkaServer; private static boolean generateData = true; - public void before() throws Exception { - deployEnv(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final String factTable = cubeInstance.getFactTable(); - - final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); - final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable); - kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName()); - - String topicName = UUID.randomUUID().toString(); - String localIp = NetworkUtils.getLocalIp(); - BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); - brokerConfig.setHost(localIp); - kafkaConfig.setTopic(topicName); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); - - startEmbeddedKafka(topicName, brokerConfig); - } - - private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { - //Start mock Kakfa - String zkConnectionStr = "sandbox:2181"; - ZkConnection zkConnection = new ZkConnection(zkConnectionStr); - // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); - kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); - kafkaServer.start(); - - kafkaServer.createTopic(topicName, 3, 1); - kafkaServer.waitTopicUntilReady(topicName); - - MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName); - Assert.assertEquals(topicName, topicMetadata.topic()); - } - - private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { - if (numberOfRecords <= 0) - return; - Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); - logger.info("Test data inserted into Kafka"); - } - - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - + @Override public void build() throws Exception { clearSegment(cubeName); SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); @@ -204,55 +112,6 @@ public class BuildCubeWithStream2 { } - - private ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); - DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); - jobService.addJob(job); - waitForJob(job.getId()); - return job.getStatus(); - } - - protected void deployEnv() throws IOException { - DeployUtil.overrideJobJarLocations(); - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - } - - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); - if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - } - - public static void afterClass() throws Exception { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - public void after() { - kafkaServer.stop(); - DefaultScheduler.destroyInstance(); - } - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR || job.getStatus() == ExecutableState.DISCARDED) { - break; - } else { - try { - sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - public static void main(String[] args) throws Exception { try { beforeClass();