KYLIN-2434 support config kylin.source.hive.database-for-flat-table in Spark cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ae6f9c9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ae6f9c9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ae6f9c9 Branch: refs/heads/KYLIN-2501 Commit: 3ae6f9c9120360cdc9fc238a2a9208fa9813aea6 Parents: 98664f0 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Mar 18 10:39:57 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Mar 19 09:47:47 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/spark/SparkBatchCubingJobBuilder2.java | 2 +- .../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index e0b3e6c..66b154d 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -47,7 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/3ae6f9c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index f7ed2d0..f70fd30 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -77,22 +77,23 @@ import java.util.List; /** + * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. */ public class SparkCubingByLayer extends AbstractApplication implements Serializable { protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class); - public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); private Options options; public SparkCubingByLayer() { options = new Options(); - options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_INPUT_TABLE); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_CONF_PATH); @@ -134,7 +135,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override protected void execute(OptionsHelper optionsHelper) throws Exception { - final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); @@ -154,7 +155,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.table(envConfig.getHiveDatabaseForIntermediateTable() + "." + hiveTable); + final DataFrame intermediateTable = sqlContext.table(hiveTable); final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor();