KYLIN-1815 Clean up pacakges by removing spark and kafka
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b514ae16 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b514ae16 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b514ae16 Branch: refs/heads/stream_m1 Commit: b514ae16a1487537601f37427b84f864be4ad2f2 Parents: 8d8ec06 Author: Li Yang <liy...@apache.org> Authored: Thu Jun 23 12:00:50 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Jun 23 12:00:50 2016 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 6 +- core-common/pom.xml | 4 - .../apache/kylin/common/KylinConfigBase.java | 20 ++- core-job/pom.xml | 6 + .../realization/RealizationRegistry.java | 8 +- engine-spark/pom.xml | 5 + .../kylin/provision/BuildCubeWithSpark.java | 157 ------------------- 7 files changed, 28 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 8aabe16..d643b62 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -52,11 +52,6 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-spark</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-streaming</artifactId> <version>${project.parent.version}</version> </dependency> @@ -181,6 +176,7 @@ <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> + <scope>provided</scope> <!-- FIXME: Should be provided just like hive and hbase, inflates job jar from 9 MB to 21 MB --> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/core-common/pom.xml ---------------------------------------------------------------------- diff --git a/core-common/pom.xml b/core-common/pom.xml index f3708bd..80d2b4b 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -77,10 +77,6 @@ <artifactId>guava</artifactId> </dependency> <dependency> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </dependency> - <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 2a3a2a0..0d48dcd 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -88,7 +88,7 @@ abstract public class KylinConfigBase implements Serializable { public Properties getAllProperties() { return properties; } - + final protected Map<String, String> getPropertiesByPrefix(String prefix) { Map<String, String> result = Maps.newLinkedHashMap(); for (Entry<Object, Object> entry : getAllProperties().entrySet()) { @@ -202,6 +202,11 @@ abstract public class KylinConfigBase implements Serializable { return new StringBuffer(root).append(StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).append("/").toString(); } + public String[] getRealizationProviders() { + return getOptionalStringArray("kylin.realization.providers", // + new String[] { "org.apache.kylin.cube.CubeManager", "org.apache.kylin.storage.hybrid.HybridManager", "org.apache.kylin.invertedindex.IIManager" }); + } + public CliCommandExecutor getCliCommandExecutor() throws IOException { CliCommandExecutor exec = new CliCommandExecutor(); if (getRunAsRemoteCommand()) { @@ -399,7 +404,6 @@ abstract public class KylinConfigBase implements Serializable { setProperty("kylin.cluster.name", clusterName); } - public int getWorkersPerServer() { //for sequence sql use return Integer.parseInt(getOptional("kylin.rest.workers.per.server", "1")); @@ -413,11 +417,11 @@ abstract public class KylinConfigBase implements Serializable { return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60))); } - public double getJobCuboidSizeRatio(){ + public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.job.cuboid.size.ratio", "0.25")); } - public double getJobCuboidSizeMemHungryRatio(){ + public double getJobCuboidSizeMemHungryRatio() { return Double.parseDouble(getOptional("kylin.job.cuboid.size.memhungry.ratio", "0.05")); } @@ -692,11 +696,11 @@ abstract public class KylinConfigBase implements Serializable { public int getCubeStatsHLLPrecision() { return Integer.parseInt(getOptional("kylin.job.cubing.inmem.sampling.hll.precision", "14")); } - + public String getJobControllerLock() { return getOptional("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperJobLock"); } - + public Map<Integer, String> getJobEngines() { Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.job.engine.")); // ref constants in IEngineAware @@ -731,7 +735,6 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.enable.scheduler", "0")); } - public String getZookeeperAddress() { return this.getOptional("kylin.zookeeper.address"); } @@ -743,6 +746,7 @@ abstract public class KylinConfigBase implements Serializable { public String getRestAddress() { return this.getOptional("kylin.rest.address", "localhost:7070"); } + public void setRestAddress(String restAddress) { setProperty("kylin.rest.address", restAddress); } @@ -778,7 +782,7 @@ abstract public class KylinConfigBase implements Serializable { public long getStorageCleanupTimeThreshold() { return Long.valueOf(this.getOptional("kylin.storage.cleanup.time.threshold", "172800000")); //default two days } - + public int getAppendDictEntrySize() { return Integer.parseInt(getOptional("kylin.dict.append.entry.size", "10000000")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/core-job/pom.xml ---------------------------------------------------------------------- diff --git a/core-job/pom.xml b/core-job/pom.xml index d744058..8a1211f 100644 --- a/core-job/pom.xml +++ b/core-job/pom.xml @@ -45,6 +45,12 @@ <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> </dependency> <!-- Env & Test --> http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java index 4190052..e5b0033 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/RealizationRegistry.java @@ -26,8 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.KylinConfig; -import org.reflections.Reflections; -import org.reflections.scanners.SubTypesScanner; +import org.apache.kylin.common.util.ClassUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,10 +80,11 @@ public class RealizationRegistry { providers = Maps.newConcurrentMap(); // use reflection to load providers - final Set<Class<? extends IRealizationProvider>> realizationProviders = new Reflections("org.apache.kylin", new SubTypesScanner()).getSubTypesOf(IRealizationProvider.class); + String[] providerNames = config.getRealizationProviders(); List<Throwable> es = Lists.newArrayList(); - for (Class<? extends IRealizationProvider> cls : realizationProviders) { + for (String clsName : providerNames) { try { + Class<? extends IRealizationProvider> cls = ClassUtil.forName(clsName, IRealizationProvider.class); IRealizationProvider p = (IRealizationProvider) cls.getMethod("getInstance", KylinConfig.class).invoke(null, config); providers.put(p.getRealizationType(), p); http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/engine-spark/pom.xml ---------------------------------------------------------------------- diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml index 1e4f4df..207fc4d 100644 --- a/engine-spark/pom.xml +++ b/engine-spark/pom.xml @@ -67,6 +67,11 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + </dependency> + <!-- Hadoop dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/b514ae16/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java deleted file mode 100644 index 4424b89..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithSpark.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.provision; - -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.TimeZone; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractKylinTestCase; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.spark.SparkBatchCubingEngine; -import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.lock.MockJobLock; -import org.apache.kylin.job.manager.ExecutableManager; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import com.google.common.collect.Lists; - -//TODO: convert it to a normal class rather than a test case, like in BuildCubeWithEngine -@Ignore -public class BuildCubeWithSpark { - - private CubeManager cubeManager; - private DefaultScheduler scheduler; - protected ExecutableManager jobService; - - private static final Log logger = LogFactory.getLog(BuildCubeWithSpark.class); - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - @BeforeClass - public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); - if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); - } - - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - for (String jobId : jobService.getAllJobIds()) { - jobService.deleteJob(jobId); - } - scheduler = DefaultScheduler.createInstance(); - scheduler.init(new JobEngineConfig(kylinConfig), new MockJobLock()); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - - } - - @After - public void after() { - HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - @Test - public void test() throws Exception { - final CubeSegment segment = createSegment(); - String confPath = new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath(); - KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); - String coprocessor = KylinConfig.getInstanceFromEnv().getCoprocessorLocalJar(); - logger.info("confPath location:" + confPath); - logger.info("coprocessor location:" + coprocessor); - final DefaultChainedExecutable cubingJob = new SparkBatchCubingEngine(confPath, coprocessor).createBatchCubingJob(segment, "BuildCubeWithSpark"); - jobService.addJob(cubingJob); - waitForJob(cubingJob.getId()); - - if (jobService.getOutput(cubingJob.getId()).getState() != ExecutableState.SUCCEED) { - throw new RuntimeException("The job '" + cubingJob.getId() + "' is failed."); - } - } - - private void clearSegment(String cubeName) throws Exception { - CubeInstance cube = cubeManager.getCube(cubeName); - // remove all existing segments - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); - cubeManager.updateCube(cubeBuilder); - } - - private CubeSegment createSegment() throws Exception { - String cubeName = "test_kylin_cube_with_slr_left_join_empty"; - clearSegment(cubeName); - - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long dateStart = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); - long dateEnd = f.parse("2050-11-12").getTime(); - - // this cube's start date is 0, end date is 20501112000000 - List<String> result = Lists.newArrayList(); - return cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, dateEnd, 0, 0); - - } - -} \ No newline at end of file