Repository: kylin Updated Branches: refs/heads/KYLIN-2434 [created] 5131351f7
KYLIN-2434 support config kylin.source.hive.database-for-flat-table in Spark cubing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5131351f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5131351f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5131351f Branch: refs/heads/KYLIN-2434 Commit: 5131351f706606eeb544f6241b091a141ff2c17b Parents: 19c87e7 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Mar 18 10:39:57 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sat Mar 18 10:39:57 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/spark/SparkBatchCubingJobBuilder2.java | 2 +- .../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5131351f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index e0b3e6c..66b154d 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -47,7 +47,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); http://git-wip-us.apache.org/repos/asf/kylin/blob/5131351f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index f7ed2d0..259cd1f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -82,17 +82,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class); - public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("table").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("table"); private Options options; public SparkCubingByLayer() { options = new Options(); - options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_INPUT_TABLE); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_CONF_PATH); @@ -134,7 +134,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override protected void execute(OptionsHelper optionsHelper) throws Exception { - final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); @@ -154,7 +154,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.table(envConfig.getHiveDatabaseForIntermediateTable() + "." + hiveTable); + final DataFrame intermediateTable = sqlContext.table(hiveTable); final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor();