This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 95d2a5b KYLIN-3457 Distribute by multi column if not set distribute column 95d2a5b is described below commit 95d2a5befaa596b2d502026d53a791fa31d20bbe Author: chao long <wayn...@qq.com> AuthorDate: Thu Jul 19 11:37:12 2018 +0800 KYLIN-3457 Distribute by multi column if not set distribute column --- .../org/apache/kylin/common/KylinConfigBase.java | 4 +++ .../java/org/apache/kylin/job/JoinedFlatTable.java | 36 +++++++++++++++++----- .../apache/kylin/source/hive/HiveInputBase.java | 5 +-- .../org/apache/kylin/source/hive/HiveMRInput.java | 11 +++---- .../apache/kylin/source/hive/HiveSparkInput.java | 11 +++---- 5 files changed, 45 insertions(+), 22 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 637502e..b2331e1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -788,6 +788,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column"); } + public int getHiveRedistributeColumnCount() { + return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3")); + } + public int getDefaultVarcharPrecision() { int v = Integer.parseInt(getOptional("kylin.source.hive.default-varchar-precision", "256")); if (v < 1) { diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index a6c6daa..392323e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -25,9 +25,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -188,8 +191,13 @@ public class JoinedFlatTable { } } - private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) { - sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n"); + private static void appendDistributeStatement(StringBuilder sql, List<TblColRef> redistCols) { + sql.append(" DISTRIBUTE BY "); + for (TblColRef redistCol : redistCols) { + sql.append(colName(redistCol, true)).append(","); + } + sql.deleteCharAt(sql.length() - 1); + sql.append(";\n"); } private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) { @@ -252,16 +260,30 @@ public class JoinedFlatTable { return hiveDataType; } - public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { + public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { final String tableName = flatDesc.getTableName(); StringBuilder sql = new StringBuilder(); sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName); - TblColRef clusterCol = flatDesc.getClusterBy(); - if (clusterCol != null) { - appendClusterStatement(sql, clusterCol); + if (flatDesc.getClusterBy() != null) { + appendClusterStatement(sql, flatDesc.getClusterBy()); + } else if (flatDesc.getDistributedBy() != null) { + appendDistributeStatement(sql, Lists.newArrayList(flatDesc.getDistributedBy())); } else { - appendDistributeStatement(sql, flatDesc.getDistributedBy()); + int redistColumnCount = KylinConfig.getInstanceFromEnv().getHiveRedistributeColumnCount(); + + RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns(); + + if (rowKeyColDescs.length < redistColumnCount) + redistColumnCount = rowKeyColDescs.length; + + List<TblColRef> redistColumns = Lists.newArrayListWithCapacity(redistColumnCount); + + for (int i = 0; i < redistColumnCount; i++) { + redistColumns.add(rowKeyColDescs[i].getColRef()); + } + + appendDistributeStatement(sql, redistColumns); } return sql.toString(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index eae2e1c..9a2c242 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; @@ -81,11 +82,11 @@ public class HiveInputBase { } protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName, - IJoinedFlatTableDesc flatDesc) { + IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); step.setInitStatement(hiveInitStatements); step.setIntermediateTable(flatDesc.getTableName()); - step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc)); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc)); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); return step; diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index bfea632..d1b4fc9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -28,6 +28,7 @@ import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -118,8 +119,9 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + final KylinConfig cubeConfig = cubeInstance.getConfig(); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); // create flat table first @@ -127,9 +129,7 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc)); - } + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); } // special for hive @@ -154,7 +154,6 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { } } - @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java index 779835b..881be1a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.ISparkInput; @@ -75,8 +76,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + final KylinConfig cubeConfig = cubeInstance.getConfig(); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); // create flat table first @@ -84,9 +85,7 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc)); - } + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); } // special for hive @@ -103,8 +102,6 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput { } } - - @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);