Repository: kylin Updated Branches: refs/heads/KYLIN-2442 [created] 77be3eb04
KYLIN-2442 calculate raw data size using custom counter Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/77be3eb0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/77be3eb0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/77be3eb0 Branch: refs/heads/KYLIN-2442 Commit: 77be3eb0437daac90a65391570a1ca9ed2e7dd72 Parents: ecf6a69 Author: Li Yang <liy...@apache.org> Authored: Fri Feb 10 16:56:21 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Feb 10 16:56:21 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/BatchCubingJobBuilder.java | 1 - .../kylin/engine/mr/common/HadoopCmdOutput.java | 9 +++--- .../engine/mr/common/MapReduceExecutable.java | 8 ++--- .../mr/steps/FactDistinctHiveColumnsMapper.java | 32 ++++++++++++++++++++ 4 files changed, 39 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 36c12a1..1ec23b6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); - baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java index 9d016cc..73ee7ce 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.kylin.engine.mr.steps.FactDistinctHiveColumnsMapper.RawDataCounter; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,7 @@ public class HadoopCmdOutput { private String mapInputRecords; private String hdfsBytesWritten; - private String hdfsBytesRead; + private String rawInputBytesRead; public String getMapInputRecords() { return mapInputRecords; @@ -77,8 +78,8 @@ public class HadoopCmdOutput { return hdfsBytesWritten; } - public String getHdfsBytesRead() { - return hdfsBytesRead; + public String getRawInputBytesRead() { + return rawInputBytesRead; } public void updateJobCounter() { @@ -95,7 +96,7 @@ public class HadoopCmdOutput { mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue()); hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue()); - hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue()); + rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue()); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); output.append(e.getLocalizedMessage()); http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 6de07ca..2e7a289 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable { private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) { hadoopCmdOutput.updateJobCounter(); info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords()); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead()); + info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead()); info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten()); String saveAs = getParam(KEY_COUNTER_SAVEAS); if (saveAs != null) { String[] saveAsNames = saveAs.split(","); saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info); - saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info); + saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info); saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info); } } @@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable { setParam(KEY_PARAMS, param); } - public String getCounterSaveAs() { - return getParam(KEY_COUNTER_SAVEAS); - } - public void setCounterSaveAs(String value) { setParam(KEY_COUNTER_SAVEAS, value); } http://git-wip-us.apache.org/repos/asf/kylin/blob/77be3eb0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index ed65343..b6efd79 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -44,6 +44,8 @@ import com.google.common.hash.Hashing; public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> { private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class); + + public static enum RawDataCounter { BYTES }; protected boolean collectStatistics = false; protected CuboidScheduler cuboidScheduler = null; @@ -130,6 +132,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap @Override public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { String[] row = flatTableInputFormat.parseMapperInput(record); + + context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); for (int i = 0; i < factDictCols.size(); i++) { String fieldValue = row[dictionaryColumnIndex[i]]; @@ -181,6 +185,34 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap rowCount++; } + private long countSizeInBytes(String[] row) { + int size = 0; + for (String s : row) { + size += s == null ? 1 : utf8Length(s); + size++; // delimiter + } + return size; + } + + // calculating length in UTF-8 of Java String without actually encoding it + public static int utf8Length(CharSequence sequence) { + int count = 0; + for (int i = 0, len = sequence.length(); i < len; i++) { + char ch = sequence.charAt(i); + if (ch <= 0x7F) { + count++; + } else if (ch <= 0x7FF) { + count += 2; + } else if (Character.isHighSurrogate(ch)) { + count += 4; + ++i; + } else { + count += 3; + } + } + return count; + } + private void putRowKeyToHLL(String[] row) { //generate hash for each row key column