This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch KYLIN-3369 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6d623664ad513bd5a2ad7e9694cdd70f5ef7e6f6 Author: GinaZhai <na.z...@kyligence.io> AuthorDate: Fri May 25 18:25:48 2018 +0800 KYLIN-3369 Kafka join with hive Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../org/apache/kylin/common/KylinConfigBase.java | 43 +++- .../org/apache/kylin/common/util/BasicTest.java | 12 + .../kylin/cube/model/CubeJoinedFlatTableDesc.java | 18 +- .../cube/model/CubeJoinedFlatTableEnrich.java | 11 + .../model/validation/rule/StreamingCubeRule.java | 5 - .../java/org/apache/kylin/job/JoinedFlatTable.java | 167 ++++++++++++- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/metadata/model/IJoinedFlatTableDesc.java | 9 + .../org/apache/kylin/metadata/model/TblColRef.java | 15 +- examples/test_case_data/sandbox/kylin.properties | 2 +- source-kafka/pom.xml | 4 + .../apache/kylin/source/kafka/KafkaMRInput.java | 268 ++++++++++++++++++++- .../org/apache/kylin/source/kafka/KafkaSource.java | 5 +- .../source/kafka/hadoop/KafkaFlatTableMapper.java | 5 + .../source/kafka/hadoop/KafkaInputFormat.java | 18 +- .../apache/kylin/source/kafka/SpiltNumTest.java | 163 +++++++++++++ 16 files changed, 721 insertions(+), 25 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 689d08f..5d543f5 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -225,7 +225,11 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.env", "DEV"); } - private String cachedHdfsWorkingDirectory; + static public String cachedHdfsWorkingDirectory;////// + + public void setHdfsWorkingDirectory(String cachedHdfsWorkingDirectory){/////// + this.cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory; + } public String getHdfsWorkingDirectory() { if (cachedHdfsWorkingDirectory != null) @@ -260,6 +264,39 @@ abstract public class KylinConfigBase implements Serializable { return cachedHdfsWorkingDirectory; } + public String getHdfsWorkingDirectory(String cachedHdfsWorkingDirectory) {// + if (cachedHdfsWorkingDirectory != null) + return cachedHdfsWorkingDirectory; + + String root = getOptional("kylin.env.hdfs-working-dir", "/kylin"); + + Path path = new Path(root); + if (!path.isAbsolute()) + throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root); + + // make sure path is qualified + try { + FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); + path = fs.makeQualified(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // append metadata-url prefix + root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString(); + + if (!root.endsWith("/")) + root += "/"; + + cachedHdfsWorkingDirectory = root; + if (cachedHdfsWorkingDirectory.startsWith("file:")) { + cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("file:", "file://"); + } else if (cachedHdfsWorkingDirectory.startsWith("maprfs:")) { + cachedHdfsWorkingDirectory = cachedHdfsWorkingDirectory.replace("maprfs:", "maprfs://"); + } + return cachedHdfsWorkingDirectory; + } + public String getZookeeperBasePath() { return getOptional("kylin.env.zookeeper-base-path", "/kylin"); } @@ -723,6 +760,10 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", "false")); } + public String getHiveDatabaseForStreamingTable() { + return this.getOptional("kylin.source.hive.database-for-streaming-table", "default"); + } + public String getHiveDatabaseForIntermediateTable() { return this.getOptional("kylin.source.hive.database-for-flat-table", "default"); } diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index 1c1e389..fcf302d 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -244,4 +244,16 @@ public class BasicTest { cal.setTimeInMillis(t); return dateFormat.format(cal.getTime()); } + + @Test + public void testStringSplit() { + String[] strings = new String[] {"abc", "bcd"}; + + String delimeter = ","; + String concated = StringUtils.join(Arrays.asList(strings), delimeter); + + String[] newStrings = concated.split("\\" + delimeter); + + Assert.assertEquals(strings, newStrings); + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index d50a5af..63df4aa 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -31,6 +31,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; @@ -42,10 +44,12 @@ import com.google.common.collect.Maps; @SuppressWarnings("serial") public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable { - protected final String tableName; - protected final CubeDesc cubeDesc; + protected String tableName;///// + protected final CubeDesc cubeDesc;/// protected final CubeSegment cubeSegment; protected final boolean includingDerived; + TableRef tableRef; + DataModelDesc dataModelDesc;/// private int columnCount = 0; private List<TblColRef> columnList = Lists.newArrayList(); @@ -185,6 +189,16 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab } @Override + public TableRef getTable() {///// + return tableRef; + } + + @Override + public void setTableName(String tableName) { + this.tableName = tableName; + } + + @Override public TblColRef getClusterBy() { return cubeDesc.getClusteredByColumn(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 73da802..b314cc2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -28,6 +28,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; /** @@ -130,6 +131,16 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializ } @Override + public TableRef getTable() {/// + return null; + } + + @Override + public void setTableName(String tableName) { + + } + + @Override public TblColRef getClusterBy() { return flatDesc.getClusterBy(); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java index 4438706..dab8fa4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/StreamingCubeRule.java @@ -49,11 +49,6 @@ public class StreamingCubeRule implements IValidatorRule<CubeDesc> { return; } - if (model.getLookupTables().size() > 0) { - context.addResult(ResultLevel.ERROR, "Streaming Cube doesn't support star-schema so far; only one fact table is allowed."); - return; - } - if (cube.getEngineType() == IEngineAware.ID_SPARK) { context.addResult(ResultLevel.ERROR, "Spark engine doesn't support streaming source, select MapReduce engine instead."); return; diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 528bcf0..ac38730 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -61,13 +61,13 @@ public class JoinedFlatTable { } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat) { + String storageFormat) { String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter(); return generateCreateTableStatement(flatDesc, storageDfsDir, storageFormat, fieldDelimiter); } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, - String storageFormat, String fieldDelimiter) { + String storageFormat, String fieldDelimiter) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@ -90,12 +90,53 @@ public class JoinedFlatTable { return ddl.toString(); } + public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { + String storageFormat = flatDesc.getDataModel().getConfig().getFlatTableStorageFormat(); + return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat); + } + + public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir, + String storageFormat) { + String fieldDelimiter = flatDesc.getDataModel().getConfig().getFlatTableFieldDelimiter(); + return generateCreateTableStatement1(flatDesc, storageDfsDir, storageFormat, fieldDelimiter); + } + + public static String generateCreateTableStatement1(IJoinedFlatTableDesc flatDesc, String storageDfsDir, + String storageFormat, String fieldDelimiter) { + StringBuilder ddl = new StringBuilder(); + + ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getDataModel().getRootFactTableName() + "\n");////flatDesc.getTableName() + + ddl.append("(" + "\n"); + for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { + TblColRef col = flatDesc.getAllColumns().get(i); + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + getHiveDataType(col.getDatatype()) + "\n"); + } + ddl.append(")" + "\n"); + if ("TEXTFILE".equals(storageFormat)) { + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n"); + } + ddl.append("STORED AS " + storageFormat + "\n"); + ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); + ddl.append("ALTER TABLE " + flatDesc.getDataModel().getRootFactTableName() + " SET TBLPROPERTIES('auto.purge'='true');\n"); + return ddl.toString(); + } + public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n"); return ddl.toString(); } + public static String generateDropTableStatement1(IJoinedFlatTableDesc flatDesc) { + StringBuilder ddl = new StringBuilder(); + ddl.append("DROP TABLE IF EXISTS " + flatDesc.getDataModel().getRootFactTableName() + ";").append("\n"); + return ddl.toString(); + }/// + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) { CubeSegment segment = ((CubeSegment) flatDesc.getSegment()); KylinConfig kylinConfig; @@ -120,6 +161,30 @@ public class JoinedFlatTable { + ";\n"; } + public static String generateInsertDataStatement1(IJoinedFlatTableDesc flatDesc) { + CubeSegment segment = ((CubeSegment) flatDesc.getSegment()); + KylinConfig kylinConfig; + if (null == segment) { + kylinConfig = KylinConfig.getInstanceFromEnv(); + } else { + kylinConfig = (flatDesc.getSegment()).getConfig(); + } + + if (kylinConfig.isAdvancedFlatTableUsed()) { + try { + Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass()); + Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class, + JobEngineConfig.class); + return (String) method.invoke(null, flatDesc); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement1(flatDesc) + + ";\n"; + } + public static String generateInsertPartialDataStatement(IJoinedFlatTableDesc flatDesc) { return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";\n"; @@ -129,6 +194,10 @@ public class JoinedFlatTable { return generateSelectDataStatement(flatDesc, false, null); } + public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc) { + return generateSelectDataStatement1(flatDesc, false, null); + } + public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine, String[] skipAs) { final String sep = singleLine ? " " : "\n"; @@ -154,6 +223,31 @@ public class JoinedFlatTable { return sql.toString(); } + public static String generateSelectDataStatement1(IJoinedFlatTableDesc flatDesc, boolean singleLine, + String[] skipAs) { + final String sep = singleLine ? " " : "\n"; + final List<String> skipAsList = (skipAs == null) ? new ArrayList<String>() : Arrays.asList(skipAs); + + StringBuilder sql = new StringBuilder(); + sql.append("SELECT" + sep); + + for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { + TblColRef col = flatDesc.getAllColumns().get(i); + if (i > 0) { + sql.append(","); + } + String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName()); + if (skipAsList.contains(colTotalName)) { + sql.append(col.getExpressionInSourceDB() + sep); + } else { + sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); + } + } + appendJoinStatement1(flatDesc, sql, singleLine); + appendWhereStatement1(flatDesc, sql, singleLine); + return sql.toString(); + } + public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) { final StringBuilder sql = new StringBuilder(); final TableRef rootTbl = flatDesc.getDataModel().getRootFactTable(); @@ -170,26 +264,61 @@ public class JoinedFlatTable { DataModelDesc model = flatDesc.getDataModel(); TableRef rootTable = model.getRootFactTable(); - sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep); + sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName() rootTable.getTableIdentity() for (JoinTableDesc lookupDesc : model.getJoinTables()) { JoinDesc join = lookupDesc.getJoin(); if (join != null && join.getType().equals("") == false) { - String joinType = join.getType().toUpperCase(); TableRef dimTable = lookupDesc.getTableRef(); if (!dimTableCache.contains(dimTable)) { TblColRef[] pk = join.getPrimaryKeyColumns(); TblColRef[] fk = join.getForeignKeyColumns(); if (pk.length != fk.length) { throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); + } String joinType = join.getType().toUpperCase(); + + sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); + sql.append("ON "); + for (int i = 0; i < pk.length; i++) { + if (i > 0) { + sql.append(" AND "); + } + sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB()); } + sql.append(sep); + + dimTableCache.add(dimTable); + } + } + } + } + + public static void appendJoinStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { + final String sep = singleLine ? " " : "\n"; + Set<TableRef> dimTableCache = new HashSet<>(); + + DataModelDesc model = flatDesc.getDataModel(); + TableRef rootTable = model.getRootFactTable(); + sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep);//这flatDesc.getTableName() rootTable.getTableIdentity() + + for (JoinTableDesc lookupDesc : model.getJoinTables()) { + JoinDesc join = lookupDesc.getJoin(); + if (join != null && join.getType().equals("") == false) { + TableRef dimTable = lookupDesc.getTableRef(); + if (!dimTableCache.contains(dimTable)) { + TblColRef[] pk = join.getPrimaryKeyColumns(); + TblColRef[] fk = join.getForeignKeyColumns(); + if (pk.length != fk.length) { + throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); + } String joinType = join.getType().toUpperCase(); + sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); sql.append("ON "); for (int i = 0; i < pk.length; i++) { if (i > 0) { sql.append(" AND "); } - sql.append(fk[i].getExpressionInSourceDB() + " = " + pk[i].getExpressionInSourceDB()); + sql.append(fk[i].getExpressionInSourceDB1() + " = " + pk[i].getExpressionInSourceDB()); } sql.append(sep); @@ -243,6 +372,34 @@ public class JoinedFlatTable { sql.append(whereBuilder.toString()); } + private static void appendWhereStatement1(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { + final String sep = singleLine ? " " : "\n"; + + StringBuilder whereBuilder = new StringBuilder(); + whereBuilder.append("WHERE 1=1"); + + DataModelDesc model = flatDesc.getDataModel(); + if (StringUtils.isNotEmpty(model.getFilterCondition())) { + whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") "); + } + + /*if (flatDesc.getSegment() != null) { + PartitionDesc partDesc = model.getPartitionDesc(); + if (partDesc != null && partDesc.getPartitionDateColumn() != null) { + SegmentRange segRange = flatDesc.getSegRange(); + + if (segRange != null && !segRange.isInfinite()) { + whereBuilder.append(" AND ("); + whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, + flatDesc.getSegment(), segRange)); + whereBuilder.append(")" + sep); + } + } + }*/ + + sql.append(whereBuilder.toString()); + } + private static String colName(TblColRef col) { return col.getTableAlias() + "_" + col.getName(); } diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index b9a3651..42f0dbf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -34,6 +34,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary"; + public static final String STEP_NAME_CREATE_HIVE_TABLE = "Create Hive Table"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index 0589829..46e21f3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -30,6 +30,8 @@ public interface IJoinedFlatTableDesc { List<TblColRef> getAllColumns(); + void setAllColumns(List<TblColRef> tblColRefList); + int getColumnIndex(TblColRef colRef); SegmentRange getSegRange(); @@ -41,4 +43,11 @@ public interface IJoinedFlatTableDesc { // optionally present ISegment getSegment(); + /// + TableRef getTable(); + + void setDataModel(DataModelDesc dataModelDesc); + + void setTableName(String tableName); + } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index ee33e8a..928d3d2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -31,6 +31,7 @@ import org.apache.kylin.metadata.datatype.DataType; public class TblColRef implements Serializable { private static final String INNER_TABLE_NAME = "_kylin_table"; + private String name; // used by projection rewrite, see OLAPProjectRel public enum InnerDataTypeEnum { @@ -119,7 +120,7 @@ public class TblColRef implements Serializable { private String identity; private String parserDescription; - TblColRef(ColumnDesc column) { + public TblColRef(ColumnDesc column) {///// this.column = column; } @@ -148,6 +149,10 @@ public class TblColRef implements Serializable { return column.getName(); } + public void setName(String name) { + this.name = name; + } + public TableRef getTableRef() { return table; } @@ -168,6 +173,14 @@ public class TblColRef implements Serializable { } } + public String getExpressionInSourceDB1() { + return identity; + } + + public void setExpressionInSourceDB(String identity) { + this.identity = identity; + }/// + public String getTable() { if (column.getTable() == null) { return null; diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index ae9dad2..d428d44 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -70,7 +70,7 @@ kylin.job.retry=0 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands) -kylin.job.use-remote-cli=false +kylin.job.use-remote-cli=true # Only necessary when kylin.job.use-remote-cli=true kylin.job.remote-cli-hostname=sandbox diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index 2ef4cdf..55df7f0 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -66,5 +66,9 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-source-hive</artifactId> + </dependency> </dependencies> </project> diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 223e303..4ca60c5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -14,14 +14,20 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.kafka; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; +import javax.annotation.Nullable; + +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -31,7 +37,11 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; @@ -39,6 +49,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.ExecuteException; @@ -46,24 +57,35 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.hive.CreateFlatHiveTableStep; +import org.apache.kylin.source.hive.HiveMRInput; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; import org.apache.kylin.source.kafka.job.MergeOffsetStep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide.materializeViewHql; + public class KafkaMRInput implements IMRInput { + private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class); CubeSegment cubeSegment; @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { this.cubeSegment = (CubeSegment) flatDesc.getSegment(); - return new BatchCubingInputSide(cubeSegment); + return new BatchCubingInputSide(cubeSegment, flatDesc); } @Override @@ -80,12 +102,14 @@ public class KafkaMRInput implements IMRInput { public static class KafkaTableInputFormat implements IMRTableInputFormat { private final CubeSegment cubeSegment; private final JobEngineConfig conf; - private final String delimiter; + private String delimiter; + private static final Logger logger = LoggerFactory.getLogger(KafkaTableInputFormat.class); public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { this.cubeSegment = cubeSegment; this.conf = conf; this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); + //delimiter="|"; //fixme } @Override @@ -114,16 +138,250 @@ public class KafkaMRInput implements IMRInput { final JobEngineConfig conf; final CubeSegment seg; + CubeDesc cubeDesc ; + KylinConfig config; private String outputPath; - - public BatchCubingInputSide(CubeSegment seg) { + protected IJoinedFlatTableDesc flatDesc;// + final protected String hiveTableDatabase;// + final protected String hiveIntermediateTableDatabase1;// + final protected String hdfsWorkingDir;// + + String hiveViewIntermediateTables = "";// + + public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc flatDesc) { + config = KylinConfig.getInstanceFromEnv();// + this.flatDesc = flatDesc;// + this.hiveTableDatabase = config.getHiveDatabaseForStreamingTable();// + this.hiveIntermediateTableDatabase1 = config.getHiveDatabaseForIntermediateTable();// + this.hdfsWorkingDir = config.getHdfsWorkingDirectory();// this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); this.seg = seg; } + //这下面7个方法都是我新加的 + protected void addStepPhase1_DoCreateHiveTable(DefaultChainedExecutable jobFlow) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); + outputPath = getJobWorkingDir(jobFlow);/// + + jobFlow.addTask(createHiveTable(hiveInitStatements, cubeName)); + } + + protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveIntermediateTableDatabase1); + final String jobWorkingDir = getJobWorkingDir(jobFlow); + + //change hdfspath + ///outputPath = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString();/// + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); + } + + protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { + String jobWorkingDir = JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId()); + if (KylinConfig.getInstanceFromEnv().getHiveTableDirCreateFirst()) { + checkAndCreateWorkDir(jobWorkingDir); + } + return jobWorkingDir; + } + + private void checkAndCreateWorkDir(String jobWorkingDir) { + try { + Path path = new Path(jobWorkingDir); + FileSystem fileSystem = HadoopUtil.getFileSystem(path); + if (!fileSystem.exists(path)) { + logger.info("Create jobWorkDir : " + jobWorkingDir); + fileSystem.mkdirs(path); + } + } catch (IOException e) { + logger.error("Could not create lookUp table dir : " + jobWorkingDir); + } + } + + private AbstractExecutable createHiveTable(String hiveInitStatements, + String cubeName) { + final String dropTableHql = JoinedFlatTable.generateDropTableStatement1(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement1(flatDesc, outputPath); + + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropTableHql + createTableHql); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_HIVE_TABLE); + return step; + } + + private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, + String cubeName) { + + //TableRef tableRef = flatDesc.getDataModel().getRootFactTable(); + //change table name and columns + //ColumnDesc columnDesc = new ColumnDesc(); + //TableDesc tableDesc = tableRef.getTableDesc(); + //tableDesc.setName(flatDesc.getDataModel().getRootFactTableName());///////// + //tableRef.setTableIdentity(tableDesc.getIdentity()); + + /*TblColRef tblColRef = null; + List<TblColRef> tblColRefList = Lists.newArrayList(); + ColumnDesc[] columnDescs = tableDesc.getColumns(); + for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { + TblColRef col = flatDesc.getAllColumns().get(i); + col.setName(colName(col)); + columnDescs[i].setName(colName(col)); + columnDesc.setName(columnDescs[i].getName()); + tblColRef = new TblColRef(columnDesc); + tblColRef.setExpressionInSourceDB(tableRef.getAlias() + "." + tableRef.getAlias() + "_" + columnDesc.getName()); + tblColRefList.add(tblColRef); + } + CubeJoinedFlatTableDesc cubeJoinedFlatTableDesc = new CubeJoinedFlatTableDesc(seg); + cubeJoinedFlatTableDesc.setAllColumns(tblColRefList);*/ + //tblColRef.setName(tableRef.getAlias() + "_" + columnDesc.getName()); + + //tableDesc.setColumns(columnDescs); + //tableRef.setTableDesc(tableDesc); + //tableRef.setColumn(tblColRef); + /*DataModelDesc dataModelDesc = flatDesc.getDataModel(); + dataModelDesc.setRootFactTable(tableRef); + dataModelDesc.setRootFactTableName(flatDesc.getTableName());*/ + cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cubeName); + flatDesc.setTableName(MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + cubeDesc.getName().toLowerCase() + "_" + + seg.getUuid().replaceAll("-", "_") + Math.round(Math.random() * 10)); + //flatDesc.setDataModel(dataModelDesc); + + //from hive to hive + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement1(flatDesc); + outputPath = jobWorkingDir + "/" + flatDesc.getTableName(); + + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + return step; + } + + private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) { + HiveMRInput.RedistributeFlatHiveTableStep step = new HiveMRInput.RedistributeFlatHiveTableStep(); + step.setInitStatement(hiveInitStatements); + step.setIntermediateTable(flatDesc.getTableName()); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc)); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); + return step; + } + + protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow); + + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); + if (task != null) { + jobFlow.addTask(task); + } + } + + private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, + String jobWorkingDir) { + ShellExecutable step = new ShellExecutable(); + step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); + + KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); + TableMetadataManager metadataManager = TableMetadataManager.getInstance(kylinConfig); + final Set<TableDesc> lookupViewsTables = Sets.newHashSet(); + + String prj = flatDesc.getDataModel().getProject(); + for (JoinTableDesc lookupDesc : flatDesc.getDataModel().getJoinTables()) { + TableDesc tableDesc = metadataManager.getTableDesc(lookupDesc.getTable(), prj); + if (lookupDesc.getKind() == DataModelDesc.TableKind.LOOKUP && tableDesc.isView()) { + lookupViewsTables.add(tableDesc); + } + } + + if (lookupViewsTables.size() == 0) { + return null; + } + + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.overwriteHiveProps(kylinConfig.getHiveConfigOverride()); + hiveCmdBuilder.addStatement(hiveInitStatements); + for (TableDesc lookUpTableDesc : lookupViewsTables) { + String identity = lookUpTableDesc.getIdentity(); + String intermediate = lookUpTableDesc.getMaterializedName(); + if (lookUpTableDesc.isView()) { + String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir); + hiveCmdBuilder.addStatement(materializeViewHql); + hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";"; + } + } + + hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, + hiveViewIntermediateTables.length() - 1); + + step.setCmd(hiveCmdBuilder.build()); + return step; + } + + private static String colName(TblColRef col) { + return col.getTableAlias() + "_" + col.getName(); + } + @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); + + //下面开始是我的 + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) + .getConfig(); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(hiveTableDatabase); + ////String jobWorkingDir = getJobWorkingDir(jobFlow);///// + + //judge the number of tables + Set<TableRef> dimTableCache = new HashSet<>(); + DataModelDesc model = flatDesc.getDataModel(); + for (JoinTableDesc lookupDesc : model.getJoinTables()) { + JoinDesc join = lookupDesc.getJoin(); + if (join != null && join.getType().equals("") == false) { + TableRef dimTable = lookupDesc.getTableRef(); + if (!dimTableCache.contains(dimTable)) { + TblColRef[] pk = join.getPrimaryKeyColumns(); + TblColRef[] fk = join.getForeignKeyColumns(); + if (pk.length != fk.length) { + throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); + } + dimTable.getTableIdentity(); + dimTable.getAlias(); + dimTableCache.add(dimTable); + } + } + } + if(dimTableCache.size() == 0){ + logger.info("you only have a table"); + }else{ + // create hive table first + addStepPhase1_DoCreateHiveTable(jobFlow); + //createHiveTable(hiveInitStatements, jobWorkingDir, cubeName); + + //change hdfspath + //jobWorkingDir = hdfsWorkingDir + "kylin-" + UUID.randomUUID().toString(); + + // next create flat table + //addStepPhase1_DoCreateFlatTable(jobFlow); + addStepPhase1_DoCreateFlatTable(jobFlow);//addStepPhase1_DoCreateFlatTable(jobFlow); + //createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName); + + + // then count and redistribute + if (cubeConfig.isHiveRedistributeEnabled()) { + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); + } + + + // special for hive + addStepPhase1_DoMaterializeLookupTable(jobFlow); + } } private MapReduceExecutable createSaveKafkaDataStep(String jobId) { diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 0ab83c6..1d65b96 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; @@ -224,7 +225,9 @@ public class KafkaSource implements ISource { public List<String> getRelatedKylinResources(TableDesc table) { List<String> dependentResources = Lists.newArrayList(); dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); - dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + if (table.getSourceType() == ISourceAware.ID_STREAMING) { + dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + } return dependentResources; } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java index 9fe29ca..a3c5e62 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java @@ -42,10 +42,13 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.StreamingParser; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, NullWritable, Text> { private NullWritable outKey = NullWritable.get(); + private static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableMapper.class); private Text outValue = new Text(); private KylinConfig config; private CubeSegment cubeSegment; @@ -63,6 +66,8 @@ public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritabl CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); this.cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); this.delimiter = cubeSegment.getConfig().getFlatTableFieldDelimiter(); + //delimiter="|"; //fixme + logger.info("Use delimiter: " + delimiter); KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(config); KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cubeSegment.getCubeInstance().getRootFactTable()); List<TblColRef> columns = new CubeJoinedFlatTableDesc(cubeSegment).getAllColumns(); diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index c996c5f..3681a98 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -55,6 +55,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); + final Integer spiltsSetnum = 1000;//这个参数是用来将来获取到从前台传来的数据的,现先假定一个固定值 final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); @@ -79,9 +80,18 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { throw new IllegalStateException("Partition '" + partitionId + "' not exists."); } - if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { - InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)); - splits.add(split); + long new_start = startOffsetMap.get(partitionId); + long end = endOffsetMap.get(partitionId); + while (end > new_start) { + if ((end - new_start) <= spiltsSetnum && (end > new_start)) { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end); + splits.add(split); + break; + } else { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum); + splits.add(split); + new_start += spiltsSetnum; + } } } } @@ -93,4 +103,4 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { return new KafkaInputRecordReader(); } -} +} \ No newline at end of file diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java new file mode 100644 index 0000000..9dfb641 --- /dev/null +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/SpiltNumTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.kafka; + +import org.apache.kylin.source.kafka.hadoop.KafkaInputSplit; +import org.junit.Assert; +import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.InputSplit; + +import com.google.common.collect.Maps; + +public class SpiltNumTest { + + public static List<InputSplit> getSplits() { + + final String brokers = "brokers"; + final String inputTopic = "topic"; + final Integer spiltsSetnum = 10; + + final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); + final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); + startOffsetMap.put(0, Long.valueOf(0)); + endOffsetMap.put(0, Long.valueOf(15)); + startOffsetMap.put(1, Long.valueOf(4)); + endOffsetMap.put(1, Long.valueOf(26)); + startOffsetMap.put(2, Long.valueOf(15)); + endOffsetMap.put(2, Long.valueOf(47)); + startOffsetMap.put(3, Long.valueOf(39)); + endOffsetMap.put(3, Long.valueOf(41)); + + final List<InputSplit> splits = new ArrayList<InputSplit>(); + for (int i = 0; i < 4; i++) { + int partitionId = i; + long new_start = startOffsetMap.get(partitionId); + long end = endOffsetMap.get(partitionId); + while (end > new_start) { + if ((end - new_start) <= spiltsSetnum && (end > new_start)) { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, end); + splits.add(split); + break; + } else { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, new_start, new_start + spiltsSetnum); + splits.add(split); + new_start += spiltsSetnum; + } + } + } + return splits; + } + + @Test + public void testSpiltNum(){ + int slen = 0; + List<InputSplit> splits = getSplits(); + slen = splits.size(); + Assert.assertEquals(slen, 10); + } + + @Test + public void testSpilt(){ + boolean flag = false; + boolean flag1 = false; + boolean flag2 = false; + boolean flag3 = false; + boolean flag4 = false; + boolean flag5 = false; + boolean flag6 = false; + boolean flag7 = false; + boolean flag8 = false; + boolean flag9 = false; + boolean flag10 = false; + boolean result = false; + List<InputSplit> splits = getSplits(); + for(Object eachspilt : splits){ + flag = eachspilt.toString().contains("brokers-topic-0-0-10"); + if(flag){ + break; + } + } + for(Object eachspilt : splits){ + flag1 = eachspilt.toString().contains("brokers-topic-0-10-15"); + if(flag1){ + break; + } + } + for(Object eachspilt : splits){ + flag2 = eachspilt.toString().contains("brokers-topic-1-4-14"); + if(flag2){ + break; + } + } + for(Object eachspilt : splits){ + flag3 = eachspilt.toString().contains("brokers-topic-1-14-24"); + if(flag3){ + break; + } + } + for(Object eachspilt : splits){ + flag4 = eachspilt.toString().contains("brokers-topic-1-24-26"); + if(flag4){ + break; + } + } + for(Object eachspilt : splits){ + flag5 = eachspilt.toString().contains("brokers-topic-2-15-25"); + if(flag5) { + break; + } + } + for(Object eachspilt : splits){ + flag6 = eachspilt.toString().contains("brokers-topic-2-25-35"); + if(flag6){ + break; + } + } + for(Object eachspilt : splits){ + flag7 = eachspilt.toString().contains("brokers-topic-2-35-45"); + if(flag7){ + break; + } + } + for(Object eachspilt : splits){ + flag8 = eachspilt.toString().contains("brokers-topic-2-45-47"); + if(flag8){ + break; + } + } + for(Object eachspilt : splits){ + flag9 = eachspilt.toString().contains("brokers-topic-3-39-41"); + if(flag9){ + break; + } + } + for(Object eachspilt : splits){ + flag10 = eachspilt.toString().contains("brokers-topic-0-4-47"); + if(flag10){ + break; + } + } + result = flag && flag1 && flag2 && flag3 && flag4 && flag5 && flag6 && flag7 && flag8 && flag9; + Assert.assertTrue(result); + Assert.assertNotEquals(flag10, true); + } +} -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.