KYLIN-1726 Scalable streaming cubing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a9c75eae Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a9c75eae Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a9c75eae
Branch: refs/heads/stream_m1 Commit: a9c75eaefc0cce4e1073cf4b90a830c6876d599b Parents: cd116a6 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Jul 3 21:43:16 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Jul 3 21:43:16 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 11 ++++---- .../kylin/job/streaming/KafkaDataLoader.java | 11 ++++++-- .../kylin/job/streaming/StreamDataLoader.java | 22 +++++++++++++++ .../java/org/apache/kylin/source/ISource.java | 4 +++ .../org/apache/kylin/source/SourceFactory.java | 5 ++++ .../kylin/engine/mr/BatchCubingJobBuilder.java | 1 + .../kylin/engine/mr/JobBuilderSupport.java | 2 +- .../engine/mr/common/AbstractHadoopJob.java | 28 ++++++++++++++++++++ .../apache/kylin/engine/mr/steps/CuboidJob.java | 1 + .../engine/mr/steps/FactDistinctColumnsJob.java | 3 +++ .../kylin/engine/mr/steps/InMemCuboidJob.java | 1 + .../test_streaming_table_cube_desc.json | 3 +++ .../kafka/default.streaming_table.json | 2 +- .../streaming/default.streaming_table.json | 2 +- .../table/DEFAULT.STREAMING_TABLE.json | 3 ++- .../kylin/provision/BuildCubeWithStream.java | 3 ++- .../apache/kylin/source/hive/HiveSource.java | 8 ++++++ 17 files changed, 97 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index d56dd64..5357faa 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -39,7 +39,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.job.dataGen.FactTableGenerator; -import org.apache.kylin.job.streaming.KafkaDataLoader; +import org.apache.kylin.job.streaming.StreamDataLoader; import org.apache.kylin.job.streaming.StreamingTableDataGenerator; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; @@ -48,7 +48,6 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveClient; import org.apache.kylin.source.hive.HiveCmdBuilder; import org.apache.kylin.source.kafka.TimedJsonStreamParser; -import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; import org.slf4j.Logger; @@ -150,15 +149,15 @@ public class DeployUtil { deployHiveTables(); } - public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException { + public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable()); List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable()); TableDesc tableDesc = cubeInstance.getFactTableDesc(); //load into kafka - KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data); - KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2); - logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic()); + streamDataLoader.loadIntoKafka(data); + streamDataLoader.loadIntoKafka(data2); + logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString()); //csv data for H2 use List<TblColRef> tableColumns = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java index 31fc670..756ef36 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java @@ -34,13 +34,20 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import org.apache.kylin.source.kafka.config.KafkaConfig; /** * Load prepared data into kafka(for test use) */ -public class KafkaDataLoader { +public class KafkaDataLoader extends StreamDataLoader { + List<KafkaClusterConfig> kafkaClusterConfigs; - public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) { + public KafkaDataLoader(KafkaConfig kafkaConfig) { + super(kafkaConfig); + this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs(); + } + + public void loadIntoKafka(List<String> messages) { KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java new file mode 100644 index 0000000..50fc883 --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java @@ -0,0 +1,22 @@ +package org.apache.kylin.job.streaming; + +import org.apache.kylin.source.kafka.config.KafkaClusterConfig; +import org.apache.kylin.source.kafka.config.KafkaConfig; + +import java.util.List; + +/** + */ +public abstract class StreamDataLoader { + protected KafkaConfig kafkaConfig; + public StreamDataLoader(KafkaConfig kafkaConfig) { + this.kafkaConfig = kafkaConfig; + } + + abstract public void loadIntoKafka(List<String> messages); + + @Override + public String toString() { + return "kafka topic " + kafkaConfig.getTopic(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/core-metadata/src/main/java/org/apache/kylin/source/ISource.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 3cd8a02..e9216f9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -20,9 +20,13 @@ package org.apache.kylin.source; import org.apache.kylin.metadata.model.TableDesc; +import java.util.List; + public interface ISource { public <I> I adaptToBuildEngine(Class<I> engineInterface); public ReadableTable createReadableTable(TableDesc tableDesc); + + public List<String> getMRDependentResources(TableDesc table); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index f701a0f..e82c6ed 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -18,6 +18,7 @@ package org.apache.kylin.source; +import java.util.List; import java.util.Map; import org.apache.kylin.common.KylinConfig; @@ -45,4 +46,8 @@ public class SourceFactory { return tableSource(table).adaptToBuildEngine(engineInterface); } + public static List<String> getMRDependentResources(TableDesc table) { + return tableSource(table).getMRDependentResources(table); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index ec9b1c6..5a098a8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index d7676f1..830de9e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -76,7 +76,7 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); - + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); result.setMapReduceParams(cmd.toString()); return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 5472928..26592cd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; @@ -68,6 +69,7 @@ import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,6 +168,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); + String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency"); logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); Configuration jobConf = job.getConfiguration(); @@ -223,6 +226,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } + // for hive dependencies + if (kylinKafkaDependency != null) { + kylinKafkaDependency = kylinKafkaDependency.replace(":", ","); + + logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency); + + if (kylinDependency.length() > 0) + kylinDependency.append(","); + kylinDependency.append(kylinKafkaDependency); + } else { + + logger.info("No Kafka dependency jars set in the environment, will find them from jvm:"); + + try { + String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); + kylinDependency.append(kafkaClientJarPath).append(","); + logger.info("kafka jar file: " + kafkaClientJarPath); + + } catch (ClassNotFoundException e) { + logger.error("Cannot found kafka dependency jars: " + e); + } + } + // for KylinJobMRLibDir String mrLibDir = kylinConf.getKylinJobMRLibDir(); if (!StringUtils.isBlank(mrLibDir)) { @@ -433,6 +459,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { for (String tableName : cube.getDescriptor().getModel().getAllTables()) { TableDesc table = metaMgr.getTableDesc(tableName); dumpList.add(table.getResourcePath()); + List<String> dependentResources = SourceFactory.getMRDependentResources(table); + dumpList.addAll(dependentResources); } for (CubeSegment segment : cube.getSegments()) { dumpList.addAll(segment.getDictionaryPaths()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 85ae9c7..d1893f4 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob { } job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 90253ba..1bbe893 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -55,6 +55,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_SEGMENT_NAME); options.addOption(OPTION_STATISTICS_ENABLED); @@ -63,6 +64,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String job_id = getOptionValue(OPTION_CUBING_JOB_ID); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id); String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 4b2ff37..62964bb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -97,6 +97,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { } job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId); logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index 0267db5..23e5b00 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -105,6 +105,9 @@ "joint_dims" : [ ] } } ], + "override_kylin_properties": { + "kylin.cube.algorithm": "random" + }, "notify_list" : [ ], "status_need_notify" : [ ], "auto_merge_time_ranges" : null, http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/kafka/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json index c97927d..6a64cce 100644 --- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json +++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json @@ -1,7 +1,7 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "topic": "test_streaming_table_topic_xyz", "timeout": 60000, "bufferSize": 65536, http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/streaming/default.streaming_table.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json index 6eb4a88..85a477b 100644 --- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json +++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json @@ -1,6 +1,6 @@ { "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193", - "name": "default.streaming_table", + "name": "DEFAULT.STREAMING_TABLE", "type": "kafka", "last_modified": 0 } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json index 6ead441..bba4b8b 100644 --- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json +++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json @@ -40,5 +40,6 @@ } ], "database": "DEFAULT", - "last_modified": 0 + "last_modified": 0, + "source_type" : 1 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index c426ea4..314276e 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.streaming.KafkaDataLoader; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -98,7 +99,7 @@ public class BuildCubeWithStream { streamingConfig.setTopic(UUID.randomUUID().toString()); KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig)); } public static void afterClass() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/a9c75eae/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index b7dbff0..e9cebea 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,11 +18,14 @@ package org.apache.kylin.source.hive; +import com.google.common.collect.Lists; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; +import java.util.List; + //used by reflection public class HiveSource implements ISource { @@ -41,4 +44,9 @@ public class HiveSource implements ISource { return new HiveTable(tableDesc); } + @Override + public List<String> getMRDependentResources(TableDesc table) { + return Lists.newArrayList(); + } + }