Repository: kylin Updated Branches: refs/heads/master-hbase0.98 0dc56aa4a -> 05980fcb1 (forced update)
KYLIN-2442 calculate raw data size using custom counter Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/405dee26 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/405dee26 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/405dee26 Branch: refs/heads/master-hbase0.98 Commit: 405dee26d7fe463d15bf5f1d7690359c9e83f678 Parents: 43c0566 Author: Li Yang <liy...@apache.org> Authored: Fri Feb 10 16:56:21 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Mon Feb 13 10:20:54 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/BatchCubingJobBuilder.java | 1 - .../kylin/engine/mr/common/HadoopCmdOutput.java | 9 +- .../engine/mr/common/MapReduceExecutable.java | 8 +- .../mr/steps/FactDistinctColumnPartitioner.java | 4 +- .../engine/mr/steps/FactDistinctColumnsJob.java | 2 +- .../mr/steps/FactDistinctColumnsMapper.java | 262 +++++++++++++++++++ .../mr/steps/FactDistinctHiveColumnsMapper.java | 230 ---------------- 7 files changed, 272 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 36c12a1..1ec23b6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); - baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java index 9d016cc..2a480e6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.RawDataCounter; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,7 @@ public class HadoopCmdOutput { private String mapInputRecords; private String hdfsBytesWritten; - private String hdfsBytesRead; + private String rawInputBytesRead; public String getMapInputRecords() { return mapInputRecords; @@ -77,8 +78,8 @@ public class HadoopCmdOutput { return hdfsBytesWritten; } - public String getHdfsBytesRead() { - return hdfsBytesRead; + public String getRawInputBytesRead() { + return rawInputBytesRead; } public void updateJobCounter() { @@ -95,7 +96,7 @@ public class HadoopCmdOutput { mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue()); hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue()); - hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue()); + rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue()); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); output.append(e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 6de07ca..2e7a289 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable { private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) { hadoopCmdOutput.updateJobCounter(); info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords()); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead()); + info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead()); info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten()); String saveAs = getParam(KEY_COUNTER_SAVEAS); if (saveAs != null) { String[] saveAsNames = saveAs.split(","); saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info); - saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info); + saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info); saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info); } } @@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable { setParam(KEY_PARAMS, param); } - public String getCounterSaveAs() { - return getParam(KEY_COUNTER_SAVEAS); - } - public void setCounterSaveAs(String value) { setParam(KEY_COUNTER_SAVEAS, value); } http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index e8817a5..5fcfe42 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -29,10 +29,10 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl @Override public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) { Text key = skey.getText(); - if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) { + if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) { // the last reducer is for merging hll return numReduceTasks - 1; - } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) { + } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) { // the last but one reducer is for partition col return numReduceTasks - 2; } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index aded600..ee0989a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -140,7 +140,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); - job.setMapperClass(FactDistinctHiveColumnsMapper.class); + job.setMapperClass(FactDistinctColumnsMapper.class); job.setCombinerClass(FactDistinctColumnsCombiner.class); job.setMapOutputKeyClass(SelfDefineSortableKey.class); job.setMapOutputValueClass(Text.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java new file mode 100644 index 0000000..9d0ff10 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +/** + */ +public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { + + private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsMapper.class); + + public static enum RawDataCounter { BYTES }; + + protected boolean collectStatistics = false; + protected CuboidScheduler cuboidScheduler = null; + protected int nRowKey; + private Integer[][] allCuboidsBitSet = null; + private HLLCounter[] allCuboidsHLL = null; + private Long[] cuboidIds; + private HashFunction hf = null; + private int rowCount = 0; + private int samplingPercentage; + private ByteArray[] row_hashcodes = null; + private ByteBuffer tmpbuf; + private static final Text EMPTY_TEXT = new Text(); + public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; + public static final byte MARK_FOR_HLL = (byte) 0xFF; + + private int partitionColumnIndex = -1; + private boolean needFetchPartitionCol = true; + + @Override + protected void setup(Context context) throws IOException { + super.setup(context); + tmpbuf = ByteBuffer.allocate(4096); + collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); + if (collectStatistics) { + samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + cuboidScheduler = new CuboidScheduler(cubeDesc); + nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + + List<Long> cuboidIdList = Lists.newArrayList(); + List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); + addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); + + allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); + cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); + + allCuboidsHLL = new HLLCounter[cuboidIds.length]; + for (int i = 0; i < cuboidIds.length; i++) { + allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); + } + + hf = Hashing.murmur3_32(); + row_hashcodes = new ByteArray[nRowKey]; + for (int i = 0; i < nRowKey; i++) { + row_hashcodes[i] = new ByteArray(); + } + + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (partitionColRef != null) { + partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); + } + + // check whether need fetch the partition col values + if (partitionColumnIndex < 0) { + // if partition col not on cube, no need + needFetchPartitionCol = false; + } else { + needFetchPartitionCol = true; + } + } + } + + private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { + allCuboids.add(cuboidId); + Integer[] indice = new Integer[Long.bitCount(cuboidId)]; + + long mask = Long.highestOneBit(baseCuboidId); + int position = 0; + for (int i = 0; i < nRowKey; i++) { + if ((mask & cuboidId) > 0) { + indice[position] = i; + position++; + } + mask = mask >> 1; + } + + allCuboidsBitSet.add(indice); + Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); + for (Long childId : children) { + addCuboidBitSet(childId, allCuboidsBitSet, allCuboids); + } + } + + @Override + public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { + String[] row = flatTableInputFormat.parseMapperInput(record); + + context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); + + for (int i = 0; i < factDictCols.size(); i++) { + String fieldValue = row[dictionaryColumnIndex[i]]; + if (fieldValue == null) + continue; + + int reducerIndex; + if (uhcIndex[i] == 0) { + //for the normal dictionary column + reducerIndex = columnIndexToReducerBeginId.get(i); + } else { + //for the uhc + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + } + + tmpbuf.clear(); + tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); + tmpbuf.put(Bytes.toBytes(fieldValue)); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.setText(outputKey); + //judge type + sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType()); + context.write(sortableKey, EMPTY_TEXT); + + // log a few rows for troubleshooting + if (rowCount < 10) { + logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + } + } + + if (collectStatistics) { + if (rowCount % 100 < samplingPercentage) { + putRowKeyToHLL(row); + } + + if (needFetchPartitionCol == true) { + String fieldValue = row[partitionColumnIndex]; + if (fieldValue != null) { + tmpbuf.clear(); + tmpbuf.put(MARK_FOR_PARTITION_COL); + tmpbuf.put(Bytes.toBytes(fieldValue)); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.setText(outputKey); + sortableKey.setTypeId((byte) 0); + context.write(sortableKey, EMPTY_TEXT); + } + } + } + rowCount++; + } + + private long countSizeInBytes(String[] row) { + int size = 0; + for (String s : row) { + size += s == null ? 1 : utf8Length(s); + size++; // delimiter + } + return size; + } + + // calculating length in UTF-8 of Java String without actually encoding it + public static int utf8Length(CharSequence sequence) { + int count = 0; + for (int i = 0, len = sequence.length(); i < len; i++) { + char ch = sequence.charAt(i); + if (ch <= 0x7F) { + count++; + } else if (ch <= 0x7FF) { + count += 2; + } else if (Character.isHighSurrogate(ch)) { + count += 4; + ++i; + } else { + count += 3; + } + } + return count; + } + + private void putRowKeyToHLL(String[] row) { + + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + if (collectStatistics) { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + // output each cuboid's hll to reducer, key is 0 - cuboidId + HLLCounter hll; + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + + tmpbuf.clear(); + tmpbuf.put(MARK_FOR_HLL); // one byte + tmpbuf.putLong(cuboidIds[i]); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + sortableKey.setText(outputKey); + sortableKey.setTypeId((byte) 0); + context.write(sortableKey, outputValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java deleted file mode 100644 index ed65343..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.hllc.HLLCounter; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; - -/** - */ -public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { - - private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class); - - protected boolean collectStatistics = false; - protected CuboidScheduler cuboidScheduler = null; - protected int nRowKey; - private Integer[][] allCuboidsBitSet = null; - private HLLCounter[] allCuboidsHLL = null; - private Long[] cuboidIds; - private HashFunction hf = null; - private int rowCount = 0; - private int samplingPercentage; - private ByteArray[] row_hashcodes = null; - private ByteBuffer tmpbuf; - private static final Text EMPTY_TEXT = new Text(); - public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; - public static final byte MARK_FOR_HLL = (byte) 0xFF; - - private int partitionColumnIndex = -1; - private boolean needFetchPartitionCol = true; - - @Override - protected void setup(Context context) throws IOException { - super.setup(context); - tmpbuf = ByteBuffer.allocate(4096); - collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); - if (collectStatistics) { - samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - cuboidScheduler = new CuboidScheduler(cubeDesc); - nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; - - List<Long> cuboidIdList = Lists.newArrayList(); - List<Integer[]> allCuboidsBitSetList = Lists.newArrayList(); - addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); - - allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); - cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); - - allCuboidsHLL = new HLLCounter[cuboidIds.length]; - for (int i = 0; i < cuboidIds.length; i++) { - allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); - } - - hf = Hashing.murmur3_32(); - row_hashcodes = new ByteArray[nRowKey]; - for (int i = 0; i < nRowKey; i++) { - row_hashcodes[i] = new ByteArray(); - } - - TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (partitionColRef != null) { - partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); - } - - // check whether need fetch the partition col values - if (partitionColumnIndex < 0) { - // if partition col not on cube, no need - needFetchPartitionCol = false; - } else { - needFetchPartitionCol = true; - } - } - } - - private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { - allCuboids.add(cuboidId); - Integer[] indice = new Integer[Long.bitCount(cuboidId)]; - - long mask = Long.highestOneBit(baseCuboidId); - int position = 0; - for (int i = 0; i < nRowKey; i++) { - if ((mask & cuboidId) > 0) { - indice[position] = i; - position++; - } - mask = mask >> 1; - } - - allCuboidsBitSet.add(indice); - Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId); - for (Long childId : children) { - addCuboidBitSet(childId, allCuboidsBitSet, allCuboids); - } - } - - @Override - public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - String[] row = flatTableInputFormat.parseMapperInput(record); - - for (int i = 0; i < factDictCols.size(); i++) { - String fieldValue = row[dictionaryColumnIndex[i]]; - if (fieldValue == null) - continue; - - int reducerIndex; - if (uhcIndex[i] == 0) { - //for the normal dictionary column - reducerIndex = columnIndexToReducerBeginId.get(i); - } else { - //for the uhc - reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; - } - - tmpbuf.clear(); - tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); - tmpbuf.put(Bytes.toBytes(fieldValue)); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - sortableKey.setText(outputKey); - //judge type - sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType()); - context.write(sortableKey, EMPTY_TEXT); - - // log a few rows for troubleshooting - if (rowCount < 10) { - logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); - } - } - - if (collectStatistics) { - if (rowCount % 100 < samplingPercentage) { - putRowKeyToHLL(row); - } - - if (needFetchPartitionCol == true) { - String fieldValue = row[partitionColumnIndex]; - if (fieldValue != null) { - tmpbuf.clear(); - tmpbuf.put(MARK_FOR_PARTITION_COL); - tmpbuf.put(Bytes.toBytes(fieldValue)); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - sortableKey.setText(outputKey); - sortableKey.setTypeId((byte) 0); - context.write(sortableKey, EMPTY_TEXT); - } - } - } - rowCount++; - } - - private void putRowKeyToHLL(String[] row) { - - //generate hash for each row key column - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hf.newHasher(); - String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; - if (colValue != null) { - row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); - } else { - row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); - } - } - - // user the row key column hash to get a consolidated hash for each cuboid - for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { - Hasher hc = hf.newHasher(); - for (int position = 0; position < allCuboidsBitSet[i].length; position++) { - hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); - } - - allCuboidsHLL[i].add(hc.hash().asBytes()); - } - } - - @Override - protected void doCleanup(Context context) throws IOException, InterruptedException { - if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - HLLCounter hll; - for (int i = 0; i < cuboidIds.length; i++) { - hll = allCuboidsHLL[i]; - - tmpbuf.clear(); - tmpbuf.put(MARK_FOR_HLL); // one byte - tmpbuf.putLong(cuboidIds[i]); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - sortableKey.setText(outputKey); - sortableKey.setTypeId((byte) 0); - context.write(sortableKey, outputValue); - } - } - } -}