http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java index 413fd8d..672f3e0 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java @@ -123,8 +123,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); @@ -133,40 +132,33 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { } { LogicalTupleFilter filter = and(timeComp2, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { LogicalTupleFilter filter = and(timeComp4, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { - LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), - and(timeComp6, ageComp1)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); - assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", - r.get(0).fuzzyKeys.toString()); + assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", r.get(0).fuzzyKeys.toString()); } { LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); @@ -175,16 +167,14 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { //skip FALSE filter LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { //TRUE or FALSE filter LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); @@ -192,8 +182,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { //TRUE or other filter LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); @@ -206,8 +195,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); @@ -217,8 +205,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size());//scan range are [close,close] } @@ -263,8 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // merge too many ranges { - LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), - and(timeComp4, ageComp3)); + LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(3, r.size()); @@ -279,10 +265,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { @Test public void verifyFirstRow() throws IOException { - doScanAndVerify(table, - new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null) - .setFilterPushDown(null).createGTScanRequest(), - "[1421193600000, 30, Yang, 10, 10.5]", // + doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", // "[1421193600000, 30, Luke, 10, 10.5]", // "[1421280000000, 20, Dong, 10, 10.5]", // "[1421280000000, 20, Jason, 10, 10.5]", // @@ -315,18 +298,12 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1))); LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable); - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }) - .setFilterPushDown(filter).createGTScanRequest(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest(); // note the unEvaluatable column 1 in filter is added to group by - assertEquals( - "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", - req.toString()); + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", - "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", - "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); } @Test @@ -337,34 +314,26 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10")); LogicalTupleFilter filter = and(fComp1, fComp2); - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }) - .setFilterPushDown(filter).createGTScanRequest(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" }).setFilterPushDown(filter).createGTScanRequest(); // note the evaluatable column 1 in filter is added to returned columns but not in group by - assertEquals( - "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", - req.toString()); + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); - doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", - "[1421366400000, 20, null, 40, null]"); + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); } @Test public void verifyAggregateAndHavingFilter() throws IOException { GTInfo info = table.getInfo(); - + TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL); havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20"); - - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" }) - .setHavingFilterPushDown(havingFilter).createGTScanRequest(); - - doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]", - "[null, 30, null, null, 52.5]"); + + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" }).setHavingFilterPushDown(havingFilter).createGTScanRequest(); + + doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]", "[null, 30, null, null, 52.5]"); } - + @Test public void testFilterScannerPerf() throws IOException { GridTable table = newTestPerfTable(); @@ -385,11 +354,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { } @SuppressWarnings("unused") - private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) - throws IOException { + private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException { long start = System.currentTimeMillis(); - GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null) - .setFilterPushDown(filter).createGTScanRequest(); + GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest(); IGTScanner scanner = table.scan(req); int i = 0; for (GTRecord r : scanner) { @@ -397,8 +364,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { } scanner.close(); long end = System.currentTimeMillis(); - System.out.println( - (end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows"); + System.out.println((end - start) + "ms with filter cache enabled=" + FilterResultCache.ENABLED + ", " + i + " rows"); } @Test @@ -418,9 +384,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { colMapping.add(extColB); TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals( - "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", - newFilter.toString()); + assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]", newFilter.toString()); } @Test @@ -441,9 +405,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // $1<"9" round up to $1<"10" TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals( - "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", - newFilter.toString()); + assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]", newFilter.toString()); } @Test @@ -464,8 +426,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // $1<="9" round down to FALSE TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", - newFilter.toString()); + assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], []]", newFilter.toString()); } @Test @@ -486,9 +447,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // $1 in ("9", "10", "15") has only "10" left TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null); - assertEquals( - "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", - newFilter.toString()); + assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]", newFilter.toString()); } private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java ---------------------------------------------------------------------- diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java index 022a800..1627b4f 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java @@ -71,26 +71,20 @@ public class SortedIteratorMergerWithLimitTest { @Test public void basic1() { - List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(3)); - List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(3)); - List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(5)); + List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3)); + List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3)); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); List<Iterator<CloneableInteger>> input = Lists.newArrayList(); input.add(a.iterator()); input.add(b.iterator()); input.add(c.iterator()); - SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>( - input.iterator(), 3, getComp()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); Iterator<CloneableInteger> iterator = merger.getIterator(); List<CloneableInteger> result = Lists.newArrayList(); while (iterator.hasNext()) { result.add(iterator.next()); } - Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(1), - new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(2), - new CloneableInteger(3), new CloneableInteger(3)), result); + Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(3), new CloneableInteger(3)), result); } @Test @@ -98,21 +92,18 @@ public class SortedIteratorMergerWithLimitTest { List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2)); List<CloneableInteger> b = Lists.newArrayList(); - List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(5)); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); List<Iterator<CloneableInteger>> input = Lists.newArrayList(); input.add(a.iterator()); input.add(b.iterator()); input.add(c.iterator()); - SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>( - input.iterator(), 3, getComp()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); Iterator<CloneableInteger> iterator = merger.getIterator(); List<CloneableInteger> result = Lists.newArrayList(); while (iterator.hasNext()) { result.add(iterator.next()); } - Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(2), new CloneableInteger(5)), result); + Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(5)), result); } @Test(expected = IllegalStateException.class) @@ -120,14 +111,12 @@ public class SortedIteratorMergerWithLimitTest { List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2), new CloneableInteger(1)); List<CloneableInteger> b = Lists.newArrayList(); - List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), - new CloneableInteger(5)); + List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5)); List<Iterator<CloneableInteger>> input = Lists.newArrayList(); input.add(a.iterator()); input.add(b.iterator()); input.add(c.iterator()); - SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>( - input.iterator(), 3, getComp()); + SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp()); Iterator<CloneableInteger> iterator = merger.getIterator(); List<CloneableInteger> result = Lists.newArrayList(); while (iterator.hasNext()) { @@ -135,4 +124,4 @@ public class SortedIteratorMergerWithLimitTest { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 9b44277..1ec23b6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -42,8 +42,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) { super(newSegment, submitter); - Preconditions.checkArgument(!newSegment.isEnableSharding(), - "V1 job engine does not support building sharded cubes"); + Preconditions.checkArgument(!newSegment.isEnableSharding(), "V1 job engine does not support building sharded cubes"); this.inputSide = MRUtil.getBatchCubingInputSide(seg); this.outputSide = MRUtil.getBatchCubingOutputSide((CubeSegment) seg); @@ -70,8 +69,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= groupRowkeyColumnsCount; i++) { - result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), - getCuboidOutputPathsByLevel(cuboidRootPath, i), i)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i)); } outputSide.addStepPhase3_BuildCube(result, cuboidRootPath); @@ -96,8 +94,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); @@ -118,8 +115,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level); ndCuboidStep.setMapReduceParams(cmd.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index c177f1f..106077c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -81,8 +81,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= maxLevel; i++) { - result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), - getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); } } @@ -108,8 +107,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Cube_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); cubeStep.setMapReduceParams(cmd.toString()); @@ -134,14 +132,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); - // baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); +// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } @@ -149,8 +146,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return BaseCuboidJob.class; } - private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level, - String jobId) { + private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level, String jobId) { // ND cuboid job MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); @@ -162,8 +158,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java index 5dfa834..0b4ae40 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java @@ -41,8 +41,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) { super(mergeSegment, submitter); - Preconditions.checkArgument(!mergeSegment.isEnableSharding(), - "V1 job engine does not support merging sharded cubes"); + Preconditions.checkArgument(!mergeSegment.isEnableSharding(), "V1 job engine does not support merging sharded cubes"); this.outputSide = MRUtil.getBatchMergeOutputSide(mergeSegment); } @@ -57,8 +56,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { final String cuboidRootPath = getCuboidRootPath(jobId); final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment); - Preconditions.checkState(mergingSegments.size() > 1, - "there should be more than 2 segments to merge, target segment " + cubeSegment); + Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + cubeSegment); final List<String> mergingSegmentIds = Lists.newArrayList(); final List<String> mergingCuboidPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { @@ -91,8 +89,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index 17a75d2..badf628 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -50,8 +50,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { final String jobId = result.getId(); final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment); - Preconditions.checkState(mergingSegments.size() > 1, - "there should be more than 2 segments to merge, target segment " + cubeSegment); + Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + cubeSegment); final List<String> mergingSegmentIds = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); @@ -73,8 +72,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { return result; } - private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, - String mergedStatisticsFolder) { + private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) { MergeStatisticsStep result = new MergeStatisticsStep(); result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java index b385140..a504899 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java @@ -113,8 +113,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> * negative if left is smaller than right. */ public int compareTo(ByteArrayWritable that) { - return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, - that.length); + return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length); } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 2087205..5aa7d72 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -80,13 +80,11 @@ public class CubingJob extends DefaultChainedExecutable { private static CubingJob initCubingJob(CubeSegment seg, String jobType, String submitter, JobEngineConfig config) { KylinConfig kylinConfig = config.getConfig(); CubeInstance cube = seg.getCubeInstance(); - List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(), - cube.getName()); + List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(), cube.getName()); if (projList == null || projList.size() == 0) { throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!"); } else if (projList.size() >= 2) { - String msg = "Find more than one project containing the cube " + cube.getName() - + ". It does't meet the uniqueness requirement!!! "; + String msg = "Find more than one project containing the cube " + cube.getName() + ". It does't meet the uniqueness requirement!!! "; if (!config.getConfig().allowCubeAppearInMultipleProjects()) { throw new RuntimeException(msg); } else { @@ -101,8 +99,7 @@ public class CubingJob extends DefaultChainedExecutable { result.setProjectName(projList.get(0).getName()); CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); - result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " - + format.format(new Date(System.currentTimeMillis()))); + result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis()))); result.setSubmitter(submitter); result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList()); return result; @@ -130,13 +127,11 @@ public class CubingJob extends DefaultChainedExecutable { @Override protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) { - CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()) - .getCube(CubingExecutableUtil.getCubeName(this.getParams())); + CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams())); final Output output = getManager().getOutput(getId()); String logMsg; state = output.getState(); - if (state != ExecutableState.ERROR - && !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString())) { + if (state != ExecutableState.ERROR && !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString())) { logger.info("state:" + state + " no need to notify users"); return null; } @@ -165,8 +160,7 @@ public class CubingJob extends DefaultChainedExecutable { content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins"); content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString()); content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter")); - content = content.replaceAll("\\$\\{error_log\\}", - Matcher.quoteReplacement(StringUtil.noBlank(logMsg, "no error message"))); + content = content.replaceAll("\\$\\{error_log\\}", Matcher.quoteReplacement(StringUtil.noBlank(logMsg, "no error message"))); try { InetAddress inetAddress = InetAddress.getLocalHost(); @@ -175,8 +169,7 @@ public class CubingJob extends DefaultChainedExecutable { logger.warn(e.getLocalizedMessage(), e); } - String title = "[" + state.toString() + "] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " - + CubingExecutableUtil.getCubeName(this.getParams()); + String title = "[" + state.toString() + "] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " + CubingExecutableUtil.getCubeName(this.getParams()); return Pair.of(title, content); } @@ -203,8 +196,7 @@ public class CubingJob extends DefaultChainedExecutable { */ @Override protected void handleMetaDataPersistException(Exception exception) { - String title = "[ERROR] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " - + CubingExecutableUtil.getCubeName(this.getParams()); + String title = "[ERROR] - [" + getDeployEnvName() + "] - [" + getProjectName() + "] - " + CubingExecutableUtil.getCubeName(this.getParams()); String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE; final String UNKNOWN = "UNKNOWN"; String errMsg = null; @@ -225,8 +217,7 @@ public class CubingJob extends DefaultChainedExecutable { content = content.replaceAll("\\$\\{mr_waiting\\}", UNKNOWN); content = content.replaceAll("\\$\\{last_update_time\\}", UNKNOWN); content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter")); - content = content.replaceAll("\\$\\{error_log\\}", - Matcher.quoteReplacement(StringUtil.noBlank(errMsg, "no error message"))); + content = content.replaceAll("\\$\\{error_log\\}", Matcher.quoteReplacement(StringUtil.noBlank(errMsg, "no error message"))); try { InetAddress inetAddress = InetAddress.getLocalHost(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java index b448f56..c036445 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java @@ -70,7 +70,7 @@ public class DFSFileTable implements IReadableTable { } return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond()); } - + @Override public boolean exists() throws IOException { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index 49a3169..0c9c3fc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -188,8 +188,7 @@ public class DFSFileTableReader implements TableReader { if (expectedColumnNumber > 0) { for (String delim : DETECT_DELIMS) { if (StringSplitter.split(line, delim).length == expectedColumnNumber) { - logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber - + " columns -- " + line); + logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line); return delim; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java index 443d6a1..69bba0a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java @@ -94,8 +94,7 @@ public interface IMROutput2 { * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with * dictionary encoding; Mx is measure value serialization form. */ - public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, - DefaultChainedExecutable jobFlow); + public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow); /** Add step that does any necessary clean up. */ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index d92b654..c1ed345 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -73,10 +73,8 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId)); - appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, - String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, - "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); result.setMapReduceParams(cmd.toString()); result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); @@ -106,6 +104,7 @@ public class JobBuilderSupport { CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + return result; } @@ -120,8 +119,7 @@ public class JobBuilderSupport { return result; } - public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, - String jobId) { + public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) { UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); @@ -205,4 +203,5 @@ public class JobBuilderSupport { } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java index c9e2377..1595bdd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java @@ -40,8 +40,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, } @Override - final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Accepting Mapper Key with ordinal: " + mapCounter); @@ -62,14 +61,12 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, } } - protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.map(key, value, context); } @Override - final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { doCleanup(context); } catch (IOException ex) { // KYLIN-2170 @@ -87,7 +84,6 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, } } - protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java index e3aef14..e47af9c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java @@ -39,8 +39,7 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI } @Override - final public void reduce(KEYIN key, Iterable<VALUEIN> values, - Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Accepting Reducer Key with ordinal: " + reduceCounter); @@ -62,14 +61,12 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI } } - protected void doReduce(KEYIN key, Iterable<VALUEIN> values, - Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { + protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.reduce(key, values, context); } @Override - final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { doCleanup(context); } catch (IOException ex) { // KYLIN-2170 @@ -87,7 +84,6 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI } } - protected void doCleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) - throws IOException, InterruptedException { + protected void doCleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 11105cc..cbb68d2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -44,8 +44,7 @@ public class MRUtil { } public static IMRTableInputFormat getTableInputFormat(String tableName) { - return SourceFactory.createEngineAdapter(getTableDesc(tableName), IMRInput.class) - .getTableInputFormat(getTableDesc(tableName)); + return SourceFactory.createEngineAdapter(getTableDesc(tableName), IMRInput.class).getTableInputFormat(getTableDesc(tableName)); } public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java index 2d1c5fc..bcf4b98 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java @@ -85,7 +85,7 @@ public class SortedColumnDFSFile implements IReadableTable { public TableSignature getSignature() throws IOException { return dfsFileTable.getSignature(); } - + @Override public boolean exists() throws IOException { return dfsFileTable.exists(); @@ -104,8 +104,7 @@ public class SortedColumnDFSFile implements IReadableTable { Long num2 = Long.parseLong(str2); return num1.compareTo(num2); } catch (NumberFormatException e) { - logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" - + str2); + logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2); e.printStackTrace(); return 0; } @@ -120,8 +119,7 @@ public class SortedColumnDFSFile implements IReadableTable { Double num2 = Double.parseDouble(str2); return num1.compareTo(num2); } catch (NumberFormatException e) { - logger.error( - "NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2); + logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2); return 0; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java index c2679bc..bb00442 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFileReader.java @@ -17,13 +17,13 @@ */ package org.apache.kylin.engine.mr; +import org.apache.kylin.source.IReadableTable; + import java.io.IOException; import java.util.Collection; import java.util.Comparator; import java.util.PriorityQueue; -import org.apache.kylin.source.IReadableTable; - /** * Created by xiefan on 16-11-22. */ @@ -37,8 +37,7 @@ public class SortedColumnDFSFileReader implements IReadableTable.TableReader { private String[] row; - public SortedColumnDFSFileReader(Collection<IReadableTable.TableReader> readers, - final Comparator<String> comparator) { + public SortedColumnDFSFileReader(Collection<IReadableTable.TableReader> readers, final Comparator<String> comparator) { this.readers = readers; this.comparator = comparator; pq = new PriorityQueue<ReaderBuffer>(11, new Comparator<ReaderBuffer>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 4515773..764cbdd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -77,41 +77,22 @@ import org.slf4j.LoggerFactory; public abstract class AbstractHadoopJob extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); - protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg() - .isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)") - .create(BatchConstants.ARG_JOB_NAME); - protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() - .isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube") - .create(BatchConstants.ARG_CUBE_NAME); - protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID) - .hasArg().isRequired(false).withDescription("ID of cubing job executable") - .create(BatchConstants.ARG_CUBING_JOB_ID); + protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); + protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); + protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID); // @Deprecated - protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME) - .hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); - protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg() - .isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); - protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() - .isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); - protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT) - .hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); - protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() - .isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); - protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg() - .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION) - .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); - protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME) - .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); - - protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder - .withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false) - .withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); - protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT) - .hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); - protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder - .withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false) - .withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); + protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME); + protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID); + protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); + protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); + protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); + protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); + protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); + protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); + + protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); + protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); + protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; @@ -169,9 +150,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } else { job.waitForCompletion(true); retVal = job.isSuccessful() ? 0 : 1; - logger.debug("Job '" + job.getJobName() + "' finished " - + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") - + formatTime((System.nanoTime() - start) / 1000000L)); + logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L)); } return retVal; } @@ -194,8 +173,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { Configuration jobConf = job.getConfiguration(); String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); if (classpath == null || classpath.length() == 0) { - logger.info("Didn't find " + MAP_REDUCE_CLASSPATH - + " in job configuration, will run 'mapred classpath' to get the default value."); + logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value."); classpath = getDefaultMapRedClasspath(); logger.info("The default mapred classpath is: " + classpath); } @@ -234,13 +212,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { StringUtil.appendWithSeparator(kylinDependency, hiveExecJarPath); logger.info("hive-exec jar file: " + hiveExecJarPath); - String hiveHCatJarPath = ClassUtil - .findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat")); + String hiveHCatJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hive.hcatalog.mapreduce.HCatInputFormat")); StringUtil.appendWithSeparator(kylinDependency, hiveHCatJarPath); logger.info("hive-catalog jar file: " + hiveHCatJarPath); - String hiveMetaStoreJarPath = ClassUtil - .findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table")); + String hiveMetaStoreJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.hadoop.hive.metastore.api.Table")); StringUtil.appendWithSeparator(kylinDependency, hiveMetaStoreJarPath); logger.info("hive-metastore jar file: " + hiveMetaStoreJarPath); } catch (ClassNotFoundException e) { @@ -256,8 +232,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } else { logger.info("No Kafka dependency jar set in the environment, will find them from classpath:"); try { - String kafkaClientJarPath = ClassUtil - .findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); + String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer")); StringUtil.appendWithSeparator(kylinDependency, kafkaClientJarPath); logger.info("kafka jar file: " + kafkaClientJarPath); @@ -328,7 +303,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { logger.warn("The directory of kylin dependency '" + fileName + "' does not exist, skip"); continue; } - + if (fs.getFileStatus(p).isDirectory()) { appendTmpDir(job, fs, p, jarList, fileList); continue; @@ -515,8 +490,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return dumpList; } - protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) - throws IOException { + protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); FileUtils.forceDelete(tmp); // we need a directory, so delete the file first @@ -585,8 +559,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { HadoopUtil.deletePath(conf, path); } - public static double getTotalMapInputMB(Job job) - throws ClassNotFoundException, IOException, InterruptedException, JobException { + public static double getTotalMapInputMB(Job job) throws ClassNotFoundException, IOException, InterruptedException, JobException { if (job == null) { throw new JobException("Job is null"); } @@ -603,13 +576,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return totalMapInputMB; } - protected double getTotalMapInputMB() - throws ClassNotFoundException, IOException, InterruptedException, JobException { + protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException { return getTotalMapInputMB(job); } - protected int getMapInputSplitCount() - throws ClassNotFoundException, JobException, IOException, InterruptedException { + protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException { if (job == null) { throw new JobException("Job is null"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index cb478c7..07b636b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -18,11 +18,7 @@ package org.apache.kylin.engine.mr.common; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.collect.Sets; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeSegment; @@ -39,7 +35,10 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; /** */ @@ -60,9 +59,8 @@ public class BaseCuboidBuilder implements java.io.Serializable { protected KylinConfig kylinConfig; - public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, - CubeJoinedFlatTableEnrich intermediateTableDesc, AbstractRowKeyEncoder rowKeyEncoder, - MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) { + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc, + AbstractRowKeyEncoder rowKeyEncoder, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) { this.kylinConfig = kylinConfig; this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; @@ -75,8 +73,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); } - public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, - CubeJoinedFlatTableEnrich intermediateTableDesc) { + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) { this.kylinConfig = kylinConfig; this.cubeDesc = cubeDesc; this.cubeSegment = cubeSegment; http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 001e76d..602b4bb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -62,6 +62,7 @@ public interface BatchConstants { String CFG_OUTPUT_STATISTICS = "statistics"; String CFG_OUTPUT_PARTITION = "partition"; + /** * command line ARGuments */ http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index f8631ec..a372c5b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -161,8 +161,7 @@ public class CubeStatsReader { return mapperOverlapRatioOfFirstBuild; } - public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HLLCounter> hllcMap, - int samplingPercentage) { + public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HLLCounter> hllcMap, int samplingPercentage) { Map<Long, Long> cuboidRowCountMap = Maps.newHashMap(); for (Map.Entry<Long, HLLCounter> entry : hllcMap.entrySet()) { // No need to adjust according sampling percentage. Assumption is that data set is far @@ -186,8 +185,7 @@ public class CubeStatsReader { Map<Long, Double> sizeMap = Maps.newHashMap(); for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { - sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), - baseCuboidId, rowkeyColumnSize)); + sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), baseCuboidId, rowkeyColumnSize)); } return sizeMap; } @@ -197,14 +195,13 @@ public class CubeStatsReader { * * @return the cuboid size in M bytes */ - private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, - long baseCuboidId, List<Integer> rowKeyColumnLength) { + private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) { int rowkeyLength = cubeSegment.getRowKeyPreambleSize(); KylinConfig kylinConf = cubeSegment.getConfig(); long mask = Long.highestOneBit(baseCuboidId); - long parentCuboidIdActualLength = (long) Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); + long parentCuboidIdActualLength = (long)Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId); for (int i = 0; i < parentCuboidIdActualLength; i++) { if ((mask & cuboidId) > 0) { rowkeyLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i)); @@ -226,8 +223,7 @@ public class CubeStatsReader { double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); - double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio - + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio) / (1024L * 1024L); + double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio) / (1024L * 1024L); return ret; } @@ -240,8 +236,7 @@ public class CubeStatsReader { out.println("============================================================================"); out.println("Statistics of " + seg); out.println(); - out.println( - "Cube statistics hll precision: " + cuboidRowEstimatesHLL.values().iterator().next().getPrecision()); + out.println("Cube statistics hll precision: " + cuboidRowEstimatesHLL.values().iterator().next().getPrecision()); out.println("Total cuboids: " + cuboidRows.size()); out.println("Total estimated rows: " + SumHelper.sumLong(cuboidRows.values())); out.println("Total estimated size(MB): " + SumHelper.sumDouble(cuboidSizes.values())); @@ -262,8 +257,7 @@ public class CubeStatsReader { ret += cuboidSizeMap.get(cuboidId); } - logger.info("Estimating size for layer {}, all cuboids are {}, total size is {}", level, - StringUtils.join(layeredCuboids.get(level), ","), ret); + logger.info("Estimating size for layer {}, all cuboids are {}, total size is {}", level, StringUtils.join(layeredCuboids.get(level), ","), ret); return ret; } @@ -286,8 +280,7 @@ public class CubeStatsReader { } } - private static void printCuboidInfoTree(long parent, long cuboidID, final CuboidScheduler scheduler, - Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { + private static void printCuboidInfoTree(long parent, long cuboidID, final CuboidScheduler scheduler, Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes, dimensionCount, depth, out); List<Long> children = scheduler.getSpanningCuboid(cuboidID); @@ -298,8 +291,7 @@ public class CubeStatsReader { } } - private static void printOneCuboidInfo(long parent, long cuboidID, Map<Long, Long> cuboidRows, - Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { + private static void printOneCuboidInfo(long parent, long cuboidID, Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int dimensionCount, int depth, PrintWriter out) { StringBuffer sb = new StringBuffer(); for (int i = 0; i < depth; i++) { sb.append(" "); @@ -312,8 +304,7 @@ public class CubeStatsReader { sb.append(", est row: ").append(rowCount).append(", est MB: ").append(formatDouble(size)); if (parent != -1) { - sb.append(", shrink: ").append(formatDouble(100.0 * cuboidRows.get(cuboidID) / cuboidRows.get(parent))) - .append("%"); + sb.append(", shrink: ").append(formatDouble(100.0 * cuboidRows.get(cuboidID) / cuboidRows.get(parent))).append("%"); } out.println(sb.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index 0b288f3..8f400c3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -43,8 +43,7 @@ public class CubeStatsWriter { } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) - throws IOException { + Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); List<Long> allCuboids = new ArrayList<Long>(); @@ -52,12 +51,11 @@ public class CubeStatsWriter { Collections.sort(allCuboids); ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), - SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); try { // mapper overlap ratio at key -1 writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); - + // mapper number at key -2 writer.append(new LongWritable(-2), new BytesWritable(Bytes.toBytes(mapperNumber))); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index 6b8bc9c..b6dbd5d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -32,8 +32,7 @@ import com.google.common.collect.Maps; public class CuboidShardUtil { protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class); - public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) - throws IOException { + public static void saveCuboidShards(CubeSegment segment, Map<Long, Short> cuboidShards, int totalShards) throws IOException { CubeManager cubeManager = CubeManager.getInstance(segment.getConfig()); Map<Long, Short> filtered = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java index 167d58c..d66e4eb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DefaultSslProtocolSocketFactory.java @@ -53,8 +53,7 @@ public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFact /** * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int) */ - public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) - throws IOException, UnknownHostException { + public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException { return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort); } @@ -92,8 +91,7 @@ public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFact * @throws IllegalArgumentException * DOCUMENT ME! */ - public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, - final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException { + public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException { if (params == null) { throw new IllegalArgumentException("Parameters may not be null"); } @@ -118,8 +116,7 @@ public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFact /** * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean) */ - public Socket createSocket(Socket socket, String host, int port, boolean autoClose) - throws IOException, UnknownHostException { + public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException { return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose); } http://git-wip-us.apache.org/repos/asf/kylin/blob/19585846/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java index fce4353..5da1947 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java @@ -94,7 +94,7 @@ public class HadoopCmdOutput { } this.output.append(counters.toString()).append("\n"); logger.debug(counters.toString()); - + String bytsWrittenCounterName = "HDFS_BYTES_WRITTEN"; String fsScheme = FileSystem.get(job.getConfiguration()).getScheme(); if (("wasb").equalsIgnoreCase(fsScheme)) { @@ -103,8 +103,7 @@ public class HadoopCmdOutput { } mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue()); - hdfsBytesWritten = String - .valueOf(counters.findCounter("FileSystemCounters", bytsWrittenCounterName).getValue()); + hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", bytsWrittenCounterName).getValue()); rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue()); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e);