This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 1b63cec KYLIN-4730 Add scan bytes metric to the query results
1b63cec is described below
commit 1b63cec0b57f1d5367abd37dd4364f78cf741c42
Author: Zhichao Zhang <[email protected]>
AuthorDate: Fri Aug 28 19:08:08 2020 +0800
KYLIN-4730 Add scan bytes metric to the query results
---
.../apache/spark/sql/execution/datasource/FilePruner.scala | 4 ----
.../org/apache/spark/sql/hive/utils/QueryMetricUtils.scala | 4 ++--
.../org/apache/kylin/query/pushdown/SparkSqlClient.scala | 5 +++++
.../scala/org/apache/kylin/query/runtime/SparkEngine.java | 8 ++++++--
.../java/org/apache/kylin/rest/service/QueryService.java | 12 ++++++++----
.../java/org/apache/kylin/rest/response/SQLResponseTest.java | 3 ++-
6 files changed, 23 insertions(+), 13 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 2ee32a0..14d63e1 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -186,7 +186,6 @@ class FilePruner(
}
var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]),
Seq[PartitionDirectory]]()
- var totalSize = 0L
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
if (cached.containsKey((partitionFilters, dataFilters))) {
@@ -224,9 +223,6 @@ class FilePruner(
val totalFileSize = selected.flatMap(partition =>
partition.files).map(_.getLen).sum
logInfo(s"totalFileSize is ${totalFileSize}")
setShufflePartitions(totalFileSize, session)
- totalSize = totalFileSize
- // val sourceRows = selected.map(seg =>
cubeInstance.getSegment(seg.segmentID).getLayout(layout.getId).getRows).sum
- // QueryContextFacade.current().addAndGetSourceScanRows(sourceRows)
if (selected.isEmpty) {
val value = Seq.empty[PartitionDirectory]
cached.put((partitionFilters, dataFilters), value)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index 2f89f03..d4f8c50 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -39,8 +39,8 @@ object QueryMetricUtils extends Logging {
exec.metrics.apply("metadataTime").value,
exec.metrics.apply("scanTime").value, -1l)
case exec: HiveTableScanExec =>
//(exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("readBytes").value)
- (exec.metrics.apply("numOutputRows").value,
exec.metrics.apply("numFiles").value,
- exec.metrics.apply("metadataTime").value,
exec.metrics.apply("scanTime").value, -1l)
+ // There is only 'numOutputRows' metric in HiveTableScanExec
+ (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
}
val scanRows = metrics.map(metric =>
java.lang.Long.valueOf(metric._1)).toList.asJava
val scanFiles = metrics.map(metrics =>
java.lang.Long.valueOf(metrics._2)).toList.asJava
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 4cc2802..5ce4f8f 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -87,6 +87,11 @@ object SparkSqlClient {
val rowList =
frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava
val fieldList = df.schema.map(field =>
SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava
val (scanRows, scanFiles, metadataTime, scanTime,
scanBytes) =
QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan)
+
QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum)
+
QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum)
+
QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum)
+
QueryContextFacade.current().addAndGetMetadataTime(metadataTime.asScala.map(Long2long(_)).sum)
+
QueryContextFacade.current().addAndGetScanTime(scanTime.asScala.map(Long2long(_)).sum)
Pair.newPair(rowList, fieldList)
} catch {
case e: Throwable =>
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index 6d444b6..ed51427 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -37,7 +37,9 @@ public class SparkEngine implements QueryEngine {
@Override
public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode
relNode, RelDataType resultType) {
Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
- log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+ if (System.getProperty("calcite.debug") != null) {
+ log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+ }
return ResultPlan.getResult(sparkPlan, resultType,
ResultType.SCALA()).right().get();
}
@@ -45,7 +47,9 @@ public class SparkEngine implements QueryEngine {
@Override
public Enumerable<Object[]> compute(DataContext dataContext, RelNode
relNode, RelDataType resultType) {
Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
- log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+ if (System.getProperty("calcite.debug") != null) {
+ log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+ }
return ResultPlan.getResult(sparkPlan, resultType,
ResultType.NORMAL()).left().get();
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index ee79c99..25a4fcf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1187,10 +1187,14 @@ public class QueryService extends BasicService {
SQLResponse response = new SQLResponse(columnMetas, results,
cubeSb.toString(), 0, isException,
exceptionMessage, isPartialResult, isPushDown);
response.setTotalScanCount(queryContext.getScannedRows());
- response.setTotalScanFiles(queryContext.getScanFiles());
- response.setMetadataTime(queryContext.getMedataTime());
- response.setTotalSparkScanTime(queryContext.getScanTime());
- response.setTotalScanBytes(queryContext.getScannedBytes());
+ response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 :
+ queryContext.getScanFiles());
+ response.setMetadataTime((queryContext.getMedataTime() < 0) ? -1 :
+ queryContext.getMedataTime());
+ response.setTotalSparkScanTime((queryContext.getScanTime() < 0) ? -1 :
+ queryContext.getScanTime());
+ response.setTotalScanBytes((queryContext.getScannedBytes() < 0) ?
+ (queryContext.getSourceScanBytes() < 1 ? -1 :
queryContext.getSourceScanBytes()) : queryContext.getScannedBytes());
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
response.setSparkPool(queryContext.getSparkPool());
if (getConfig().isQueryCacheSignatureEnabled()) {
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index f939eb0..48f4791 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -34,7 +34,8 @@ public class SQLResponseTest {
public void testInterfaceConsistency() throws IOException {
String[] attrArray = new String[] { "columnMetas", "results", "cube",
"affectedRowCount", "isException",
"exceptionMessage", "duration", "partial", "totalScanCount",
"hitExceptionCache", "storageCacheUsed",
- "sparkPool", "pushDown", "traceUrl", "totalScanBytes" };
+ "sparkPool", "pushDown", "traceUrl", "totalScanBytes",
"totalScanFiles",
+ "metadataTime", "totalSparkScanTime" };
SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube",
100, false, null, false, false);
String jsonStr = JsonUtil.writeValueAsString(sqlResponse);