tmp
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6f2ae5ce Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6f2ae5ce Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6f2ae5ce Branch: refs/heads/KYLIN-1971 Commit: 6f2ae5ceb33ee8117f1565c541733b2355359933 Parents: e119cc9 Author: Yang Li <liy...@apache.org> Authored: Sat Oct 22 19:28:27 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sat Oct 22 19:28:27 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 19 +++++++------ .../kylin/job/dataGen/FactTableGenerator.java | 11 ++++--- .../kylin/cube/CubeCapabilityChecker.java | 12 ++------ .../org/apache/kylin/cube/model/CubeDesc.java | 5 +--- .../kylin/cube/model/v1_4_0/CubeDesc.java | 7 +---- .../kylin/metadata/model/DataModelDesc.java | 10 +++---- .../kylin/metadata/model/PartitionDesc.java | 23 ++------------- .../apache/kylin/metadata/model/TableRef.java | 6 ++++ .../apache/kylin/source/kafka/KafkaMRInput.java | 30 +++++++------------- 9 files changed, 43 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index be9b2a9..4a24ad2 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -18,7 +18,13 @@ package org.apache.kylin.job; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; @@ -37,6 +43,7 @@ import org.apache.kylin.job.streaming.StreamingTableDataGenerator; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveClient; import org.apache.kylin.source.hive.HiveCmdBuilder; @@ -48,9 +55,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -import kafka.message.Message; -import kafka.message.MessageAndOffset; - public class DeployUtil { private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class); @@ -146,16 +150,13 @@ public class DeployUtil { public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable()); - TableDesc tableDesc = cubeInstance.getFactTableDesc(); //load into kafka streamDataLoader.loadIntoKafka(data); logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString()); //csv data for H2 use - List<TblColRef> tableColumns = Lists.newArrayList(); - for (ColumnDesc columnDesc : tableDesc.getColumns()) { - tableColumns.add(columnDesc.getRef()); - } + TableRef factTable = cubeInstance.getDataModelDesc().getFactTableRef(); + List<TblColRef> tableColumns = Lists.newArrayList(factTable.getColumns()); TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null); StringBuilder sb = new StringBuilder(); for (String json : data) { http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java index 5aac4de..ce5d8c4 100644 --- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java +++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java @@ -555,14 +555,13 @@ public class FactTableGenerator { } private LinkedList<String> createRow(TreeMap<String, String> factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> defaultColumns) throws Exception { - KylinConfig config = KylinConfig.getInstanceFromEnv(); LinkedList<String> columnValues = new LinkedList<String>(); long currentRowTime = -1; - for (ColumnDesc cDesc : MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) { + for (TblColRef col : cube.getDataModelDesc().getFactTableRef().getColumns()) { - String colName = cDesc.getName(); + String colName = col.getName(); if (factTableCol2LookupCol.containsKey(colName)) { @@ -572,15 +571,15 @@ public class FactTableGenerator { columnValues.add(candidates.get(r.nextInt(candidates.size()))); } else if (usedCols.contains(colName)) { // if the current column is a metric or dimension column in fact table - columnValues.add(createCell(cDesc)); + columnValues.add(createCell(col.getColumnDesc())); } else { // otherwise this column is not useful in OLAP - columnValues.add(createDefaultsCell(cDesc.getTypeName())); + columnValues.add(createDefaultsCell(col.getColumnDesc().getTypeName())); defaultColumns.add(colName); } - if (cDesc.getRef().equals(this.cube.getDescriptor().getModel().getPartitionDesc().getPartitionDateColumnRef())) { + if (col.equals(cube.getDataModelDesc().getPartitionDesc().getPartitionDateColumnRef())) { currentRowTime = format.parse(columnValues.get(columnValues.size() - 1)).getTime(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index 1eada16..24f15e0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -25,22 +25,18 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.basic.BasicMeasureType; -import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.CapabilityResult; -import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; +import org.apache.kylin.metadata.realization.SQLDigest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,11 +78,7 @@ public class CubeCapabilityChecker { } else { //deal with query on lookup table, like https://issues.apache.org/jira/browse/KYLIN-2030 if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) { - TableDesc tableDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(digest.factTable); - Set<TblColRef> dimCols = Sets.newHashSet(); - for (ColumnDesc columnDesc : tableDesc.getColumns()) { - dimCols.add(columnDesc.getRef()); - } + Set<TblColRef> dimCols = Sets.newHashSet(cube.getDataModelDesc().getFactTableRef().getColumns()); tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result, dimCols); } else { logger.info("Skip tryDimensionAsMeasures because current cube {} does not touch lookup table {} at all", cube.getName(), digest.factTable); http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index bfb51d0..5c73f21 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -61,7 +61,6 @@ import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -785,9 +784,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } private TblColRef initDimensionColRef(DimensionDesc dim, String colName) { - TableRef table = dim.getTableRef(); - TblColRef col = table.getColumn(colName); - checkArgument(col != null, "No column '%s' found in table %s", colName, table); + TblColRef col = model.findColumn(dim.getTable(), colName); // always use FK instead PK, FK could be shared by more than one lookup tables JoinDesc join = dim.getJoin(); http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java index 24c9ceb..b5df1bf 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java @@ -607,12 +607,7 @@ public class CubeDesc extends RootPersistentEntity { } private TblColRef initDimensionColRef(DimensionDesc dim, String colName) { - TableDesc table = dim.getTableDesc(); - ColumnDesc col = table.findColumnByName(colName); - if (col == null) - throw new IllegalArgumentException("No column '" + colName + "' found in table " + table); - - TblColRef ref = col.getRef(); + TblColRef ref = model.findColumn(dim.getTable(), colName); // always use FK instead PK, FK could be shared by more than one lookup tables JoinDesc join = dim.getJoin(); http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java index d1df410..0d37bb8 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java @@ -263,9 +263,9 @@ public class DataModelDesc extends RootPersistentEntity { tableNameMap.clear(); initTableAlias(tables); - initJoinColumns(tables); + initJoinColumns(); ModelDimensionDesc.capicalizeStrings(dimensions); - initPartitionDesc(tables); + initPartitionDesc(); } private void initTableAlias(Map<String, TableDesc> tables) { @@ -315,12 +315,12 @@ public class DataModelDesc extends RootPersistentEntity { } } - private void initPartitionDesc(Map<String, TableDesc> tables) { + private void initPartitionDesc() { if (this.partitionDesc != null) - this.partitionDesc.init(tables); + this.partitionDesc.init(this); } - private void initJoinColumns(Map<String, TableDesc> tables) { + private void initJoinColumns() { for (LookupDesc lookup : this.lookups) { TableRef dimTable = lookup.getTableRef(); JoinDesc join = lookup.getJoin(); http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java index 6487bfa..1006b83 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.StringSplitter; import org.apache.kylin.metadata.datatype.DataType; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -31,8 +30,6 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; /** - * @author xduo - * */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class PartitionDesc { @@ -66,28 +63,12 @@ public class PartitionDesc { private TblColRef partitionDateColumnRef; private IPartitionConditionBuilder partitionConditionBuilder; - public void init(Map<String, TableDesc> tables) { + public void init(DataModelDesc model) { if (StringUtils.isEmpty(partitionDateColumn)) return; partitionDateColumn = partitionDateColumn.toUpperCase(); - - String[] columns = StringSplitter.split(partitionDateColumn, "."); - - if (null != columns && columns.length == 3) { - String tableName = columns[0].toUpperCase() + "." + columns[1].toUpperCase(); - - TableDesc table = tables.get(tableName); - ColumnDesc col = table.findColumnByName(columns[2]); - if (col != null) { - partitionDateColumnRef = new TblColRef(col); - } else { - throw new IllegalStateException("The column '" + partitionDateColumn + "' provided in 'partition_date_column' doesn't exist."); - } - } else { - throw new IllegalStateException("The 'partition_date_column' format is invalid: " + partitionDateColumn + ", it should be {db}.{table}.{column}."); - } - + partitionDateColumnRef = model.findColumn(partitionDateColumn); partitionConditionBuilder = (IPartitionConditionBuilder) ClassUtil.newInstance(partitionConditionBuilderClz); } http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java index 626fb16..975094c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java @@ -18,6 +18,8 @@ package org.apache.kylin.metadata.model; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import com.google.common.collect.Maps; @@ -64,6 +66,10 @@ public class TableRef { return columns.get(name); } + public Collection<TblColRef> getColumns() { + return Collections.unmodifiableCollection(columns.values()); + } + @Override public boolean equals(Object o) { if (this == o) http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 4d1f5c9..54d8c92 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -17,10 +17,10 @@ */ package org.apache.kylin.source.kafka; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -38,20 +38,18 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; import org.apache.kylin.source.kafka.job.MergeOffsetStep; import org.apache.kylin.source.kafka.job.SeekOffsetStep; import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; +import com.google.common.collect.Lists; public class KafkaMRInput implements IMRInput { @@ -59,7 +57,7 @@ public class KafkaMRInput implements IMRInput { @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { - this.cubeSegment = (CubeSegment)flatDesc.getSegment(); + this.cubeSegment = (CubeSegment) flatDesc.getSegment(); return new BatchCubingInputSide(cubeSegment); } @@ -67,14 +65,8 @@ public class KafkaMRInput implements IMRInput { public IMRTableInputFormat getTableInputFormat(TableDesc table) { KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); - List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() { - @Nullable - @Override - public TblColRef apply(ColumnDesc input) { - return input.getRef(); - } - }); - + TableRef tableRef = cubeSegment.getCubeInstance().getDataModelDesc().findTable(table.getIdentity()); + List<TblColRef> columns = Lists.newArrayList(tableRef.getColumns()); return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null); }