This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch KYLIN-3378 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d37c236ee9865846fed4909eec983f7644f6fd35 Author: GinaZhai <na.z...@kyligence.io> AuthorDate: Fri May 25 18:25:48 2018 +0800 KYLIN-3378 Kafka join with hive Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../org/apache/kylin/common/KylinConfigBase.java | 39 ++++- .../org/apache/kylin/common/util/BasicTest.java | 3 +- .../kylin/cube/model/CubeJoinedFlatTableDesc.java | 24 ++- .../cube/model/CubeJoinedFlatTableEnrich.java | 10 ++ .../model/validation/rule/StreamingCubeRule.java | 11 -- .../java/org/apache/kylin/job/JoinedFlatTable.java | 44 ++--- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/metadata/model/IJoinedFlatTableDesc.java | 4 + .../org/apache/kylin/source/hive/HiveMRInput.java | 68 +++++--- source-kafka/pom.xml | 4 + .../apache/kylin/source/kafka/KafkaMRInput.java | 194 ++++++++++++++------- .../org/apache/kylin/source/kafka/KafkaSource.java | 5 +- .../kylin/source/kafka/config/KafkaConfig.java | 12 ++ .../source/kafka/hadoop/KafkaFlatTableJob.java | 7 +- .../source/kafka/hadoop/KafkaFlatTableMapper.java | 24 ++- .../source/kafka/hadoop/KafkaInputFormat.java | 18 +- .../apache/kylin/source/kafka/SpiltNumTest.java | 163 +++++++++++++++++ 17 files changed, 490 insertions(+), 141 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 689d08f..cdb3755 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -225,7 +225,11 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.env", "DEV"); } - private String cachedHdfsWorkingDirectory; + static public String cachedHdfsWorkingDirectory;////// + + public void setHdfsWorkingDirectory(String cachedHdfsWorkingDirectory){/////// + this.cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory; + } public String getHdfsWorkingDirectory() { if (cachedHdfsWorkingDirectory != null) @@ -260,6 +264,39 @@ abstract public class KylinConfigBase implements Serializable { return cachedHdfsWorkingDirectory; } + public String getHdfsWorkingDirectory(String cachedHdfsWorkingDirectory) {// + if (cachedHdfsWorkingDirectory != null) + return cachedHdfsWorkingDirectory; + + String root = getOptional("kylin.env.hdfs-working-dir", "/kylin"); + + Path path = new Path(root); + if (!path.isAbsolute()) + throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root); + + // make sure path is qualified + try { + FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); + path = fs.makeQualified(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // append metadata-url prefix + root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString(); + + if (!root.endsWith("/")) + root += "/"; + + cachedHdfsWorkingDirectory = root; + if (cachedHdfsWorkingDirectory.startsWith("file:")) { + cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("file:", "file://"); + } else if (cachedHdfsWorkingDirectory.startsWith("maprfs:")) { + cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("maprfs:", "maprfs://"); + } + return cachedHdfsWorkingDirectory; + } + public String getZookeeperBasePath() { return getOptional("kylin.env.zookeeper-base-path", "/kylin"); } diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 1c1e389..6ae238b 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -231,7 +231,8 @@ public class BasicTest { String[] origin = new String[] {"ab,c", "cd|e"}; - String delimiter = "\u001F"; // "\t"; + // test with sequence file default delimiter + String delimiter = "\01"; //"\u001F"; "\t"; String concated = StringUtils.join(Arrays.asList(origin), delimiter); String[] newValues = concated.split(delimiter); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index d50a5af..70ad13e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -42,8 +42,8 @@ import com.google.common.collect.Maps; @SuppressWarnings("serial") public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable { - protected final String tableName; - protected final CubeDesc cubeDesc; + protected String tableName;///// + protected final CubeDesc cubeDesc;/// protected final CubeSegment cubeSegment; protected final boolean includingDerived; @@ -135,6 +135,18 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab } } + @Override + public List<TblColRef> getFactColumns() { + final List<TblColRef> factColumns = Lists.newArrayList(); + for (TblColRef col : this.getAllColumns()) { + if (col.getTableRef().equals(getDataModel().getRootFactTable())) { + // only fetch the columns from fact table + factColumns.add(col); + } + } + return factColumns; + } + // sanity check the input record (in bytes) matches what's expected public void sanityCheck(BytesSplitter bytesSplitter) { if (columnCount != bytesSplitter.getBufferSize()) { @@ -171,6 +183,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab @Override public SegmentRange getSegRange() { + if (cubeSegment.isOffsetCube()) { + return null; + } return cubeSegment.getSegRange(); } @@ -185,6 +200,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab } @Override + public boolean useAlias() { + return true; + } + + @Override public TblColRef getClusterBy() { return cubeDesc.getClusteredByColumn(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 73da802..f09478e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -105,6 +105,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ } @Override + public List<TblColRef> getFactColumns() { + return flatDesc.getFactColumns(); + } + + @Override public DataModelDesc getDataModel() { return flatDesc.getDataModel(); } @@ -130,6 +135,11 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ } @Override + public boolean useAlias() { + return flatDesc.useAlias(); + } + + @Override public TblColRef getClusterBy() { return flatDesc.getClusterBy(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java index 4438706..647f4c1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java @@ -24,7 +24,6 @@ import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ResultLevel; import org.apache.kylin.cube.model.validation.ValidateContext; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TblColRef; @@ -49,16 +48,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> { return; } - if (model.getLookupTables().size() > 0) { - context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed."); - return; - } - - if (cube.getEngineType() == IEngineAware.ID_SPARK) { - context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead."); - return; - } - if (model.getPartitionDesc() == null || model.getPartitionDesc().getPartitionDateColumn() == null) { context.addResult(ResultLevel.ERROR, "Must define a partition column."); return; diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 528bcf0..0769dcf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -44,6 +44,8 @@ import org.apache.kylin.metadata.model.TblColRef; public class JoinedFlatTable { + public static final String TEXTFILE = "TEXTFILE"; + public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { return storageDfsDir + "/" + flatDesc.getTableName(); } @@ -61,13 +63,13 @@ public class JoinedFlatTable { } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat) { + String storageFormat) { String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter(); return generateCreateTableStatement(flatDesc, storageDfsDir, storageFormat, fieldDelimiter); } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat, String fieldDelimiter) { + String storageFormat, String fieldDelimiter) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@ -78,10 +80,10 @@ public class JoinedFlatTable { if (i > 0) { ddl.append(","); } - ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n"); + ddl.append(colName(col, flatDesc.useAlias()) + " " + getHiveDataType(col.getDatatype()) + "\n"); } ddl.append(")" + "\n"); - if ("TEXTFILE".equals(storageFormat)) { + if (TEXTFILE.equals(storageFormat)) { ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\" + fieldDelimiter + "'\n"); } ddl.append("STORED AS " + storageFormat + "\n"); @@ -96,6 +98,12 @@ public class JoinedFlatTable { return ddl.toString(); } + public static String generateDropTableStatement1(IJoinedFlatTableDesc flatDesc) { + StringBuilder ddl = new StringBuilder(); + ddl.append("DROP TABLE IF EXISTS " + flatDesc.getDataModel().getRootFactTableName() + ";").append("\n"); + return ddl.toString(); + }/// + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) { CubeSegment segment = ((CubeSegment) flatDesc.getSegment()); KylinConfig kylinConfig; @@ -120,11 +128,6 @@ public class JoinedFlatTable { + ";\n"; } - public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) { - return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) - + ";\n"; - } - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { return generateSelectDataStatement(flatDesc, false, null); } @@ -146,7 +149,7 @@ public class JoinedFlatTable { if (skipAsList.contains(colTotalName)) { sql.append(col.getExpressionInSourceDB() + sep); } else { - sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); + sql.append(col.getExpressionInSourceDB() + " as " + colName(col, true) + sep); } } appendJoinStatement(flatDesc, sql, singleLine); @@ -154,15 +157,6 @@ public class JoinedFlatTable { return sql.toString(); } - public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) { - final StringBuilder sql = new StringBuilder(); - final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable(); - sql.append("dfs -mkdir -p " + outputDir + ";\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + rootTbl.getTableIdentity() - + " " + rootTbl.getAlias() + "\n"); - appendWhereStatement(flatDesc, sql); - return sql.toString(); - } public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { final String sep = singleLine ? " " : "\n"; @@ -175,7 +169,6 @@ public class JoinedFlatTable { for (JoinTableDesc lookupDesc : model.getJoinTables()) { JoinDesc join = lookupDesc.getJoin(); if (join != null && join.getType().equals("") == false) { - String joinType = join.getType().toUpperCase(); TableRef dimTable = lookupDesc.getTableRef(); if (!dimTableCache.contains(dimTable)) { TblColRef[] pk = join.getPrimaryKeyColumns(); @@ -183,6 +176,8 @@ public class JoinedFlatTable { if (pk.length != fk.length) { throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); } + String joinType = join.getType().toUpperCase(); + sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); sql.append("ON "); for (int i = 0; i < pk.length; i++) { @@ -201,7 +196,7 @@ public class JoinedFlatTable { private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) { if (redistCol != null) { - sql.append(" DISTRIBUTE BY ").append(colName(redistCol)).append(";\n"); + sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n"); } else { sql.append(" DISTRIBUTE BY RAND()").append(";\n"); } @@ -243,8 +238,13 @@ public class JoinedFlatTable { sql.append(whereBuilder.toString()); } + private static String colName(TblColRef col) { - return col.getTableAlias() + "_" + col.getName(); + return colName(col, true); + } + + private static String colName(TblColRef col, boolean useAlias) { + return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName(); } private static String getHiveDataType(String javaDataType) { diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index b9a3651..42f0dbf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -34,6 +34,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; + public static final String STEP_NAME_CREATE_HIVE_TABLE = "Create Hive Table"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index 0589829..8f86a52 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc { List<TblColRef> getAllColumns(); + List<TblColRef> getFactColumns(); + int getColumnIndex(TblColRef colRef); SegmentRange getSegRange(); @@ -41,4 +43,6 @@ public interface IJoinedFlatTableDesc { // optionally present ISegment getSegment(); + boolean useAlias(); + } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index a96f4d5..0e791eb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +34,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -284,8 +286,8 @@ public class HiveMRInput implements IMRInput { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); - step.setIntermediateTableIdentity(getIntermediateTableIdentity()); - step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)); + step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity())); + step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir))); step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables); jobFlow.addTask(step); } @@ -435,42 +437,58 @@ public class HiveMRInput implements IMRInput { private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { StringBuffer output = new StringBuffer(); - final String hiveTable = this.getIntermediateTableIdentity(); - if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); - hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); - config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); - output.append("Hive table " + hiveTable + " is dropped. \n"); - rmdirOnHDFS(getExternalDataPath()); - output.append( - "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final List<String> hiveTables = this.getIntermediateTables(); + for (String hiveTable : hiveTables) { + if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { + hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); + hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); + + output.append("Hive table " + hiveTable + " is dropped. \n"); + } } + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + rmdirOnHDFS(getExternalDataPaths()); + output.append( + "Path " + getExternalDataPaths() + " is deleted. \n"); + return output.toString(); } - private void rmdirOnHDFS(String path) throws IOException { - Path externalDataPath = new Path(path); - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); + private void rmdirOnHDFS(List<String> paths) throws IOException { + for (String path : paths) { + Path externalDataPath = new Path(path); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + } } } - public void setIntermediateTableIdentity(String tableIdentity) { - setParam("oldHiveTable", tableIdentity); + public void setIntermediateTables(List<String> tableIdentity) { + setParam("oldHiveTables", StringUtil.join(tableIdentity, ",")); } - private String getIntermediateTableIdentity() { - return getParam("oldHiveTable"); + private List<String> getIntermediateTables() { + List<String> intermediateTables = Lists.newArrayList(); + String[] tables = StringUtil.splitAndTrim(getParam("oldHiveTables"), ","); + for (String t : tables) { + intermediateTables.add(t); + } + return intermediateTables; } - public void setExternalDataPath(String externalDataPath) { - setParam("externalDataPath", externalDataPath); + public void setExternalDataPaths(List<String> externalDataPaths) { + setParam("externalDataPaths", StringUtil.join(externalDataPaths, ",")); } - private String getExternalDataPath() { - return getParam("externalDataPath"); + private List<String> getExternalDataPaths() { + String[] paths = StringUtil.splitAndTrim(getParam("externalDataPaths"), ","); + List<String> result = Lists.newArrayList(); + for (String s : paths) { + result.add(s); + } + return result; } public void setHiveViewIntermediateTableIdentities(String tableIdentities) { diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index 2ef4cdf..55df7f0 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -66,5 +66,9 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-source-hive</artifactId> + </dependency> </dependencies> </project> diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 223e303..f37bf50 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.kafka; import java.io.IOException; @@ -22,7 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import org.apache.hadoop.fs.FileSystem; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; @@ -41,16 +41,17 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.hive.CreateFlatHiveTableStep; +import org.apache.kylin.source.hive.HiveMRInput; import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; import org.apache.kylin.source.kafka.job.MergeOffsetStep; import org.slf4j.Logger; @@ -58,18 +59,19 @@ import org.slf4j.LoggerFactory; public class KafkaMRInput implements IMRInput { - CubeSegment cubeSegment; + private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class); + private CubeSegment cubeSegment; @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { this.cubeSegment = (CubeSegment) flatDesc.getSegment(); - return new BatchCubingInputSide(cubeSegment); + return new BatchCubingInputSide(cubeSegment, flatDesc); } @Override public IMRTableInputFormat getTableInputFormat(TableDesc table) { - return new KafkaTableInputFormat(cubeSegment, null, null, null); + return new KafkaTableInputFormat(cubeSegment, null); } @Override @@ -80,12 +82,11 @@ public class KafkaMRInput implements IMRInput { public static class KafkaTableInputFormat implements IMRTableInputFormat { private final CubeSegment cubeSegment; private final JobEngineConfig conf; - private final String delimiter; + private String delimiter = "\01"; - public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { + public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) { this.cubeSegment = cubeSegment; this.conf = conf; - this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); } @Override @@ -114,30 +115,132 @@ public class KafkaMRInput implements IMRInput { final JobEngineConfig conf; final CubeSegment seg; - private String outputPath; - - public BatchCubingInputSide(CubeSegment seg) { + private CubeDesc cubeDesc ; + private KylinConfig config; + protected IJoinedFlatTableDesc flatDesc; + protected String hiveTableDatabase; + private List<String> intermediateTables = Lists.newArrayList(); + private List<String> intermediatePaths = Lists.newArrayList(); + private String cubeName; + + public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) { this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + this.config = seg.getConfig(); + this.flatDesc = flatDesc; + this.hiveTableDatabase = config.getHiveDatabaseForIntermediateTable(); this.seg = seg; + this.cubeDesc = seg.getCubeDesc(); + this.cubeName = seg.getCubeInstance().getName(); } @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); + + boolean onlyOneTable = cubeDesc.getModel().getLookupTables().size() == 0; + final String baseLocation = getJobWorkingDir(jobFlow); + if (onlyOneTable) { + // directly use flat table location + final String intermediateFactTable = flatDesc.getTableName(); + final String tableLocation = baseLocation + "/" + intermediateFactTable; + jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), tableLocation)); + intermediatePaths.add(tableLocation); + } else { + final String mockFactTableName = MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeName.toLowerCase() + "_" + + seg.getUuid().replaceAll("-", "_") + "_fact"; + jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), baseLocation + "/" + mockFactTableName)); + jobFlow.addTask(createFlatTable(mockFactTableName, baseLocation)); + } + } + private AbstractExecutable createFlatTable(final String mockFactTableName, String baseLocation) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); + + final IJoinedFlatTableDesc mockfactDesc = new IJoinedFlatTableDesc() { + + @Override + public String getTableName() { + return mockFactTableName; + } + + @Override + public DataModelDesc getDataModel() { + return cubeDesc.getModel(); + } + + @Override + public List<TblColRef> getAllColumns() { + return flatDesc.getFactColumns(); + } + + @Override + public List<TblColRef> getFactColumns() { + return null; + } + + @Override + public int getColumnIndex(TblColRef colRef) { + return 0; + } + + @Override + public SegmentRange getSegRange() { + return null; + } + + @Override + public TblColRef getDistributedBy() { + return null; + } + + @Override + public TblColRef getClusterBy() { + return null; + } + + @Override + public ISegment getSegment() { + return null; + } + + @Override + public boolean useAlias() { + return false; + } + }; + final String dropFactTableHql = JoinedFlatTable.generateDropTableStatement(mockfactDesc); + final String createFactTableHql = JoinedFlatTable.generateCreateTableStatement(mockfactDesc, baseLocation); + + + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, baseLocation); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); + insertDataHqls = insertDataHqls.replace(flatDesc.getDataModel().getRootFactTableName() + " ", mockFactTableName + " "); + + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropFactTableHql + createFactTableHql + dropTableHql + createTableHql + insertDataHqls); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + + intermediateTables.add(flatDesc.getTableName()); + intermediateTables.add(mockFactTableName); + intermediatePaths.add(baseLocation + "/" + flatDesc.getTableName()); + intermediatePaths.add(baseLocation + "/" + mockFactTableName); + return step; } - private MapReduceExecutable createSaveKafkaDataStep(String jobId) { - MapReduceExecutable result = new MapReduceExecutable(); + protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { + return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId()); + } - IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg); - outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + private MapReduceExecutable createSaveKafkaDataStep(String jobId, String location) { + MapReduceExecutable result = new MapReduceExecutable(); result.setName("Save data from Kafka"); result.setMapReduceJobClass(KafkaFlatTableJob.class); JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system"); StringBuilder cmd = new StringBuilder(); jobBuilderSupport.appendMapReduceParameters(cmd); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, location); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); @@ -147,20 +250,17 @@ public class KafkaMRInput implements IMRInput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - GarbageCollectionStep step = new GarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP); - step.setDataPath(outputPath); + HiveMRInput.GarbageCollectionStep step = new HiveMRInput.GarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); + step.setIntermediateTables(intermediateTables); + step.setExternalDataPaths(intermediatePaths); jobFlow.addTask(step); } @Override public IMRTableInputFormat getFlatTableInputFormat() { - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getCubeInstance().getRootFactTable()); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns(); - - return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf); + return new KafkaTableInputFormat(seg, conf); } } @@ -178,43 +278,11 @@ public class KafkaMRInput implements IMRInput { final MergeOffsetStep result = new MergeOffsetStep(); result.setName("Merge offset step"); - CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setCubeName(cubeSegment.getCubeInstance().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); jobFlow.addTask(result); } } - public static class GarbageCollectionStep extends AbstractExecutable { - private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - try { - rmdirOnHDFS(getDataPath()); - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return ExecuteResult.createError(e); - } - - return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n"); - } - - private void rmdirOnHDFS(String path) throws IOException { - Path externalDataPath = new Path(path); - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); - } - } - - public void setDataPath(String externalDataPath) { - setParam("dataPath", externalDataPath); - } - - private String getDataPath() { - return getParam("dataPath"); - } - - } } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 0ab83c6..1d65b96 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; @@ -224,7 +225,9 @@ public class KafkaSource implements ISource { public List<String> getRelatedKylinResources(TableDesc table) { List<String> dependentResources = Lists.newArrayList(); dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); - dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + if (table.getSourceType() == ISourceAware.ID_STREAMING) { + dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + } return dependentResources; } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index 696c20c..c31132b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -67,6 +67,9 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("margin") private long margin; + @JsonProperty("splitRows") + private int splitRows=1000000; + //"configA=1;configB=2" @JsonProperty("parserProperties") private String parserProperties; @@ -157,6 +160,15 @@ public class KafkaConfig extends RootPersistentEntity { return sb.toString(); } + + public int getSplitRows() { + return splitRows; + } + + public void setSplitRows(int splitRows) { + this.splitRows = splitRows; + } + @Override public KafkaConfig clone() { try { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index b71ca84..e106a0a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -25,7 +25,7 @@ import java.util.Map; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -64,6 +64,8 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + public static final String CONFIG_KAFKA_SPLIT_ROWS = "kafka.split.rows"; + @Override public int run(String[] args) throws Exception { Options options = new Options(); @@ -111,6 +113,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); + job.getConfiguration().set(CONFIG_KAFKA_SPLIT_ROWS, String.valueOf(kafkaConfig.getSplitRows())); job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name setupMapper(cube.getSegmentById(segmentId)); job.setNumReduceTasks(0); @@ -152,7 +155,7 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.setMapperClass(KafkaFlatTableMapper.class); job.setInputFormatClass(KafkaInputFormat.class); - job.setOutputKeyClass(NullWritable.class); + job.setOutputKeyClass(BytesWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setNumReduceTasks(0); diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java index 9fe29ca..b452b12 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -38,14 +37,18 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.StreamingParser; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> { +public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, BytesWritable, Text> { - private NullWritable outKey = NullWritable.get(); + private BytesWritable outKey = new BytesWritable(); + private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class); private Text outValue = new Text(); private KylinConfig config; private CubeSegment cubeSegment; @@ -60,15 +63,18 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl config = AbstractHadoopJob.loadKylinPropsAndMetadata(); String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); - CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + final CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); - this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable()); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns(); + this.delimiter = "\01";//sequence file default delimiter + logger.info("Use delimiter: " + delimiter); + final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config); + final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable()); + + final IJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + final List<TblColRef> allColumns = flatTableDesc.getFactColumns(); try { - streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), columns); + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getAllParserProperties(), allColumns); } catch (ReflectiveOperationException e) { throw new IllegalArgumentException(e); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index c996c5f..c3ed47f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); + final Integer spiltRows = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_SPLIT_ROWS)); final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); @@ -79,9 +80,18 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { throw new IllegalStateException("Partition '" + partitionId + "' not exists."); } - if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { - InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)); - splits.add(split); + long new_start = startOffsetMap.get(partitionId); + long end = endOffsetMap.get(partitionId); + while (end > new_start) { + if ((end - new_start) <= spiltRows && (end > new_start)) { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end); + splits.add(split); + break; + } else { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltRows); + splits.add(split); + new_start += spiltRows; + } } } } @@ -93,4 +103,4 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { return new KafkaInputRecordReader(); } -} +} \ No newline at end of file diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java new file mode 100644 index 0000000..9dfb641 --- /dev/null +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.kafka; + +import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit; +import org.junit.Assert; +import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.InputSplit; + +import com.google.common.collect.Maps; + +public class SpiltNumTest { + + public static List<InputSplit> getSplits() { + + final String brokers = "brokers"; + final String inputTopic = "topic"; + final Integer spiltsSetnum = 10; + + final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); + final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); + startOffsetMap.put(0, Long.valueOf(0)); + endOffsetMap.put(0, Long.valueOf(15)); + startOffsetMap.put(1, Long.valueOf(4)); + endOffsetMap.put(1, Long.valueOf(26)); + startOffsetMap.put(2, Long.valueOf(15)); + endOffsetMap.put(2, Long.valueOf(47)); + startOffsetMap.put(3, Long.valueOf(39)); + endOffsetMap.put(3, Long.valueOf(41)); + + final List<InputSplit> splits = new ArrayList<InputSplit>(); + for (int i = 0; i < 4; i++) { + int partitionId = i; + long new_start = startOffsetMap.get(partitionId); + long end = endOffsetMap.get(partitionId); + while (end > new_start) { + if ((end - new_start) <= spiltsSetnum && (end > new_start)) { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end); + splits.add(split); + break; + } else { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum); + splits.add(split); + new_start += spiltsSetnum; + } + } + } + return splits; + } + + @Test + public void testSpiltNum(){ + int slen = 0; + List<InputSplit> splits = getSplits(); + slen = splits.size(); + Assert.assertEquals(slen, 10); + } + + @Test + public void testSpilt(){ + boolean flag = false; + boolean flag1 = false; + boolean flag2 = false; + boolean flag3 = false; + boolean flag4 = false; + boolean flag5 = false; + boolean flag6 = false; + boolean flag7 = false; + boolean flag8 = false; + boolean flag9 = false; + boolean flag10 = false; + boolean result = false; + List<InputSplit> splits = getSplits(); + for(Object eachspilt : splits){ + flag = eachspilt.toString().contains("brokers-topic-0-0-10"); + if(flag){ + break; + } + } + for(Object eachspilt : splits){ + flag1 = eachspilt.toString().contains("brokers-topic-0-10-15"); + if(flag1){ + break; + } + } + for(Object eachspilt : splits){ + flag2 = eachspilt.toString().contains("brokers-topic-1-4-14"); + if(flag2){ + break; + } + } + for(Object eachspilt : splits){ + flag3 = eachspilt.toString().contains("brokers-topic-1-14-24"); + if(flag3){ + break; + } + } + for(Object eachspilt : splits){ + flag4 = eachspilt.toString().contains("brokers-topic-1-24-26"); + if(flag4){ + break; + } + } + for(Object eachspilt : splits){ + flag5 = eachspilt.toString().contains("brokers-topic-2-15-25"); + if(flag5) { + break; + } + } + for(Object eachspilt : splits){ + flag6 = eachspilt.toString().contains("brokers-topic-2-25-35"); + if(flag6){ + break; + } + } + for(Object eachspilt : splits){ + flag7 = eachspilt.toString().contains("brokers-topic-2-35-45"); + if(flag7){ + break; + } + } + for(Object eachspilt : splits){ + flag8 = eachspilt.toString().contains("brokers-topic-2-45-47"); + if(flag8){ + break; + } + } + for(Object eachspilt : splits){ + flag9 = eachspilt.toString().contains("brokers-topic-3-39-41"); + if(flag9){ + break; + } + } + for(Object eachspilt : splits){ + flag10 = eachspilt.toString().contains("brokers-topic-0-4-47"); + if(flag10){ + break; + } + } + result = flag && flag1 && flag2 && flag3 && flag4 && flag5 && flag6 && flag7 && flag8 && flag9; + Assert.assertTrue(result); + Assert.assertNotEquals(flag10, true); + } +} -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.