Repository: kylin Updated Branches: refs/heads/yang21-cdh5.7 c02911381 -> d5956a425 (forced update)
refine mapper and reducer log Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3ecb0d9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3ecb0d9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3ecb0d9 Branch: refs/heads/yang21-cdh5.7 Commit: d3ecb0d9c381dbb035c7cada7d3c798e24fef1d1 Parents: 8001887 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 1 18:01:55 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Dec 1 18:01:55 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/engine/mr/KylinMapper.java | 17 +++++++--- .../apache/kylin/engine/mr/KylinReducer.java | 17 +++++++--- .../engine/mr/steps/BaseCuboidMapperBase.java | 1 - .../kylin/engine/mr/steps/CuboidReducer.java | 21 ++++++------ .../engine/mr/steps/HiveToBaseCuboidMapper.java | 10 ++---- .../engine/mr/steps/InMemCuboidMapper.java | 34 ++++++++------------ .../engine/mr/steps/InMemCuboidReducer.java | 20 ++++++------ .../kylin/engine/mr/steps/NDCuboidMapper.java | 22 ++++++------- 8 files changed, 70 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java index a01f7a2..c5af2fe 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java @@ -18,18 +18,21 @@ package org.apache.kylin.engine.mr; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** */ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class); + protected int mapCounter = 0; + protected void bindCurrentConfiguration(Configuration conf) { logger.info("The conf for current mapper will be " + System.identityHashCode(conf)); HadoopUtil.setCurrentConfiguration(conf); @@ -38,6 +41,10 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, @Override final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { + if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Accepting Mapper Key with ordinal: " + mapCounter); + } + doMap(key, value, context); } catch (IOException ex) { // KYLIN-2170 logger.error("", ex); @@ -53,11 +60,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, throw ex; } } - + protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.map(key, value, context); } - + @Override final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { @@ -76,7 +83,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, throw ex; } } - + protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java index 2b63ce0..83266ea 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java @@ -18,18 +18,22 @@ package org.apache.kylin.engine.mr; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** */ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class); - + + protected int reduceCounter = 0; + + protected void bindCurrentConfiguration(Configuration conf) { HadoopUtil.setCurrentConfiguration(conf); } @@ -37,6 +41,9 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI @Override final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { + if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Accepting Reducer Key with ordinal: " + reduceCounter); + } doReduce(key, values, context); } catch (IOException ex) { // KYLIN-2170 logger.error("", ex); @@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI throw ex; } } - + protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.reduce(key, values, context); } - + @Override final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index dd0a031..2ad5f53 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -69,7 +69,6 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected String intermediateTableRowDelimiter; protected byte byteRowDelimiter; - protected int counter; protected MeasureIngester<?>[] aggrIngesters; protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected Object[] measures; http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index 9543f0a..03c925e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -18,10 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; @@ -35,9 +31,12 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + /** * @author George Song (ysong1) - * */ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { @@ -50,7 +49,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { private BufferedMeasureCodec codec; private MeasureAggregators aggs; - private int counter; + private int vcounter = 0; private int cuboidLevel; private boolean[] needAggr; private Object[] input; @@ -90,12 +89,18 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { aggs.reset(); for (Text value : values) { + + if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling value with ordinal: " + vcounter + "!"); + } + codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); if (cuboidLevel > 0) { aggs.aggregate(input, needAggr); } else { aggs.aggregate(input); } + } aggs.collectStates(result); @@ -104,10 +109,6 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(key, outputValue); - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index d9c5312..f4e8af7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -18,11 +18,10 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; - import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; + +import java.io.IOException; /** * @author George Song (ysong1) @@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O @Override public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } - try { //put a record into the shared bytesSplitter String[] row = flatTableInputFormat.parseMapperInput(value); http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 15bfd2e..cf5abaf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -18,18 +18,7 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; @@ -51,7 +40,17 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** */ @@ -64,7 +63,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr private CubeSegment cubeSegment; private IMRTableInputFormat flatTableInputFormat; - private int counter; private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64); private Future<?> future; @@ -120,10 +118,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr while (!future.isDone()) { if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } break; } } @@ -131,10 +125,10 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr @Override protected void doCleanup(Context context) throws IOException, InterruptedException { - logger.info("Totally handled " + counter + " records!"); + logger.info("Totally handled " + mapCounter + " records!"); while (!future.isDone()) { - if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { + if (queue.offer(Collections.<String>emptyList(), 1, TimeUnit.SECONDS)) { break; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index d0a7062..a57ddb8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -18,10 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -37,6 +33,10 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + /** */ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> { @@ -46,7 +46,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private BufferedMeasureCodec codec; private MeasureAggregators aggs; - private int counter; + private int vcounter; private Object[] input; private Object[] result; @@ -74,10 +74,14 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra @Override public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException { - aggs.reset(); for (ByteArrayWritable value : values) { + + if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling value with ordinal: " + vcounter); + } + codec.decode(value.asBuffer(), input); aggs.aggregate(input); } @@ -92,10 +96,6 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra context.write(outputKey, outputValue); - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d3ecb0d9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 8107e52..54d9e23 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -18,9 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Collection; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; @@ -41,9 +38,11 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; + /** * @author George Song (ysong1) - * */ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { @@ -97,9 +96,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId for (int i = 0; i < parentCuboidIdActualLength; i++) { if ((mask & parentCuboidId) > 0) {// if the this bit position equals - // 1 + // 1 if ((mask & childCuboidId) > 0) {// if the child cuboid has this - // column + // column System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); offset += splitBuffers[index].length; } @@ -123,24 +122,21 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidId = rowKeySplitter.split(key.getBytes()); Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); - Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); // if still empty or null if (myChildren == null || myChildren.size() == 0) { context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L); - skipCounter++; - if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Skipped " + skipCounter + " records!"); + if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Skipping record with ordinal " + skipCounter); } return; } context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L); - handleCounter++; - if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + handleCounter + " records!"); + if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling record with ordinal: " + handleCounter); } for (Long child : myChildren) {