Merge commit '727920b4d3642aaa3657d90b7f3dce7dcdd68fe2'
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b50030da Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b50030da Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b50030da Branch: refs/heads/2622-2764 Commit: b50030da4cd64d424a82e9d2b752c00917e8e681 Parents: 068baf6 727920b Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Aug 31 15:42:16 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Aug 31 21:33:14 2017 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 2 +- atopcalcite/pom.xml | 2 +- core-common/pom.xml | 2 +- .../org/apache/kylin/common/KylinConfig.java | 57 ++- .../apache/kylin/common/KylinConfigBase.java | 1 + .../org/apache/kylin/common/KylinConfigExt.java | 2 +- .../org/apache/kylin/common/KylinVersion.java | 4 +- .../org/apache/kylin/common/util/ClassUtil.java | 13 +- core-cube/pom.xml | 2 +- .../kylin/cube/gridtable/CubeCodeSystem.java | 4 + .../org/apache/kylin/gridtable/GTRecord.java | 8 + .../kylin/gridtable/GTSampleCodeSystem.java | 4 + .../apache/kylin/gridtable/IGTCodeSystem.java | 3 + core-dictionary/pom.xml | 2 +- core-job/pom.xml | 2 +- .../job/execution/DefaultChainedExecutable.java | 13 +- .../kylin/job/execution/ExecutableManager.java | 12 +- core-metadata/pom.xml | 2 +- .../measure/bitmap/BitmapCounterFactory.java | 2 + .../kylin/measure/bitmap/BitmapSerializer.java | 38 +- .../measure/bitmap/RoaringBitmapCounter.java | 10 + .../bitmap/RoaringBitmapCounterFactory.java | 5 + .../measure/percentile/PercentileCounter.java | 28 +- .../metadata/datatype/DataTypeSerializer.java | 13 + .../percentile/PercentileCounterTest.java | 46 --- core-storage/pom.xml | 2 +- .../gtrecord/GTCubeStorageQueryBase.java | 53 ++- .../gtrecord/SequentialCubeTupleIterator.java | 17 +- engine-mr/pom.xml | 2 +- .../engine/mr/common/AbstractHadoopJob.java | 128 +++--- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../kylin/engine/mr/common/HadoopCmdOutput.java | 19 +- .../engine/mr/common/JobInfoConverter.java | 7 +- .../engine/mr/common/JobRelatedMetaUtil.java | 72 ++++ engine-spark/pom.xml | 2 +- .../engine/spark/KylinKryoRegistrator.java | 11 +- .../spark/SparkBatchCubingJobBuilder2.java | 5 + .../kylin/engine/spark/SparkCubingByLayer.java | 392 +++++++++++-------- .../kylin/engine/spark/SparkExecutable.java | 58 ++- .../spark/util/PercentileCounterSerializer.java | 55 +++ .../spark/util/PercentileSerializerTest.java | 67 ++++ .../acl/0928468a-9fab-4185-9a14-6f2e7c74823f | 27 ++ .../acl/2fbca32a-a33e-4b69-83dd-0bb8b1f8c53b | 27 ++ .../acl/2fbca32a-a33e-4b69-83dd-0bb8b1f8c91b | 24 ++ jdbc/pom.xml | 2 +- kylin-it/pom.xml | 18 +- .../job/ITDistributedSchedulerBaseTest.java | 2 + .../jdbc/ITJdbcSourceTableLoaderTest.java | 114 ++++++ .../source/jdbc/ITJdbcTableReaderTest.java | 109 ++++++ .../test/resources/query/sql_limit/query05.sql | 21 + pom.xml | 2 +- query/pom.xml | 2 +- .../query/adhoc/PushDownRunnerJdbcImpl.java | 2 - .../kylin/query/relnode/OLAPAggregateRel.java | 4 +- .../kylin/query/relnode/OLAPWindowRel.java | 2 - .../kylin/query/routing/RealizationChooser.java | 7 +- server-base/pom.xml | 2 +- .../kylin/rest/controller/CubeController.java | 49 ++- .../rest/controller2/CubeControllerV2.java | 19 +- .../apache/kylin/rest/service/AclService.java | 4 +- .../rest/service/AclTableMigrationTool.java | 13 +- .../apache/kylin/rest/service/JobService.java | 4 + .../apache/kylin/rest/service/QueryService.java | 4 +- .../kylin/rest/util/Log4jConfigListener.java | 1 + server/pom.xml | 2 +- server/src/main/resources/kylinSecurity.xml | 2 + .../kylin/rest/service/JobServiceTest.java | 33 ++ source-hive/pom.xml | 7 +- .../HiveColumnCardinalityUpdateJob.java | 3 +- .../apache/kylin/source/jdbc/JdbcExplorer.java | 6 +- source-kafka/pom.xml | 2 +- .../kylin/source/kafka/DateTimeParser.java | 62 +++ storage-hbase/pom.xml | 2 +- .../kylin/storage/hbase/HBaseConnection.java | 2 +- .../kylin/storage/hbase/HBaseResourceStore.java | 7 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 1 + .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- .../hbase/cube/v2/HBaseReadonlyStore.java | 33 +- .../coprocessor/endpoint/CubeVisitService.java | 20 +- .../endpoint/generated/CubeVisitProtos.java | 151 +++++-- .../endpoint/protobuf/CubeVisit.proto | 1 + .../storage/hbase/steps/CreateHTableJob.java | 3 +- .../steps/HDFSPathGarbageCollectionStep.java | 2 +- .../hbase/util/DeployCoprocessorCLI.java | 5 +- tomcat-ext/pom.xml | 2 +- tool-assembly/pom.xml | 2 +- tool/pom.xml | 2 +- webapp/app/css/AdminLTE.css | 6 +- webapp/app/js/controllers/access.js | 1 - webapp/app/js/controllers/cubeAdvanceSetting.js | 8 +- webapp/app/js/controllers/cubeEdit.js | 52 +++ webapp/app/js/controllers/cubeMeasures.js | 68 +++- .../js/controllers/modelConditionsSettings.js | 9 + webapp/app/js/controllers/modelDataModel.js | 3 +- webapp/app/js/controllers/sourceMeta.js | 2 - webapp/app/js/model/cubeDescModel.js | 6 - webapp/app/js/model/cubeListModel.js | 10 +- webapp/app/js/model/jobListModel.js | 4 - webapp/app/js/model/streamingListModel.js | 6 - webapp/app/less/layout.less | 2 +- webapp/app/partials/admin/admin.html | 16 +- .../cubeDesigner/advanced_settings.html | 55 ++- webapp/app/partials/cubeDesigner/info.html | 2 +- webapp/app/partials/cubeDesigner/measures.html | 22 +- webapp/app/partials/cubes/cube_detail.html | 4 +- webapp/app/partials/cubes/cube_json_edit.html | 2 +- .../modelDesigner/conditions_settings.html | 1 + .../app/partials/modelDesigner/data_model.html | 4 +- .../app/partials/modelDesigner/model_info.html | 2 +- .../app/partials/projects/project_create.html | 2 +- .../app/partials/projects/project_detail.html | 2 +- 111 files changed, 1692 insertions(+), 576 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/assembly/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --cc core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index cc08056,1d5e0ec..dfb9a28 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@@ -59,27 -60,27 +60,47 @@@ public class KylinConfig extends KylinC // thread-local instances, will override SYS_ENV_INSTANCE private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new ThreadLocal<>(); + + static { + /* + * Make Calcite to work with Unicode. + * + * Sets default char set for string literals in SQL and row types of + * RelNode. This is more a label used to compare row type equality. For + * both SQL string and row record, they are passed to Calcite in String + * object and does not require additional codec. + * + * Ref SaffronProperties.defaultCharset + * Ref SqlUtil.translateCharacterSetName() + * Ref NlsString constructor() + */ + // copied from org.apache.calcite.util.ConversionUtil.NATIVE_UTF16_CHARSET_NAME + String NATIVE_UTF16_CHARSET_NAME = (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ? "UTF-16BE" : "UTF-16LE"; + System.setProperty("saffron.default.charset", NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.nationalcharset", NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.collation.name", NATIVE_UTF16_CHARSET_NAME + "$en_US"); + } + static { + /* + * Make Calcite to work with Unicode. + * + * Sets default char set for string literals in SQL and row types of + * RelNode. This is more a label used to compare row type equality. For + * both SQL string and row record, they are passed to Calcite in String + * object and does not require additional codec. + * + * Ref SaffronProperties.defaultCharset + * Ref SqlUtil.translateCharacterSetName() + * Ref NlsString constructor() + */ + // copied from org.apache.calcite.util.ConversionUtil.NATIVE_UTF16_CHARSET_NAME + String NATIVE_UTF16_CHARSET_NAME = (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) ? "UTF-16BE" : "UTF-16LE"; + System.setProperty("saffron.default.charset", NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.nationalcharset", NATIVE_UTF16_CHARSET_NAME); + System.setProperty("saffron.default.collation.name", NATIVE_UTF16_CHARSET_NAME + "$en_US"); + } + public static KylinConfig getInstanceFromEnv() { synchronized (KylinConfig.class) { KylinConfig config = THREAD_ENV_INSTANCE.get(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java ---------------------------------------------------------------------- diff --cc core-common/src/main/java/org/apache/kylin/common/KylinVersion.java index c6913fc,ad23421..e52feab --- a/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinVersion.java @@@ -95,7 -89,7 +95,7 @@@ public class KylinVersion implements Co /** * Require MANUAL updating kylin version per ANY upgrading. */ - private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("2.1.0.20403"); - private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("2.2.0"); ++ private static final KylinVersion CURRENT_KYLIN_VERSION = new KylinVersion("2.2.0.20500"); private static final KylinVersion VERSION_200 = new KylinVersion("2.0.0"); @@@ -211,4 -199,4 +211,4 @@@ } } --} ++} http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --cc core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index c3a160a,f65e4b5..00e355f --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@@ -295,20 -296,12 +295,28 @@@ public class GTRecord implements Compar } } + public void loadColumnsFromColumnBlocks(ImmutableBitSet[] selectedColumnBlocks, ImmutableBitSet selectedCols, + ByteBuffer buf) { + int pos = buf.position(); + for (ImmutableBitSet selectedColBlock : selectedColumnBlocks) { + for (int i = 0; i < selectedColBlock.trueBitCount(); i++) { + int c = selectedColBlock.trueBitAt(i); + int len = info.codeSystem.codeLength(c, buf); + if (selectedCols.get(c)) { + cols[c].set(buf.array(), buf.arrayOffset() + pos, len); + } + pos += len; + buf.position(pos); + } + } + } + + /** change pointers to point to data in given buffer, this + * method allows to defined specific column to load */ + public void loadColumns(int selectedCol, ByteBuffer buf) { + int pos = buf.position(); + int len = info.codeSystem.codeLength(selectedCol, buf); + cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --cc core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index b40aa5d,7b7b9ca..a3af511 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@@ -95,11 -95,10 +96,11 @@@ public abstract class GTCubeStorageQuer return ITupleIterator.EMPTY_TUPLE_ITERATOR; return new SequentialCubeTupleIterator(scanners, request.getCuboid(), request.getDimensions(), - request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest); + request.getGroups(), request.getMetrics(), returnTupleInfo, request.getContext(), sqlDigest); } - protected GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + protected GTCubeStorageQueryRequest getStorageQueryRequest(StorageContext context, SQLDigest sqlDigest, + TupleInfo returnTupleInfo) { context.setStorageQuery(this); //cope with queries with no aggregations http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index a378a1d,fc8fb4e..54f77c0 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@@ -68,9 -66,6 +66,7 @@@ import org.apache.kylin.cube.CubeSegmen import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.TableDesc; - import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.project.ProjectManager; - import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -78,23 -73,22 +74,43 @@@ public abstract class AbstractHadoopJob extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); - protected static final Option OPTION_PROJECT = OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg().isRequired(true).withDescription("Project name.").create(BatchConstants.ARG_PROJECT); -- protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); -- protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); -- protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID); ++ protected static final Option OPTION_PROJECT = OptionBuilder.withArgName(BatchConstants.ARG_PROJECT).hasArg() ++ .isRequired(true).withDescription("Project name.").create(BatchConstants.ARG_PROJECT); ++ protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg() ++ .isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)") ++ .create(BatchConstants.ARG_JOB_NAME); ++ protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() ++ .isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube") ++ .create(BatchConstants.ARG_CUBE_NAME); ++ protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID) ++ .hasArg().isRequired(false).withDescription("ID of cubing job executable") ++ .create(BatchConstants.ARG_CUBING_JOB_ID); // @Deprecated -- protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); -- protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); -- protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); -- protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); -- protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); -- protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); -- protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); -- protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); -- -- protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); -- protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); -- protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); ++ protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME) ++ .hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); ++ protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg() ++ .isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); ++ protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() ++ .isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); ++ protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT) ++ .hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); ++ protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() ++ .isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); ++ protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg() ++ .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); ++ protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION) ++ .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); ++ protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME) ++ .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); ++ ++ protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder ++ .withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false) ++ .withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); ++ protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT) ++ .hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); ++ protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder ++ .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false) ++ .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; @@@ -152,7 -146,7 +168,9 @@@ } else { job.waitForCompletion(true); retVal = job.isSuccessful() ? 0 : 1; -- logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L)); ++ logger.debug("Job '" + job.getJobName() + "' finished " ++ + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") ++ + formatTime((System.nanoTime() - start) / 1000000L)); } return retVal; } @@@ -173,7 -167,7 +191,8 @@@ Configuration jobConf = job.getConfiguration(); String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); if (classpath == null || classpath.length() == 0) { -- logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value."); ++ logger.info("Didn't find " + MAP_REDUCE_CLASSPATH ++ + " in job configuration, will run 'mapred classpath' to get the default value."); classpath = getDefaultMapRedClasspath(); logger.info("The default mapred classpath is: " + classpath); } @@@ -206,11 -200,11 +225,13 @@@ StringUtil.appendWithSeparator(kylinDependency, hiveExecJarPath); logger.debug("hive-exec jar file: " + hiveExecJarPath); -- String hiveHCatJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat")); ++ String hiveHCatJarPath = ClassUtil ++ .findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat")); StringUtil.appendWithSeparator(kylinDependency, hiveHCatJarPath); logger.debug("hive-catalog jar file: " + hiveHCatJarPath); -- String hiveMetaStoreJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table")); ++ String hiveMetaStoreJarPath = ClassUtil ++ .findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table")); StringUtil.appendWithSeparator(kylinDependency, hiveMetaStoreJarPath); logger.debug("hive-metastore jar file: " + hiveMetaStoreJarPath); } catch (ClassNotFoundException e) { @@@ -226,7 -220,7 +247,8 @@@ } else { logger.debug("No Kafka dependency jar set in the environment, will find them from classpath:"); try { -- String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); ++ String kafkaClientJarPath = ClassUtil ++ .findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); StringUtil.appendWithSeparator(kylinDependency, kafkaClientJarPath); logger.debug("kafka jar file: " + kafkaClientJarPath); @@@ -449,7 -443,7 +471,8 @@@ } protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException { - dumpKylinPropsAndMetadata(cube.getProject(), collectCubeMetadata(cube), cube.getConfig(), conf); - dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(), conf); ++ dumpKylinPropsAndMetadata(cube.getProject(), JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(), ++ conf); } protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException { @@@ -463,28 -457,12 +486,13 @@@ protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); - dumpList.addAll(collectCubeMetadata(segment.getCubeInstance())); + dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance())); dumpList.addAll(segment.getDictionaryPaths()); - dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf); + dumpKylinPropsAndMetadata(segment.getProject(), dumpList, segment.getConfig(), conf); } - private Set<String> collectCubeMetadata(CubeInstance cube) { - // cube, model_desc, cube_desc, table - Set<String> dumpList = new LinkedHashSet<>(); - dumpList.add(cube.getResourcePath()); - dumpList.add(cube.getDescriptor().getModel().getResourcePath()); - dumpList.add(cube.getDescriptor().getResourcePath()); - - for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { - TableDesc table = tableRef.getTableDesc(); - dumpList.add(table.getResourcePath()); - dumpList.addAll(SourceFactory.getMRDependentResources(table)); - } - - return dumpList; - } - - protected void dumpKylinPropsAndMetadata(String prj, Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { - protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { ++ protected void dumpKylinPropsAndMetadata(String prj, Set<String> dumpList, KylinConfig kylinConfig, ++ Configuration conf) throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); FileUtils.forceDelete(tmp); // we need a directory, so delete the file first @@@ -494,13 -472,9 +502,13 @@@ // write kylin.properties File kylinPropsFile = new File(metaDir, "kylin.properties"); kylinConfig.writeProperties(kylinPropsFile); - + + if (prj != null) { + dumpList.add(ProjectManager.getInstance(kylinConfig).getProject(prj).getResourcePath()); + } + // write resources - dumpResources(kylinConfig, metaDir, dumpList); + JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList); // hadoop distributed cache String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()); @@@ -557,7 -514,7 +548,8 @@@ HadoopUtil.deletePath(conf, path); } -- public static double getTotalMapInputMB(Job job) throws ClassNotFoundException, IOException, InterruptedException, JobException { ++ public static double getTotalMapInputMB(Job job) ++ throws ClassNotFoundException, IOException, InterruptedException, JobException { if (job == null) { throw new JobException("Job is null"); } @@@ -574,11 -531,11 +566,13 @@@ return totalMapInputMB; } -- protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException { ++ protected double getTotalMapInputMB() ++ throws ClassNotFoundException, IOException, InterruptedException, JobException { return getTotalMapInputMB(job); } -- protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException { ++ protected int getMapInputSplitCount() ++ throws ClassNotFoundException, JobException, IOException, InterruptedException { if (job == null) { throw new JobException("Job is null"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java index 0000000,46b1d3c..c34245b mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java @@@ -1,0 -1,71 +1,72 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.common; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.persistence.RawResource; + import org.apache.kylin.common.persistence.ResourceStore; + import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.metadata.model.TableDesc; + import org.apache.kylin.metadata.model.TableRef; + import org.apache.kylin.source.SourceFactory; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.File; + import java.io.IOException; + import java.util.LinkedHashSet; + import java.util.Set; + + public class JobRelatedMetaUtil { + private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class); + + public static Set<String> collectCubeMetadata(CubeInstance cube) { + // cube, model_desc, cube_desc, table + Set<String> dumpList = new LinkedHashSet<>(); + dumpList.add(cube.getResourcePath()); + dumpList.add(cube.getDescriptor().getModel().getResourcePath()); + dumpList.add(cube.getDescriptor().getResourcePath()); ++ dumpList.add(cube.getProjectInstance().getResourcePath()); + + for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { + TableDesc table = tableRef.getTableDesc(); + dumpList.add(table.getResourcePath()); + dumpList.addAll(SourceFactory.getMRDependentResources(table)); + } + + return dumpList; + } + + public static void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException { + long startTime = System.currentTimeMillis(); + + ResourceStore from = ResourceStore.getStore(kylinConfig); + KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); + ResourceStore to = ResourceStore.getStore(localConfig); + for (String path : dumpList) { + RawResource res = from.getResource(path); + if (res == null) + throw new IllegalStateException("No resource found at -- " + path); + to.putResource(path, res.inputStream, res.timestamp); + res.inputStream.close(); + } + + logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java ---------------------------------------------------------------------- diff --cc engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java index ac56075,2991b82..e287739 mode 100644,100755..100755 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --cc engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 544e072,a03e238..dab5fb7 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@@ -66,8 -69,7 +70,7 @@@ import org.apache.spark.api.java.functi import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; - import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; @@@ -186,62 -170,24 +171,24 @@@ public class SparkCubingByLayer extend allNormalMeasure = allNormalMeasure && needAggr[i]; } logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); - StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); - // encode with dimension encoding, transform to <ByteArray, Object[]> RDD - final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { - volatile transient boolean initialized = false; - BaseCuboidBuilder baseCuboidBuilder = null; - - @Override - public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { - if (initialized == false) { - synchronized (SparkCubingByLayer.class) { - if (initialized == false) { - prepare(); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); - initialized = true; - } - } - } - - String[] rowArray = rowToArray(row); - baseCuboidBuilder.resetAggrs(); - byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); - Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); - return new Tuple2<>(new ByteArray(rowKey), result); - } - - private String[] rowToArray(Row row) { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; - } + HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.table(hiveTable); ++ final Dataset intermediateTable = sqlContext.table(hiveTable); - }); + // encode with dimension encoding, transform to <ByteArray, Object[]> RDD + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() + .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl)); - logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions()); Long totalCount = 0L; - if (kylinConfig.isSparkSanityCheckEnabled()) { + if (envConfig.isSparkSanityCheckEnabled()) { totalCount = encodedBaseRDD.count(); - logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count()); } - final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures()); - final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators); + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { - reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr); + reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr); } final int totalLevels = cubeDesc.getBuildLevel(); @@@ -336,26 -389,43 +390,43 @@@ private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0); - class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { - - CubeSegment cubeSegment; - CubeDesc cubeDesc; - NDCuboidBuilder ndCuboidBuilder; - RowKeySplitter rowKeySplitter; - transient boolean initialized = false; + static class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + + private String cubeName; + private String segmentId; + private String metaUrl; + private CubeSegment cubeSegment; + private CubeDesc cubeDesc; + private CuboidScheduler cuboidScheduler; + private NDCuboidBuilder ndCuboidBuilder; + private RowKeySplitter rowKeySplitter; + private volatile transient boolean initialized = false; + + public CuboidFlatMap(String cubeName, String segmentId, String metaUrl) { + this.cubeName = cubeName; + this.segmentId = segmentId; + this.metaUrl = metaUrl; + } - CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, NDCuboidBuilder ndCuboidBuilder) { - this.cubeSegment = cubeSegment; - this.cubeDesc = cubeDesc; - this.ndCuboidBuilder = ndCuboidBuilder; + public void init() { + KylinConfig kConfig = getKylinConfigForExecutor(metaUrl); + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + this.cubeSegment = cubeInstance.getSegmentById(segmentId); + this.cubeDesc = cubeInstance.getDescriptor(); - this.cuboidScheduler = new CuboidScheduler(cubeDesc); ++ this.cuboidScheduler = cubeDesc.getCuboidScheduler(); + this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment)); this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @Override - public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { if (initialized == false) { - prepare(); - initialized = true; + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + init(); + initialized = true; + } + } } byte[] key = tuple2._1().array(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java ---------------------------------------------------------------------- diff --cc kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java index 1960e32,1960e32..483d8f7 --- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java @@@ -52,6 -52,6 +52,8 @@@ public class ITDistributedSchedulerBase Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState()); Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState()); Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); ++ ++ Thread.sleep(5000); Assert.assertEquals(null, getServerName(segmentId1)); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java ---------------------------------------------------------------------- diff --cc kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java index 0000000,977c0f4..1bf3bfe mode 000000,100644..100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcSourceTableLoaderTest.java @@@ -1,0 -1,112 +1,114 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.source.jdbc; + + import static org.junit.Assert.assertTrue; + + import java.sql.Connection; + import java.sql.DriverManager; + import java.sql.SQLException; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.LocalFileMetadataTestCase; + import org.apache.kylin.common.util.Pair; + import org.apache.kylin.metadata.MetadataManager; + import org.apache.kylin.metadata.model.ISourceAware; + import org.apache.kylin.metadata.model.TableDesc; + import org.apache.kylin.metadata.model.TableExtDesc; ++import org.apache.kylin.metadata.project.ProjectInstance; + import org.apache.kylin.query.H2Database; + import org.apache.kylin.source.ISource; + import org.apache.kylin.source.ISourceMetadataExplorer; + import org.apache.kylin.source.SourceFactory; + import org.apache.kylin.source.datagen.ModelDataGenerator; + import org.junit.After; + import org.junit.Before; + import org.junit.Test; + + public class ITJdbcSourceTableLoaderTest extends LocalFileMetadataTestCase implements ISourceAware { + + protected KylinConfig config = null; + protected static Connection h2Connection = null; + + @Before + public void setup() throws Exception { + + super.createTestMetadata(); + + System.setProperty("kylin.source.jdbc.connection-url", "jdbc:h2:mem:db" + "_jdbc_source_table_loader"); + System.setProperty("kylin.source.jdbc.driver", "org.h2.Driver"); + System.setProperty("kylin.source.jdbc.user", "sa"); + System.setProperty("kylin.source.jdbc.pass", ""); + + config = KylinConfig.getInstanceFromEnv(); + + h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + "_jdbc_source_table_loader", "sa", ""); + - H2Database h2DB = new H2Database(h2Connection, config); ++ String project = ProjectInstance.DEFAULT_PROJECT_NAME; ++ H2Database h2DB = new H2Database(h2Connection, config, project); + + MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + ModelDataGenerator gen = new ModelDataGenerator(mgr.getDataModelDesc("ci_left_join_model"), 10000); + gen.generate(); + + h2DB.loadAllTables(); + + } + + @After + public void after() throws Exception { + + super.cleanupTestMetadata(); + + if (h2Connection != null) { + try { + h2Connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + System.clearProperty("kylin.source.jdbc.connection-url"); + System.clearProperty("kylin.source.jdbc.driver"); + System.clearProperty("kylin.source.jdbc.user"); + System.clearProperty("kylin.source.jdbc.pass"); + + } + + @Test + public void test() throws Exception { + + ISource source = SourceFactory.getSource(new ITJdbcSourceTableLoaderTest()); + ISourceMetadataExplorer explr = source.getSourceMetadataExplorer(); + Pair<TableDesc, TableExtDesc> pair; + - pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT"); ++ pair = explr.loadTableMetadata("DEFAULT", "TEST_KYLIN_FACT", ProjectInstance.DEFAULT_PROJECT_NAME); + assertTrue(pair.getFirst().getIdentity().equals("DEFAULT.TEST_KYLIN_FACT")); + - pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT"); ++ pair = explr.loadTableMetadata("EDW", "TEST_CAL_DT", ProjectInstance.DEFAULT_PROJECT_NAME); + assertTrue(pair.getFirst().getIdentity().equals("EDW.TEST_CAL_DT")); + + } + + @Override + public int getSourceType() { + return ISourceAware.ID_JDBC; + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java ---------------------------------------------------------------------- diff --cc kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java index 0000000,22e0b14..41a35fe mode 000000,100644..100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/jdbc/ITJdbcTableReaderTest.java @@@ -1,0 -1,107 +1,109 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.source.jdbc; + + import java.sql.Connection; + import java.sql.DriverManager; + import java.sql.SQLException; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.LocalFileMetadataTestCase; + import org.apache.kylin.metadata.MetadataManager; + import org.apache.kylin.metadata.model.ISourceAware; ++import org.apache.kylin.metadata.project.ProjectInstance; + import org.apache.kylin.query.H2Database; + import org.apache.kylin.source.datagen.ModelDataGenerator; + import org.junit.After; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Test; + + public class ITJdbcTableReaderTest extends LocalFileMetadataTestCase implements ISourceAware { + + protected KylinConfig config = null; + protected static Connection h2Connection = null; + + @Before + public void setup() throws Exception { + + super.createTestMetadata(); + + System.setProperty("kylin.source.jdbc.connection-url", "jdbc:h2:mem:db" + "_jdbc_table_reader"); + System.setProperty("kylin.source.jdbc.driver", "org.h2.Driver"); + System.setProperty("kylin.source.jdbc.user", "sa"); + System.setProperty("kylin.source.jdbc.pass", ""); + + config = KylinConfig.getInstanceFromEnv(); + + h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + "_jdbc_table_reader", "sa", ""); + - H2Database h2DB = new H2Database(h2Connection, config); ++ String project = ProjectInstance.DEFAULT_PROJECT_NAME; ++ H2Database h2DB = new H2Database(h2Connection, config, project); + + MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + ModelDataGenerator gen = new ModelDataGenerator(mgr.getDataModelDesc("ci_left_join_model"), 10000); + gen.generate(); + + h2DB.loadAllTables(); + + } + + @After + public void after() throws Exception { + + super.cleanupTestMetadata(); + + if (h2Connection != null) { + try { + h2Connection.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + + System.clearProperty("kylin.source.jdbc.connection-url"); + System.clearProperty("kylin.source.jdbc.driver"); + System.clearProperty("kylin.source.jdbc.user"); + System.clearProperty("kylin.source.jdbc.pass"); + + } + + @Test + public void test() throws Exception { + + JdbcTableReader reader = new JdbcTableReader("default", "test_kylin_fact"); + int rowNumber = 0; + while (reader.next()) { + String[] row = reader.getRow(); + Assert.assertEquals(11, row.length); + + rowNumber++; + } + + reader.close(); + Assert.assertEquals(10000, rowNumber); + + } + + @Override + public int getSourceType() { + return ISourceAware.ID_JDBC; + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java ---------------------------------------------------------------------- diff --cc query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java index cd0b2ca,3431c45..cfb0cbd --- a/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java +++ b/query/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java @@@ -63,11 -63,12 +63,12 @@@ public class RealizationChooser Map<DataModelDesc, Set<IRealization>> modelMap = makeOrderedModelMap(context); if (modelMap.size() == 0) { - throw new NoRealizationFoundException("No model found for" + toErrorMsg(context)); + throw new NoRealizationFoundException("No model found for " + toErrorMsg(context)); } - for (DataModelDesc model : modelMap.keySet()) { - Map<String, String> aliasMap = matches(model, context); + for (Map.Entry<DataModelDesc, Set<IRealization>> entry : modelMap.entrySet()) { + final DataModelDesc model = entry.getKey(); + final Map<String, String> aliasMap = matches(model, context); if (aliasMap != null) { fixModel(context, model, aliasMap); http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index a370292,a2cf0fb..4244cf3 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@@ -99,7 -100,7 +99,11 @@@ public class CubeController extends Bas @RequestMapping(value = "", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody -- public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { ++ public List<CubeInstance> getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, ++ @RequestParam(value = "modelName", required = false) String modelName, ++ @RequestParam(value = "projectName", required = false) String projectName, ++ @RequestParam(value = "limit", required = false) Integer limit, ++ @RequestParam(value = "offset", required = false) Integer offset) { List<CubeInstance> cubes; cubes = cubeService.listAllCubes(cubeName, projectName, modelName, true); @@@ -148,7 -149,7 +152,8 @@@ * @throws UnknownHostException * @throws IOException */ -- @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET }, produces = { "application/json" }) ++ @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET }, produces = { ++ "application/json" }) @ResponseBody public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@@ -168,7 -169,7 +173,8 @@@ * @param notifyList * @throws IOException */ -- @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT }, produces = { "application/json" }) ++ @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT }, produces = { ++ "application/json" }) @ResponseBody public void updateNotifyList(@PathVariable String cubeName, @RequestBody List<String> notifyList) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@@ -208,9 -209,9 +214,11 @@@ * * @throws IOException */ -- @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { RequestMethod.PUT }, produces = { "application/json" }) ++ @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { ++ RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody -- public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, @RequestParam(value = "lookupTable") String lookupTable) { ++ public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, ++ @RequestParam(value = "lookupTable") String lookupTable) { try { final CubeManager cubeMgr = cubeService.getCubeManager(); final CubeInstance cube = cubeMgr.getCube(cubeName); @@@ -226,7 -227,7 +234,8 @@@ * * @throws IOException */ -- @RequestMapping(value = "/{cubeName}/segs/{segmentName}", method = { RequestMethod.DELETE }, produces = { "application/json" }) ++ @RequestMapping(value = "/{cubeName}/segs/{segmentName}", method = { RequestMethod.DELETE }, produces = { ++ "application/json" }) @ResponseBody public CubeInstance deleteSegment(@PathVariable String cubeName, @PathVariable String segmentName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@@ -261,7 -262,7 +270,8 @@@ @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest req) { -- return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), req.isForce() || req.isForceMergeEmptySegment()); ++ return buildInternal(cubeName, req.getStartTime(), req.getEndTime(), 0, 0, null, null, req.getBuildType(), ++ req.isForce() || req.isForceMergeEmptySegment()); } /** Build/Rebuild a cube segment by source offset */ @@@ -287,11 -288,11 +297,14 @@@ @RequestMapping(value = "/{cubeName}/rebuild2", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody public JobInstance rebuild2(@PathVariable String cubeName, @RequestBody JobBuildRequest2 req) { -- return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), req.isForce()); ++ return buildInternal(cubeName, 0, 0, req.getSourceOffsetStart(), req.getSourceOffsetEnd(), ++ req.getSourcePartitionOffsetStart(), req.getSourcePartitionOffsetEnd(), req.getBuildType(), ++ req.isForce()); } private JobInstance buildInternal(String cubeName, long startTime, long endTime, // -- long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) { ++ long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, ++ Map<Integer, Long> sourcePartitionOffsetEnd, String buildType, boolean force) { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); @@@ -300,7 -301,7 +313,8 @@@ throw new InternalErrorException("Cannot find cube " + cubeName); } return jobService.submitJob(cube, startTime, endTime, startOffset, endOffset, // -- sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, submitter); ++ sourcePartitionOffsetStart, sourcePartitionOffsetEnd, CubeBuildTypeEnum.valueOf(buildType), force, ++ submitter); } catch (Throwable e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e.getLocalizedMessage(), e); @@@ -453,7 -454,7 +467,8 @@@ try { desc.setUuid(UUID.randomUUID().toString()); -- String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject(); ++ String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME ++ : cubeRequest.getProject(); ProjectInstance project = cubeService.getProjectManager().getProject(projectName); if (project == null) { throw new BadRequestException("Project " + projectName + " doesn't exist"); @@@ -485,7 -486,7 +500,8 @@@ return cubeRequest; } -- String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject(); ++ String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME ++ : cubeRequest.getProject(); try { CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName()); @@@ -497,13 -498,13 +513,15 @@@ //cube renaming is not allowed if (!cube.getDescriptor().getName().equalsIgnoreCase(desc.getName())) { -- String error = "Cube Desc renaming is not allowed: desc.getName(): " + desc.getName() + ", cubeRequest.getCubeName(): " + cubeRequest.getCubeName(); ++ String error = "Cube Desc renaming is not allowed: desc.getName(): " + desc.getName() ++ + ", cubeRequest.getCubeName(): " + cubeRequest.getCubeName(); updateRequest(cubeRequest, false, error); return cubeRequest; } if (cube.getSegments().size() != 0 && !cube.getDescriptor().consistentWith(desc)) { -- String error = "CubeDesc " + desc.getName() + " is inconsistent with existing. Try purge that cube first or avoid updating key cube desc fields."; ++ String error = "CubeDesc " + desc.getName() ++ + " is inconsistent with existing. Try purge that cube first or avoid updating key cube desc fields."; updateRequest(cubeRequest, false, error); return cubeRequest; } @@@ -653,7 -654,7 +671,8 @@@ * @param cubeName * @return */ -- @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { RequestMethod.PUT }, produces = { "application/json" }) ++ @RequestMapping(value = "/{cubeName}/init_start_offsets", method = { RequestMethod.PUT }, produces = { ++ "application/json" }) @ResponseBody public GeneralResponse initStartOffsets(@PathVariable String cubeName) { checkCubeName(cubeName); http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java index cdc09b9,aba2cf9..f8d5f83 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java @@@ -101,7 -102,7 +101,8 @@@ public class CubeControllerV2 extends B @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer pageOffset, -- @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize) throws IOException { ++ @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize) ++ throws IOException { HashMap<String, Object> data = new HashMap<String, Object>(); List<CubeInstanceResponse> response = new ArrayList<CubeInstanceResponse>(); @@@ -109,13 -110,9 +110,13 @@@ // official cubes for (CubeInstance cube : cubes) { - response.add(createCubeInstanceResponse(cube)); + try { + response.add(createCubeInstanceResponse(cube)); + } catch (Exception e) { + logger.error("Error creating cube instance response, skipping.", e); + } } -- ++ // draft cubes for (Draft d : cubeService.listCubeDrafts(cubeName, modelName, projectName, exactMatch)) { CubeDesc c = (CubeDesc) d.getEntity(); @@@ -144,7 -141,7 +145,7 @@@ return new EnvelopeResponse(ResponseCode.CODE_SUCCESS, data, ""); } -- ++ private boolean contains(List<CubeInstanceResponse> response, String name) { for (CubeInstanceResponse r : response) { if (r.getName().equals(name)) @@@ -156,38 -153,33 +157,38 @@@ private CubeInstanceResponse createCubeInstanceResponseFromDraft(Draft d) { CubeDesc desc = (CubeDesc) d.getEntity(); Preconditions.checkState(desc.isDraft()); -- ++ CubeInstance mock = new CubeInstance(); mock.setName(desc.getName()); mock.setDescName(desc.getName()); mock.setStatus(RealizationStatusEnum.DISABLED); -- ++ CubeInstanceResponse r = new CubeInstanceResponse(mock); -- ++ r.setModel(desc.getModelName()); r.setProject(d.getProject()); r.setDraft(true); -- ++ return r; } private CubeInstanceResponse createCubeInstanceResponse(CubeInstance cube) { Preconditions.checkState(!cube.getDescriptor().isDraft()); -- ++ CubeInstanceResponse r = new CubeInstanceResponse(cube); r.setModel(cube.getDescriptor().getModelName()); r.setPartitionDateStart(cube.getDescriptor().getPartitionDateStart()); - r.setPartitionDateColumn(cube.getModel().getPartitionDesc().getPartitionDateColumn()); - r.setIs_streaming( - cube.getModel().getRootFactTable().getTableDesc().getSourceType() == ISourceAware.ID_STREAMING); + // cuz model doesn't have a state the label a model is broken, + // so in some case the model can not be loaded due to some check failed, + // but the cube in this model can still be loaded. + if (cube.getModel() != null) { + r.setPartitionDateColumn(cube.getModel().getPartitionDesc().getPartitionDateColumn()); + r.setIs_streaming( + cube.getModel().getRootFactTable().getTableDesc().getSourceType() == ISourceAware.ID_STREAMING); + } r.setProject(projectService.getProjectOfCube(cube.getName())); -- ++ return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/AclService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java index fe91f52,e8b675e..f302307 --- a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java @@@ -223,11 -223,13 +223,14 @@@ public class AclTableMigrationTool private Map<String, AceInfo> getAllAceInfo(Result result) throws IOException { Map<String, AceInfo> allAceInfoMap = new HashMap<>(); NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(Bytes.toBytes(AclConstant.ACL_ACES_FAMILY)); - for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) { - String sid = new String(entry.getKey()); - AceInfo aceInfo = aceSerializer.deserialize(entry.getValue()); - if (null != aceInfo) { - allAceInfoMap.put(sid, aceInfo); - if(familyMap != null && !familyMap.isEmpty()) { ++ ++ if (familyMap != null && !familyMap.isEmpty()) { + for (Map.Entry<byte[], byte[]> entry : familyMap.entrySet()) { + String sid = new String(entry.getKey()); + AceInfo aceInfo = aceSerializer.deserialize(entry.getValue()); + if (null != aceInfo) { + allAceInfoMap.put(sid, aceInfo); + } } } return allAceInfoMap; http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --cc server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 5ac595f,673b11b..51ec902 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@@ -849,9 -805,9 +849,10 @@@ public class QueryService extends Basic List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException { CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true); + + PreparedStatement preparedStatement = null; try { - conn.prepareStatement(correctedSql); + preparedStatement = conn.prepareStatement(correctedSql); throw new IllegalStateException("Should have thrown OnlyPrepareEarlyAbortException"); } catch (Exception e) { Throwable rootCause = ExceptionUtils.getRootCause(e); @@@ -883,12 -839,13 +884,13 @@@ } } finally { CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false); + DBUtils.closeQuietly(preparedStatement); } - return getSqlResponse(isPushDown, results, columnMetas); + return buildSqlResponse(isPushDown, results, columnMetas); } - private SQLResponse getSqlResponse(Boolean isPushDown, List<List<String>> results, + private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> results, List<SelectedColumnMeta> columnMetas) { boolean isPartialResult = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java ---------------------------------------------------------------------- diff --cc server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index b47f05f,a509f88..44a30f1 --- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@@ -19,12 -19,20 +19,22 @@@ package org.apache.kylin.rest.service; import java.io.IOException; +import java.sql.SQLException; + import java.util.Collections; + import java.util.List; + import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.constant.JobTimeFilterEnum; + import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.JobException; + import org.apache.kylin.job.execution.AbstractExecutable; + import org.apache.kylin.job.execution.ExecutableContext; + import org.apache.kylin.job.execution.ExecutableManager; + import org.apache.kylin.job.execution.ExecutableState; + import org.apache.kylin.job.execution.ExecuteResult; + import org.apache.kylin.job.execution.Output; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.query.QueryConnection; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; @@@ -52,4 -60,32 +62,27 @@@ public class JobServiceTest extends Ser Assert.assertNull(jobService.getJobInstance("job_not_exist")); Assert.assertNotNull(jobService.searchJobs(null, null, null, 0, 0, JobTimeFilterEnum.ALL)); } + + @Test + public void testExceptionOnLostJobOutput() { + ExecutableManager manager = ExecutableManager.getInstance(jobService.getConfig()); + AbstractExecutable executable = new TestJob(); + manager.addJob(executable); - List<CubingJob> jobs = jobService.innerSearchCubingJobs("cube", - "jobName", - Collections.<ExecutableState>emptySet(), - 0, - Long.MAX_VALUE, - Collections.<String, Output>emptyMap(), - true, - "project"); ++ List<CubingJob> jobs = jobService.innerSearchCubingJobs("cube", "jobName", ++ Collections.<ExecutableState> emptySet(), 0, Long.MAX_VALUE, Collections.<String, Output> emptyMap(), ++ true, "project"); + Assert.assertEquals(0, jobs.size()); + } + + public static class TestJob extends CubingJob { + - public TestJob(){ ++ public TestJob() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + return new ExecuteResult(ExecuteResult.State.SUCCEED, ""); + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b50030da/source-kafka/pom.xml ----------------------------------------------------------------------