This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 15305eaf86fb8925f559e3b3d0efda05f4a022f8 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Wed Aug 12 08:51:38 2020 +0800 KYLIN-4662 Migrate from third-party Spark to offical Apache Spark Use Apache Spark 2.4.6 as default spark version --- .../sql/execution/KylinFileSourceScanExec.scala | 4 ++-- .../datasource/ResetShufflePartition.scala | 3 ++- .../spark/sql/hive/utils/QueryMetricUtils.scala | 9 ++++++--- .../engine/spark/application/SparkApplication.java | 5 ++--- .../apache/kylin/engine/spark/job/CubeBuildJob.java | 21 +++++++++++++-------- .../apache/kylin/engine/spark/job/CubeMergeJob.java | 7 ++++--- .../kylin/engine/spark/utils/JobMetricsUtils.scala | 11 +---------- .../kylin/engine/spark/utils/Repartitioner.java | 4 ++-- .../kylin/query/pushdown/SparkSqlClient.scala | 5 +++-- .../org/apache/kylin/query/runtime/SparkEngine.java | 4 ++-- .../kylin/query/runtime/plans/ResultPlan.scala | 5 +++-- .../scala/org/apache/spark/sql/SparderContext.scala | 8 -------- .../query/pushdown/PushDownRunnerSparkImplTest.java | 10 ++++++---- pom.xml | 6 +++--- .../query/relnode/OLAPToEnumerableConverter.java | 21 +++++++++++++-------- 15 files changed, 62 insertions(+), 61 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 9bc8801..90ff597 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -148,7 +148,7 @@ class KylinFileSourceScanExec( } val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId => - FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil)) + FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil).toArray) } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) @@ -205,7 +205,7 @@ class KylinFileSourceScanExec( val newPartition = FilePartition( partitions.size, - currentFiles.toArray.toSeq) // Copy to a new Array. + currentFiles.toArray) // Copy to a new Array. partitions += newPartition } currentFiles.clear() diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala index 9308717..aaed1d9 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala @@ -34,7 +34,8 @@ trait ResetShufflePartition extends Logging { KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 * 2) + 1, defaultParallelism).toInt } - sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitionsNum.toString) + //sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", + // partitionsNum.toString) logInfo(s"Set partition to $partitionsNum, total bytes ${QueryContextFacade.current().getSourceScanBytes}") } } diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala index 4b43078..dccbba3 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala @@ -29,11 +29,14 @@ object QueryMetricUtils extends Logging { try { val metrics = plan.collect { case exec: KylinFileSourceScanExec => - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + (exec.metrics.apply("numOutputRows").value, 0l) case exec: FileSourceScanExec => - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + (exec.metrics.apply("numOutputRows").value, 0l) case exec: HiveTableScanExec => - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) + (exec.metrics.apply("numOutputRows").value, 0l) } val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1)).toList.asJava val scanBytes = metrics.map(metric => java.lang.Long.valueOf(metric._2)).toList.asJava diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 5719732..da51c45 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -24,7 +24,6 @@ import org.apache.kylin.engine.spark.job.KylinBuildEnv; import org.apache.kylin.engine.spark.job.LogJobInfoUtils; import org.apache.kylin.engine.spark.job.SparkJobConstants; import org.apache.kylin.engine.spark.job.UdfManager; -import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.engine.spark.utils.SparkConfHelper; import java.io.IOException; @@ -180,7 +179,7 @@ public abstract class SparkApplication { .getOrCreate(); // for spark metrics - JobMetricsUtils.registerListener(ss); + //JobMetricsUtils.registerListener(ss); UdfManager.create(ss); @@ -194,7 +193,7 @@ public abstract class SparkApplication { infos.jobEnd(); } if (ss != null && !ss.conf().get("spark.master").startsWith("local")) { - JobMetricsUtils.unRegisterListener(ss); + //JobMetricsUtils.unRegisterListener(ss); ss.stop(); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index e55dfcf..40dcb01 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -215,6 +215,7 @@ public class CubeBuildJob extends SparkApplication { cuboidsNumInLayer += toBuildCuboids.size(); Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty."); Dataset<Row> parentDS = info.getParentDS(); + long parentDSCnt = parentDS.count(); for (LayoutEntity index : toBuildCuboids) { Preconditions.checkNotNull(parentDS, "Parent dataset is null when building."); @@ -226,7 +227,7 @@ public class CubeBuildJob extends SparkApplication { @Override public LayoutEntity build() throws IOException { - return buildIndex(seg, index, parentDS, st, info.getLayoutId()); + return buildIndex(seg, index, parentDS, st, info.getLayoutId(), parentDSCnt); } }, config); allIndexesInCurrentLayer.add(index); @@ -288,7 +289,7 @@ public class CubeBuildJob extends SparkApplication { } private LayoutEntity buildIndex(SegmentInfo seg, LayoutEntity cuboid, Dataset<Row> parent, - SpanningTree spanningTree, long parentId) throws IOException { + SpanningTree spanningTree, long parentId, long parentDSCnt) throws IOException { String parentName = String.valueOf(parentId); if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) { parentName = "flat table"; @@ -304,7 +305,7 @@ public class CubeBuildJob extends SparkApplication { Set<Integer> orderedDims = layoutEntity.getOrderedDimensions().keySet(); Dataset<Row> afterSort = afterPrj.select(NSparkCubingUtil.getColumns(orderedDims)) .sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims)); - saveAndUpdateLayout(afterSort, seg, layoutEntity); + saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt); } else { Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, dimIndexes, cuboid.getOrderedMeasures(), spanningTree, false); @@ -316,14 +317,15 @@ public class CubeBuildJob extends SparkApplication { .select(NSparkCubingUtil.getColumns(rowKeys, layoutEntity.getOrderedMeasures().keySet())) .sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys)); - saveAndUpdateLayout(afterSort, seg, layoutEntity); + saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt); } ss.sparkContext().setJobDescription(null); logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), seg.id()); return layoutEntity; } - private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout) throws IOException { + private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout, + long parentDSCnt) throws IOException { long layoutId = layout.getId(); // for spark metrics @@ -343,10 +345,13 @@ public class CubeBuildJob extends SparkApplication { long rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT()); if (rowCount == -1) { infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, use count() to collect cuboid rows.'"); - logger.warn("Can not get cuboid row cnt."); + logger.warn("Can not get cuboid row cnt, use count() to collect cuboid rows."); + layout.setRows(dataset.count()); + layout.setSourceRows(parentDSCnt); + } else { + layout.setRows(rowCount); + layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT())); } - layout.setRows(rowCount); - layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT())); int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss); layout.setShardNum(shardNum); cuboidShardNum.put(layoutId, (short)shardNum); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index aaa3c21..3d54492 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -166,10 +166,11 @@ public class CubeMergeJob extends SparkApplication { if (rowCount == -1) { infos.recordAbnormalLayouts(layout.getId(), "'Job metrics seems null, use count() to collect cuboid rows.'"); - logger.warn("Can not get cuboid row cnt."); + logger.warn("Can not get cuboid row cnt, use count() to collect cuboid rows."); + layout.setRows(dataset.count()); + } else { + layout.setRows(rowCount); } - - layout.setRows(rowCount); layout.setSourceRows(sourceCount); int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala index e5d02cf..a8231ac 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala @@ -27,8 +27,6 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec -import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin - object JobMetricsUtils extends Logging { @@ -43,7 +41,7 @@ object JobMetricsUtils extends Logging { metrics = collectOutputRows(execution.executedPlan) logInfo(s"Collect output rows successfully. $metrics") } else { - logError(s"Collect output rows failed.") + logWarning(s"Collect output rows failed.") } metrics } @@ -93,13 +91,6 @@ object JobMetricsUtils extends Logging { sparkListener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case e: PostQueryExecutionForKylin => - val nExecutionId = e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "") - if (nExecutionId != "" && e.queryExecution != null) { - QueryExecutionCache.setQueryExecution(nExecutionId, e.queryExecution) - } else { - logWarning("executionIdStr is null, can't get QueryExecution from SQLExecution.") - } case _ => // Ignore } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java index 340564c..a62ead8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java @@ -150,7 +150,7 @@ public class Repartitioner { Dataset<Row> data; if (needRepartitionForShardByColumns()) { - ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", "false"); + //ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", "false"); data = storage.getFrom(tempPath, ss).repartition(repartitionNum, NSparkCubingUtil.getColumns(getShardByColumns())) .sortWithinPartitions(sortCols); @@ -163,7 +163,7 @@ public class Repartitioner { storage.saveTo(path, data, ss); if (needRepartitionForShardByColumns()) { - ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", null); + //ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", null); } if (readFileSystem.delete(tempResourcePath, true)) { logger.info("Delete temp cuboid path successful. Temp path: {}.", tempPath); diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 7c37c01..db05c66 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -64,7 +64,8 @@ object SparkSqlClient { val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan) val sourceTableSize = ResourceDetectUtils.getResourceSize(paths: _*) + "b" val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitions) + //df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", + // partitions) logger.info(s"Auto set spark.sql.shuffle.partitions $partitions") } catch { case e: Throwable => @@ -97,7 +98,7 @@ object SparkSqlClient { } else throw e } finally { - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null) + //df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null) HadoopUtil.setCurrentConfiguration(null) } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index 4a3e65d..0b97d7e 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -37,7 +37,7 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution().logical()); + log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get(); } @@ -45,7 +45,7 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object[]> compute(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution().logical()); + log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get(); } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index ef9cd54..2bf4069 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -83,7 +83,8 @@ object ResultPlan extends Logging { sparkContext.setLocalProperty("spark.scheduler.pool", pool) val queryId = QueryContextFacade.current().getQueryId sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId) - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitionsNum.toString) + //df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", + // partitionsNum.toString) QueryContextFacade.current().setDataset(df) sparkContext.setJobGroup(jobGroup, @@ -136,7 +137,7 @@ object ResultPlan extends Logging { val r = body // remember clear local properties. df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", null) - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null) + //df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null) SparderContext.setDF(df) TableScanPlan.cacheDf.get().clear() HadoopUtil.setCurrentConfiguration(null) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 0a9c7c1..f70ea67 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -20,13 +20,11 @@ package org.apache.spark.sql import java.lang.{Boolean => JBoolean, String => JString} -import org.apache.kylin.query.runtime.plans.QueryToExecutionIDCache import org.apache.spark.memory.MonitorEnv import org.apache.spark.util.Utils import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.kylin.query.UdfManager import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.KylinSession._ @@ -37,8 +35,6 @@ import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} import org.apache.spark.sql.execution.datasource.KylinSourceStrategy -import scala.collection.JavaConverters._ - // scalastyle:off object SparderContext extends Logging { @volatile @@ -150,7 +146,6 @@ object SparderContext extends Logging { .currentThread() .getContextClassLoader .toString) - registerListener(sparkSession.sparkContext) initMonitorEnv() APP_MASTER_TRACK_URL = null } catch { @@ -178,9 +173,6 @@ object SparderContext extends Logging { val sparkListener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case e: PostQueryExecutionForKylin => - val queryID = e.localProperties.getProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, "") - QueryToExecutionIDCache.setQueryExecutionID(queryID, e.executionId.toString) case _ => // Ignore } } diff --git a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java index 8815648..80ce7ee 100644 --- a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java +++ b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java @@ -78,19 +78,21 @@ public class PushDownRunnerSparkImplTest extends LocalFileMetadataTestCase { queries.add("SELECT cast(ORDER_ID as integer) FROM TEST_KYLIN_FACT limit 10"); queries.add("SELECT cast(LSTG_SITE_ID as long) FROM TEST_KYLIN_FACT limit 10"); queries.add("SELECT cast(LSTG_SITE_ID as short) FROM TEST_KYLIN_FACT limit 10"); - queries.add("SELECT CAST(ORDER_ID AS VARCHAR) FROM TEST_KYLIN_FACT limit 10"); - queries.add("SELECT CAST(ORDER_ID AS char) FROM TEST_KYLIN_FACT limit 10"); + queries.add("SELECT CAST(ORDER_ID AS varchar(20)) FROM TEST_KYLIN_FACT limit 10"); + queries.add("SELECT CAST(ORDER_ID AS char(20)) FROM TEST_KYLIN_FACT limit 10"); queries.add("select SELLER_ID,ITEM_COUNT,sum(price)\n" + // "from (\n" + // "SELECT SELLER_ID, ITEM_COUNT,price\n" + // - "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS varchar), '-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\n" + // + "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS varchar(4)), '-'),\n" + // + "CAST(month(CAST(CAL_DT AS date)) AS varchar(2))) AS prt_mth\n" + // "FROM TEST_KYLIN_FACT) \n" + // "group by SELLER_ID,ITEM_COUNT,price limit 10"); // queries.add("select SELLER_ID,ITEM_COUNT,sum(price)\n" + // "from (\n" + // "SELECT SELLER_ID, ITEM_COUNT,price\n" + // - "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS char), '-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\n" + // + "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS char(4)), '-'),\n" + // + "CAST(month(CAST(CAL_DT AS date)) AS char(2))) AS prt_mth\n" + // "FROM TEST_KYLIN_FACT) \n" + // "group by SELLER_ID,ITEM_COUNT,price limit 10"); diff --git a/pom.xml b/pom.xml index 5c8fb2f..09b1f98 100644 --- a/pom.xml +++ b/pom.xml @@ -80,8 +80,8 @@ <kafka.version>1.0.0</kafka.version> <!-- Spark versions --> - <spark.version>2.4.1-os-kylin-r3</spark.version> - <janino.version>3.0.9</janino.version> + <spark.version>2.4.6</spark.version> + <janino.version>3.0.16</janino.version> <kryo.version>4.0.0</kryo.version> @@ -131,7 +131,7 @@ <guava-testlib.version>28.2-jre</guava-testlib.version> <!-- Commons --> - <commons-lang3.version>3.4</commons-lang3.version> + <commons-lang3.version>3.5</commons-lang3.version> <commons-email.version>1.5</commons-email.version> <commons-validator.version>1.4.0</commons-validator.version> <commons-compress.version>1.18</commons-compress.version> diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index 4d7a373..20e5f21 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -21,6 +21,10 @@ package org.apache.kylin.query.relnode; import java.util.List; import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; import org.apache.calcite.adapter.enumerable.JavaRowFormat; @@ -46,8 +50,6 @@ import org.apache.kylin.query.exec.SparderMethod; import org.apache.kylin.query.routing.RealizationChooser; import org.apache.kylin.query.security.QueryInterceptor; import org.apache.kylin.query.security.QueryInterceptorUtil; - -import com.google.common.collect.Lists; import org.apache.kylin.query.util.QueryInfoCollector; /** @@ -55,6 +57,9 @@ import org.apache.kylin.query.util.QueryInfoCollector; * see org.apache.calcite.plan.OLAPRelMdRowCount#shouldIntercept(org.apache.calcite.rel.RelNode) */ public class OLAPToEnumerableConverter extends ConverterImpl implements EnumerableRel { + + private static final Logger logger = LoggerFactory.getLogger(OLAPToEnumerableConverter.class); + private static final String SPARDER_CALL_METHOD_NAME = "enumerable"; public OLAPToEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) { @@ -76,8 +81,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab public Result implement(EnumerableRelImplementor enumImplementor, Prefer pref) { if (System.getProperty("calcite.debug") != null) { String dumpPlan = RelOptUtil.dumpPlan("", this, false, SqlExplainLevel.DIGEST_ATTRIBUTES); - System.out.println("EXECUTION PLAN BEFORE REWRITE"); - System.out.println(dumpPlan); + logger.debug("EXECUTION PLAN BEFORE REWRITE"); + logger.debug(dumpPlan); } QueryContextFacade.current().setWithoutSyntaxError(true); @@ -98,8 +103,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab if (System.getProperty("calcite.debug") != null) { String dumpPlan = RelOptUtil.dumpPlan("", this, false, SqlExplainLevel.DIGEST_ATTRIBUTES); - System.out.println("EXECUTION PLAN AFTER OLAPCONTEXT IS SET"); - System.out.println(dumpPlan); + logger.debug("EXECUTION PLAN AFTER OLAPCONTEXT IS SET"); + logger.debug(dumpPlan); } RealizationChooser.selectRealization(contexts); @@ -139,8 +144,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab if (System.getProperty("calcite.debug") != null) { String dumpPlan = RelOptUtil.dumpPlan("", this, false, SqlExplainLevel.DIGEST_ATTRIBUTES); - System.out.println("EXECUTION PLAN AFTER REWRITE"); - System.out.println(dumpPlan); + logger.debug("EXECUTION PLAN AFTER REWRITE"); + logger.debug(dumpPlan); QueryContextFacade.current().setCalcitePlan(this.copy(getTraitSet(), getInputs())); }