This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch KYLIN-3369 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ddf9432884f14b745c62ad6e74a331f299397733 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Wed May 30 17:35:38 2018 +0800 KYLIN-3378 Support Kafka table join with Hive tables --- .../org/apache/kylin/common/KylinConfigBase.java | 4 - .../org/apache/kylin/common/util/BasicTest.java | 12 - .../kylin/cube/model/CubeJoinedFlatTableDesc.java | 28 +- .../cube/model/CubeJoinedFlatTableEnrich.java | 15 +- .../model/validation/rule/StreamingCubeRule.java | 6 - .../java/org/apache/kylin/job/JoinedFlatTable.java | 183 +--------- .../kylin/metadata/model/IJoinedFlatTableDesc.java | 9 +- .../org/apache/kylin/metadata/model/TblColRef.java | 15 +- examples/test_case_data/sandbox/kylin.properties | 2 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 68 ++-- .../apache/kylin/source/kafka/KafkaMRInput.java | 375 ++++++--------------- .../kylin/source/kafka/config/KafkaConfig.java | 12 + .../source/kafka/hadoop/KafkaFlatTableJob.java | 7 +- .../source/kafka/hadoop/KafkaFlatTableMapper.java | 14 +- .../source/kafka/hadoop/KafkaInputFormat.java | 8 +- 15 files changed, 211 insertions(+), 547 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5d543f5..cdb3755 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -760,10 +760,6 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", "false")); } - public String getHiveDatabaseForStreamingTable() { - return this.getOptional("kylin.source.hive.database-for-streaming-table", "default"); - } - public String getHiveDatabaseForIntermediateTable() { return this.getOptional("kylin.source.hive.database-for-flat-table", "default"); } diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index fcf302d..1c1e389 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -244,16 +244,4 @@ public class BasicTest { cal.setTimeInMillis(t); return dateFormat.format(cal.getTime()); } - - @Test - public void testStringSplit() { - String[] strings = new String[] {"abc", "bcd"}; - - String delimeter = ","; - String concated = StringUtils.join(Arrays.asList(strings), delimeter); - - String[] newStrings = concated.split("\\" + delimeter); - - Assert.assertEquals(strings, newStrings); - } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 63df4aa..70ad13e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -31,8 +31,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; @@ -48,8 +46,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab protected final CubeDesc cubeDesc;/// protected final CubeSegment cubeSegment; protected final boolean includingDerived; - TableRef tableRef; - DataModelDesc dataModelDesc;/// private int columnCount = 0; private List<TblColRef> columnList = Lists.newArrayList(); @@ -139,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab } } + @Override + public List<TblColRef> getFactColumns() { + final List<TblColRef> factColumns = Lists.newArrayList(); + for (TblColRef col : this.getAllColumns()) { + if (col.getTableRef().equals(getDataModel().getRootFactTable())) { + // only fetch the columns from fact table + factColumns.add(col); + } + } + return factColumns; + } + // sanity check the input record (in bytes) matches what's expected public void sanityCheck(BytesSplitter bytesSplitter) { if (columnCount != bytesSplitter.getBufferSize()) { @@ -175,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab @Override public SegmentRange getSegRange() { + if (cubeSegment.isOffsetCube()) { + return null; + } return cubeSegment.getSegRange(); } @@ -189,13 +200,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab } @Override - public TableRef getTable() {///// - return tableRef; - } - - @Override - public void setTableName(String tableName) { - this.tableName = tableName; + public boolean useAlias() { + return true; } @Override diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index b314cc2..f09478e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -28,7 +28,6 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; /** @@ -106,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ } @Override + public List<TblColRef> getFactColumns() { + return flatDesc.getFactColumns(); + } + + @Override public DataModelDesc getDataModel() { return flatDesc.getDataModel(); } @@ -131,13 +135,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ } @Override - public TableRef getTable() {/// - return null; - } - - @Override - public void setTableName(String tableName) { - + public boolean useAlias() { + return flatDesc.useAlias(); } @Override diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java index dab8fa4..647f4c1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java @@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ResultLevel; import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TblColRef; @@ -49,11 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> { return; } - if (cube.getEngineType() == IEngineAware.ID_SPARK) { - context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead."); - return; - } - if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) { context.addResult(ResultLevel.ERROR, "Must define a partition column."); return; diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index ac38730..0769dcf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -44,6 +44,8 @@ import org.apache.kylin.metadata.model.TblColRef; public class JoinedFlatTable { + public static final String TEXTFILE = "TEXTFILE"; + public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { return storageDfsDir + "/" + flatDesc.getTableName(); } @@ -78,10 +80,10 @@ public class JoinedFlatTable { if (i > 0) { ddl.append(","); } - ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n"); + ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n"); } ddl.append(")" + "\n"); - if ("TEXTFILE".equals(storageFormat)) { + if (TEXTFILE.equals(storageFormat)) { ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n"); } ddl.append("STORED AS " + storageFormat + "\n"); @@ -90,41 +92,6 @@ public class JoinedFlatTable { return ddl.toString(); } - public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { - String storageFormat = flatDesc.getDataModel().getConfig().getFlatTableStorageFormat(); - return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat); - } - - public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat) { - String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter(); - return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat, fieldDelimiter); - } - - public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat, String fieldDelimiter) { - StringBuilder ddl = new StringBuilder(); - - ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getDataModel().getRootFactTableName() + "\n");////flatDesc.getTableName() - - ddl.append("(" + "\n"); - for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { - TblColRef col = flatDesc.getAllColumns().get(i); - if (i > 0) { - ddl.append(","); - } - ddl.append(col.getName() + " " + getHiveDataType(col.getDatatype()) + "\n"); - } - ddl.append(")" + "\n"); - if ("TEXTFILE".equals(storageFormat)) { - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n"); - } - ddl.append("STORED AS " + storageFormat + "\n"); - ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); - ddl.append("ALTER TABLE " + flatDesc.getDataModel().getRootFactTableName() + " SET TBLPROPERTIES('auto.purge'='true');\n"); - return ddl.toString(); - } - public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n"); @@ -161,43 +128,10 @@ public class JoinedFlatTable { + ";\n"; } - public static String generateInsertDataStatement1(IJoinedFlatTableDesc flatDesc) { - CubeSegment segment = ((CubeSegment) flatDesc.getSegment()); - KylinConfig kylinConfig; - if (null == segment) { - kylinConfig = KylinConfig.getInstanceFromEnv(); - } else { - kylinConfig = (flatDesc.getSegment()).getConfig(); - } - - if (kylinConfig.isAdvancedFlatTableUsed()) { - try { - Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass()); - Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class, - JobEngineConfig.class); - return (String) method.invoke(null, flatDesc); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement1(flatDesc) - + ";\n"; - } - - public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) { - return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) - + ";\n"; - } - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { return generateSelectDataStatement(flatDesc, false, null); } - public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc) { - return generateSelectDataStatement1(flatDesc, false, null); - } - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine, String[] skipAs) { final String sep = singleLine ? " " : "\n"; @@ -215,7 +149,7 @@ public class JoinedFlatTable { if (skipAsList.contains(colTotalName)) { sql.append(col.getExpressionInSourceDB() + sep); } else { - sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); + sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep); } } appendJoinStatement(flatDesc, sql, singleLine); @@ -223,40 +157,6 @@ public class JoinedFlatTable { return sql.toString(); } - public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc, boolean singleLine, - String[] skipAs) { - final String sep = singleLine ? " " : "\n"; - final List<String> skipAsList = (skipAs == null) ? new ArrayList<String>() : Arrays.asList(skipAs); - - StringBuilder sql = new StringBuilder(); - sql.append("SELECT" + sep); - - for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { - TblColRef col = flatDesc.getAllColumns().get(i); - if (i > 0) { - sql.append(","); - } - String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName()); - if (skipAsList.contains(colTotalName)) { - sql.append(col.getExpressionInSourceDB() + sep); - } else { - sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); - } - } - appendJoinStatement1(flatDesc, sql, singleLine); - appendWhereStatement1(flatDesc, sql, singleLine); - return sql.toString(); - } - - public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) { - final StringBuilder sql = new StringBuilder(); - final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable(); - sql.append("dfs -mkdir -p " + outputDir + ";\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity() - + " " + rootTbl.getAlias() + "\n"); - appendWhereStatement(flatDesc, sql); - return sql.toString(); - } public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { final String sep = singleLine ? " " : "\n"; @@ -264,7 +164,7 @@ public class JoinedFlatTable { DataModelDesc model = flatDesc.getDataModel(); TableRef rootTable = model.getRootFactTable(); - sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName() rootTable.getTableIdentity() + sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep); for (JoinTableDesc lookupDesc : model.getJoinTables()) { JoinDesc join = lookupDesc.getJoin(); @@ -275,42 +175,8 @@ public class JoinedFlatTable { TblColRef[] fk = join.getForeignKeyColumns(); if (pk.length != fk.length) { throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); - } String joinType = join.getType().toUpperCase(); - - sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); - sql.append("ON "); - for (int i = 0; i < pk.length; i++) { - if (i > 0) { - sql.append(" AND "); - } - sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB()); } - sql.append(sep); - - dimTableCache.add(dimTable); - } - } - } - } - - public static void appendJoinStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { - final String sep = singleLine ? " " : "\n"; - Set<TableRef> dimTableCache = new HashSet<>(); - - DataModelDesc model = flatDesc.getDataModel(); - TableRef rootTable = model.getRootFactTable(); - sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName() rootTable.getTableIdentity() - - for (JoinTableDesc lookupDesc : model.getJoinTables()) { - JoinDesc join = lookupDesc.getJoin(); - if (join != null && join.getType().equals("") == false) { - TableRef dimTable = lookupDesc.getTableRef(); - if (!dimTableCache.contains(dimTable)) { - TblColRef[] pk = join.getPrimaryKeyColumns(); - TblColRef[] fk = join.getForeignKeyColumns(); - if (pk.length != fk.length) { - throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); - } String joinType = join.getType().toUpperCase(); + String joinType = join.getType().toUpperCase(); sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); sql.append("ON "); @@ -318,7 +184,7 @@ public class JoinedFlatTable { if (i > 0) { sql.append(" AND "); } - sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB()); + sql.append(fk[i].getExpressionInSourceDB() + " = " + pk[i].getExpressionInSourceDB()); } sql.append(sep); @@ -330,7 +196,7 @@ public class JoinedFlatTable { private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) { if (redistCol != null) { - sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n"); + sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n"); } else { sql.append(" DISTRIBUTE BY RAND()").append(";\n"); } @@ -372,36 +238,13 @@ public class JoinedFlatTable { sql.append(whereBuilder.toString()); } - private static void appendWhereStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { - final String sep = singleLine ? " " : "\n"; - - StringBuilder whereBuilder = new StringBuilder(); - whereBuilder.append("WHERE 1=1"); - - DataModelDesc model = flatDesc.getDataModel(); - if (StringUtils.isNotEmpty(model.getFilterCondition())) { - whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") "); - } - - /*if (flatDesc.getSegment() != null) { - PartitionDesc partDesc = model.getPartitionDesc(); - if (partDesc != null && partDesc.getPartitionDateColumn() != null) { - SegmentRange segRange = flatDesc.getSegRange(); - - if (segRange != null && !segRange.isInfinite()) { - whereBuilder.append(" AND ("); - whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, - flatDesc.getSegment(), segRange)); - whereBuilder.append(")" + sep); - } - } - }*/ - sql.append(whereBuilder.toString()); + private static String colName(TblColRef col) { + return colName(col, true); } - private static String colName(TblColRef col) { - return col.getTableAlias() + "_" + col.getName(); + private static String colName(TblColRef col, boolean useAlias) { + return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName(); } private static String getHiveDataType(String javaDataType) { diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index 46e21f3..8f86a52 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -30,7 +30,7 @@ public interface IJoinedFlatTableDesc { List<TblColRef> getAllColumns(); - void setAllColumns(List<TblColRef> tblColRefList); + List<TblColRef> getFactColumns(); int getColumnIndex(TblColRef colRef); @@ -43,11 +43,6 @@ public interface IJoinedFlatTableDesc { // optionally present ISegment getSegment(); - /// - TableRef getTable(); - - void setDataModel(DataModelDesc dataModelDesc); - - void setTableName(String tableName); + boolean useAlias(); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index 928d3d2..ee33e8a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -31,7 +31,6 @@ import org.apache.kylin.metadata.datatype.DataType; public class TblColRef implements Serializable { private static final String INNER_TABLE_NAME = "_kylin_table"; - private String name; // used by projection rewrite, see OLAPProjectRel public enum InnerDataTypeEnum { @@ -120,7 +119,7 @@ public class TblColRef implements Serializable { private String identity; private String parserDescription; - public TblColRef(ColumnDesc column) {///// + TblColRef(ColumnDesc column) { this.column = column; } @@ -149,10 +148,6 @@ public class TblColRef implements Serializable { return column.getName(); } - public void setName(String name) { - this.name = name; - } - public TableRef getTableRef() { return table; } @@ -173,14 +168,6 @@ public class TblColRef implements Serializable { } } - public String getExpressionInSourceDB1() { - return identity; - } - - public void setExpressionInSourceDB(String identity) { - this.identity = identity; - }/// - public String getTable() { if (column.getTable() == null) { return null; diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index d428d44..ae9dad2 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -70,7 +70,7 @@ kylin.job.retry=0 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands) -kylin.job.use-remote-cli=true +kylin.job.use-remote-cli=false # Only necessary when kylin.job.use-remote-cli=true kylin.job.remote-cli-hostname=sandbox diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index a96f4d5..0e791eb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); - step.setIntermediateTableIdentity(getIntermediateTableIdentity()); - step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)); + step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity())); + step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir))); step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables); jobFlow.addTask(step); } @@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput { private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { StringBuffer output = new StringBuffer(); - final String hiveTable = this.getIntermediateTableIdentity(); - if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); - hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); - config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); - output.append("Hive table " + hiveTable + " is dropped. \n"); - rmdirOnHDFS(getExternalDataPath()); - output.append( - "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final List<String> hiveTables = this.getIntermediateTables(); + for (String hiveTable : hiveTables) { + if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { + hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); + hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); + + output.append("Hive table " + hiveTable + " is dropped. \n"); + } } + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + rmdirOnHDFS(getExternalDataPaths()); + output.append( + "Path " + getExternalDataPaths() + " is deleted. \n"); + return output.toString(); } - private void rmdirOnHDFS(String path) throws IOException { - Path externalDataPath = new Path(path); - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); + private void rmdirOnHDFS(List<String> paths) throws IOException { + for (String path : paths) { + Path externalDataPath = new Path(path); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + } } } - public void setIntermediateTableIdentity(String tableIdentity) { - setParam("oldHiveTable", tableIdentity); + public void setIntermediateTables(List<String> tableIdentity) { + setParam("oldHiveTables", StringUtil.join(tableIdentity, ",")); } - private String getIntermediateTableIdentity() { - return getParam("oldHiveTable"); + private List<String> getIntermediateTables() { + List<String> intermediateTables = Lists.newArrayList(); + String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ","); + for (String t : tables) { + intermediateTables.add(t); + } + return intermediateTables; } - public void setExternalDataPath(String externalDataPath) { - setParam("externalDataPath", externalDataPath); + public void setExternalDataPaths(List<String> externalDataPaths) { + setParam("externalDataPaths", StringUtil.join(externalDataPaths, ",")); } - private String getExternalDataPath() { - return getParam("externalDataPath"); + private List<String> getExternalDataPaths() { + String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ","); + List<String> result = Lists.newArrayList(); + for (String s : paths) { + result.add(s); + } + return result; } public void setHiveViewIntermediateTableIdentities(String tableIdentities) { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 4ca60c5..b2c5360 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -20,15 +20,9 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.UUID; -import javax.annotation.Nullable; - -import com.google.common.collect.Sets; -import org.apache.hadoop.fs.FileSystem; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -36,10 +30,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.HiveCmdBuilder; -import org.apache.kylin.cube.CubeDescManager; -import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; @@ -49,23 +39,16 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; -import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.metadata.model.JoinDesc; -import org.apache.kylin.metadata.model.JoinTableDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.CreateFlatHiveTableStep; import org.apache.kylin.source.hive.HiveMRInput; @@ -75,8 +58,6 @@ import org.apache.kylin.source.kafka.job.MergeOffsetStep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide.materializeViewHql; - public class KafkaMRInput implements IMRInput { private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class); @@ -103,13 +84,11 @@ public class KafkaMRInput implements IMRInput { private final CubeSegment cubeSegment; private final JobEngineConfig conf; private String delimiter; - private static final Logger logger = LoggerFactory.getLogger(KafkaTableInputFormat.class); public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { this.cubeSegment = cubeSegment; this.conf = conf; this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); - //delimiter="|"; //fixme } @Override @@ -136,266 +115,142 @@ public class KafkaMRInput implements IMRInput { public static class BatchCubingInputSide implements IMRBatchCubingInputSide { - final JobEngineConfig conf; final CubeSegment seg; - CubeDesc cubeDesc ; - KylinConfig config; - private String outputPath; - protected IJoinedFlatTableDesc flatDesc;// - final protected String hiveTableDatabase;// - final protected String hiveIntermediateTableDatabase1;// - final protected String hdfsWorkingDir;// - - String hiveViewIntermediateTables = "";// + private CubeDesc cubeDesc ; + private KylinConfig config; + protected IJoinedFlatTableDesc flatDesc; + protected String hiveTableDatabase; + private List<String> intermediateTables = Lists.newArrayList(); + private List<String> intermediatePaths = Lists.newArrayList(); public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) { - config = KylinConfig.getInstanceFromEnv();// - this.flatDesc = flatDesc;// - this.hiveTableDatabase = config.getHiveDatabaseForStreamingTable();// - this.hiveIntermediateTableDatabase1 = config.getHiveDatabaseForIntermediateTable();// - this.hdfsWorkingDir = config.getHdfsWorkingDirectory();// - this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + config = seg.getConfig(); + this.flatDesc = flatDesc; + this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable(); this.seg = seg; + this.cubeDesc = seg.getCubeDesc(); } - //这下面7个方法都是我新加的 - protected void addStepPhase1_DoCreateHiveTable(DefaultChainedExecutable jobFlow) { - final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); - outputPath = getJobWorkingDir(jobFlow);/// - - jobFlow.addTask(createHiveTable(hiveInitStatements, cubeName)); - } - - protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { - final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveIntermediateTableDatabase1); - final String jobWorkingDir = getJobWorkingDir(jobFlow); - - //change hdfspath - ///outputPath = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();/// - jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); - } - - protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { - String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId()); - if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) { - checkAndCreateWorkDir(jobWorkingDir); - } - return jobWorkingDir; - } + @Override + public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { - private void checkAndCreateWorkDir(String jobWorkingDir) { - try { - Path path = new Path(jobWorkingDir); - FileSystem fileSystem = HadoopUtil.getFileSystem(path); - if (!fileSystem.exists(path)) { - logger.info("Create jobWorkDir : " + jobWorkingDir); - fileSystem.mkdirs(path); - } - } catch (IOException e) { - logger.error("Could not create lookUp table dir : " + jobWorkingDir); + boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0; + final String baseLocation = getJobWorkingDir(jobFlow); + if (onlyOneTable) { + // directly use flat table name + final String intermediateFactTable = flatDesc.getTableName(); + jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + intermediateFactTable)); + jobFlow.addTask(createMockFactTable(intermediateFactTable, baseLocation)); + intermediateTables.add(intermediateFactTable); + intermediatePaths.add(baseLocation + "/" + intermediateFactTable); + } else { + final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_" + + seg.getUuid().replaceAll("-", "_") + "_fact"; + jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName)); + jobFlow.addTask(createMockFactTable(mockFactTableName, baseLocation)); + jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation)); + intermediateTables.add(flatDesc.getTableName()); + intermediateTables.add(mockFactTableName); + intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName()); + intermediatePaths.add(baseLocation + "/" + mockFactTableName); } } + private AbstractExecutable createFlatTable(String mockRootTableName, String baseLocation) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); - private AbstractExecutable createHiveTable(String hiveInitStatements, - String cubeName) { - final String dropTableHql = JoinedFlatTable.generateDropTableStatement1(flatDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement1(flatDesc, outputPath); - - CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); - step.setInitStatement(hiveInitStatements); - step.setCreateTableStatement(dropTableHql + createTableHql); - CubingExecutableUtil.setCubeName(cubeName, step.getParams()); - step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE); - return step; - } - - private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, - String cubeName) { - - //TableRef tableRef = flatDesc.getDataModel().getRootFactTable(); - //change table name and columns - //ColumnDesc columnDesc = new ColumnDesc(); - //TableDesc tableDesc = tableRef.getTableDesc(); - //tableDesc.setName(flatDesc.getDataModel().getRootFactTableName());///////// - //tableRef.setTableIdentity(tableDesc.getIdentity()); - - /*TblColRef tblColRef = null; - List<TblColRef> tblColRefList = Lists.newArrayList(); - ColumnDesc[] columnDescs = tableDesc.getColumns(); - for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { - TblColRef col = flatDesc.getAllColumns().get(i); - col.setName(colName(col)); - columnDescs[i].setName(colName(col)); - columnDesc.setName(columnDescs[i].getName()); - tblColRef = new TblColRef(columnDesc); - tblColRef.setExpressionInSourceDB(tableRef.getAlias() + "." + tableRef.getAlias() + "_" + columnDesc.getName()); - tblColRefList.add(tblColRef); - } - CubeJoinedFlatTableDesc cubeJoinedFlatTableDesc = new CubeJoinedFlatTableDesc(seg); - cubeJoinedFlatTableDesc.setAllColumns(tblColRefList);*/ - //tblColRef.setName(tableRef.getAlias() + "_" + columnDesc.getName()); - - //tableDesc.setColumns(columnDescs); - //tableRef.setTableDesc(tableDesc); - //tableRef.setColumn(tblColRef); - /*DataModelDesc dataModelDesc = flatDesc.getDataModel(); - dataModelDesc.setRootFactTable(tableRef); - dataModelDesc.setRootFactTableName(flatDesc.getTableName());*/ - cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cubeName); - flatDesc.setTableName(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_" - + seg.getUuid().replaceAll("-", "_") + Math.round(Math.random() * 10)); - //flatDesc.setDataModel(dataModelDesc); - - //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); - String insertDataHqls = JoinedFlatTable.generateInsertDataStatement1(flatDesc); - outputPath = jobWorkingDir + "/" + flatDesc.getTableName(); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); + insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockRootTableName + " "); CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + CubingExecutableUtil.setCubeName(cubeDesc.getName(), step.getParams()); step.setInitStatement(hiveInitStatements); step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls); - CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - return step; - } - private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) { - HiveMRInput.RedistributeFlatHiveTableStep step = new HiveMRInput.RedistributeFlatHiveTableStep(); - step.setInitStatement(hiveInitStatements); - step.setIntermediateTable(flatDesc.getTableName()); - step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc)); - CubingExecutableUtil.setCubeName(cubeName, step.getParams()); - step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); return step; } - protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { - final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); - final String jobWorkingDir = getJobWorkingDir(jobFlow); - - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); - if (task != null) { - jobFlow.addTask(task); - } + protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { + return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId()); } - private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, - String jobWorkingDir) { - ShellExecutable step = new ShellExecutable(); - step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); - - KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); - TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig); - final Set<TableDesc> lookupViewsTables = Sets.newHashSet(); + private AbstractExecutable createMockFactTable(final String mockTalbeName, String baseLocation) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); + final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() { - String prj = flatDesc.getDataModel().getProject(); - for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) { - TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj); - if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) { - lookupViewsTables.add(tableDesc); + @Override + public String getTableName() { + return mockTalbeName; } - } - if (lookupViewsTables.size() == 0) { - return null; - } - - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride()); - hiveCmdBuilder.addStatement(hiveInitStatements); - for (TableDesc lookUpTableDesc : lookupViewsTables) { - String identity = lookUpTableDesc.getIdentity(); - String intermediate = lookUpTableDesc.getMaterializedName(); - if (lookUpTableDesc.isView()) { - String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir); - hiveCmdBuilder.addStatement(materializeViewHql); - hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";"; + @Override + public DataModelDesc getDataModel() { + return cubeDesc.getModel(); } - } - hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, - hiveViewIntermediateTables.length() - 1); - - step.setCmd(hiveCmdBuilder.build()); - return step; - } - - private static String colName(TblColRef col) { - return col.getTableAlias() + "_" + col.getName(); - } + @Override + public List<TblColRef> getAllColumns() { + return flatDesc.getFactColumns(); + } - @Override - public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); + @Override + public List<TblColRef> getFactColumns() { + return null; + } - //下面开始是我的 - final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); - final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); - ////String jobWorkingDir = getJobWorkingDir(jobFlow);///// - - //judge the number of tables - Set<TableRef> dimTableCache = new HashSet<>(); - DataModelDesc model = flatDesc.getDataModel(); - for (JoinTableDesc lookupDesc : model.getJoinTables()) { - JoinDesc join = lookupDesc.getJoin(); - if (join != null && join.getType().equals("") == false) { - TableRef dimTable = lookupDesc.getTableRef(); - if (!dimTableCache.contains(dimTable)) { - TblColRef[] pk = join.getPrimaryKeyColumns(); - TblColRef[] fk = join.getForeignKeyColumns(); - if (pk.length != fk.length) { - throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); - } - dimTable.getTableIdentity(); - dimTable.getAlias(); - dimTableCache.add(dimTable); - } + @Override + public int getColumnIndex(TblColRef colRef) { + return 0; } - } - if(dimTableCache.size() == 0){ - logger.info("you only have a table"); - }else{ - // create hive table first - addStepPhase1_DoCreateHiveTable(jobFlow); - //createHiveTable(hiveInitStatements, jobWorkingDir, cubeName); - //change hdfspath - //jobWorkingDir = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString(); + @Override + public SegmentRange getSegRange() { + return null; + } - // next create flat table - //addStepPhase1_DoCreateFlatTable(jobFlow); - addStepPhase1_DoCreateFlatTable(jobFlow);//addStepPhase1_DoCreateFlatTable(jobFlow); - //createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName); + @Override + public TblColRef getDistributedBy() { + return null; + } + @Override + public TblColRef getClusterBy() { + return null; + } - // then count and redistribute - if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); + @Override + public ISegment getSegment() { + return null; } + @Override + public boolean useAlias() { + return false; + } + }; + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation, JoinedFlatTable.TEXTFILE); - // special for hive - addStepPhase1_DoMaterializeLookupTable(jobFlow); - } + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropTableHql + createTableHql); + CubingExecutableUtil.setCubeName(cubeDesc.getName(), step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE); + return step; } - private MapReduceExecutable createSaveKafkaDataStep(String jobId) { - MapReduceExecutable result = new MapReduceExecutable(); - IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg); - outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) { + MapReduceExecutable result = new MapReduceExecutable(); result.setName("Save data from Kafka"); result.setMapReduceJobClass(KafkaFlatTableJob.class); JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system"); StringBuilder cmd = new StringBuilder(); jobBuilderSupport.appendMapReduceParameters(cmd); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); @@ -405,20 +260,18 @@ public class KafkaMRInput implements IMRInput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - GarbageCollectionStep step = new GarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP); - step.setDataPath(outputPath); + HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); + step.setIntermediateTables(intermediateTables); + step.setExternalDataPaths(intermediatePaths); jobFlow.addTask(step); } @Override public IMRTableInputFormat getFlatTableInputFormat() { - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable()); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns(); - - return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf); + String intermediateHiveTable = hiveTableDatabase + "." + flatDesc.getTableName(); + return new HiveMRInput.HiveTableInputFormat(intermediateHiveTable); } } @@ -443,36 +296,4 @@ public class KafkaMRInput implements IMRInput { } } - public static class GarbageCollectionStep extends AbstractExecutable { - private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - try { - rmdirOnHDFS(getDataPath()); - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return ExecuteResult.createError(e); - } - - return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n"); - } - - private void rmdirOnHDFS(String path) throws IOException { - Path externalDataPath = new Path(path); - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); - } - } - - public void setDataPath(String externalDataPath) { - setParam("dataPath", externalDataPath); - } - - private String getDataPath() { - return getParam("dataPath"); - } - - } } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 696c20c..c31132b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("margin") private long margin; + @JsonProperty("splitRows") + private int splitRows=1000000; + //"configA=1;configB=2" @JsonProperty("parserProperties") private String parserProperties; @@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity { return sb.toString(); } + + public int getSplitRows() { + return splitRows; + } + + public void setSplitRows(int splitRows) { + this.splitRows = splitRows; + } + @Override public KafkaConfig clone() { try { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index b71ca84..b4bc17b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows"; + @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); + job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows())); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name setupMapper(cube.getSegmentById(segmentId)); job.setNumReduceTasks(0); @@ -154,7 +157,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.setInputFormatClass(KafkaInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); job.setNumReduceTasks(0); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java index a3c5e62..f88fe3a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java @@ -38,6 +38,7 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.StreamingParser; @@ -63,17 +64,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl config = AbstractHadoopJob.loadKylinPropsAndMetadata(); String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); - CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); - //delimiter="|"; //fixme logger.info("Use delimiter: " + delimiter); - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable()); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns(); + final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config); + final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable()); + + final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + final List<TblColRef> allColumns = flatTableDesc.getFactColumns(); try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns); + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns); } catch (ReflectiveOperationException e) { throw new IllegalArgumentException(e); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index 3681a98..c3ed47f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -55,7 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); - final Integer spiltsSetnum = 1000;//这个参数是用来将来获取到从前台传来的数据的,现先假定一个固定值 + final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS)); final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); @@ -83,14 +83,14 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { long new_start = startOffsetMap.get(partitionId); long end = endOffsetMap.get(partitionId); while (end > new_start) { - if ((end - new_start) <= spiltsSetnum && (end > new_start)) { + if ((end - new_start) <= spiltRows && (end > new_start)) { InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end); splits.add(split); break; } else { - InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum); + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltRows); splits.add(split); - new_start += spiltsSetnum; + new_start += spiltRows; } } } -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.