This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 15305eaf86fb8925f559e3b3d0efda05f4a022f8
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Wed Aug 12 08:51:38 2020 +0800

    KYLIN-4662 Migrate from third-party Spark to offical Apache Spark
    
    Use Apache Spark 2.4.6 as default spark version
---
 .../sql/execution/KylinFileSourceScanExec.scala     |  4 ++--
 .../datasource/ResetShufflePartition.scala          |  3 ++-
 .../spark/sql/hive/utils/QueryMetricUtils.scala     |  9 ++++++---
 .../engine/spark/application/SparkApplication.java  |  5 ++---
 .../apache/kylin/engine/spark/job/CubeBuildJob.java | 21 +++++++++++++--------
 .../apache/kylin/engine/spark/job/CubeMergeJob.java |  7 ++++---
 .../kylin/engine/spark/utils/JobMetricsUtils.scala  | 11 +----------
 .../kylin/engine/spark/utils/Repartitioner.java     |  4 ++--
 .../kylin/query/pushdown/SparkSqlClient.scala       |  5 +++--
 .../org/apache/kylin/query/runtime/SparkEngine.java |  4 ++--
 .../kylin/query/runtime/plans/ResultPlan.scala      |  5 +++--
 .../scala/org/apache/spark/sql/SparderContext.scala |  8 --------
 .../query/pushdown/PushDownRunnerSparkImplTest.java | 10 ++++++----
 pom.xml                                             |  6 +++---
 .../query/relnode/OLAPToEnumerableConverter.java    | 21 +++++++++++++--------
 15 files changed, 62 insertions(+), 61 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 9bc8801..90ff597 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -148,7 +148,7 @@ class KylinFileSourceScanExec(
       }
 
     val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId =>
-      FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil))
+      FilePartition(shardId, filesToPartitionId.getOrElse(shardId, 
Nil).toArray)
     }
 
     new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
@@ -205,7 +205,7 @@ class KylinFileSourceScanExec(
         val newPartition =
           FilePartition(
             partitions.size,
-            currentFiles.toArray.toSeq) // Copy to a new Array.
+            currentFiles.toArray) // Copy to a new Array.
         partitions += newPartition
       }
       currentFiles.clear()
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index 9308717..aaed1d9 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -34,7 +34,8 @@ trait ResetShufflePartition extends Logging {
         KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 
1024 * 2) + 1,
         defaultParallelism).toInt
     }
-    
sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", 
partitionsNum.toString)
+    
//sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
+    //  partitionsNum.toString)
     logInfo(s"Set partition to $partitionsNum, total bytes 
${QueryContextFacade.current().getSourceScanBytes}")
   }
 }
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index 4b43078..dccbba3 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -29,11 +29,14 @@ object QueryMetricUtils extends Logging {
     try {
       val metrics = plan.collect {
         case exec: KylinFileSourceScanExec =>
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          (exec.metrics.apply("numOutputRows").value, 0l)
         case exec: FileSourceScanExec =>
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          (exec.metrics.apply("numOutputRows").value, 0l)
         case exec: HiveTableScanExec =>
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
+          (exec.metrics.apply("numOutputRows").value, 0l)
       }
       val scanRows = metrics.map(metric => 
java.lang.Long.valueOf(metric._1)).toList.asJava
       val scanBytes = metrics.map(metric => 
java.lang.Long.valueOf(metric._2)).toList.asJava
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 5719732..da51c45 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -24,7 +24,6 @@ import org.apache.kylin.engine.spark.job.KylinBuildEnv;
 import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
 import org.apache.kylin.engine.spark.job.SparkJobConstants;
 import org.apache.kylin.engine.spark.job.UdfManager;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
 import java.io.IOException;
@@ -180,7 +179,7 @@ public abstract class SparkApplication {
                     .getOrCreate();
 
             // for spark metrics
-            JobMetricsUtils.registerListener(ss);
+            //JobMetricsUtils.registerListener(ss);
 
             UdfManager.create(ss);
 
@@ -194,7 +193,7 @@ public abstract class SparkApplication {
                 infos.jobEnd();
             }
             if (ss != null && 
!ss.conf().get("spark.master").startsWith("local")) {
-                JobMetricsUtils.unRegisterListener(ss);
+                //JobMetricsUtils.unRegisterListener(ss);
                 ss.stop();
             }
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index e55dfcf..40dcb01 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -215,6 +215,7 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built 
cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
+            long parentDSCnt = parentDS.count();
 
             for (LayoutEntity index : toBuildCuboids) {
                 Preconditions.checkNotNull(parentDS, "Parent dataset is null 
when building.");
@@ -226,7 +227,7 @@ public class CubeBuildJob extends SparkApplication {
 
                     @Override
                     public LayoutEntity build() throws IOException {
-                        return buildIndex(seg, index, parentDS, st, 
info.getLayoutId());
+                        return buildIndex(seg, index, parentDS, st, 
info.getLayoutId(), parentDSCnt);
                     }
                 }, config);
                 allIndexesInCurrentLayer.add(index);
@@ -288,7 +289,7 @@ public class CubeBuildJob extends SparkApplication {
     }
 
     private LayoutEntity buildIndex(SegmentInfo seg, LayoutEntity cuboid, 
Dataset<Row> parent,
-                                    SpanningTree spanningTree, long parentId) 
throws IOException {
+                                    SpanningTree spanningTree, long parentId, 
long parentDSCnt) throws IOException {
         String parentName = String.valueOf(parentId);
         if (parentId == ParentSourceChooser.FLAT_TABLE_FLAG()) {
             parentName = "flat table";
@@ -304,7 +305,7 @@ public class CubeBuildJob extends SparkApplication {
             Set<Integer> orderedDims = 
layoutEntity.getOrderedDimensions().keySet();
             Dataset<Row> afterSort = 
afterPrj.select(NSparkCubingUtil.getColumns(orderedDims))
                     
.sortWithinPartitions(NSparkCubingUtil.getColumns(orderedDims));
-            saveAndUpdateLayout(afterSort, seg, layoutEntity);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
         } else {
             Dataset<Row> afterAgg = CuboidAggregator.agg(ss, parent, 
dimIndexes, cuboid.getOrderedMeasures(),
                     spanningTree, false);
@@ -316,14 +317,15 @@ public class CubeBuildJob extends SparkApplication {
                     .select(NSparkCubingUtil.getColumns(rowKeys, 
layoutEntity.getOrderedMeasures().keySet()))
                     
.sortWithinPartitions(NSparkCubingUtil.getColumns(rowKeys));
 
-            saveAndUpdateLayout(afterSort, seg, layoutEntity);
+            saveAndUpdateLayout(afterSort, seg, layoutEntity, parentDSCnt);
         }
         ss.sparkContext().setJobDescription(null);
         logger.info("Finished Build index :{}, in segment:{}", cuboid.getId(), 
seg.id());
         return layoutEntity;
     }
 
-    private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, 
LayoutEntity layout) throws IOException {
+    private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo seg, 
LayoutEntity layout,
+                                     long parentDSCnt) throws IOException {
         long layoutId = layout.getId();
 
         // for spark metrics
@@ -343,10 +345,13 @@ public class CubeBuildJob extends SparkApplication {
         long rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT());
         if (rowCount == -1) {
             infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, 
use count() to collect cuboid rows.'");
-            logger.warn("Can not get cuboid row cnt.");
+            logger.warn("Can not get cuboid row cnt, use count() to collect 
cuboid rows.");
+            layout.setRows(dataset.count());
+            layout.setSourceRows(parentDSCnt);
+        } else {
+            layout.setRows(rowCount);
+            
layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         }
-        layout.setRows(rowCount);
-        layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, 
tempPath, config, ss);
         layout.setShardNum(shardNum);
         cuboidShardNum.put(layoutId, (short)shardNum);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index aaa3c21..3d54492 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -166,10 +166,11 @@ public class CubeMergeJob extends SparkApplication {
         if (rowCount == -1) {
             infos.recordAbnormalLayouts(layout.getId(),
                     "'Job metrics seems null, use count() to collect cuboid 
rows.'");
-            logger.warn("Can not get cuboid row cnt.");
+            logger.warn("Can not get cuboid row cnt, use count() to collect 
cuboid rows.");
+            layout.setRows(dataset.count());
+        } else {
+            layout.setRows(rowCount);
         }
-
-        layout.setRows(rowCount);
         layout.setSourceRows(sourceCount);
 
         int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, 
tempPath, config, ss);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index e5d02cf..a8231ac 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -27,8 +27,6 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec
-import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
-
 
 object JobMetricsUtils extends Logging {
 
@@ -43,7 +41,7 @@ object JobMetricsUtils extends Logging {
       metrics = collectOutputRows(execution.executedPlan)
       logInfo(s"Collect output rows successfully. $metrics")
     } else {
-      logError(s"Collect output rows failed.")
+      logWarning(s"Collect output rows failed.")
     }
     metrics
   }
@@ -93,13 +91,6 @@ object JobMetricsUtils extends Logging {
     sparkListener = new SparkListener {
 
       override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
-        case e: PostQueryExecutionForKylin =>
-          val nExecutionId = 
e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "")
-          if (nExecutionId != "" && e.queryExecution != null) {
-            QueryExecutionCache.setQueryExecution(nExecutionId, 
e.queryExecution)
-          } else {
-            logWarning("executionIdStr is null, can't get QueryExecution from 
SQLExecution.")
-          }
         case _ => // Ignore
       }
     }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index 340564c..a62ead8 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -150,7 +150,7 @@ public class Repartitioner {
             Dataset<Row> data;
 
             if (needRepartitionForShardByColumns()) {
-                
ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", 
"false");
+                
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", 
"false");
                 data = storage.getFrom(tempPath, 
ss).repartition(repartitionNum,
                         NSparkCubingUtil.getColumns(getShardByColumns()))
                         .sortWithinPartitions(sortCols);
@@ -163,7 +163,7 @@ public class Repartitioner {
 
             storage.saveTo(path, data, ss);
             if (needRepartitionForShardByColumns()) {
-                
ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", null);
+                
//ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.enabled", null);
             }
             if (readFileSystem.delete(tempResourcePath, true)) {
                 logger.info("Delete temp cuboid path successful. Temp path: 
{}.", tempPath);
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 7c37c01..db05c66 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -64,7 +64,8 @@ object SparkSqlClient {
                                val paths = 
ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
                                val sourceTableSize = 
ResourceDetectUtils.getResourceSize(paths: _*) + "b"
                                val partitions = Math.max(1, 
JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString
-                               
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 partitions)
+                               
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
+                               //      partitions)
                                logger.info(s"Auto set 
spark.sql.shuffle.partitions $partitions")
                        } catch {
                                case e: Throwable =>
@@ -97,7 +98,7 @@ object SparkSqlClient {
                                }
                                else throw e
                } finally {
-                       
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 null)
+                       
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 null)
                        HadoopUtil.setCurrentConfiguration(null)
                }
        }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index 4a3e65d..0b97d7e 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -37,7 +37,7 @@ public class SparkEngine implements QueryEngine {
     @Override
     public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode 
relNode, RelDataType resultType) {
         Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        log.debug("SPARK LOGICAL PLAN {}", 
sparkPlan.queryExecution().logical());
+        log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
         return ResultPlan.getResult(sparkPlan, resultType, 
ResultType.SCALA()).right().get();
 
     }
@@ -45,7 +45,7 @@ public class SparkEngine implements QueryEngine {
     @Override
     public Enumerable<Object[]> compute(DataContext dataContext, RelNode 
relNode, RelDataType resultType) {
         Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        log.debug("SPARK LOGICAL PLAN {}", 
sparkPlan.queryExecution().logical());
+        log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
         return ResultPlan.getResult(sparkPlan, resultType, 
ResultType.NORMAL()).left().get();
     }
 
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index ef9cd54..2bf4069 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -83,7 +83,8 @@ object ResultPlan extends Logging {
     sparkContext.setLocalProperty("spark.scheduler.pool", pool)
     val queryId = QueryContextFacade.current().getQueryId
     sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, 
queryId)
-    
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 partitionsNum.toString)
+    
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
+    //  partitionsNum.toString)
     QueryContextFacade.current().setDataset(df)
 
     sparkContext.setJobGroup(jobGroup,
@@ -136,7 +137,7 @@ object ResultPlan extends Logging {
     val r = body
     // remember clear local properties.
     df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", null)
-    
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 null)
+    
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 null)
     SparderContext.setDF(df)
     TableScanPlan.cacheDf.get().clear()
     HadoopUtil.setCurrentConfiguration(null)
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 0a9c7c1..f70ea67 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -20,13 +20,11 @@ package org.apache.spark.sql
 
 import java.lang.{Boolean => JBoolean, String => JString}
 
-import org.apache.kylin.query.runtime.plans.QueryToExecutionIDCache
 import org.apache.spark.memory.MonitorEnv
 import org.apache.spark.util.Utils
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.kylin.query.UdfManager
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.KylinSession._
@@ -37,8 +35,6 @@ import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
 
-import scala.collection.JavaConverters._
-
 // scalastyle:off
 object SparderContext extends Logging {
   @volatile
@@ -150,7 +146,6 @@ object SparderContext extends Logging {
                   .currentThread()
                   .getContextClassLoader
                   .toString)
-              registerListener(sparkSession.sparkContext)
               initMonitorEnv()
               APP_MASTER_TRACK_URL = null
             } catch {
@@ -178,9 +173,6 @@ object SparderContext extends Logging {
     val sparkListener = new SparkListener {
 
       override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
-        case e: PostQueryExecutionForKylin =>
-          val queryID = 
e.localProperties.getProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, "")
-          QueryToExecutionIDCache.setQueryExecutionID(queryID, 
e.executionId.toString)
         case _ => // Ignore
       }
     }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
 
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
index 8815648..80ce7ee 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
+++ 
b/kylin-spark-project/kylin-spark-query/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
@@ -78,19 +78,21 @@ public class PushDownRunnerSparkImplTest extends 
LocalFileMetadataTestCase {
         queries.add("SELECT cast(ORDER_ID as integer) FROM TEST_KYLIN_FACT 
limit 10");
         queries.add("SELECT cast(LSTG_SITE_ID as long) FROM TEST_KYLIN_FACT 
limit 10");
         queries.add("SELECT cast(LSTG_SITE_ID as short) FROM TEST_KYLIN_FACT 
limit 10");
-        queries.add("SELECT CAST(ORDER_ID AS VARCHAR) FROM TEST_KYLIN_FACT 
limit 10");
-        queries.add("SELECT CAST(ORDER_ID AS char) FROM TEST_KYLIN_FACT limit 
10");
+        queries.add("SELECT CAST(ORDER_ID AS varchar(20)) FROM TEST_KYLIN_FACT 
limit 10");
+        queries.add("SELECT CAST(ORDER_ID AS char(20)) FROM TEST_KYLIN_FACT 
limit 10");
         queries.add("select SELLER_ID,ITEM_COUNT,sum(price)\n" + //
                 "from (\n" + //
                 "SELECT SELLER_ID, ITEM_COUNT,price\n" + //
-                "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS 
varchar), '-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\n" + //
+                "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS 
varchar(4)), '-'),\n" + //
+                "CAST(month(CAST(CAL_DT AS date)) AS varchar(2))) AS 
prt_mth\n" + //
                 "FROM TEST_KYLIN_FACT) \n" + //
                 "group by SELLER_ID,ITEM_COUNT,price limit 10"); //
 
         queries.add("select SELLER_ID,ITEM_COUNT,sum(price)\n" + //
                 "from (\n" + //
                 "SELECT SELLER_ID, ITEM_COUNT,price\n" + //
-                "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS char), 
'-'), CAST(month(CAST(CAL_DT AS date)) AS varchar)) AS prt_mth\n" + //
+                "\t, concat(concat(CAST(year(CAST(CAL_DT AS date)) AS 
char(4)), '-'),\n" + //
+                "CAST(month(CAST(CAL_DT AS date)) AS char(2))) AS prt_mth\n" + 
//
                 "FROM TEST_KYLIN_FACT) \n" + //
                 "group by SELLER_ID,ITEM_COUNT,price limit 10");
 
diff --git a/pom.xml b/pom.xml
index 5c8fb2f..09b1f98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,8 +80,8 @@
     <kafka.version>1.0.0</kafka.version>
 
     <!-- Spark versions -->
-    <spark.version>2.4.1-os-kylin-r3</spark.version>
-    <janino.version>3.0.9</janino.version>
+    <spark.version>2.4.6</spark.version>
+    <janino.version>3.0.16</janino.version>
 
     <kryo.version>4.0.0</kryo.version>
 
@@ -131,7 +131,7 @@
     <guava-testlib.version>28.2-jre</guava-testlib.version>
 
     <!-- Commons -->
-    <commons-lang3.version>3.4</commons-lang3.version>
+    <commons-lang3.version>3.5</commons-lang3.version>
     <commons-email.version>1.5</commons-email.version>
     <commons-validator.version>1.4.0</commons-validator.version>
     <commons-compress.version>1.18</commons-compress.version>
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
index 4d7a373..20e5f21 100644
--- 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
+++ 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
@@ -21,6 +21,10 @@ package org.apache.kylin.query.relnode;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
 import org.apache.calcite.adapter.enumerable.JavaRowFormat;
@@ -46,8 +50,6 @@ import org.apache.kylin.query.exec.SparderMethod;
 import org.apache.kylin.query.routing.RealizationChooser;
 import org.apache.kylin.query.security.QueryInterceptor;
 import org.apache.kylin.query.security.QueryInterceptorUtil;
-
-import com.google.common.collect.Lists;
 import org.apache.kylin.query.util.QueryInfoCollector;
 
 /**
@@ -55,6 +57,9 @@ import org.apache.kylin.query.util.QueryInfoCollector;
  * see 
org.apache.calcite.plan.OLAPRelMdRowCount#shouldIntercept(org.apache.calcite.rel.RelNode)
  */
 public class OLAPToEnumerableConverter extends ConverterImpl implements 
EnumerableRel {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(OLAPToEnumerableConverter.class);
+
     private static final String SPARDER_CALL_METHOD_NAME = "enumerable";
 
     public OLAPToEnumerableConverter(RelOptCluster cluster, RelTraitSet 
traits, RelNode input) {
@@ -76,8 +81,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl 
implements Enumerab
     public Result implement(EnumerableRelImplementor enumImplementor, Prefer 
pref) {
         if (System.getProperty("calcite.debug") != null) {
             String dumpPlan = RelOptUtil.dumpPlan("", this, false, 
SqlExplainLevel.DIGEST_ATTRIBUTES);
-            System.out.println("EXECUTION PLAN BEFORE REWRITE");
-            System.out.println(dumpPlan);
+            logger.debug("EXECUTION PLAN BEFORE REWRITE");
+            logger.debug(dumpPlan);
         }
 
         QueryContextFacade.current().setWithoutSyntaxError(true);
@@ -98,8 +103,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl 
implements Enumerab
 
         if (System.getProperty("calcite.debug") != null) {
             String dumpPlan = RelOptUtil.dumpPlan("", this, false, 
SqlExplainLevel.DIGEST_ATTRIBUTES);
-            System.out.println("EXECUTION PLAN AFTER OLAPCONTEXT IS SET");
-            System.out.println(dumpPlan);
+            logger.debug("EXECUTION PLAN AFTER OLAPCONTEXT IS SET");
+            logger.debug(dumpPlan);
         }
 
         RealizationChooser.selectRealization(contexts);
@@ -139,8 +144,8 @@ public class OLAPToEnumerableConverter extends 
ConverterImpl implements Enumerab
 
             if (System.getProperty("calcite.debug") != null) {
                 String dumpPlan = RelOptUtil.dumpPlan("", this, false, 
SqlExplainLevel.DIGEST_ATTRIBUTES);
-                System.out.println("EXECUTION PLAN AFTER REWRITE");
-                System.out.println(dumpPlan);
+                logger.debug("EXECUTION PLAN AFTER REWRITE");
+                logger.debug(dumpPlan);
                 
QueryContextFacade.current().setCalcitePlan(this.copy(getTraitSet(), 
getInputs()));
             }
 

Reply via email to