KYLIN-2170 fix cleanup() in mapper and reducer
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47de9611 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47de9611 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47de9611 Branch: refs/heads/KYLIN-2006 Commit: 47de9611be71e3adeebe5da5041b37db5de9fa28 Parents: d5d2b9c Author: Li Yang <liy...@apache.org> Authored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Tue Nov 8 18:56:33 2016 +0800 ---------------------------------------------------------------------- .../mr/steps/FactDistinctColumnsReducer.java | 39 ++++++------ .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++++++----- .../engine/mr/steps/InMemCuboidMapper.java | 24 +++++--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java | 22 ++++--- .../cardinality/ColumnCardinalityReducer.java | 39 ++++++------ .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++++++++++--------- 8 files changed, 137 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index c8624bb..ecbc6c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -176,25 +176,28 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri @Override protected void cleanup(Context context) throws IOException, InterruptedException { - - if (isStatistics == false) { - if (colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); - } - } else { - //output the hll info; - long grandTotal = 0; - for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { - grandTotal += hll.getCountEstimate(); + try { + if (isStatistics == false) { + if (colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); + } + } else { + //output the hll info; + long grandTotal = 0; + for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { + grandTotal += hll.getCountEstimate(); + } + double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + + int mapperNumber = baseCuboidRowCountInMappers.size(); + + writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // + cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } - double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - - int mapperNumber = baseCuboidRowCountInMappers.size(); - - writeMapperAndCuboidStatistics(context); // for human check - CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // - cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); + } catch (Throwable ex) { + logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 86ef487..177c9f6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -197,22 +197,26 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap @Override protected void cleanup(Context context) throws IOException, InterruptedException { - if (collectStatistics) { - ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - // output each cuboid's hll to reducer, key is 0 - cuboidId - HyperLogLogPlusCounter hll; - for (int i = 0; i < cuboidIds.length; i++) { - hll = allCuboidsHLL[i]; - - keyBuffer.clear(); - keyBuffer.put(MARK_FOR_HLL); // one byte - keyBuffer.putLong(cuboidIds[i]); - outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); - hllBuf.clear(); - hll.writeRegisters(hllBuf); - outputValue.set(hllBuf.array(), 0, hllBuf.position()); - context.write(outputKey, outputValue); + try { + if (collectStatistics) { + ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + // output each cuboid's hll to reducer, key is 0 - cuboidId + HyperLogLogPlusCounter hll; + for (int i = 0; i < cuboidIds.length; i++) { + hll = allCuboidsHLL[i]; + + keyBuffer.clear(); + keyBuffer.put(MARK_FOR_HLL); // one byte + keyBuffer.putLong(cuboidIds[i]); + outputKey.set(keyBuffer.array(), 0, keyBuffer.position()); + hllBuf.clear(); + hll.writeRegisters(hllBuf); + outputValue.set(hllBuf.array(), 0, hllBuf.position()); + context.write(outputKey, outputValue); + } } + } catch (Throwable ex) { + ex.printStackTrace(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 1d90d01..dac93cb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -133,18 +133,22 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("Totally handled " + counter + " records!"); - while (!future.isDone()) { - if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { - break; - } - } - try { - future.get(); - } catch (Exception e) { - throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); + while (!future.isDone()) { + if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { + break; + } + } + + try { + future.get(); + } catch (Exception e) { + throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e); + } + queue.clear(); + } catch (Throwable ex) { + logger.error("", ex); } - queue.clear(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java index fa2ff73..21e97a3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java @@ -77,10 +77,14 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex @Override protected void cleanup(Context context) throws IOException, InterruptedException { - LongWritable outputValue = new LongWritable(); - for (Entry<Text, Long> kv : resultMap.entrySet()) { - outputValue.set(kv.getValue()); - context.write(kv.getKey(), outputValue); + try { + LongWritable outputValue = new LongWritable(); + for (Entry<Text, Long> kv : resultMap.entrySet()) { + outputValue.set(kv.getValue()); + context.write(kv.getKey(), outputValue); + } + } catch (Throwable ex) { + ex.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index f27bee3..8c624e3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -96,15 +96,19 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab @Override protected void cleanup(Context context) throws IOException, InterruptedException { - Iterator<Integer> it = hllcMap.keySet().iterator(); - ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); + try { + Iterator<Integer> it = hllcMap.keySet().iterator(); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + while (it.hasNext()) { + int key = it.next(); + HyperLogLogPlusCounter hllc = hllcMap.get(key); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); + } + } catch (Throwable ex) { + ex.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java index 858d84c..2551af3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java @@ -69,24 +69,27 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri @Override protected void cleanup(Context context) throws IOException, InterruptedException { - List<Integer> keys = new ArrayList<Integer>(); - Iterator<Integer> it = hllcMap.keySet().iterator(); - while (it.hasNext()) { - keys.add(it.next()); + try { + List<Integer> keys = new ArrayList<Integer>(); + Iterator<Integer> it = hllcMap.keySet().iterator(); + while (it.hasNext()) { + keys.add(it.next()); + } + Collections.sort(keys); + it = keys.iterator(); + while (it.hasNext()) { + int key = it.next(); + HyperLogLogPlusCounter hllc = hllcMap.get(key); + ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); + buf.clear(); + hllc.writeRegisters(buf); + buf.flip(); + context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate())); + // context.write(new Text("ErrorRate_" + key), new + // LongWritable((long)hllc.getErrorRate())); + } + } catch (Throwable ex) { + ex.printStackTrace(); } - Collections.sort(keys); - it = keys.iterator(); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate())); - // context.write(new Text("ErrorRate_" + key), new - // LongWritable((long)hllc.getErrorRate())); - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java index c2190fb..c82d58d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java @@ -62,9 +62,13 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo @Override protected void cleanup(Context context) throws IOException, InterruptedException { - if (lastKey != null) { - outputValue.set(bytesRead); - context.write(lastKey, outputValue); + try { + if (lastKey != null) { + outputValue.set(bytesRead); + context.write(lastKey, outputValue); + } + } catch (Throwable ex) { + ex.printStackTrace(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java index a4b7956..e9918d4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java @@ -97,37 +97,41 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable @Override protected void cleanup(Context context) throws IOException, InterruptedException { - int nRegion = Math.round((float) gbPoints.size() / cut); - nRegion = Math.max(minRegionCount, nRegion); - nRegion = Math.min(maxRegionCount, nRegion); - - int gbPerRegion = gbPoints.size() / nRegion; - gbPerRegion = Math.max(1, gbPerRegion); - - if (hfileSizeGB <= 0) { - hfileSizeGB = gbPerRegion; - } - int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB); - hfilePerRegion = Math.max(1, hfilePerRegion); - - System.out.println(nRegion + " regions"); - System.out.println(gbPerRegion + " GB per region"); - System.out.println(hfilePerRegion + " hfile per region"); - - Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); - SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); - int hfileCountInOneRegion = 0; - for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { - hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); - if (++hfileCountInOneRegion >= hfilePerRegion) { - Text key = gbPoints.get(i); - outputValue.set(i); - System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); - context.write(key, outputValue); - - hfileCountInOneRegion = 0; + try { + int nRegion = Math.round((float) gbPoints.size() / cut); + nRegion = Math.max(minRegionCount, nRegion); + nRegion = Math.min(maxRegionCount, nRegion); + + int gbPerRegion = gbPoints.size() / nRegion; + gbPerRegion = Math.max(1, gbPerRegion); + + if (hfileSizeGB <= 0) { + hfileSizeGB = gbPerRegion; + } + int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB); + hfilePerRegion = Math.max(1, hfilePerRegion); + + System.out.println(nRegion + " regions"); + System.out.println(gbPerRegion + " GB per region"); + System.out.println(hfilePerRegion + " hfile per region"); + + Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); + SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class); + int hfileCountInOneRegion = 0; + for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { + hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); + if (++hfileCountInOneRegion >= hfilePerRegion) { + Text key = gbPoints.get(i); + outputValue.set(i); + System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); + context.write(key, outputValue); + + hfileCountInOneRegion = 0; + } } + hfilePartitionWriter.close(); + } catch (Throwable ex) { + logger.error("", ex); } - hfilePartitionWriter.close(); } }