KYLIN-2633 Upgrade Spark to 2.x
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cfaeb7e7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cfaeb7e7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cfaeb7e7 Branch: refs/heads/2.1.x Commit: cfaeb7e7eb12eefa7c0ecd40ceabeea6b5dbf62e Parents: 309fac0 Author: 许çé <xuyim...@hys-inc.cn> Authored: Fri Jul 7 00:13:24 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Fri Jul 21 16:13:02 2017 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 2 +- build/script/download-spark.sh | 4 ++-- .../main/resources/kylin-defaults.properties | 3 ++- .../gtrecord/GTCubeStorageQueryBase.java | 2 +- engine-spark/pom.xml | 6 ++--- .../apache/kylin/engine/spark/SparkCubing.java | 22 +++++++++--------- .../kylin/engine/spark/SparkCubingByLayer.java | 11 +++++---- .../kylin/engine/spark/SparkHiveDemo.java | 5 ++-- .../test_case_data/sandbox/kylin.properties | 4 ++-- kylin-it/pom.xml | 24 +++++++++++++++++++- pom.xml | 10 ++++---- server/pom.xml | 2 +- source-kafka/pom.xml | 2 +- 13 files changed, 61 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 0a64dde..b988445 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -152,7 +152,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/build/script/download-spark.sh ---------------------------------------------------------------------- diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh index b73331e..8025591 100755 --- a/build/script/download-spark.sh +++ b/build/script/download-spark.sh @@ -27,8 +27,8 @@ if [[ `uname -a` =~ "Darwin" ]]; then alias md5cmd="md5 -q" fi -spark_version="1.6.3" -spark_pkg_md5="ce8a2e7529aac0f0175194061769dbd4" +spark_version="2.1.1" +spark_pkg_md5="195daab700e4332fcdaf7c66236de542" if [ ! -f "build/spark-${spark_version}-bin-hadoop2.6.tgz" ] then http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-common/src/main/resources/kylin-defaults.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index cb511e7..ee25637 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -218,7 +218,7 @@ kylin.engine.spark.max-partition=5000 # Spark conf (default is in spark/conf/spark-defaults.conf) kylin.engine.spark-conf.spark.master=yarn -kylin.engine.spark-conf.spark.submit.deployMode=cluster +#kylin.engine.spark-conf.spark.submit.deployMode=cluster kylin.engine.spark-conf.spark.yarn.queue=default kylin.engine.spark-conf.spark.executor.memory=1G kylin.engine.spark-conf.spark.executor.cores=2 @@ -226,6 +226,7 @@ kylin.engine.spark-conf.spark.executor.instances=1 kylin.engine.spark-conf.spark.eventLog.enabled=true kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history +kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime #kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 10f735e..22f5fc9 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -83,7 +83,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) { logger.info("Skip cube segment {} because its input record is 0", cubeSeg); continue; - } + } scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), request.getGroups(), request.getMetrics(), request.getFilter(), request.getHavingFilter(), request.getContext()); if (!scanner.isSegmentSkipped()) http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/pom.xml ---------------------------------------------------------------------- diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml index 93b6f9b..26a6e31 100644 --- a/engine-spark/pom.xml +++ b/engine-spark/pom.xml @@ -51,19 +51,19 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> + <artifactId>spark-core_2.11</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-sql_2.11</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.10</artifactId> + <artifactId>spark-hive_2.11</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 2a0981a..57fd315 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -98,7 +98,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.slf4j.Logger; @@ -179,7 +179,7 @@ public class SparkCubing extends AbstractApplication { } } - private void writeDictionary(DataFrame intermediateTable, String cubeName, String segmentId) throws Exception { + private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); @@ -204,9 +204,9 @@ public class SparkCubing extends AbstractApplication { for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) { final String column = columns[entry.getKey()]; final TblColRef tblColRef = entry.getValue(); - final DataFrame frame = intermediateTable.select(column).distinct(); + final Dataset<Row> frame = intermediateTable.select(column).distinct(); - final Row[] rows = frame.collect(); + final List<Row> rows = frame.collectAsList(); dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() { @Override public Iterator<String> iterator() { @@ -215,13 +215,13 @@ public class SparkCubing extends AbstractApplication { @Override public boolean hasNext() { - return i < rows.length; + return i < rows.size(); } @Override public String next() { if (hasNext()) { - final Row row = rows[i++]; + final Row row = rows.get(i++); final Object o = row.get(0); return o != null ? o.toString() : null; } else { @@ -367,7 +367,7 @@ public class SparkCubing extends AbstractApplication { final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { @Override - public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { + public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { long t = System.currentTimeMillis(); prepare(); @@ -390,7 +390,7 @@ public class SparkCubing extends AbstractApplication { throw new RuntimeException(e); } System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); - return sparkCuboidWriter.getResult(); + return sparkCuboidWriter.getResult().iterator(); } }); @@ -430,7 +430,7 @@ public class SparkCubing extends AbstractApplication { } }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() { @Override - public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { + public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { return new Iterable<Tuple2<byte[], byte[]>>() { final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes); final Object[] input = new Object[measureSize]; @@ -458,7 +458,7 @@ public class SparkCubing extends AbstractApplication { } }); } - }; + }.iterator(); } }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { @Override @@ -549,7 +549,7 @@ public class SparkCubing extends AbstractApplication { JavaSparkContext sc = new JavaSparkContext(conf); HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable); + final Dataset<Row> intermediateTable = sqlContext.sql("select * from " + hiveTable); final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 91aa9f7..587ff78 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -59,7 +59,7 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; @@ -73,6 +73,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; @@ -155,7 +156,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.table(hiveTable); + final Dataset<Row> intermediateTable = sqlContext.table(hiveTable); final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -354,7 +355,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } @Override - public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { if (initialized == false) { prepare(); initialized = true; @@ -368,7 +369,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // if still empty or null if (myChildren == null || myChildren.size() == 0) { - return EMTPY_ITERATOR; + return EMTPY_ITERATOR.iterator(); } List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size()); @@ -382,7 +383,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2())); } - return tuples; + return tuples.iterator(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java index e1ba470..58d4222 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java @@ -22,7 +22,8 @@ import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; /** @@ -45,7 +46,7 @@ public class SparkHiveDemo extends AbstractApplication { SparkConf conf = new SparkConf().setAppName("Simple Application"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact"); + final Dataset<Row> dataFrame = sqlContext.sql("select * from test_kylin_fact"); System.out.println("count * of the table:" + dataFrame.count()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 6a571df..619bf99 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -170,7 +170,7 @@ kylin.engine.spark.rdd-partition-cut-mb=100 ### Spark conf overwrite for cube engine kylin.engine.spark-conf.spark.yarn.submit.file.replication=1 kylin.engine.spark-conf.spark.master=yarn -kylin.engine.spark-conf.spark.submit.deployMode=cluster +#kylin.engine.spark-conf.spark.submit.deployMode=cluster kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=384 kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=256 kylin.engine.spark-conf.spark.executor.memory=768M @@ -184,7 +184,7 @@ kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current - +kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false ### QUERY PUSH DOWN ### #kylin.query.pushdown.runner-class-name=org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index e82867f..9583b42 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -233,7 +233,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <scope>provided</scope> </dependency> @@ -249,23 +249,45 @@ <scope>test</scope> </dependency> + <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 506d734..732deac 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ <kafka.version>0.10.1.0</kafka.version> <!-- Spark versions --> - <spark.version>1.6.3</spark.version> + <spark.version>2.1.1</spark.version> <kryo.version>4.0.0</kryo.version> <!-- <reflections.version>0.9.10</reflections.version> --> @@ -577,19 +577,19 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> + <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> + <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.10</artifactId> + <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> @@ -602,7 +602,7 @@ <!-- Kafka dependency --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index f35cb44..fe82e3f 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -256,7 +256,7 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> + <artifactId>spark-core_2.11</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/cfaeb7e7/source-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index e4ca76f..9ce665e 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -45,7 +45,7 @@ <!-- Provided --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> + <artifactId>kafka_2.11</artifactId> </dependency> <!-- Env & Test -->