This is an automated email from the ASF dual-hosted git repository.
pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 83cab86ac4 KYLIN-6026 When table sourceType is JDBC,skip
`calculateViewMetas`
83cab86ac4 is described below
commit 83cab86ac4ee74f9d0c60e9ded305e47255ccade
Author: jlf <[email protected]>
AuthorDate: Fri Nov 15 15:28:53 2024 +0800
KYLIN-6026 When table sourceType is JDBC,skip `calculateViewMetas`
---
.../kylin/engine/spark/job/TableAnalysisJob.scala | 13 +++++--
.../spark/stats/analyzer/TableAnalyzerTest.java | 41 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
index 26cbcc54a9..e42a8b238a 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
@@ -26,7 +26,7 @@ import org.apache.kylin.engine.spark.builder.CreateFlatTable
import org.apache.kylin.engine.spark.source.SparkSqlUtil
import org.apache.kylin.engine.spark.stats.analyzer.TableAnalyzerJob
import org.apache.kylin.engine.spark.utils.SparkConfHelper
-import org.apache.kylin.metadata.model.TableDesc
+import org.apache.kylin.metadata.model.{ISourceAware, TableDesc}
import org.apache.kylin.metadata.project.NProjectManager
import org.apache.kylin.source.SourceFactory
import org.apache.spark.application.NoRetryException
@@ -68,7 +68,7 @@ class TableAnalysisJob(tableDesc: TableDesc,
throw new NoRetryException("Source table missing columns. Please reload
table before sampling.")
}
- calculateViewMetasIfNeeded(tableDesc.getBackTickIdentity)
+ calculateViewMetasIfNeeded(tableDesc)
val dat = dataFrame.localLimit(rowsTakenInEachPartition)
val sampledDataset = CreateFlatTable.changeSchemaToAliasDotName(dat,
tableDesc.getBackTickIdentity)
@@ -80,6 +80,15 @@ class TableAnalysisJob(tableDesc: TableDesc,
aggData ++ sampledDataset.limit(10).collect()
}
+ def calculateViewMetasIfNeeded(tableDesc: TableDesc): Unit = {
+ val tableName = tableDesc.getBackTickIdentity
+ if (tableDesc.getSourceType == ISourceAware.ID_JDBC) {
+ logInfo(s"Table [$tableName] sourceType is JDBC, skip to calculate view
meta")
+ return
+ }
+ calculateViewMetasIfNeeded(tableName)
+ }
+
def calculateViewMetasIfNeeded(tableName: String): Unit = {
if (ss.conf.get("spark.sql.catalogImplementation") == "hive") {
if (ss.catalog.tableExists(tableName)) {
diff --git
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
index b4b0da3588..f3e1760168 100644
---
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
+++
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/stats/analyzer/TableAnalyzerTest.java
@@ -24,19 +24,31 @@ import java.util.stream.Collectors;
import org.apache.kylin.GlutenDisabled;
import org.apache.kylin.GlutenRunner;
+import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
+import org.apache.kylin.engine.spark.job.TableAnalysisJob;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import lombok.val;
import lombok.var;
@@ -45,6 +57,8 @@ import lombok.var;
public class TableAnalyzerTest extends NLocalWithSparkSessionTestBase {
private NTableMetadataManager tableMgr;
+ @Mock
+ private Appender appender = Mockito.mock(Appender.class);
@Before
public void setup() {
@@ -56,6 +70,15 @@ public class TableAnalyzerTest extends
NLocalWithSparkSessionTestBase {
sparkConf.set(SparkConfHelper.EXECUTOR_CORES, "1");
ss = SparkSession.builder().config(sparkConf).getOrCreate();
SparderEnv.setSparkSession(ss);
+
+ Mockito.when(appender.getName()).thenReturn("mocked");
+ Mockito.when(appender.isStarted()).thenReturn(true);
+ ((Logger) LogManager.getRootLogger()).addAppender(appender);
+ }
+
+ @After
+ public void after() {
+ ((Logger) LogManager.getRootLogger()).removeAppender(appender);
}
@Test
@@ -164,4 +187,22 @@ public class TableAnalyzerTest extends
NLocalWithSparkSessionTestBase {
Assert.assertEquals(100.0 / 10000, tableExt.getTotalRows() / 10000.0,
0.1);
}
+
+ @Test
+ public void testJdbcTableSkipCalculateViewMetas() {
+ val tableDesc = Mockito.mock(TableDesc.class);
+
Mockito.when(tableDesc.getSourceType()).thenReturn(ISourceAware.ID_JDBC);
+ Mockito.when(tableDesc.getBackTickIdentity()).thenReturn("UT.TEST");
+
+ val tableAnalyzeExec = new TableAnalysisJob(tableDesc, getProject(),
100, ss, RandomUtil.randomUUIDStr());
+ tableAnalyzeExec.calculateViewMetasIfNeeded(tableDesc);
+
+ ArgumentCaptor<LogEvent> logCaptor =
ArgumentCaptor.forClass(LogEvent.class);
+ Mockito.verify(appender,
Mockito.atLeast(0)).append(logCaptor.capture());
+ var log = logCaptor.getAllValues().stream()
+ .filter(event ->
event.getLoggerName().equals(TableAnalysisJob.class.getName()))
+ .filter(event -> event.getLevel().equals(Level.INFO))
+ .map(event ->
event.getMessage().getFormattedMessage()).findFirst().orElseThrow(AssertionError::new);
+ Assert.assertEquals("Table [UT.TEST] sourceType is JDBC, skip to
calculate view meta", log);
+ }
}