This is an automated email from the ASF dual-hosted git repository.
yhcast0 pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 73f2091f75 KYLIN-6019 fix calculate scanRows and scanBytes (#2268)
73f2091f75 is described below
commit 73f2091f75157ca4d782de9a7080338fd3d07cab
Author: Guoliang Sun <[email protected]>
AuthorDate: Fri Feb 21 15:36:59 2025 +0800
KYLIN-6019 fix calculate scanRows and scanBytes (#2268)
Co-authored-by: 夏旭晨 <[email protected]>
---
.../org/apache/kylin/common/msg/MsgPicker.java | 27 +++++--
.../apache/kylin/common/KylinConfigBaseTest.java | 17 ++++-
.../apache/spark/sql/hive/QueryMetricUtils.scala | 14 +++-
.../sql/execution/SparkQueryMetricUtilsSuite.scala | 84 ++++++++++++++++++++--
.../sql/hive/MockHiveTableScanExecFactory.scala} | 26 ++-----
5 files changed, 134 insertions(+), 34 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
index 6aa69b94a9..5cf19447ac 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
@@ -20,20 +20,33 @@ package org.apache.kylin.common.msg;
import com.alibaba.ttl.TransmittableThreadLocal;
public class MsgPicker {
- private static TransmittableThreadLocal<Message> msg = new
TransmittableThreadLocal<>();
+ private static final ThreadLocal<Message> msg = new ThreadLocal<>();
+ private static final String CHINESE_LANGUAGE_CODE = "cn";
+ /**
+ * Sets the message based on the given language code.
+ * If the language code is null or not recognized, sets the default
English message.
+ *
+ * @param lang the language code (e.g., "cn" for Chinese)
+ */
public static void setMsg(String lang) {
- //if ("cn".equals(lang))
- //msg.set(CnMessage.getInstance());
- //else
- msg.set(Message.getInstance());
+ if (CHINESE_LANGUAGE_CODE.equals(lang)) {
+ msg.set(CnMessage.getInstance());
+ } else {
+ msg.set(Message.getInstance());
+ }
}
+ /**
+ * Gets the current message. If no message is set, returns the default
English message.
+ *
+ * @return the current message or the default English message if none is
set
+ */
public static Message getMsg() {
Message ret = msg.get();
- if (ret == null) { // use English by default
+ if (ret == null) {
ret = Message.getInstance();
- msg.set(Message.getInstance());
+ msg.set(ret);
}
return ret;
}
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 3ef0e75bc6..0d2f359f19 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -50,10 +50,13 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -1525,6 +1528,16 @@ class KylinConfigBaseTest {
Assertions.assertEquals(5002, map.get(13).get(1));
}
+ private void assertUnorderedEqualsIgnoringEmpty(String expected, String
actual) {
+ Set<String> actualSet = new
HashSet<>(Arrays.asList(actual.split(",")));
+ Set<String> expectedSet = new
HashSet<>(Arrays.asList(expected.split(",")));
+
+ actualSet.removeIf(StringUtils::isBlank);
+ expectedSet.removeIf(StringUtils::isBlank);
+
+ Assertions.assertEquals(expectedSet, actualSet, "expectedSet: " +
expectedSet + " === actualSet: " + actualSet);
+ }
+
@Test
public void getKylinExtJarsPath() throws Exception {
val config = KylinConfig.getInstanceFromEnv();
@@ -1541,9 +1554,9 @@ class KylinConfigBaseTest {
FileUtils.write(glutenCelebornJar, "gluten celeborn jar");
val withGluten = config.getKylinExtJarsPath(true);
- val withGlutenExpected = "," + glutenJar.getAbsolutePath() + "," +
celebornJar.getAbsolutePath() + ","
+ val withGlutenExpected = "," + celebornJar.getAbsolutePath() + "," +
glutenJar.getAbsolutePath() + ","
+ mysqlJar.getAbsolutePath() + "," +
glutenCelebornJar.getAbsolutePath();
- Assertions.assertEquals(withGlutenExpected, withGluten);
+ assertUnorderedEqualsIgnoringEmpty(withGlutenExpected, withGluten);
val withoutGluten = config.getKylinExtJarsPath(false);
val withoutExpected = "," + celebornJar.getAbsolutePath() + "," +
mysqlJar.getAbsolutePath();
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
index 1e504d2854..d0c70dc205 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/hive/QueryMetricUtils.scala
@@ -18,12 +18,13 @@
package org.apache.spark.sql.hive
-import org.apache.gluten.execution.FileSourceScanExecTransformer
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.AppStatus
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import scala.collection.JavaConverters._
@@ -57,6 +58,17 @@ object QueryMetricUtils extends Logging {
(scanRow + exec.metrics.apply("numOutputRows").value, scanBytes +
exec.metrics.apply("readBytes").value)
case exec: HiveTableScanExec =>
(scanRow + exec.metrics.apply("numOutputRows").value, scanBytes +
exec.metrics.apply("readBytes").value)
+ case transformer: HiveTableScanExecTransformer =>
+ (scanRow + transformer.metrics.apply("numOutputRows").value, scanBytes
+ transformer.metrics.apply("outputBytes").value)
+ case exec: BatchScanExec =>
+ // avoid empty metrics
+ val readBytes =
exec.metrics.get("readBytes").map(_.value).getOrElse(0L)
+ (scanRow + exec.metrics.apply("numOutputRows").value, scanBytes +
readBytes)
+ case transformer: BatchScanExecTransformer =>
+ // avoid empty metrics
+ val numOutputRows =
transformer.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ val outputBytes =
transformer.metrics.get("outputBytes").map(_.value).getOrElse(0L)
+ (scanRow + numOutputRows, scanBytes + outputBytes)
case exec: ShuffleQueryStageExec =>
collectAdaptiveSparkPlanExecMetrics(exec.plan, scanRow, scanBytes)
case exec: AdaptiveSparkPlanExec =>
diff --git
a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
index 66d2ea1eef..7fcdd4f1f6 100644
---
a/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
+++
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/execution/SparkQueryMetricUtilsSuite.scala
@@ -34,6 +34,9 @@
package org.apache.spark.sql.execution
+import java.io.File
+
+import org.apache.gluten.execution.BatchScanExecTransformer
import org.apache.hadoop.fs.Path
import org.apache.kylin.common.util.NLocalFileMetadataTestCase
import org.apache.spark.sql._
@@ -44,14 +47,15 @@ import
org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, Adapti
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
InMemoryFileIndex, PartitionSpec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.hive.QueryMetricUtils
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.hive.{HiveTableScanExecTransformer,
MockHiveTableScanExecFactory, QueryMetricUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
-
-import java.io.File
+import org.mockito.Mockito._
class SparkQueryMetricUtilsSuite extends QueryTest with SharedSparkSession {
@@ -59,7 +63,7 @@ class SparkQueryMetricUtilsSuite extends QueryTest with
SharedSparkSession {
lazy val metaStore: NLocalFileMetadataTestCase = new
NLocalFileMetadataTestCase
- protected def metadata : Seq[String] = {
+ protected def metadata: Seq[String] = {
Seq(fitPathForUT(ut_meta))
}
@@ -252,4 +256,76 @@ class SparkQueryMetricUtilsSuite extends QueryTest with
SharedSparkSession {
}
+ def testLeafExecMetrics(leafExecNode: LeafExecNode, setScanRows: (Int) =>
Unit, setScanBytes: (Int) => Unit): Unit = {
+ setScanRows(1000)
+ setScanBytes(56698)
+ val collectScanMetrics = QueryMetricUtils.collectScanMetrics(leafExecNode)
+ assert(1000 == collectScanMetrics._1.get(0))
+ assert(56698 == collectScanMetrics._2.get(0))
+ val collectScanMetrics2 =
QueryMetricUtils.collectAdaptiveSparkPlanExecMetrics(leafExecNode, 200, 250)
+ assert(1200 == collectScanMetrics2._1)
+ assert(56948 == collectScanMetrics2._2)
+ val dataWritingCommandExec = mock(classOf[DataWritingCommandExec])
+ when(dataWritingCommandExec.metrics).thenReturn(
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "readBytes" -> SQLMetrics.createMetric(sparkContext, "readBytes"),
+ "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+ )
+ )
+ // these metrics should not be counted
+ dataWritingCommandExec.metrics("numOutputRows").+=(1000)
+ dataWritingCommandExec.metrics("readBytes").+=(5691)
+ dataWritingCommandExec.metrics("outputBytes").+=(5691)
+ when(dataWritingCommandExec.child).thenReturn(leafExecNode)
+ val collectScanMetrics3 =
QueryMetricUtils.collectScanMetrics(dataWritingCommandExec)
+ assert(1000 == collectScanMetrics3._1.get(0))
+ assert(56698 == collectScanMetrics3._2.get(0))
+ val collectScanMetrics4 =
QueryMetricUtils.collectAdaptiveSparkPlanExecMetrics(leafExecNode, 200, 250)
+ assert(1200 == collectScanMetrics4._1)
+ assert(56948 == collectScanMetrics4._2)
+ }
+
+ test("sparkPlan metrics for Batch scanBytes and ScanRows") {
+ val mockBatchScanExec = mock(classOf[BatchScanExec])
+ when(mockBatchScanExec.metrics).thenReturn(
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "readBytes" -> SQLMetrics.createMetric(sparkContext, "read bytes")
+ )
+ )
+ val mockBatchScanExecTransformer = mock(classOf[BatchScanExecTransformer])
+ when(mockBatchScanExecTransformer.metrics).thenReturn(
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+ )
+ )
+ testLeafExecMetrics(mockBatchScanExec,
mockBatchScanExec.metrics("numOutputRows").+=(_),
+ mockBatchScanExec.metrics("readBytes").+=(_))
+ testLeafExecMetrics(mockBatchScanExecTransformer,
mockBatchScanExecTransformer.metrics("numOutputRows").+=(_),
+ mockBatchScanExecTransformer.metrics("outputBytes").+=(_))
+ }
+
+ test("sparkPlan metrics for Hive scanBytes and ScanRows") {
+ val mockHiveTableScanExec =
MockHiveTableScanExecFactory.getHiveTableScanExec
+ when(mockHiveTableScanExec.metrics).thenReturn(
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "readBytes" -> SQLMetrics.createMetric(sparkContext, "read bytes")
+ )
+ )
+ testLeafExecMetrics(mockHiveTableScanExec,
mockHiveTableScanExec.metrics("numOutputRows").+=(_),
+ mockHiveTableScanExec.metrics("readBytes").+=(_))
+ val mockHiveTableScanExecTrans =
mock(classOf[HiveTableScanExecTransformer])
+ when(mockHiveTableScanExecTrans.metrics).thenReturn(
+ Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
+ "outputBytes" -> SQLMetrics.createMetric(sparkContext, "outputBytes")
+ )
+ )
+ testLeafExecMetrics(mockHiveTableScanExecTrans,
mockHiveTableScanExecTrans.metrics("numOutputRows").+=(_),
+ mockHiveTableScanExecTrans.metrics("outputBytes").+=(_))
+ }
+
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
similarity index 56%
copy from
src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
copy to
src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
index 6aa69b94a9..b4d86f2a09 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++
b/src/spark-project/sparder/src/test/scala/org/apache/spark/sql/hive/MockHiveTableScanExecFactory.scala
@@ -15,26 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql.hive
-package org.apache.kylin.common.msg;
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.mockito.Mockito
-import com.alibaba.ttl.TransmittableThreadLocal;
-public class MsgPicker {
- private static TransmittableThreadLocal<Message> msg = new
TransmittableThreadLocal<>();
-
- public static void setMsg(String lang) {
- //if ("cn".equals(lang))
- //msg.set(CnMessage.getInstance());
- //else
- msg.set(Message.getInstance());
- }
-
- public static Message getMsg() {
- Message ret = msg.get();
- if (ret == null) { // use English by default
- ret = Message.getInstance();
- msg.set(Message.getInstance());
- }
- return ret;
- }
+object MockHiveTableScanExecFactory {
+ def getHiveTableScanExec: LeafExecNode =
Mockito.mock(classOf[HiveTableScanExec])
}