KYLIN-1726 Scalable streaming cubing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/acde3396 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/acde3396 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/acde3396
Branch: refs/heads/master Commit: acde339623d43fa9b441614bc64ca7960e9255fe Parents: a2b693c Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Jul 3 21:43:16 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Aug 10 10:10:10 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 11 ++++---- .../kylin/job/streaming/KafkaDataLoader.java | 11 ++++++-- .../kylin/job/streaming/StreamDataLoader.java | 22 +++++++++++++++ .../java/org/apache/kylin/source/ISource.java | 4 +++ .../org/apache/kylin/source/SourceFactory.java | 5 ++++ .../kylin/engine/mr/BatchCubingJobBuilder.java | 1 + .../kylin/engine/mr/JobBuilderSupport.java | 2 +- .../engine/mr/common/AbstractHadoopJob.java | 28 ++++++++++++++++++++ .../apache/kylin/engine/mr/steps/CuboidJob.java | 1 + .../engine/mr/steps/FactDistinctColumnsJob.java | 3 +++ .../kylin/engine/mr/steps/InMemCuboidJob.java | 1 + .../test_streaming_table_cube_desc.json | 3 +++ .../kafka/DEFAULT.STREAMING_TABLE.json | 2 +- .../kafka/default.streaming_table.json | 2 +- .../streaming/DEFAULT.STREAMING_TABLE.json | 2 +- .../streaming/default.streaming_table.json | 2 +- .../table/DEFAULT.STREAMING_TABLE.json | 3 ++- .../kylin/provision/BuildCubeWithStream.java | 3 ++- .../apache/kylin/source/hive/HiveSource.java | 8 ++++++ 19 files changed, 99 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 6128770..986edf6 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.dataGen.FactTableGenerator; -import org.apache.kylin.job.streaming.KafkaDataLoader; +import org.apache.kylin.job.streaming.StreamDataLoader; import org.apache.kylin.job.streaming.StreamingTableDataGenerator; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; @@ -45,7 +45,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveClient; import org.apache.kylin.source.hive.HiveCmdBuilder; import org.apache.kylin.source.kafka.TimedJsonStreamParser; -import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; import org.slf4j.Logger; @@ -148,15 +147,15 @@ public class DeployUtil { deployHiveTables(); } - public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException { + public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable()); List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable()); TableDesc tableDesc = cubeInstance.getFactTableDesc(); //load into kafka - KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data); - KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2); - logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic()); + streamDataLoader.loadIntoKafka(data); + streamDataLoader.loadIntoKafka(data2); + logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString()); //csv data for H2 use List<TblColRef> tableColumns = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java index 5242ff2..0eaae20 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java @@ -30,6 +30,7 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import org.apache.kylin.source.kafka.config.KafkaConfig; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; @@ -38,9 +39,15 @@ import kafka.producer.ProducerConfig; /** * Load prepared data into kafka(for test use) */ -public class KafkaDataLoader { +public class KafkaDataLoader extends StreamDataLoader { + List<KafkaClusterConfig> kafkaClusterConfigs; - public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) { + public KafkaDataLoader(KafkaConfig kafkaConfig) { + super(kafkaConfig); + this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs(); + } + + public void loadIntoKafka(List<String> messages) { KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java new file mode 100644 index 0000000..50fc883 --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java @@ -0,0 +1,22 @@ +package org.apache.kylin.job.streaming; + +import org.apache.kylin.source.kafka.config.KafkaClusterConfig; +import org.apache.kylin.source.kafka.config.KafkaConfig; + +import java.util.List; + +/** + */ +public abstract class StreamDataLoader { + protected KafkaConfig kafkaConfig; + public StreamDataLoader(KafkaConfig kafkaConfig) { + this.kafkaConfig = kafkaConfig; + } + + abstract public void loadIntoKafka(List<String> messages); + + @Override + public String toString() { + return "kafka topic " + kafkaConfig.getTopic(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/ISource.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 3cd8a02..e9216f9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -20,9 +20,13 @@ package org.apache.kylin.source; import org.apache.kylin.metadata.model.TableDesc; +import java.util.List; + public interface ISource { public <I> I adaptToBuildEngine(Class<I> engineInterface); public ReadableTable createReadableTable(TableDesc tableDesc); + + public List<String> getMRDependentResources(TableDesc table); } http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index f701a0f..e82c6ed 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -18,6 +18,7 @@ package org.apache.kylin.source; +import java.util.List; import java.util.Map; import org.apache.kylin.common.KylinConfig; @@ -45,4 +46,8 @@ public class SourceFactory { return tableSource(table).adaptToBuildEngine(engineInterface); } + public static List<String> getMRDependentResources(TableDesc table) { + return tableSource(table).getMRDependentResources(table); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index ec9b1c6..5a098a8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 3e9aff6..86451c9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -73,7 +73,7 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); - + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); result.setMapReduceParams(cmd.toString()); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 02928e0..04ecc71 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; @@ -67,6 +68,7 @@ import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,6 +166,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); + String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency"); logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); Configuration jobConf = job.getConfiguration(); @@ -221,6 +224,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } + // for hive dependencies + if (kylinKafkaDependency != null) { + kylinKafkaDependency = kylinKafkaDependency.replace(":", ","); + + logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency); + + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(kylinKafkaDependency); + } else { + + logger.info("No Kafka dependency jars set in the environment, will find them from jvm:"); + + try { + String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); + kylinDependency.append(kafkaClientJarPath).append(","); + logger.info("kafka jar file: " + kafkaClientJarPath); + + } catch (ClassNotFoundException e) { + logger.error("Cannot found kafka dependency jars: " + e); + } + } + // for KylinJobMRLibDir String mrLibDir = kylinConf.getKylinJobMRLibDir(); if (!StringUtils.isBlank(mrLibDir)) { @@ -442,6 +468,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (String tableName : cube.getDescriptor().getModel().getAllTables()) { TableDesc table = metaMgr.getTableDesc(tableName); dumpList.add(table.getResourcePath()); + List<String> dependentResources = SourceFactory.getMRDependentResources(table); + dumpList.addAll(dependentResources); } for (CubeSegment segment : cube.getSegments()) { dumpList.addAll(segment.getDictionaryPaths()); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index f037d2e..f3524f8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob { } job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 39aae72..f091ab9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -54,6 +54,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_SEGMENT_NAME); options.addOption(OPTION_STATISTICS_ENABLED); @@ -62,6 +63,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index f5076e4..510dbe8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -95,6 +95,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { } job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index 0267db5..23e5b00 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -105,6 +105,9 @@ "joint_dims" : [ ] } } ], + "override_kylin_properties": { + "kylin.cube.algorithm": "random" + }, "notify_list" : [ ], "status_need_notify" : [ ], "auto_merge_time_ranges" : null, http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json index c97927d..6a64cce 100644 --- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json +++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json @@ -1,7 +1,7 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "topic": "test_streaming_table_topic_xyz", "timeout": 60000, "bufferSize": 65536, http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json index c97927d..6a64cce 100644 --- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json +++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json @@ -1,7 +1,7 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "topic": "test_streaming_table_topic_xyz", "timeout": 60000, "bufferSize": 65536, http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json index 6eb4a88..85a477b 100644 --- a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json +++ b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json @@ -1,6 +1,6 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "type": "kafka", "last_modified": 0 } http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json index 6eb4a88..85a477b 100644 --- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json +++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json @@ -1,6 +1,6 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "type": "kafka", "last_modified": 0 } http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json index 82f6fdb..5bcfa35 100644 --- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json +++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json @@ -41,5 +41,6 @@ ], "database": "DEFAULT", "source_type": 1, - "last_modified": 0 + "last_modified": 0, + "source_type" : 1 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 95f0f3d..9490560 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.streaming.KafkaDataLoader; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -99,7 +100,7 @@ public class BuildCubeWithStream { streamingConfig.setTopic(UUID.randomUUID().toString()); KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig)); } public void cleanup() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index b7dbff0..e9cebea 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,11 +18,14 @@ package org.apache.kylin.source.hive; +import com.google.common.collect.Lists; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; +import java.util.List; + //used by reflection public class HiveSource implements ISource { @@ -41,4 +44,9 @@ public class HiveSource implements ISource { return new HiveTable(tableDesc); } + @Override + public List<String> getMRDependentResources(TableDesc table) { + return Lists.newArrayList(); + } + }