Repository: kylin
Updated Branches:
  refs/heads/master ddec049a6 -> c8efa5483


http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
new file mode 100644
index 0000000..dfc46b6
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctHiveColumnsMapper2<KEYIN> extends 
FactDistinctColumnsMapperBase2<KEYIN, Object> {
+
+    protected boolean collectStatistics = false;
+    protected CuboidScheduler cuboidScheduler = null;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private HashFunction hf = null;
+    private int rowCount = 0;
+    private int samplingPercentage;
+    private ByteArray[] row_hashcodes = null;
+    private ByteBuffer keyBuffer;
+    private static final Text EMPTY_TEXT = new Text();
+    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
+    public static final byte MARK_FOR_HLL = (byte) 0xFF;
+
+    private int partitionColumnIndex = -1;
+    private boolean needFetchPartitionCol = true;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        keyBuffer = ByteBuffer.allocate(4096);
+        collectStatistics = 
Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+        if (collectStatistics) {
+            samplingPercentage = 
Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+            cuboidScheduler = new CuboidScheduler(cubeDesc);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            List<Long> cuboidIdList = Lists.newArrayList();
+            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+            allCuboidsBitSet = allCuboidsBitSetList.toArray(new 
Integer[cuboidIdList.size()][]);
+            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+            allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new 
HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+            }
+
+            hf = Hashing.murmur3_32();
+            row_hashcodes = new ByteArray[nRowKey];
+            for (int i = 0; i < nRowKey; i++) {
+                row_hashcodes[i] = new ByteArray();
+            }
+
+            TblColRef partitionColRef = 
cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            if (partitionColRef != null) {
+                partitionColumnIndex = 
intermediateTableDesc.getColumnIndex(partitionColRef);
+            }
+
+            // check whether need fetch the partition col values
+            if (partitionColumnIndex < 0) {
+                // if partition col not on cube, no need
+                needFetchPartitionCol = false;
+            } else {
+                for (int x : dictionaryColumnIndex) {
+                    if (x == partitionColumnIndex) {
+                        // if partition col already build dict, no need
+                        needFetchPartitionCol = false;
+                        break;
+                    }
+                }
+            }
+
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> 
allCuboidsBitSet, List<Long> allCuboids) {
+        allCuboids.add(cuboidId);
+        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < nRowKey; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+        Collection<Long> children = 
cuboidScheduler.getSpanningCuboid(cuboidId);
+        for (Long childId : children) {
+            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+        }
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+
+        keyBuffer.clear();
+        try {
+            for (int i = 0; i < factDictCols.size(); i++) {
+                String fieldValue = row[dictionaryColumnIndex[i]];
+                if (fieldValue == null)
+                    continue;
+                int offset = keyBuffer.position();
+                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+                keyBuffer.put(Bytes.toBytes(fieldValue));
+                outputKey.set(keyBuffer.array(), offset, keyBuffer.position() 
- offset);
+                sortableKey.setText(outputKey);
+                //judge type
+                DataType type = factDictCols.get(i).getType();
+                if (!type.isNumberFamily()) {
+                    sortableKey.setTypeId((byte) 
TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+                } else if (type.isIntegerFamily()) {
+                    sortableKey.setTypeId((byte) 
TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+                } else {
+                    sortableKey.setTypeId((byte) 
TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+                }
+                context.write(sortableKey, EMPTY_TEXT);
+            }
+        } catch (Exception ex) {
+            handleErrorRecord(row, ex);
+        }
+
+        if (collectStatistics) {
+            if (rowCount < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (needFetchPartitionCol == true) {
+                String fieldValue = row[partitionColumnIndex];
+                if (fieldValue != null) {
+                    int offset = keyBuffer.position();
+                    keyBuffer.put(MARK_FOR_PARTITION_COL);
+                    keyBuffer.put(Bytes.toBytes(fieldValue));
+                    outputKey.set(keyBuffer.array(), offset, 
keyBuffer.position() - offset);
+                    sortableKey.setText(outputKey);
+                    sortableKey.setTypeId((byte) 0);
+                    context.write(sortableKey, EMPTY_TEXT);
+                }
+            }
+        }
+
+        if (rowCount++ == 100)
+            rowCount = 0;
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = 
row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+            } else {
+                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+            }
+        }
+
+        // user the row key column hash to get a consolidated hash for each 
cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; 
position++) {
+                
hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            HyperLogLogPlusCounter hll;
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = allCuboidsHLL[i];
+
+                keyBuffer.clear();
+                keyBuffer.put(MARK_FOR_HLL); // one byte
+                keyBuffer.putLong(cuboidIds[i]);
+                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                sortableKey.setText(outputKey);
+                sortableKey.setTypeId((byte) 0);
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                context.write(sortableKey, outputValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
new file mode 100644
index 0000000..cadbcbf
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ * Created by xiefan on 16-11-1.
+ */
+public class SelfDefineSortableKey implements 
WritableComparable<SelfDefineSortableKey> {
+
+    private byte typeId; //non-numeric(0000 0000) int(0000 0001) other 
numberic(0000 0010)
+
+    private Text text;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SelfDefineSortableKey.class);
+
+    public SelfDefineSortableKey() {
+    }
+
+    public SelfDefineSortableKey(byte typeId, Text text) {
+        this.typeId = typeId;
+        this.text = text;
+    }
+
+    @Override
+    public int compareTo(SelfDefineSortableKey o) {
+        if (!o.isNumberFamily()) {
+            return this.text.compareTo(o.text);
+        } else {
+            byte[] data1 = this.text.getBytes();
+            byte[] data2 = o.text.getBytes();
+            String str1 = new String(data1, 1, data1.length - 1);
+            String str2 = new String(data2, 1, data2.length - 1);
+            if (str1 == null || str1.equals("") || str2 == null || 
str2.equals("")) {
+                //should not achieve here
+                logger.error("none numeric value!");
+                return 0;
+            }
+            if (o.isIntegerFamily()) {  //integer type
+                try {
+                    Long num1 = Long.parseLong(str1);
+                    Long num2 = Long.parseLong(str2);
+                    return num1.compareTo(num2);
+                } catch (NumberFormatException e) {
+                    System.out.println("NumberFormatException when parse 
integer family number.str1:" + str1 + " str2:" + str2);
+                    logger.error("NumberFormatException when parse integer 
family number.str1:" + str1 + " str2:" + str2);
+                    e.printStackTrace();
+                    return 0;
+                }
+            } else {  //other numeric type
+                try {
+                    Double num1 = Double.parseDouble(str1);
+                    Double num2 = Double.parseDouble(str2);
+                    return num1.compareTo(num2);
+                } catch (NumberFormatException e) {
+                    System.out.println("NumberFormatException when parse 
double family number.str1:" + str1 + " str2:" + str2);
+                    logger.error("NumberFormatException when parse doul family 
number.str1:" + str1 + " str2:" + str2);
+                    //e.printStackTrace();
+                    return 0;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        dataOutput.writeByte(typeId);
+        text.write(dataOutput);
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        dataInput.readByte();
+        text.readFields(dataInput);
+    }
+
+    public short getTypeId() {
+        return typeId;
+    }
+
+    public Text getText() {
+        return text;
+    }
+
+    public boolean isNumberFamily() {
+        if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) return false;
+        return true;
+    }
+
+    public boolean isIntegerFamily() {
+        return (typeId == TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+    }
+
+    public boolean isOtherNumericFamily() {
+        return (typeId == TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+    }
+
+    public void setTypeId(byte typeId) {
+        this.typeId = typeId;
+    }
+
+    public void setText(Text text) {
+        this.text = text;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
new file mode 100644
index 0000000..c69acfd
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public enum TypeFlag {
+    NONE_NUMERIC_TYPE,
+    INTEGER_FAMILY_TYPE,
+    DOUBLE_FAMILY_TYPE
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
new file mode 100644
index 0000000..70197ac
--- /dev/null
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
@@ -0,0 +1,214 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dict.NumberDictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.NumberDictionaryForest;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryForestBuilder;
+import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey;
+import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+
+
+public class NumberDictionaryForestTest {
+    @Test
+    public void testNumberDictionaryForestLong(){
+        List<String> list = randomLongData(10);
+        testData(list,TypeFlag.INTEGER_FAMILY_TYPE);
+    }
+
+    @Test
+    public void testNumberDictionaryForestDouble(){
+        List<String> list = randomDoubleData(10);
+
+        testData(list,TypeFlag.DOUBLE_FAMILY_TYPE);
+    }
+
+    private void testData(List<String> list,TypeFlag flag){
+        //stimulate map-reduce job
+        ArrayList<SelfDefineSortableKey> keyList = 
createKeyList(list,(byte)flag.ordinal());
+        Collections.sort(keyList);
+        //build tree
+        NumberDictionaryForestBuilder<String> b = new 
NumberDictionaryForestBuilder<String>(
+                new StringBytesConverter(),0);
+        TrieDictionaryForestBuilder.MaxTrieTreeSize = 0;
+        for(SelfDefineSortableKey key : keyList){
+            String fieldValue = printKey(key);
+            b.addValue(fieldValue);
+        }
+        NumberDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+        ArrayList<Integer> resultIds = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String fieldValue = getFieldValue(key);
+            resultIds.add(dict.getIdFromValue(fieldValue));
+            
assertEquals(fieldValue,dict.getValueFromId(dict.getIdFromValue(fieldValue)));
+        }
+        assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        }));
+    }
+
+    @Test
+    public void serializeTest() {
+        List<String> testData = new ArrayList<>();
+        testData.add("1");
+        testData.add("2");
+        testData.add("100");
+        //TrieDictionaryForestBuilder.MaxTrieTreeSize = 0;
+        NumberDictionaryForestBuilder<String> b = new 
NumberDictionaryForestBuilder<String>(new StringBytesConverter());
+        for(String str : testData)
+            b.addValue(str);
+        NumberDictionaryForest<String> dict = b.build();
+        dict = testSerialize(dict);
+        dict.dump(System.out);
+        for (String str : testData) {
+            assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str)));
+        }
+    }
+
+    @Test
+    public void testVerySmallDouble(){
+        List<String> testData = new ArrayList<>();
+        testData.add(-1.0+"");
+        testData.add(Double.MIN_VALUE+"");
+        testData.add("1.01");
+        testData.add("2.0");
+        NumberDictionaryForestBuilder<String> b = new 
NumberDictionaryForestBuilder<String>(new StringBytesConverter());
+        for(String str : testData)
+            b.addValue(str);
+        NumberDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+
+        NumberDictionaryBuilder<String> b2 = new NumberDictionaryBuilder<>(new 
StringBytesConverter());
+        for(String str : testData)
+            b2.addValue(str);
+        NumberDictionary<String> dict2 = b2.build(0);
+        dict2.dump(System.out);
+
+    }
+
+    private static NumberDictionaryForest<String> 
testSerialize(NumberDictionaryForest<String> dict) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new 
ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            NumberDictionaryForest<String> r = new NumberDictionaryForest<>();
+            //r.dump(System.out);
+            r.readFields(datain);
+            //r.dump(System.out);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<String> randomLongData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(rand.nextLong()+"");
+        }
+        list.add(Long.MAX_VALUE+"");
+        list.add(Long.MIN_VALUE+"");
+        return list;
+    }
+
+    private List<String> randomDoubleData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(rand.nextDouble()+"");
+        }
+        list.add("-1");
+        return list;
+    }
+
+    private List<String> randomStringData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(UUID.randomUUID().toString());
+        }
+        list.add("123");
+        list.add("123");
+        return list;
+    }
+
+    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> 
strNumList,byte typeFlag){
+        int partationId = 0;
+        ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>();
+        for(String str : strNumList){
+            ByteBuffer keyBuffer = ByteBuffer.allocate(4096);
+            int offset = keyBuffer.position();
+            keyBuffer.put(Bytes.toBytes(partationId)[3]);
+            keyBuffer.put(Bytes.toBytes(str));
+            //System.out.println(Arrays.toString(keyBuffer.array()));
+            byte[] valueField = 
Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1);
+            //System.out.println("new string:"+new String(valueField));
+            //System.out.println("arrays 
toString:"+Arrays.toString(valueField));
+            Text outputKey = new Text();
+            
outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset);
+            SelfDefineSortableKey sortableKey = new 
SelfDefineSortableKey(typeFlag,outputKey);
+            keyList.add(sortableKey);
+        }
+        return keyList;
+    }
+
+    private String printKey(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new 
String(fieldValue));
+        return new String(fieldValue);
+    }
+
+    private String getFieldValue(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        return new String(fieldValue);
+    }
+
+    private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){
+        int flag;
+        T previous = null;
+        for(T t : list){
+            if(previous == null) previous = t;
+            else{
+                flag = comp.compare(previous,t);
+                if(flag > 0) return false;
+                previous = t;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
new file mode 100644
index 0000000..858bba4
--- /dev/null
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
@@ -0,0 +1,228 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dict.NumberDictionaryForest;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dict.TrieDictionaryForest;
+import org.apache.kylin.dict.TrieDictionaryForestBuilder;
+import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey;
+import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public class SelfDefineSortableKeyTest {
+
+    @Test
+    public void testSortLong(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Long> longList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            longList.add(rand.nextLong());
+        }
+        longList.add(0L);
+        longList.add(0L); //test duplicate
+        longList.add(-1L); //test negative number
+        longList.add(Long.MAX_VALUE);
+        longList.add(Long.MIN_VALUE);
+
+        System.out.println("test numbers:"+longList);
+        ArrayList<String> strNumList = listToStringList(longList);
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isIntegerFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new 
Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+                Long l1 = Long.parseLong(o1);
+                Long l2 = Long.parseLong(o2);
+                return l1.compareTo(l2);
+            }
+        }));
+    }
+
+    @Test
+    public void testSortDouble(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Double> doubleList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            doubleList.add(rand.nextDouble());
+        }
+        doubleList.add(0.0);
+        doubleList.add(0.0); //test duplicate
+        doubleList.add(-1.0); //test negative number
+        doubleList.add(Double.MAX_VALUE);
+        doubleList.add(-Double.MAX_VALUE);
+        //System.out.println(Double.MIN_VALUE);
+
+        System.out.println("test numbers:"+doubleList);
+        ArrayList<String> strNumList = listToStringList(doubleList);
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new 
Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+                Double d1 = Double.parseDouble(o1);
+                Double d2 = Double.parseDouble(o2);
+                return d1.compareTo(d2);
+            }
+        }));
+    }
+
+    @Test
+    public void testSortNormalString(){
+        int count = 10;
+        ArrayList<String> strList = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            UUID uuid = UUID.randomUUID();
+            strList.add(uuid.toString());
+        }
+        strList.add("hello");
+        strList.add("hello"); //duplicate
+        strList.add("123");
+        strList.add("");
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, 
(byte)TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new 
Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+               return o1.compareTo(o2);
+            }
+        }));
+    }
+
+    @Test
+    public void testIllegalNumber(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Double> doubleList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            doubleList.add(rand.nextDouble());
+        }
+        doubleList.add(0.0);
+        doubleList.add(0.0); //test duplicate
+        doubleList.add(-1.0); //test negative number
+        doubleList.add(Double.MAX_VALUE);
+        doubleList.add(-Double.MAX_VALUE);
+        //System.out.println(Double.MIN_VALUE);
+
+        System.out.println("test numbers:"+doubleList);
+        ArrayList<String> strNumList = listToStringList(doubleList);
+        strNumList.add("fjaeif"); //illegal type
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, 
(byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        for(SelfDefineSortableKey key : keyList){
+            printKey(key);
+        }
+
+    }
+
+    @Test
+    public void testEnum(){
+        TypeFlag flag = TypeFlag.DOUBLE_FAMILY_TYPE;
+        System.out.println((byte)flag.ordinal());
+        int t = (byte)flag.ordinal();
+        System.out.println(t);
+    }
+
+
+
+    private<T> ArrayList<String> listToStringList(ArrayList<T> list){
+        ArrayList<String> strList = new ArrayList<>();
+        for(T t : list){
+            System.out.println(t.toString());
+            strList.add(t.toString());
+        }
+        return strList;
+    }
+
+    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> 
strNumList,byte typeFlag){
+        int partationId = 0;
+        ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>();
+        for(String str : strNumList){
+            ByteBuffer keyBuffer = ByteBuffer.allocate(4096);
+            int offset = keyBuffer.position();
+            keyBuffer.put(Bytes.toBytes(partationId)[3]);
+            keyBuffer.put(Bytes.toBytes(str));
+            //System.out.println(Arrays.toString(keyBuffer.array()));
+            byte[] valueField = 
Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1);
+            //System.out.println("new string:"+new String(valueField));
+            //System.out.println("arrays 
toString:"+Arrays.toString(valueField));
+            Text outputKey = new Text();
+            
outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset);
+            SelfDefineSortableKey sortableKey = new 
SelfDefineSortableKey(typeFlag,outputKey);
+            keyList.add(sortableKey);
+        }
+        return keyList;
+    }
+
+    private String printKey(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new 
String(fieldValue));
+        return new String(fieldValue);
+    }
+
+    private String getFieldValue(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        return new String(fieldValue);
+    }
+
+    private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){
+        int flag;
+        T previous = null;
+        for(T t : list){
+            if(previous == null) previous = t;
+            else{
+                flag = comp.compare(previous,t);
+                if(flag > 0) return false;
+                previous = t;
+            }
+        }
+        return true;
+    }
+
+
+}

Reply via email to