KYLIN-2421 Add spark engine to Integration Test
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5da53936 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5da53936 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5da53936 Branch: refs/heads/master-hbase0.98 Commit: 5da53936502136c0d56236e148da2751aa1462c9 Parents: 855301d Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Jan 20 11:28:57 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sat Feb 4 19:37:59 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 10 ++ .../measure/bitmap/RoaringBitmapCounter.java | 3 +- .../bitmap/RoaringBitmapCounterFactory.java | 3 +- .../measure/percentile/PercentileCounter.java | 22 ++- .../percentile/PercentileSerializer.java | 6 +- .../kylin/measure/topn/TopNAggregator.java | 5 +- .../percentile/PercentileCounterTest.java | 47 ++++++ .../kylin/engine/mr/BatchCubingJobBuilder2.java | 8 +- .../engine/spark/KylinKryoRegistrator.java | 161 +++++++++++++++++++ .../spark/SparkBatchCubingJobBuilder2.java | 12 +- .../apache/kylin/engine/spark/SparkCubing.java | 123 +------------- .../kylin/engine/spark/SparkCubingByLayer.java | 65 ++++---- .../localmeta/cube_desc/ci_inner_join_cube.json | 14 +- examples/test_case_data/sandbox/core-site.xml | 2 + .../test_case_data/sandbox/kylin.properties | 29 ++-- kylin-it/pom.xml | 21 +++ .../kylin/provision/BuildCubeWithEngine.java | 25 +++ 17 files changed, 355 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6a88fc4..fe15b1e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -68,6 +68,12 @@ abstract public class KylinConfigBase implements Serializable { return sparkHome; } + sparkHome = System.getProperty("SPARK_HOME"); + if (StringUtils.isNotEmpty(sparkHome)) { + logger.info("SPARK_HOME was set to " + sparkHome); + return sparkHome; + } + return getKylinHome() + File.separator + "spark"; } @@ -760,6 +766,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.engine.spark.env.hadoop-conf-dir", ""); } + public void setHadoopConfDir(String hadoopConfDir) { + setProperty("kylin.engine.spark.env.hadoop-conf-dir", hadoopConfDir); + } + public String getSparkAdditionalJars() { return getOptional("kylin.engine.spark.additional-jars", ""); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java index fb9dcfc..eec45f2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java @@ -24,6 +24,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; import java.io.DataOutputStream; import java.io.IOException; +import java.io.Serializable; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Iterator; @@ -31,7 +32,7 @@ import java.util.Iterator; /** * A {@link BitmapCounter} based on roaring bitmap. */ -public class RoaringBitmapCounter implements BitmapCounter { +public class RoaringBitmapCounter implements BitmapCounter, Serializable { private ImmutableRoaringBitmap bitmap; http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java index a71df95..822afa2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java @@ -21,9 +21,10 @@ package org.apache.kylin.measure.bitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; -public class RoaringBitmapCounterFactory implements BitmapCounterFactory { +public class RoaringBitmapCounterFactory implements BitmapCounterFactory, Serializable { public static final BitmapCounterFactory INSTANCE = new RoaringBitmapCounterFactory(); private RoaringBitmapCounterFactory() {} http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java index bf505cf..f86a796 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java @@ -18,6 +18,9 @@ package org.apache.kylin.measure.percentile; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; @@ -30,7 +33,7 @@ public class PercentileCounter implements Serializable { double compression; double quantileRatio; - TDigest registers; + transient TDigest registers; public PercentileCounter(double compression) { this(compression, INVALID_QUANTILE_RATIO); @@ -94,4 +97,21 @@ public class PercentileCounter implements Serializable { public void clear() { reInitRegisters(); } + + private void writeObject(ObjectOutputStream out) throws IOException { + registers.compress(); + int bound = registers.byteSize(); + ByteBuffer buf = ByteBuffer.allocate(bound); + registers.asSmallBytes(buf); + out.defaultWriteObject(); + out.writeInt(bound); + out.write(buf.array(), 0, bound); + } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + int bound = in.readInt(); + ByteBuffer buf = ByteBuffer.allocate(bound); + in.read(buf.array(), 0, bound); + registers = AVLTreeDigest.fromBytes(buf); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java index a0a2a77..d7e4204 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java @@ -25,7 +25,7 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer; public class PercentileSerializer extends DataTypeSerializer<PercentileCounter> { // be thread-safe and avoid repeated obj creation - private ThreadLocal<PercentileCounter> current = new ThreadLocal<>(); + private transient ThreadLocal<PercentileCounter> current = null; private double compression; @@ -49,6 +49,10 @@ public class PercentileSerializer extends DataTypeSerializer<PercentileCounter> } private PercentileCounter current() { + if (current == null) { + current = new ThreadLocal<>(); + } + PercentileCounter counter = current.get(); if (counter == null) { counter = new PercentileCounter(compression); http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java index b5e316f..bc2bc36 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java @@ -46,10 +46,11 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> { @Override public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, TopNCounter<ByteArray> value2) { - TopNCounter<ByteArray> aggregated = new TopNCounter<>(capacity * 2); + int thisCapacity = value1.getCapacity(); + TopNCounter<ByteArray> aggregated = new TopNCounter<>(thisCapacity * 2); aggregated.merge(value1); aggregated.merge(value2); - aggregated.retain(capacity); + aggregated.retain(thisCapacity); return aggregated; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java index abaa409..94a1233 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java @@ -20,11 +20,19 @@ package org.apache.kylin.measure.percentile; import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.Collections; import java.util.List; import java.util.Random; +import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.MathUtil; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Lists; @@ -76,4 +84,43 @@ public class PercentileCounterTest { assertEquals(expectedResult, actualResult, 0); } + + @Test + public void testSerialization() { + double compression = 100; + double quantile = 0.5; + ByteArrayOutputStream os = new ByteArrayOutputStream(1024); + ObjectOutputStream out = null; + PercentileCounter origin_counter = null; + try { + out = new ObjectOutputStream(os); + + origin_counter = new PercentileCounter(compression, quantile); + out.writeObject(origin_counter); + + } catch (IOException e) { + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(out); + } + + InputStream is = new ByteArrayInputStream(os.toByteArray()); + PercentileCounter serialized_counter = null; + ObjectInputStream in = null; + try { + in = new ObjectInputStream(is); + serialized_counter = (PercentileCounter)in.readObject(); + + Assert.assertNotNull(serialized_counter); + Assert.assertNotNull(serialized_counter.registers); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(os); + IOUtils.closeQuietly(is); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 0f604e2..106077c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -31,7 +31,6 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +64,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { // Phase 3: Build Cube addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute - result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); // inmem cubing, only selected algorithm will execute + addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute outputSide.addStepPhase3_BuildCube(result); // Phase 4: Update Metadata & Cleanup @@ -96,7 +95,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } - protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { + protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) { // base cuboid job MapReduceExecutable cubeStep = new MapReduceExecutable(); @@ -113,8 +112,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(getInMemCuboidJob()); -// cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES); - return cubeStep; + result.addTask(cubeStep); } protected Class<? extends AbstractHadoopJob> getInMemCuboidJob() { http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java new file mode 100644 index 0000000..3d33aa8 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark; + +import com.esotericsoftware.kryo.Kryo; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.spark.serializer.KryoRegistrator; +import org.reflections.Reflections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Set; + +/** + * Registor for registering classes and serializers to Kryo + */ +public class KylinKryoRegistrator implements KryoRegistrator { + protected static final Logger logger = LoggerFactory.getLogger(KylinKryoRegistrator.class); + + @Override + public void registerClasses(Kryo kryo) { + + Set<Class> kyroClasses = Sets.newLinkedHashSet(); + kyroClasses.add(byte[].class); + kyroClasses.add(int[].class); + kyroClasses.add(byte[][].class); + kyroClasses.add(String[].class); + kyroClasses.add(String[][].class); + kyroClasses.add(Object[].class); + kyroClasses.add(java.math.BigDecimal.class); + kyroClasses.add(java.util.ArrayList.class); + kyroClasses.add(java.util.LinkedList.class); + kyroClasses.add(java.util.HashSet.class); + kyroClasses.add(java.util.LinkedHashSet.class); + kyroClasses.add(java.util.LinkedHashMap.class); + kyroClasses.add(java.util.HashMap.class); + kyroClasses.add(java.util.TreeMap.class); + kyroClasses.add(java.util.Properties.class); + kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class)); + + kyroClasses.add(org.apache.spark.sql.Row[].class); + kyroClasses.add(org.apache.spark.sql.Row.class); + kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class); + kyroClasses.add(org.apache.spark.sql.types.StructType.class); + kyroClasses.add(org.apache.spark.sql.types.StructField[].class); + kyroClasses.add(org.apache.spark.sql.types.StructField.class); + kyroClasses.add(org.apache.spark.sql.types.DateType$.class); + kyroClasses.add(org.apache.spark.sql.types.Metadata.class); + kyroClasses.add(org.apache.spark.sql.types.StringType$.class); + kyroClasses.add(Hashing.murmur3_128().getClass()); + kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class); + kyroClasses.add(org.apache.spark.sql.types.Decimal.class); + kyroClasses.add(scala.math.BigDecimal.class); + kyroClasses.add(java.math.MathContext.class); + kyroClasses.add(java.math.RoundingMode.class); + kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class); + kyroClasses.add(java.util.Random.class); + kyroClasses.add(java.util.concurrent.atomic.AtomicLong.class); + + kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class); + kyroClasses.add(org.apache.kylin.common.util.Array.class); + kyroClasses.add(org.apache.kylin.metadata.model.Segments.class); + kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class); + kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class); + kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class); + kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class); + kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class); + kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class); + kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); + kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class); + kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class); + kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class); + kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class); + kyroClasses.add(org.apache.kylin.measure.topn.DoubleDeltaSerializer.class); + kyroClasses.add(org.apache.kylin.measure.bitmap.RoaringBitmapCounter.class); + kyroClasses.add(org.roaringbitmap.buffer.MutableRoaringArray.class); + kyroClasses.add(org.roaringbitmap.buffer.MappeableContainer[].class); + kyroClasses.add(org.roaringbitmap.buffer.MutableRoaringBitmap.class); + kyroClasses.add(org.roaringbitmap.buffer.MappeableArrayContainer.class); + kyroClasses.add(org.apache.kylin.measure.bitmap.RoaringBitmapCounterFactory.class); + kyroClasses.add(org.apache.kylin.measure.topn.Counter.class); + kyroClasses.add(org.apache.kylin.measure.topn.TopNCounter.class); + kyroClasses.add(org.apache.kylin.measure.percentile.PercentileSerializer.class); + kyroClasses.add(com.tdunning.math.stats.AVLTreeDigest.class); + kyroClasses.add(com.tdunning.math.stats.Centroid.class); + + addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList"); + addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer"); + addClassQuitely(kyroClasses, "scala.collection.immutable.Map$EmptyMap$"); + addClassQuitely(kyroClasses, "org.apache.spark.sql.catalyst.expressions.GenericInternalRow"); + addClassQuitely(kyroClasses, "org.apache.spark.unsafe.types.UTF8String"); + addClassQuitely(kyroClasses, "com.tdunning.math.stats.AVLGroupTree"); + + for (Class kyroClass : kyroClasses) { + kryo.register(kyroClass); + } + + // TODO: should use JavaSerializer for PercentileCounter after Kryo bug be fixed: https://github.com/EsotericSoftware/kryo/issues/489 + // kryo.register(PercentileCounter.class, new JavaSerializer()); + } + + private static void addClassQuitely(Set<Class> kyroClasses, String className) { + try { + kyroClasses.add(Class.forName(className)); + } catch (ClassNotFoundException e) { + logger.error("failed to load class", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 208a0c9..76b73b6 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -26,7 +26,6 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,11 +42,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { @Override protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { - - } - - @Override - protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final SparkExecutable sparkExecutable = new SparkExecutable(); sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); @@ -71,7 +65,11 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setJars(jars.toString()); sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE); - return sparkExecutable; + result.addTask(sparkExecutable); + } + + @Override + protected void addInMemCubingSteps(final CubingJob result, String jobId, String cuboidRootPath) { } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 0437a80..2a0981a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -20,10 +20,8 @@ package org.apache.kylin.engine.spark; import java.io.File; import java.io.FileFilter; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -31,17 +29,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import javax.annotation.Nullable; - import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; @@ -84,7 +78,6 @@ import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.hllc.HLLCounter; -import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -108,16 +101,12 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; -import org.reflections.Reflections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -545,109 +534,6 @@ public class SparkCubing extends AbstractApplication { } } - public static Collection<String> getKyroClasses() { - Set<Class> kyroClasses = Sets.newHashSet(); - kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class)); - kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class)); - - kyroClasses.add(HashMap.class); - kyroClasses.add(org.apache.spark.sql.Row[].class); - kyroClasses.add(org.apache.spark.sql.Row.class); - kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class); - kyroClasses.add(org.apache.spark.sql.types.StructType.class); - kyroClasses.add(org.apache.spark.sql.types.StructField[].class); - kyroClasses.add(org.apache.spark.sql.types.StructField.class); - kyroClasses.add(org.apache.spark.sql.types.DateType$.class); - kyroClasses.add(org.apache.spark.sql.types.Metadata.class); - kyroClasses.add(org.apache.spark.sql.types.StringType$.class); - kyroClasses.add(Hashing.murmur3_128().getClass()); - kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class); - kyroClasses.add(Object[].class); - kyroClasses.add(int[].class); - kyroClasses.add(byte[].class); - kyroClasses.add(byte[][].class); - kyroClasses.add(String[].class); - kyroClasses.add(String[][].class); - kyroClasses.add(org.apache.spark.sql.types.Decimal.class); - kyroClasses.add(scala.math.BigDecimal.class); - kyroClasses.add(java.math.BigDecimal.class); - kyroClasses.add(java.math.MathContext.class); - kyroClasses.add(java.math.RoundingMode.class); - kyroClasses.add(java.util.ArrayList.class); - kyroClasses.add(java.util.LinkedList.class); - kyroClasses.add(java.util.HashSet.class); - kyroClasses.add(java.util.LinkedHashSet.class); - kyroClasses.add(java.util.LinkedHashMap.class); - kyroClasses.add(java.util.TreeMap.class); - kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class); - - kyroClasses.add(java.util.HashMap.class); - kyroClasses.add(java.util.Properties.class); - kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class); - kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class); - kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class); - kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class); - kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class); - kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class); - kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class); - kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class); - kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class); - kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class); - kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class); - kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class); - kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class); - kyroClasses.add(org.apache.kylin.common.util.Array.class); - kyroClasses.add(org.apache.kylin.metadata.model.Segments.class); - kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class); - kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class); - kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class); - kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class); - kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class); - kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class); - kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class); - kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); - kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class); - kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class); - kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class); - kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class); - kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class); - kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class); - kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class); - kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class); - kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class); - kyroClasses.add(org.apache.kylin.measure.topn.DoubleDeltaSerializer.class); - kyroClasses.add(org.apache.kylin.measure.topn.Counter.class); - - try { - kyroClasses.add(Class.forName("com.google.common.collect.EmptyImmutableList")); - } catch (ClassNotFoundException e) { - logger.error("failed to load class", e); - } - - ArrayList<String> result = Lists.newArrayList(); - for (Class kyroClass : kyroClasses) { - result.add(kyroClass.getName()); - } - result.add("scala.collection.immutable.Map$EmptyMap$"); - result.add("org.apache.spark.sql.catalyst.expressions.GenericInternalRow"); - result.add("org.apache.spark.unsafe.types.UTF8String"); - return result; - } - @Override protected void execute(OptionsHelper optionsHelper) throws Exception { final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); @@ -658,15 +544,8 @@ public class SparkCubing extends AbstractApplication { //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); conf.set("spark.kryo.registrationRequired", "true"); - final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { - @Override - public boolean apply(@Nullable String input) { - return input != null && input.trim().length() > 0; - } - }); - System.out.println("kyro classes:" + allClasses.toString()); - conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext sqlContext = new HiveContext(sc.sc()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index d6790aa..8892a73 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,13 +17,10 @@ */ package org.apache.kylin.engine.spark; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -71,7 +68,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; -import javax.annotation.Nullable; import java.io.File; import java.io.FileFilter; import java.io.Serializable; @@ -79,7 +75,6 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; -import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses; /** */ @@ -129,11 +124,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } private static final void prepare() { - final File file = new File(SparkFiles.get("kylin.properties")); - final String confPath = file.getParentFile().getAbsolutePath(); + File file = new File(SparkFiles.get("kylin.properties")); + String confPath = file.getParentFile().getAbsolutePath(); logger.info("conf directory:" + confPath); System.setProperty(KylinConfig.KYLIN_CONF, confPath); ClassUtil.addClasspath(confPath); + } @Override @@ -144,17 +140,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); - SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + ", segment " + segmentId); + SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId); //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); conf.set("spark.kryo.registrationRequired", "true"); - final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { - @Override - public boolean apply(@Nullable String input) { - return input != null && input.trim().length() > 0; - } - }); - conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); JavaSparkContext sc = new JavaSparkContext(conf); setupClasspath(sc, confPath); @@ -176,11 +166,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); - - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); final int measureNum = cubeDesc.getMeasures().size(); - final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); int countMeasureIndex = 0; for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { @@ -204,12 +190,20 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // encode with dimension encoding, transform to <ByteArray, Object[]> RDD final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { transient boolean initialized = false; + BaseCuboidBuilder baseCuboidBuilder = null; @Override public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { if (initialized == false) { - prepare(); - initialized = true; + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + prepare(); + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + initialized = true; + } + } } String[] rowArray = rowToArray(row); @@ -235,7 +229,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa }); logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions()); - Long totalCount = 0L; + Long totalCount = 0L; if (kylinConfig.isSparkSanityCheckEnabled()) { totalCount = encodedBaseRDD.count(); logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count()); @@ -267,8 +261,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); logger.info("Level " + level + " partition number: " + partition); allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel); - if (kylinConfig.isSparkSanityCheckEnabled() == true) { - sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); + if (kylinConfig.isSparkSanityCheckEnabled() == true) { + sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, confOverwrite); allRDDs[level - 1].unpersist(); @@ -288,17 +282,18 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { - final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); - rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { - BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - @Override - public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { - ByteBuffer valueBuf = codec.encode(tuple2._2()); - byte[] encodedBytes = new byte[valueBuf.position()]; - System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); - return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); - } - }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); + } + }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json index 0fda3b3..99013ce 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json @@ -267,16 +267,6 @@ }, "returntype" : "raw" } - }, { - "name" : "GVM_PERCENTILE", - "function" : { - "expression" : "PERCENTILE", - "parameter" : { - "type" : "column", - "value" : "TEST_KYLIN_FACT.PRICE" - }, - "returntype" : "percentile(100)" - } } ], "dictionaries": [ { "column": "TEST_KYLIN_FACT.TEST_COUNT_DISTINCT_BITMAP", @@ -368,7 +358,7 @@ "name" : "f3", "columns" : [ { "qualifier" : "m", - "measure_refs" : [ "TEST_EXTENDED_COLUMN", "TRANS_ID_RAW", "PRICE_RAW", "CAL_DT_RAW", "BUYER_CONTACT", "SELLER_CONTACT", "GVM_PERCENTILE" ] + "measure_refs" : [ "TEST_EXTENDED_COLUMN", "TRANS_ID_RAW", "PRICE_RAW", "CAL_DT_RAW", "BUYER_CONTACT", "SELLER_CONTACT" ] } ] } ] }, @@ -448,7 +438,7 @@ "status_need_notify" : [ ], "auto_merge_time_ranges" : null, "retention_range" : 0, - "engine_type" : 2, + "engine_type" : 4, "storage_type" : 2, "override_kylin_properties": { "kylin.cube.algorithm": "LAYER" http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/examples/test_case_data/sandbox/core-site.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/core-site.xml b/examples/test_case_data/sandbox/core-site.xml index 7660a7e..a4ad5c6 100644 --- a/examples/test_case_data/sandbox/core-site.xml +++ b/examples/test_case_data/sandbox/core-site.xml @@ -178,9 +178,11 @@ <value>false</value> </property> + <!-- <property> <name>net.topology.script.file.name</name> <value>/etc/hadoop/conf/topology_script.py</value> </property> + --> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 6cb5148..91566ae 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -162,23 +162,24 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600 kylin.env=DEV kylin.source.hive.keep-flat-table=false -### Spark as Engine ### -kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox -kylin.engine.spark.sanity-check-enabled=false + +# Estimate the RDD partition numbers, the test cubes have a couple memory-hungry measure so the estimation is wild +kylin.engine.spark.rdd-partition-cut-mb=100 ### Spark conf overwrite for cube engine +kylin.engine.spark-conf.spark.yarn.submit.file.replication=1 kylin.engine.spark-conf.spark.master=yarn -kylin.engine.spark-conf.spark.submit.deployMode=client -kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=512 -kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384 -kylin.engine.spark-conf.spark.executor.memory=1G +kylin.engine.spark-conf.spark.submit.deployMode=cluster +kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=384 +kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=256 +kylin.engine.spark-conf.spark.executor.memory=768M kylin.engine.spark-conf.spark.executor.cores=1 kylin.engine.spark-conf.spark.executor.instances=1 kylin.engine.spark-conf.spark.storage.memoryFraction=0.3 -kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history -kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history -#kylin.engine.spark-conf.spark.yarn.queue=default -#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar -#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec - - +kylin.engine.spark-conf.spark.eventLog.enabled=true +kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///spark-history +kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///spark-history +kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current +kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current +kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index 9662806..91104ba 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -36,6 +36,7 @@ <properties> <hdp.version/> <fastBuildMode/> + <engineType/> </properties> <!-- Dependencies. --> @@ -238,6 +239,25 @@ <artifactId>kafka_2.10</artifactId> <scope>provided</scope> </dependency> + + <!-- Spark dependency --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_2.10</artifactId> + <scope>provided</scope> + </dependency> </dependencies> @@ -296,6 +316,7 @@ <arguments> <argument>-Dhdp.version=${hdp.version}</argument> <argument>-DfastBuildMode=${fastBuildMode}</argument> + <argument>-DengineType=${engineType}</argument> <argument>-Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties</argument> <argument>-classpath</argument> <classpath/> http://git-wip-us.apache.org/repos/asf/kylin/blob/5da53936/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 08cc6b9..726d72f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -40,10 +40,12 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.DeployUtil; @@ -68,9 +70,11 @@ import com.google.common.collect.Lists; public class BuildCubeWithEngine { private CubeManager cubeManager; + private CubeDescManager cubeDescManager; private DefaultScheduler scheduler; protected ExecutableManager jobService; private static boolean fastBuildMode = false; + private static int engineType; private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithEngine.class); @@ -110,7 +114,15 @@ public class BuildCubeWithEngine { logger.info("Will not use fast build mode"); } + String specifiedEngineType = System.getProperty("engineType"); + if (StringUtils.isNotEmpty(specifiedEngineType)) { + engineType = Integer.parseInt(specifiedEngineType); + } else { + engineType = 2; + } + System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); + System.setProperty("SPARK_HOME", "/usr/local/spark"); // need manually create and put spark to this folder on Jenkins if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169"); } @@ -154,6 +166,7 @@ public class BuildCubeWithEngine { } } + cubeDescManager = CubeDescManager.getInstance(kylinConfig); } public void after() { @@ -251,6 +264,9 @@ public class BuildCubeWithEngine { String cubeName = "ci_left_join_cube"; clearSegment(cubeName); + // ci_left_join_cube has percentile which isn't supported by Spark engine now + // updateCubeEngineType(cubeName); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); f.setTimeZone(TimeZone.getTimeZone("GMT")); long date1 = 0; @@ -278,6 +294,7 @@ public class BuildCubeWithEngine { String cubeName = "ci_inner_join_cube"; clearSegment(cubeName); + //updateCubeEngineType(cubeName); SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); f.setTimeZone(TimeZone.getTimeZone("GMT")); @@ -295,6 +312,14 @@ public class BuildCubeWithEngine { return false; } + private void updateCubeEngineType(String cubeName) throws IOException { + CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName); + if (cubeDesc.getEngineType() != engineType) { + cubeDesc.setEngineType(engineType); + cubeDescManager.updateCubeDesc(cubeDesc); + } + } + private void clearSegment(String cubeName) throws Exception { CubeInstance cube = cubeManager.getCube(cubeName); // remove all existing segments