This is an automated email from the ASF dual-hosted git repository.

yhcast0 pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 73f2091f75 KYLIN-6019 fix calculate scanRows and scanBytes (#2268)
73f2091f75 is described below

commit 73f2091f75157ca4d782de9a7080338fd3d07cab
Author: Guoliang Sun <[email protected]>
AuthorDate: Fri Feb 21 15:36:59 2025 +0800

    KYLIN-6019 fix calculate scanRows and scanBytes (#2268)
    
    Co-authored-by: 夏旭晨 <[email protected]>
---
 .../org/apache/kylin/common/msg/MsgPicker.java     | 27 +++++--
 .../apache/kylin/common/KylinConfigBaseTest.java   | 17 ++++-
 .../apache/spark/sql/hive/QueryMetricUtils.scala   | 14 +++-
 .../sql/execution/SparkQueryMetricUtilsSuite.scala | 84 ++++++++++++++++++++--
 .../sql/hive/MockHiveTableScanExecFactory.scala}   | 26 ++-----
 5 files changed, 134 insertions(+), 34 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
index 6aa69b94a9..5cf19447ac 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
@@ -20,20 +20,33 @@ package org.apache.kylin.common.msg;
 
 import com.alibaba.ttl.TransmittableThreadLocal;
 public class MsgPicker {
-    private static TransmittableThreadLocal<Message> msg = new 
TransmittableThreadLocal<>();
+    private static final ThreadLocal<Message> msg = new ThreadLocal<>();
+    private static final String CHINESE_LANGUAGE_CODE = "cn";
 
+    /**
+     * Sets the message based on the given language code.
+     * If the language code is null or not recognized, sets the default 
English message.
+     *
+     * @param lang the language code (e.g., "cn" for Chinese)
+     */
     public static void setMsg(String lang) {
-        //if ("cn".equals(lang))
-        //msg.set(CnMessage.getInstance());
-        //else
-        msg.set(Message.getInstance());
+        if (CHINESE_LANGUAGE_CODE.equals(lang)) {
+            msg.set(CnMessage.getInstance());
+        } else {
+            msg.set(Message.getInstance());
+        }
     }
 
+    /**
+     * Gets the current message. If no message is set, returns the default 
English message.
+     *
+     * @return the current message or the default English message if none is 
set
+     */
     public static Message getMsg() {
         Message ret = msg.get();
-        if (ret == null) { // use English by default
+        if (ret == null) {
             ret = Message.getInstance();
-            msg.set(Message.getInstance());
+            msg.set(ret);
         }
         return ret;
     }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 3ef0e75bc6..0d2f359f19 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -50,10 +50,13 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -1525,6 +1528,16 @@ class KylinConfigBaseTest {
         Assertions.assertEquals(5002, map.get(13).get(1));
     }
 
+    private void assertUnorderedEqualsIgnoringEmpty(String expected, String 
actual) {
+        Set<String> actualSet = new 
HashSet<>(Arrays.asList(actual.split(",")));
+        Set<String> expectedSet = new 
HashSet<>(Arrays.asList(expected.split(",")));
+
+        actualSet.removeIf(StringUtils::isBlank);
+        expectedSet.removeIf(StringUtils::isBlank);
+
+        Assertions.assertEquals(expectedSet, actualSet, "expectedSet: " + 
expectedSet + " === actualSet: " + actualSet);
+    }
+
     @Test
     public void getKylinExtJarsPath() throws Exception {
         val config = KylinConfig.getInstanceFromEnv();
@@ -1541,9 +1554,9 @@ class KylinConfigBaseTest {
         FileUtils.write(glutenCelebornJar, "gluten celeborn jar");
 
         val withGluten = config.getKylinExtJarsPath(true);
-        val withGlutenExpected = "," + glutenJar.getAbsolutePath() + "," + 
celebornJar.getAbsolutePath() + ","
+        val withGlutenExpected = "," + celebornJar.getAbsolutePath() + "," + 
glutenJar.getAbsolutePath() + ","
                 + mysqlJar.getAbsolutePath() + "," + 
glutenCelebornJar.getAbsolutePath();
-        Assertions.assertEquals(withGlutenExpected, withGluten);
+        assertUnorderedEqualsIgnoringEmpty(withGlutenExpected, withGluten);
 
         val withoutGluten = config.getKylinExtJarsPath(false);
         val withoutExpected = "," + celebornJar.getAbsolutePath() + "," + 
mysqlJar.getAbsolutePath();
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
index 1e504d2854..d0c70dc205 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
@@ -18,12 +18,13 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.AppStatus
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 
 import scala.collection.JavaConverters._
@@ -57,6 +58,17 @@ object QueryMetricUtils extends Logging {
         (scanRow + exec.metrics.apply("numOutputRows").value, scanBytes + 
exec.metrics.apply("readBytes").value)
       case exec: HiveTableScanExec =>
         (scanRow + exec.metrics.apply("numOutputRows").value, scanBytes + 
exec.metrics.apply("readBytes").value)
+      case transformer: HiveTableScanExecTransformer =>
+        (scanRow + transformer.metrics.apply("numOutputRows").value, scanBytes 
+ transformer.metrics.apply("outputBytes").value)
+      case exec: BatchScanExec =>
+        // avoid empty metrics
+        val readBytes = 
exec.metrics.get("readBytes").map(_.value).getOrElse(0L)
+        (scanRow + exec.metrics.apply("numOutputRows").value, scanBytes + 
readBytes)
+      case transformer: BatchScanExecTransformer =>
+        // avoid empty metrics
+        val numOutputRows = 
transformer.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+        val outputBytes = 
transformer.metrics.get("outputBytes").map(_.value).getOrElse(0L)
+        (scanRow + numOutputRows, scanBytes + outputBytes)
       case exec: ShuffleQueryStageExec =>
         collectAdaptiveSparkPlanExecMetrics(exec.plan, scanRow, scanBytes)
       case exec: AdaptiveSparkPlanExec =>
diff --git 
a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
 
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
index 66d2ea1eef..7fcdd4f1f6 100644
--- 
a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
+++ 
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
@@ -34,6 +34,9 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.File
+
+import org.apache.gluten.execution.BatchScanExecTransformer
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase
 import org.apache.spark.sql._
@@ -44,14 +47,15 @@ import 
org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, Adapti
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InMemoryFileIndex, PartitionSpec}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.hive.QueryMetricUtils
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, 
MockHiveTableScanExecFactory, QueryMetricUtils}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
-
-import java.io.File
+import org.mockito.Mockito._
 
 class SparkQueryMetricUtilsSuite extends QueryTest with SharedSparkSession {
 
@@ -59,7 +63,7 @@ class SparkQueryMetricUtilsSuite extends QueryTest with 
SharedSparkSession {
 
   lazy val metaStore: NLocalFileMetadataTestCase = new 
NLocalFileMetadataTestCase
 
-  protected def metadata : Seq[String] = {
+  protected def metadata: Seq[String] = {
     Seq(fitPathForUT(ut_meta))
   }
 
@@ -252,4 +256,76 @@ class SparkQueryMetricUtilsSuite extends QueryTest with 
SharedSparkSession {
 
   }
 
+  def testLeafExecMetrics(leafExecNode: LeafExecNode, setScanRows: (Int) => 
Unit, setScanBytes: (Int) => Unit): Unit = {
+    setScanRows(1000)
+    setScanBytes(56698)
+    val collectScanMetrics = QueryMetricUtils.collectScanMetrics(leafExecNode)
+    assert(1000 == collectScanMetrics._1.get(0))
+    assert(56698 == collectScanMetrics._2.get(0))
+    val collectScanMetrics2 = 
QueryMetricUtils.collectAdaptiveSparkPlanExecMetrics(leafExecNode, 200, 250)
+    assert(1200 == collectScanMetrics2._1)
+    assert(56948 == collectScanMetrics2._2)
+    val dataWritingCommandExec = mock(classOf[DataWritingCommandExec])
+    when(dataWritingCommandExec.metrics).thenReturn(
+      Map(
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+        "readBytes" -> SQLMetrics.createMetric(sparkContext, "readBytes"),
+        "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+      )
+    )
+    // these metrics should not be counted
+    dataWritingCommandExec.metrics("numOutputRows").+=(1000)
+    dataWritingCommandExec.metrics("readBytes").+=(5691)
+    dataWritingCommandExec.metrics("outputBytes").+=(5691)
+    when(dataWritingCommandExec.child).thenReturn(leafExecNode)
+    val collectScanMetrics3 = 
QueryMetricUtils.collectScanMetrics(dataWritingCommandExec)
+    assert(1000 == collectScanMetrics3._1.get(0))
+    assert(56698 == collectScanMetrics3._2.get(0))
+    val collectScanMetrics4 = 
QueryMetricUtils.collectAdaptiveSparkPlanExecMetrics(leafExecNode, 200, 250)
+    assert(1200 == collectScanMetrics4._1)
+    assert(56948 == collectScanMetrics4._2)
+  }
+
+  test("sparkPlan metrics for Batch scanBytes and ScanRows") {
+    val mockBatchScanExec = mock(classOf[BatchScanExec])
+    when(mockBatchScanExec.metrics).thenReturn(
+      Map(
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+        "readBytes" -> SQLMetrics.createMetric(sparkContext, "read bytes")
+      )
+    )
+    val mockBatchScanExecTransformer = mock(classOf[BatchScanExecTransformer])
+    when(mockBatchScanExecTransformer.metrics).thenReturn(
+      Map(
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+        "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+      )
+    )
+    testLeafExecMetrics(mockBatchScanExec, 
mockBatchScanExec.metrics("numOutputRows").+=(_),
+      mockBatchScanExec.metrics("readBytes").+=(_))
+    testLeafExecMetrics(mockBatchScanExecTransformer, 
mockBatchScanExecTransformer.metrics("numOutputRows").+=(_),
+      mockBatchScanExecTransformer.metrics("outputBytes").+=(_))
+  }
+
+  test("sparkPlan metrics for Hive scanBytes and ScanRows") {
+    val mockHiveTableScanExec = 
MockHiveTableScanExecFactory.getHiveTableScanExec
+    when(mockHiveTableScanExec.metrics).thenReturn(
+      Map(
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+        "readBytes" -> SQLMetrics.createMetric(sparkContext, "read bytes")
+      )
+    )
+    testLeafExecMetrics(mockHiveTableScanExec, 
mockHiveTableScanExec.metrics("numOutputRows").+=(_),
+      mockHiveTableScanExec.metrics("readBytes").+=(_))
+    val mockHiveTableScanExecTrans = 
mock(classOf[HiveTableScanExecTransformer])
+    when(mockHiveTableScanExecTrans.metrics).thenReturn(
+      Map(
+        "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+        "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+      )
+    )
+    testLeafExecMetrics(mockHiveTableScanExecTrans, 
mockHiveTableScanExecTrans.metrics("numOutputRows").+=(_),
+      mockHiveTableScanExecTrans.metrics("outputBytes").+=(_))
+  }
+
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java 
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
similarity index 56%
copy from 
src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
copy to 
src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
index 6aa69b94a9..b4d86f2a09 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++ 
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
@@ -15,26 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.spark.sql.hive
 
-package org.apache.kylin.common.msg;
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.mockito.Mockito
 
-import com.alibaba.ttl.TransmittableThreadLocal;
-public class MsgPicker {
-    private static TransmittableThreadLocal<Message> msg = new 
TransmittableThreadLocal<>();
-
-    public static void setMsg(String lang) {
-        //if ("cn".equals(lang))
-        //msg.set(CnMessage.getInstance());
-        //else
-        msg.set(Message.getInstance());
-    }
-
-    public static Message getMsg() {
-        Message ret = msg.get();
-        if (ret == null) { // use English by default
-            ret = Message.getInstance();
-            msg.set(Message.getInstance());
-        }
-        return ret;
-    }
+object MockHiveTableScanExecFactory {
+  def getHiveTableScanExec: LeafExecNode = 
Mockito.mock(classOf[HiveTableScanExec])
 }

Reply via email to