KYLIN-2170 fix cleanup() in mapper and reducer

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47de9611
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47de9611
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47de9611

Branch: refs/heads/KYLIN-2006
Commit: 47de9611be71e3adeebe5da5041b37db5de9fa28
Parents: d5d2b9c
Author: Li Yang <liy...@apache.org>
Authored: Tue Nov 8 18:45:59 2016 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Tue Nov 8 18:56:33 2016 +0800

----------------------------------------------------------------------
 .../mr/steps/FactDistinctColumnsReducer.java    | 39 ++++++------
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++++++-----
 .../engine/mr/steps/InMemCuboidMapper.java      | 24 +++++---
 .../steps/RowKeyDistributionCheckerMapper.java  | 12 ++--
 .../cardinality/ColumnCardinalityMapper.java    | 22 ++++---
 .../cardinality/ColumnCardinalityReducer.java   | 39 ++++++------
 .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++-
 .../steps/RangeKeyDistributionReducer.java      | 64 +++++++++++---------
 8 files changed, 137 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index c8624bb..ecbc6c2 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -176,25 +176,28 @@ public class FactDistinctColumnsReducer extends 
KylinReducer<Text, Text, NullWri
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-
-        if (isStatistics == false) {
-            if (colValues.size() > 0) {
-                outputDistinctValues(col, colValues, context);
-                colValues.clear();
-            }
-        } else {
-            //output the hll info;
-            long grandTotal = 0;
-            for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
-                grandTotal += hll.getCountEstimate();
+        try {
+            if (isStatistics == false) {
+                if (colValues.size() > 0) {
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
+            } else {
+                //output the hll info;
+                long grandTotal = 0;
+                for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
+                    grandTotal += hll.getCountEstimate();
+                }
+                double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
+    
+                int mapperNumber = baseCuboidRowCountInMappers.size();
+    
+                writeMapperAndCuboidStatistics(context); // for human check
+                
CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new 
Path(statisticsOutput), //
+                        cuboidHLLMap, samplingPercentage, mapperNumber, 
mapperOverlapRatio);
             }
-            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) 
totalRowsBeforeMerge / grandTotal;
-            
-            int mapperNumber = baseCuboidRowCountInMappers.size();
-
-            writeMapperAndCuboidStatistics(context); // for human check
-            CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), 
new Path(statisticsOutput), //
-                    cuboidHLLMap, samplingPercentage, mapperNumber, 
mapperOverlapRatio);
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 86ef487..177c9f6 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -197,22 +197,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends 
FactDistinctColumnsMap
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        if (collectStatistics) {
-            ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            // output each cuboid's hll to reducer, key is 0 - cuboidId
-            HyperLogLogPlusCounter hll;
-            for (int i = 0; i < cuboidIds.length; i++) {
-                hll = allCuboidsHLL[i];
-
-                keyBuffer.clear();
-                keyBuffer.put(MARK_FOR_HLL); // one byte
-                keyBuffer.putLong(cuboidIds[i]);
-                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
-                hllBuf.clear();
-                hll.writeRegisters(hllBuf);
-                outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                context.write(outputKey, outputValue);
+        try {
+            if (collectStatistics) {
+                ByteBuffer hllBuf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+                // output each cuboid's hll to reducer, key is 0 - cuboidId
+                HyperLogLogPlusCounter hll;
+                for (int i = 0; i < cuboidIds.length; i++) {
+                    hll = allCuboidsHLL[i];
+    
+                    keyBuffer.clear();
+                    keyBuffer.put(MARK_FOR_HLL); // one byte
+                    keyBuffer.putLong(cuboidIds[i]);
+                    outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                    hllBuf.clear();
+                    hll.writeRegisters(hllBuf);
+                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                    context.write(outputKey, outputValue);
+                }
             }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 1d90d01..dac93cb 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -133,18 +133,22 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         logger.info("Totally handled " + counter + " records!");
 
-        while (!future.isDone()) {
-            if (queue.offer(Collections.<String> emptyList(), 1, 
TimeUnit.SECONDS)) {
-                break;
-            }
-        }
-
         try {
-            future.get();
-        } catch (Exception e) {
-            throw new IOException("Failed to build cube in mapper " + 
context.getTaskAttemptID().getTaskID().getId(), e);
+            while (!future.isDone()) {
+                if (queue.offer(Collections.<String> emptyList(), 1, 
TimeUnit.SECONDS)) {
+                    break;
+                }
+            }
+    
+            try {
+                future.get();
+            } catch (Exception e) {
+                throw new IOException("Failed to build cube in mapper " + 
context.getTaskAttemptID().getTaskID().getId(), e);
+            }
+            queue.clear();
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
-        queue.clear();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index fa2ff73..21e97a3 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -77,10 +77,14 @@ public class RowKeyDistributionCheckerMapper extends 
KylinMapper<Text, Text, Tex
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        LongWritable outputValue = new LongWritable();
-        for (Entry<Text, Long> kv : resultMap.entrySet()) {
-            outputValue.set(kv.getValue());
-            context.write(kv.getKey(), outputValue);
+        try {
+            LongWritable outputValue = new LongWritable();
+            for (Entry<Text, Long> kv : resultMap.entrySet()) {
+                outputValue.set(kv.getValue());
+                context.write(kv.getKey(), outputValue);
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index f27bee3..8c624e3 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -96,15 +96,19 @@ public class ColumnCardinalityMapper<T> extends 
KylinMapper<T, Object, IntWritab
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new BytesWritable(buf.array(), 
buf.limit()));
+        try {
+            Iterator<Integer> it = hllcMap.keySet().iterator();
+            ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            while (it.hasNext()) {
+                int key = it.next();
+                HyperLogLogPlusCounter hllc = hllcMap.get(key);
+                buf.clear();
+                hllc.writeRegisters(buf);
+                buf.flip();
+                context.write(new IntWritable(key), new 
BytesWritable(buf.array(), buf.limit()));
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 858d84c..2551af3 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -69,24 +69,27 @@ public class ColumnCardinalityReducer extends 
KylinReducer<IntWritable, BytesWri
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        List<Integer> keys = new ArrayList<Integer>();
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            keys.add(it.next());
+        try {
+            List<Integer> keys = new ArrayList<Integer>();
+            Iterator<Integer> it = hllcMap.keySet().iterator();
+            while (it.hasNext()) {
+                keys.add(it.next());
+            }
+            Collections.sort(keys);
+            it = keys.iterator();
+            while (it.hasNext()) {
+                int key = it.next();
+                HyperLogLogPlusCounter hllc = hllcMap.get(key);
+                ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+                buf.clear();
+                hllc.writeRegisters(buf);
+                buf.flip();
+                context.write(new IntWritable(key), new 
LongWritable(hllc.getCountEstimate()));
+                // context.write(new Text("ErrorRate_" + key), new
+                // LongWritable((long)hllc.getErrorRate()));
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
-        Collections.sort(keys);
-        it = keys.iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = 
ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new 
LongWritable(hllc.getCountEstimate()));
-            // context.write(new Text("ErrorRate_" + key), new
-            // LongWritable((long)hllc.getErrorRate()));
-        }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
index c2190fb..c82d58d 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -62,9 +62,13 @@ public class RangeKeyDistributionMapper extends 
KylinMapper<Text, Text, Text, Lo
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
+        try {
+            if (lastKey != null) {
+                outputValue.set(bytesRead);
+                context.write(lastKey, outputValue);
+            }
+        } catch (Throwable ex) {
+            ex.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index a4b7956..e9918d4 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -97,37 +97,41 @@ public class RangeKeyDistributionReducer extends 
KylinReducer<Text, LongWritable
 
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        int nRegion = Math.round((float) gbPoints.size() / cut);
-        nRegion = Math.max(minRegionCount, nRegion);
-        nRegion = Math.min(maxRegionCount, nRegion);
-
-        int gbPerRegion = gbPoints.size() / nRegion;
-        gbPerRegion = Math.max(1, gbPerRegion);
-
-        if (hfileSizeGB <= 0) {
-            hfileSizeGB = gbPerRegion;
-        }
-        int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
-        hfilePerRegion = Math.max(1, hfilePerRegion);
-
-        System.out.println(nRegion + " regions");
-        System.out.println(gbPerRegion + " GB per region");
-        System.out.println(hfilePerRegion + " hfile per region");
-
-        Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
-        SequenceFile.Writer hfilePartitionWriter = new 
SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()),
 context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, 
NullWritable.class);
-        int hfileCountInOneRegion = 0;
-        for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
-            hfilePartitionWriter.append(new 
ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
-            if (++hfileCountInOneRegion >= hfilePerRegion) {
-                Text key = gbPoints.get(i);
-                outputValue.set(i);
-                System.out.println(StringUtils.byteToHexString(key.getBytes()) 
+ "\t" + outputValue.get());
-                context.write(key, outputValue);
-
-                hfileCountInOneRegion = 0;
+        try {
+            int nRegion = Math.round((float) gbPoints.size() / cut);
+            nRegion = Math.max(minRegionCount, nRegion);
+            nRegion = Math.min(maxRegionCount, nRegion);
+    
+            int gbPerRegion = gbPoints.size() / nRegion;
+            gbPerRegion = Math.max(1, gbPerRegion);
+    
+            if (hfileSizeGB <= 0) {
+                hfileSizeGB = gbPerRegion;
+            }
+            int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
+            hfilePerRegion = Math.max(1, hfilePerRegion);
+    
+            System.out.println(nRegion + " regions");
+            System.out.println(gbPerRegion + " GB per region");
+            System.out.println(hfilePerRegion + " hfile per region");
+    
+            Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+            SequenceFile.Writer hfilePartitionWriter = new 
SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()),
 context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, 
NullWritable.class);
+            int hfileCountInOneRegion = 0;
+            for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+                hfilePartitionWriter.append(new 
ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+                if (++hfileCountInOneRegion >= hfilePerRegion) {
+                    Text key = gbPoints.get(i);
+                    outputValue.set(i);
+                    
System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + 
outputValue.get());
+                    context.write(key, outputValue);
+    
+                    hfileCountInOneRegion = 0;
+                }
             }
+            hfilePartitionWriter.close();
+        } catch (Throwable ex) {
+            logger.error("", ex);
         }
-        hfilePartitionWriter.close();
     }
 }

Reply via email to