Repository: kylin Updated Branches: refs/heads/master ddec049a6 -> c8efa5483
http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java new file mode 100644 index 0000000..dfc46b6 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +/** + */ +public class FactDistinctHiveColumnsMapper2<KEYIN> extends FactDistinctColumnsMapperBase2<KEYIN, Object> { + + protected boolean collectStatistics = false; + protected CuboidScheduler cuboidScheduler = null; + protected int nRowKey; + private Integer[][] allCuboidsBitSet = null; + private HyperLogLogPlusCounter[] allCuboidsHLL = null; + private Long[] cuboidIds; + private HashFunction hf = null; + private int rowCount = 0; + private int samplingPercentage; + private ByteArray[] row_hashcodes = null; + private ByteBuffer keyBuffer; + private static final Text EMPTY_TEXT = new Text(); + public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; + public static final byte MARK_FOR_HLL = (byte) 0xFF; + + private int partitionColumnIndex = -1; + private boolean needFetchPartitionCol = true; + + @Override + protected void setup(Context context) throws IOException { + super.setup(context); + keyBuffer = ByteBuffer.allocate(4096); + collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); + if (collectStatistics) { + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + cuboidScheduler = new CuboidScheduler(cubeDesc); + nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + + List<Long> cuboidIdList = Lists.newArrayList(); + List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); + addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); + + allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); + cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); + + allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length]; + for (int i = 0; i < cuboidIds.length; i++) { + allCuboidsHLL[i] = new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); + } + + hf = Hashing.murmur3_32(); + row_hashcodes = new ByteArray[nRowKey]; + for (int i = 0; i < nRowKey; i++) { + row_hashcodes[i] = new ByteArray(); + } + + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (partitionColRef != null) { + partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); + } + + // check whether need fetch the partition col values + if (partitionColumnIndex < 0) { + // if partition col not on cube, no need + needFetchPartitionCol = false; + } else { + for (int x : dictionaryColumnIndex) { + if (x == partitionColumnIndex) { + // if partition col already build dict, no need + needFetchPartitionCol = false; + break; + } + } + } + + } + } + + private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { + allCuboids.add(cuboidId); + Integer[] indice = new Integer[Long.bitCount(cuboidId)]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < nRowKey; i++) { + if ((mask & cuboidId) > 0) { + indice[position] = i; + position++; + } + mask = mask >> 1; + } + + allCuboidsBitSet.add(indice); + Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); + for (Long childId : children) { + addCuboidBitSet(childId, allCuboidsBitSet, allCuboids); + } + } + + @Override + public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + String[] row = flatTableInputFormat.parseMapperInput(record); + + keyBuffer.clear(); + try { + for (int i = 0; i < factDictCols.size(); i++) { + String fieldValue = row[dictionaryColumnIndex[i]]; + if (fieldValue == null) + continue; + int offset = keyBuffer.position(); + keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough + keyBuffer.put(Bytes.toBytes(fieldValue)); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + sortableKey.setText(outputKey); + //judge type + DataType type = factDictCols.get(i).getType(); + if (!type.isNumberFamily()) { + sortableKey.setTypeId((byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal()); + } else if (type.isIntegerFamily()) { + sortableKey.setTypeId((byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); + } else { + sortableKey.setTypeId((byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + } + context.write(sortableKey, EMPTY_TEXT); + } + } catch (Exception ex) { + handleErrorRecord(row, ex); + } + + if (collectStatistics) { + if (rowCount < samplingPercentage) { + putRowKeyToHLL(row); + } + + if (needFetchPartitionCol == true) { + String fieldValue = row[partitionColumnIndex]; + if (fieldValue != null) { + int offset = keyBuffer.position(); + keyBuffer.put(MARK_FOR_PARTITION_COL); + keyBuffer.put(Bytes.toBytes(fieldValue)); + outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); + sortableKey.setText(outputKey); + sortableKey.setTypeId((byte) 0); + context.write(sortableKey, EMPTY_TEXT); + } + } + } + + if (rowCount++ == 100) + rowCount = 0; + } + + private void putRowKeyToHLL(String[] row) { + + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + if (collectStatistics) { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + // output each cuboid's hll to reducer, key is 0 - cuboidId + HyperLogLogPlusCounter hll; + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + + keyBuffer.clear(); + keyBuffer.put(MARK_FOR_HLL); // one byte + keyBuffer.putLong(cuboidIds[i]); + outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); + sortableKey.setText(outputKey); + sortableKey.setTypeId((byte) 0); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + context.write(sortableKey, outputValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java new file mode 100644 index 0000000..cadbcbf --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.mr.steps.fdc2; + + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + * Created by xiefan on 16-11-1. + */ +public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> { + + private byte typeId; //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010) + + private Text text; + + private static final Logger logger = LoggerFactory.getLogger(SelfDefineSortableKey.class); + + public SelfDefineSortableKey() { + } + + public SelfDefineSortableKey(byte typeId, Text text) { + this.typeId = typeId; + this.text = text; + } + + @Override + public int compareTo(SelfDefineSortableKey o) { + if (!o.isNumberFamily()) { + return this.text.compareTo(o.text); + } else { + byte[] data1 = this.text.getBytes(); + byte[] data2 = o.text.getBytes(); + String str1 = new String(data1, 1, data1.length - 1); + String str2 = new String(data2, 1, data2.length - 1); + if (str1 == null || str1.equals("") || str2 == null || str2.equals("")) { + //should not achieve here + logger.error("none numeric value!"); + return 0; + } + if (o.isIntegerFamily()) { //integer type + try { + Long num1 = Long.parseLong(str1); + Long num2 = Long.parseLong(str2); + return num1.compareTo(num2); + } catch (NumberFormatException e) { + System.out.println("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2); + logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2); + e.printStackTrace(); + return 0; + } + } else { //other numeric type + try { + Double num1 = Double.parseDouble(str1); + Double num2 = Double.parseDouble(str2); + return num1.compareTo(num2); + } catch (NumberFormatException e) { + System.out.println("NumberFormatException when parse double family number.str1:" + str1 + " str2:" + str2); + logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2); + //e.printStackTrace(); + return 0; + } + } + } + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeByte(typeId); + text.write(dataOutput); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + dataInput.readByte(); + text.readFields(dataInput); + } + + public short getTypeId() { + return typeId; + } + + public Text getText() { + return text; + } + + public boolean isNumberFamily() { + if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) return false; + return true; + } + + public boolean isIntegerFamily() { + return (typeId == TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); + } + + public boolean isOtherNumericFamily() { + return (typeId == TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + } + + public void setTypeId(byte typeId) { + this.typeId = typeId; + } + + public void setText(Text text) { + this.text = text; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java new file mode 100644 index 0000000..c69acfd --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +/** + * Created by xiefan on 16-11-2. + */ +public enum TypeFlag { + NONE_NUMERIC_TYPE, + INTEGER_FAMILY_TYPE, + DOUBLE_FAMILY_TYPE +} http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java new file mode 100644 index 0000000..70197ac --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java @@ -0,0 +1,214 @@ +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.dict.NumberDictionary; +import org.apache.kylin.dict.NumberDictionaryBuilder; +import org.apache.kylin.dict.NumberDictionaryForest; +import org.apache.kylin.dict.NumberDictionaryForestBuilder; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.dict.TrieDictionaryForestBuilder; +import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey; +import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Created by xiefan on 16-11-2. + */ + + +public class NumberDictionaryForestTest { + @Test + public void testNumberDictionaryForestLong(){ + List<String> list = randomLongData(10); + testData(list,TypeFlag.INTEGER_FAMILY_TYPE); + } + + @Test + public void testNumberDictionaryForestDouble(){ + List<String> list = randomDoubleData(10); + + testData(list,TypeFlag.DOUBLE_FAMILY_TYPE); + } + + private void testData(List<String> list,TypeFlag flag){ + //stimulate map-reduce job + ArrayList<SelfDefineSortableKey> keyList = createKeyList(list,(byte)flag.ordinal()); + Collections.sort(keyList); + //build tree + NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>( + new StringBytesConverter(),0); + TrieDictionaryForestBuilder.MaxTrieTreeSize = 0; + for(SelfDefineSortableKey key : keyList){ + String fieldValue = printKey(key); + b.addValue(fieldValue); + } + NumberDictionaryForest<String> dict = b.build(); + dict.dump(System.out); + ArrayList<Integer> resultIds = new ArrayList<>(); + for(SelfDefineSortableKey key : keyList){ + String fieldValue = getFieldValue(key); + resultIds.add(dict.getIdFromValue(fieldValue)); + assertEquals(fieldValue,dict.getValueFromId(dict.getIdFromValue(fieldValue))); + } + assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() { + @Override + public int compare(Integer o1, Integer o2) { + return o1.compareTo(o2); + } + })); + } + + @Test + public void serializeTest() { + List<String> testData = new ArrayList<>(); + testData.add("1"); + testData.add("2"); + testData.add("100"); + //TrieDictionaryForestBuilder.MaxTrieTreeSize = 0; + NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter()); + for(String str : testData) + b.addValue(str); + NumberDictionaryForest<String> dict = b.build(); + dict = testSerialize(dict); + dict.dump(System.out); + for (String str : testData) { + assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str))); + } + } + + @Test + public void testVerySmallDouble(){ + List<String> testData = new ArrayList<>(); + testData.add(-1.0+""); + testData.add(Double.MIN_VALUE+""); + testData.add("1.01"); + testData.add("2.0"); + NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter()); + for(String str : testData) + b.addValue(str); + NumberDictionaryForest<String> dict = b.build(); + dict.dump(System.out); + + NumberDictionaryBuilder<String> b2 = new NumberDictionaryBuilder<>(new StringBytesConverter()); + for(String str : testData) + b2.addValue(str); + NumberDictionary<String> dict2 = b2.build(0); + dict2.dump(System.out); + + } + + private static NumberDictionaryForest<String> testSerialize(NumberDictionaryForest<String> dict) { + try { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(bout); + dict.write(dataout); + dataout.close(); + ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray()); + DataInputStream datain = new DataInputStream(bin); + NumberDictionaryForest<String> r = new NumberDictionaryForest<>(); + //r.dump(System.out); + r.readFields(datain); + //r.dump(System.out); + datain.close(); + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private List<String> randomLongData(int count){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<String> list = new ArrayList<>(); + for(int i=0;i<count;i++){ + list.add(rand.nextLong()+""); + } + list.add(Long.MAX_VALUE+""); + list.add(Long.MIN_VALUE+""); + return list; + } + + private List<String> randomDoubleData(int count){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<String> list = new ArrayList<>(); + for(int i=0;i<count;i++){ + list.add(rand.nextDouble()+""); + } + list.add("-1"); + return list; + } + + private List<String> randomStringData(int count){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<String> list = new ArrayList<>(); + for(int i=0;i<count;i++){ + list.add(UUID.randomUUID().toString()); + } + list.add("123"); + list.add("123"); + return list; + } + + private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){ + int partationId = 0; + ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>(); + for(String str : strNumList){ + ByteBuffer keyBuffer = ByteBuffer.allocate(4096); + int offset = keyBuffer.position(); + keyBuffer.put(Bytes.toBytes(partationId)[3]); + keyBuffer.put(Bytes.toBytes(str)); + //System.out.println(Arrays.toString(keyBuffer.array())); + byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1); + //System.out.println("new string:"+new String(valueField)); + //System.out.println("arrays toString:"+Arrays.toString(valueField)); + Text outputKey = new Text(); + outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset); + SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey); + keyList.add(sortableKey); + } + return keyList; + } + + private String printKey(SelfDefineSortableKey key){ + byte[] data = key.getText().getBytes(); + byte[] fieldValue = Bytes.copy(data,1,data.length-1); + System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue)); + return new String(fieldValue); + } + + private String getFieldValue(SelfDefineSortableKey key){ + byte[] data = key.getText().getBytes(); + byte[] fieldValue = Bytes.copy(data,1,data.length-1); + return new String(fieldValue); + } + + private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){ + int flag; + T previous = null; + for(T t : list){ + if(previous == null) previous = t; + else{ + flag = comp.compare(previous,t); + if(flag > 0) return false; + previous = t; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java new file mode 100644 index 0000000..858bba4 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java @@ -0,0 +1,228 @@ +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.dict.NumberDictionaryForest; +import org.apache.kylin.dict.NumberDictionaryForestBuilder; +import org.apache.kylin.dict.StringBytesConverter; +import org.apache.kylin.dict.TrieDictionary; +import org.apache.kylin.dict.TrieDictionaryBuilder; +import org.apache.kylin.dict.TrieDictionaryForest; +import org.apache.kylin.dict.TrieDictionaryForestBuilder; +import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey; +import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +/** + * Created by xiefan on 16-11-2. + */ +public class SelfDefineSortableKeyTest { + + @Test + public void testSortLong(){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<Long> longList = new ArrayList<>(); + int count = 10; + for(int i=0;i<count;i++){ + longList.add(rand.nextLong()); + } + longList.add(0L); + longList.add(0L); //test duplicate + longList.add(-1L); //test negative number + longList.add(Long.MAX_VALUE); + longList.add(Long.MIN_VALUE); + + System.out.println("test numbers:"+longList); + ArrayList<String> strNumList = listToStringList(longList); + //System.out.println("test num strs list:"+strNumList); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.INTEGER_FAMILY_TYPE.ordinal()); + System.out.println(keyList.get(0).isIntegerFamily()); + Collections.sort(keyList); + ArrayList<String> strListAftereSort = new ArrayList<>(); + for(SelfDefineSortableKey key : keyList){ + String str = printKey(key); + strListAftereSort.add(str); + } + assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + Long l1 = Long.parseLong(o1); + Long l2 = Long.parseLong(o2); + return l1.compareTo(l2); + } + })); + } + + @Test + public void testSortDouble(){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<Double> doubleList = new ArrayList<>(); + int count = 10; + for(int i=0;i<count;i++){ + doubleList.add(rand.nextDouble()); + } + doubleList.add(0.0); + doubleList.add(0.0); //test duplicate + doubleList.add(-1.0); //test negative number + doubleList.add(Double.MAX_VALUE); + doubleList.add(-Double.MAX_VALUE); + //System.out.println(Double.MIN_VALUE); + + System.out.println("test numbers:"+doubleList); + ArrayList<String> strNumList = listToStringList(doubleList); + //System.out.println("test num strs list:"+strNumList); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + System.out.println(keyList.get(0).isOtherNumericFamily()); + Collections.sort(keyList); + ArrayList<String> strListAftereSort = new ArrayList<>(); + for(SelfDefineSortableKey key : keyList){ + String str = printKey(key); + strListAftereSort.add(str); + } + assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + Double d1 = Double.parseDouble(o1); + Double d2 = Double.parseDouble(o2); + return d1.compareTo(d2); + } + })); + } + + @Test + public void testSortNormalString(){ + int count = 10; + ArrayList<String> strList = new ArrayList<>(); + for(int i=0;i<count;i++){ + UUID uuid = UUID.randomUUID(); + strList.add(uuid.toString()); + } + strList.add("hello"); + strList.add("hello"); //duplicate + strList.add("123"); + strList.add(""); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte)TypeFlag.NONE_NUMERIC_TYPE.ordinal()); + System.out.println(keyList.get(0).isOtherNumericFamily()); + Collections.sort(keyList); + ArrayList<String> strListAftereSort = new ArrayList<>(); + for(SelfDefineSortableKey key : keyList){ + String str = printKey(key); + strListAftereSort.add(str); + } + assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + })); + } + + @Test + public void testIllegalNumber(){ + Random rand = new Random(System.currentTimeMillis()); + ArrayList<Double> doubleList = new ArrayList<>(); + int count = 10; + for(int i=0;i<count;i++){ + doubleList.add(rand.nextDouble()); + } + doubleList.add(0.0); + doubleList.add(0.0); //test duplicate + doubleList.add(-1.0); //test negative number + doubleList.add(Double.MAX_VALUE); + doubleList.add(-Double.MAX_VALUE); + //System.out.println(Double.MIN_VALUE); + + System.out.println("test numbers:"+doubleList); + ArrayList<String> strNumList = listToStringList(doubleList); + strNumList.add("fjaeif"); //illegal type + //System.out.println("test num strs list:"+strNumList); + ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal()); + System.out.println(keyList.get(0).isOtherNumericFamily()); + Collections.sort(keyList); + for(SelfDefineSortableKey key : keyList){ + printKey(key); + } + + } + + @Test + public void testEnum(){ + TypeFlag flag = TypeFlag.DOUBLE_FAMILY_TYPE; + System.out.println((byte)flag.ordinal()); + int t = (byte)flag.ordinal(); + System.out.println(t); + } + + + + private<T> ArrayList<String> listToStringList(ArrayList<T> list){ + ArrayList<String> strList = new ArrayList<>(); + for(T t : list){ + System.out.println(t.toString()); + strList.add(t.toString()); + } + return strList; + } + + private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){ + int partationId = 0; + ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>(); + for(String str : strNumList){ + ByteBuffer keyBuffer = ByteBuffer.allocate(4096); + int offset = keyBuffer.position(); + keyBuffer.put(Bytes.toBytes(partationId)[3]); + keyBuffer.put(Bytes.toBytes(str)); + //System.out.println(Arrays.toString(keyBuffer.array())); + byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1); + //System.out.println("new string:"+new String(valueField)); + //System.out.println("arrays toString:"+Arrays.toString(valueField)); + Text outputKey = new Text(); + outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset); + SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey); + keyList.add(sortableKey); + } + return keyList; + } + + private String printKey(SelfDefineSortableKey key){ + byte[] data = key.getText().getBytes(); + byte[] fieldValue = Bytes.copy(data,1,data.length-1); + System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue)); + return new String(fieldValue); + } + + private String getFieldValue(SelfDefineSortableKey key){ + byte[] data = key.getText().getBytes(); + byte[] fieldValue = Bytes.copy(data,1,data.length-1); + return new String(fieldValue); + } + + private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){ + int flag; + T previous = null; + for(T t : list){ + if(previous == null) previous = t; + else{ + flag = comp.compare(previous,t); + if(flag > 0) return false; + previous = t; + } + } + return true; + } + + +}