tmp

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6f2ae5ce
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6f2ae5ce
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6f2ae5ce

Branch: refs/heads/KYLIN-1971
Commit: 6f2ae5ceb33ee8117f1565c541733b2355359933
Parents: e119cc9
Author: Yang Li <liy...@apache.org>
Authored: Sat Oct 22 19:28:27 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sat Oct 22 19:28:27 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   | 19 +++++++------
 .../kylin/job/dataGen/FactTableGenerator.java   | 11 ++++---
 .../kylin/cube/CubeCapabilityChecker.java       | 12 ++------
 .../org/apache/kylin/cube/model/CubeDesc.java   |  5 +---
 .../kylin/cube/model/v1_4_0/CubeDesc.java       |  7 +----
 .../kylin/metadata/model/DataModelDesc.java     | 10 +++----
 .../kylin/metadata/model/PartitionDesc.java     | 23 ++-------------
 .../apache/kylin/metadata/model/TableRef.java   |  6 ++++
 .../apache/kylin/source/kafka/KafkaMRInput.java | 30 +++++++-------------
 9 files changed, 43 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java 
b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index be9b2a9..4a24ad2 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,7 +18,13 @@
 
 package org.apache.kylin.job;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -37,6 +43,7 @@ import 
org.apache.kylin.job.streaming.StreamingTableDataGenerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
 import org.apache.kylin.source.hive.HiveCmdBuilder;
@@ -48,9 +55,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-
 public class DeployUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(DeployUtil.class);
 
@@ -146,16 +150,13 @@ public class DeployUtil {
     public static void prepareTestDataForStreamingCube(long startTime, long 
endTime, int numberOfRecords, String cubeName, StreamDataLoader 
streamDataLoader) throws IOException {
         CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = 
StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, 
cubeInstance.getFactTable());
-        TableDesc tableDesc = cubeInstance.getFactTableDesc();
         //load into kafka
         streamDataLoader.loadIntoKafka(data);
         logger.info("Write {} messages into {}", data.size(), 
streamDataLoader.toString());
 
         //csv data for H2 use
-        List<TblColRef> tableColumns = Lists.newArrayList();
-        for (ColumnDesc columnDesc : tableDesc.getColumns()) {
-            tableColumns.add(columnDesc.getRef());
-        }
+        TableRef factTable = cubeInstance.getDataModelDesc().getFactTableRef();
+        List<TblColRef> tableColumns = 
Lists.newArrayList(factTable.getColumns());
         TimedJsonStreamParser timedJsonStreamParser = new 
TimedJsonStreamParser(tableColumns, null);
         StringBuilder sb = new StringBuilder();
         for (String json : data) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java 
b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 5aac4de..ce5d8c4 100644
--- 
a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ 
b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -555,14 +555,13 @@ public class FactTableGenerator {
     }
 
     private LinkedList<String> createRow(TreeMap<String, String> 
factTableCol2LookupCol, TreeSet<String> usedCols, TreeSet<String> 
defaultColumns) throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
         LinkedList<String> columnValues = new LinkedList<String>();
 
         long currentRowTime = -1;
 
-        for (ColumnDesc cDesc : 
MetadataManager.getInstance(config).getTableDesc(factTableName).getColumns()) {
+        for (TblColRef col : 
cube.getDataModelDesc().getFactTableRef().getColumns()) {
 
-            String colName = cDesc.getName();
+            String colName = col.getName();
 
             if (factTableCol2LookupCol.containsKey(colName)) {
 
@@ -572,15 +571,15 @@ public class FactTableGenerator {
                 columnValues.add(candidates.get(r.nextInt(candidates.size())));
             } else if (usedCols.contains(colName)) {
                 // if the current column is a metric or dimension column in 
fact table
-                columnValues.add(createCell(cDesc));
+                columnValues.add(createCell(col.getColumnDesc()));
             } else {
 
                 // otherwise this column is not useful in OLAP
-                columnValues.add(createDefaultsCell(cDesc.getTypeName()));
+                
columnValues.add(createDefaultsCell(col.getColumnDesc().getTypeName()));
                 defaultColumns.add(colName);
             }
 
-            if 
(cDesc.getRef().equals(this.cube.getDescriptor().getModel().getPartitionDesc().getPartitionDateColumnRef()))
 {
+            if 
(col.equals(cube.getDataModelDesc().getPartitionDesc().getPartitionDateColumnRef()))
 {
                 currentRowTime = 
format.parse(columnValues.get(columnValues.size() - 1)).getTime();
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 1eada16..24f15e0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -25,22 +25,18 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.CapabilityResult;
-import org.apache.kylin.metadata.realization.SQLDigest;
 import 
org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,11 +78,7 @@ public class CubeCapabilityChecker {
             } else {
                 //deal with query on lookup table, like 
https://issues.apache.org/jira/browse/KYLIN-2030
                 if 
(cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) {
-                    TableDesc tableDesc = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(digest.factTable);
-                    Set<TblColRef> dimCols = Sets.newHashSet();
-                    for (ColumnDesc columnDesc : tableDesc.getColumns()) {
-                        dimCols.add(columnDesc.getRef());
-                    }
+                    Set<TblColRef> dimCols = 
Sets.newHashSet(cube.getDataModelDesc().getFactTableRef().getColumns());
                     tryDimensionAsMeasures(unmatchedAggregations, digest, 
cube, result, dimCols);
                 } else {
                     logger.info("Skip tryDimensionAsMeasures because current 
cube {} does not touch lookup table {} at all", cube.getName(), 
digest.factTable);

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index bfb51d0..5c73f21 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -61,7 +61,6 @@ import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -785,9 +784,7 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
     }
 
     private TblColRef initDimensionColRef(DimensionDesc dim, String colName) {
-        TableRef table = dim.getTableRef();
-        TblColRef col = table.getColumn(colName);
-        checkArgument(col != null, "No column '%s' found in table %s", 
colName, table);
+        TblColRef col = model.findColumn(dim.getTable(), colName);
 
         // always use FK instead PK, FK could be shared by more than one 
lookup tables
         JoinDesc join = dim.getJoin();

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java
index 24c9ceb..b5df1bf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/v1_4_0/CubeDesc.java
@@ -607,12 +607,7 @@ public class CubeDesc extends RootPersistentEntity {
     }
 
     private TblColRef initDimensionColRef(DimensionDesc dim, String colName) {
-        TableDesc table = dim.getTableDesc();
-        ColumnDesc col = table.findColumnByName(colName);
-        if (col == null)
-            throw new IllegalArgumentException("No column '" + colName + "' 
found in table " + table);
-
-        TblColRef ref = col.getRef();
+        TblColRef ref = model.findColumn(dim.getTable(), colName);
 
         // always use FK instead PK, FK could be shared by more than one 
lookup tables
         JoinDesc join = dim.getJoin();

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
index d1df410..0d37bb8 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java
@@ -263,9 +263,9 @@ public class DataModelDesc extends RootPersistentEntity {
         tableNameMap.clear();
         
         initTableAlias(tables);
-        initJoinColumns(tables);
+        initJoinColumns();
         ModelDimensionDesc.capicalizeStrings(dimensions);
-        initPartitionDesc(tables);
+        initPartitionDesc();
     }
 
     private void initTableAlias(Map<String, TableDesc> tables) {
@@ -315,12 +315,12 @@ public class DataModelDesc extends RootPersistentEntity {
         }
     }
 
-    private void initPartitionDesc(Map<String, TableDesc> tables) {
+    private void initPartitionDesc() {
         if (this.partitionDesc != null)
-            this.partitionDesc.init(tables);
+            this.partitionDesc.init(this);
     }
 
-    private void initJoinColumns(Map<String, TableDesc> tables) {
+    private void initJoinColumns() {
         for (LookupDesc lookup : this.lookups) {
             TableRef dimTable = lookup.getTableRef();
             JoinDesc join = lookup.getJoin();

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 6487bfa..1006b83 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -31,8 +30,6 @@ import 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
- * @author xduo
- * 
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class PartitionDesc {
@@ -66,28 +63,12 @@ public class PartitionDesc {
     private TblColRef partitionDateColumnRef;
     private IPartitionConditionBuilder partitionConditionBuilder;
 
-    public void init(Map<String, TableDesc> tables) {
+    public void init(DataModelDesc model) {
         if (StringUtils.isEmpty(partitionDateColumn))
             return;
 
         partitionDateColumn = partitionDateColumn.toUpperCase();
-
-        String[] columns = StringSplitter.split(partitionDateColumn, ".");
-
-        if (null != columns && columns.length == 3) {
-            String tableName = columns[0].toUpperCase() + "." + 
columns[1].toUpperCase();
-
-            TableDesc table = tables.get(tableName);
-            ColumnDesc col = table.findColumnByName(columns[2]);
-            if (col != null) {
-                partitionDateColumnRef = new TblColRef(col);
-            } else {
-                throw new IllegalStateException("The column '" + 
partitionDateColumn + "' provided in 'partition_date_column' doesn't exist.");
-            }
-        } else {
-            throw new IllegalStateException("The 'partition_date_column' 
format is invalid: " + partitionDateColumn + ", it should be 
{db}.{table}.{column}.");
-        }
-
+        partitionDateColumnRef = model.findColumn(partitionDateColumn);
         partitionConditionBuilder = (IPartitionConditionBuilder) 
ClassUtil.newInstance(partitionConditionBuilderClz);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 626fb16..975094c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
@@ -64,6 +66,10 @@ public class TableRef {
         return columns.get(name);
     }
     
+    public Collection<TblColRef> getColumns() {
+        return Collections.unmodifiableCollection(columns.values());
+    }
+    
     @Override
     public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/kylin/blob/6f2ae5ce/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 4d1f5c9..54d8c92 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -17,10 +17,10 @@
 */
 package org.apache.kylin.source.kafka;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import org.apache.kylin.metadata.model.ISegment;
-import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -38,20 +38,18 @@ import 
org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 import org.apache.kylin.source.kafka.job.SeekOffsetStep;
 import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public class KafkaMRInput implements IMRInput {
 
@@ -59,7 +57,7 @@ public class KafkaMRInput implements IMRInput {
 
     @Override
     public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        this.cubeSegment = (CubeSegment)flatDesc.getSegment();
+        this.cubeSegment = (CubeSegment) flatDesc.getSegment();
         return new BatchCubingInputSide(cubeSegment);
     }
 
@@ -67,14 +65,8 @@ public class KafkaMRInput implements IMRInput {
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
         KafkaConfigManager kafkaConfigManager = 
KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv());
         KafkaConfig kafkaConfig = 
kafkaConfigManager.getKafkaConfig(table.getIdentity());
-        List<TblColRef> columns = 
Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, 
TblColRef>() {
-            @Nullable
-            @Override
-            public TblColRef apply(ColumnDesc input) {
-                return input.getRef();
-            }
-        });
-
+        TableRef tableRef = 
cubeSegment.getCubeInstance().getDataModelDesc().findTable(table.getIdentity());
+        List<TblColRef> columns = Lists.newArrayList(tableRef.getColumns());
         return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, 
null);
     }
 

Reply via email to