This is an automated email from the ASF dual-hosted git repository.

pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new c5d9939366 KYLIN-6050 fix use default schema when query internal table 
directly (#2300)
c5d9939366 is described below

commit c5d9939366c081994880af10e07d60390904a48c
Author: Guoliang Sun <[email protected]>
AuthorDate: Tue Feb 25 16:23:25 2025 +0800

    KYLIN-6050 fix use default schema when query internal table directly (#2300)
    
    ---------
    
    Co-authored-by: Xuecheng Shan <[email protected]>
    Co-authored-by: Xuecheng Shan <[email protected]>
---
 .../org/apache/kylin/common/msg/MsgPicker.java     |  1 -
 .../common/persistence/transaction/UnitOfWork.java |  4 +-
 .../apache/kylin/common/KylinConfigBaseTest.java   |  3 +-
 .../config/initialize/OpsAppInitializerTest.java   |  7 ++-
 .../apache/kylin/query/util/SchemaConverter.java   | 49 +++++++++++++++---
 .../kylin/query/util/SchemaConverterTest.java      | 59 ++++++++++++++++++++--
 6 files changed, 106 insertions(+), 17 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
index 5cf19447ac..0b7ce72e0a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.common.msg;
 
-import com.alibaba.ttl.TransmittableThreadLocal;
 public class MsgPicker {
     private static final ThreadLocal<Message> msg = new ThreadLocal<>();
     private static final String CHINESE_LANGUAGE_CODE = "cn";
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
index c7d9319ca6..805515a19d 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
@@ -46,6 +46,7 @@ import 
org.apache.kylin.common.persistence.lock.DeadLockException;
 import org.apache.kylin.common.persistence.lock.LockInterruptException;
 import org.apache.kylin.common.persistence.lock.TransactionLock;
 import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.persistence.resources.SystemRawResource;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
@@ -240,7 +241,8 @@ public class UnitOfWork {
         Set<String> copyForWriteResources = 
UnitOfWork.get().getCopyForWriteItems();
         val eventList = data.stream().map(x -> {
             String resPath = x.generateKeyWithType();
-            if (x.getContent() != null && 
!copyForWriteResources.contains(resPath)) {
+            if (x.getContent() != null && !(x instanceof SystemRawResource)
+                    && !copyForWriteResources.contains(resPath)) {
                 throw new IllegalStateException(
                         "Transaction try to modify a resource without 
copyForWrite: " + x.getMetaKey());
             }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 7017a3fa06..295ced8a66 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1563,7 +1563,8 @@ class KylinConfigBaseTest {
         Assertions.assertEquals(withoutExpected, withoutGluten);
     }
 
-    void testGetServerAddress() {
+    @Test
+    public void testGetServerAddress() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         config.setProperty("kylin.server.address", "127.0.0.1");
         Assertions.assertEquals("127.0.0.1", config.getServerAddress());
diff --git 
a/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
 
b/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
index ba5722b6e6..5562a80c21 100644
--- 
a/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
+++ 
b/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
@@ -40,8 +40,10 @@ import org.apache.kylin.tool.MetadataTool;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -52,6 +54,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @PowerMockIgnore({ "com.sun.security.*", "org.w3c.*", "javax.xml.*", 
"org.xml.*", "org.apache.cxf.*",
         "javax.management.*", "javax.script.*", "org.apache.hadoop.*", 
"javax.security.*", "java.security.*",
         "javax.crypto.*", "javax.net.ssl.*", 
"org.apache.kylin.common.asyncprofiler.AsyncProfiler" })
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class OpsAppInitializerTest extends NLocalFileMetadataTestCase {
     OpsService opsService;
 
@@ -72,7 +75,7 @@ public class OpsAppInitializerTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testMetadataBackupInterrupted() throws IOException {
+    public void testMetadataBackupInterruptedA() throws IOException {
         String path = opsService.backupMetadata(null);
         JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv());
         OpsService.MetadataBackup.getRunningTask()
@@ -85,7 +88,7 @@ public class OpsAppInitializerTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testMetadataRestoreInterrupted() throws IOException {
+    public void testMetadataRestoreInterruptedB() throws IOException {
         String path = opsService.backupMetadata(null);
         JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv());
         await().atMost(2, TimeUnit.MINUTES).until(() -> {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index f2f429850b..0df7bc20b0 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -18,7 +18,10 @@
 package org.apache.kylin.query.util;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
+import java.util.Set;
 
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
@@ -27,6 +30,8 @@ import org.apache.calcite.sql.SqlJoin;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlBasicVisitor;
 import org.apache.kylin.common.KylinConfig;
@@ -60,7 +65,7 @@ public class SchemaConverter implements IPushDownConverter {
             return originSql;
         }
         try {
-            String transformedSql = transform(originSql, project, config);
+            String transformedSql = transform(originSql, project, 
defaultSchema, config);
             QueryContext.current().setPushdownEngine("GLUTEN");
             return transformedSql;
         } catch (Exception e) {
@@ -74,21 +79,29 @@ public class SchemaConverter implements IPushDownConverter {
 
     }
 
-    public String transform(String originSql, String project, KylinConfig 
config) throws SqlParseException {
+    public String transform(String originSql, String project, String 
defaultSchema, KylinConfig config)
+            throws SqlParseException {
         SqlNode node = CalciteParser.parse(originSql, project);
         TableNameVisitor visitor = new TableNameVisitor(originSql);
         node.accept(visitor);
         List<Pair<SqlIdentifier, Pair<Integer, Integer>>> tableNamesWithPos = 
visitor.getTableNamesWithPos();
-        return replaceDbNameAndAddCatalog(tableNamesWithPos, originSql, 
config, project);
+        return replaceDbNameAndAddCatalog(tableNamesWithPos, originSql, 
defaultSchema, config, project);
     }
 
     private String replaceDbNameAndAddCatalog(List<Pair<SqlIdentifier, 
Pair<Integer, Integer>>> positions,
-            String originSql, KylinConfig config, String project) {
+            String originSql, String defaultSchema, KylinConfig config, String 
project) {
         positions.sort(((o1, o2) -> o2.getSecond().getFirst() - 
o1.getSecond().getFirst()));
         String sql = originSql + " ";
         InternalTableManager manager = 
InternalTableManager.getInstance(config, project);
         for (Pair<SqlIdentifier, Pair<Integer, Integer>> pos : positions) {
-            String tableIdentity = pos.getFirst().toString();
+            SqlIdentifier identifier = pos.getFirst();
+            String tableIdentity = identifier.toString();
+            if (identifier.names.size() == 1) {
+                tableIdentity = defaultSchema + '.' + tableIdentity;
+            }
+            if (!config.getSourceNameCaseSensitiveEnabled()) {
+                tableIdentity = tableIdentity.toUpperCase(Locale.ROOT);
+            }
             InternalTableDesc table = 
manager.getInternalTableDesc(tableIdentity);
             if (table == null) {
                 throw new IllegalStateException("Table " + tableIdentity + " 
is not an internal table.");
@@ -103,6 +116,7 @@ public class SchemaConverter implements IPushDownConverter {
         @Getter
         private final List<Pair<SqlIdentifier, Pair<Integer, Integer>>> 
tableNamesWithPos = new ArrayList<>();
         private final String originSql;
+        private final Set<String> namesOfWithItems = new HashSet<>();
 
         public TableNameVisitor(String originSql) {
             this.originSql = originSql;
@@ -117,16 +131,37 @@ public class SchemaConverter implements 
IPushDownConverter {
                 SqlJoin join = (SqlJoin) call;
                 checkIdentifier(join.getLeft());
                 checkIdentifier(join.getRight());
+            } else if (call.getKind() == SqlKind.WITH) {
+                SqlWith sqlWith = (SqlWith) call;
+                for (SqlNode withNode : sqlWith.withList) {
+                    visitWith(withNode);
+                }
+                sqlWith.body.accept(this);
+                return null;
             }
             return super.visit(call);
         }
 
+        private void visitWith(SqlNode withNode) {
+            if (withNode instanceof SqlWithItem) {
+                SqlWithItem withItem = (SqlWithItem) withNode;
+                withItem.query.accept(this);
+                namesOfWithItems.add(withItem.name.toString());
+            } else if (withNode instanceof SqlCall) {
+                withNode.accept(this);
+            }
+        }
+
         private void checkIdentifier(SqlNode node) {
             if (node instanceof SqlBasicCall && node.getKind() == SqlKind.AS) {
                 node = ((SqlBasicCall) node).operand(0);
             }
-            if (node instanceof SqlIdentifier && ((SqlIdentifier) 
node).names.size() == 2) {
-                tableIdentifierFound((SqlIdentifier) node);
+            if (node instanceof SqlIdentifier) {
+                SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
+                if (sqlIdentifier.names.size() == 2 || 
(sqlIdentifier.names.size() == 1
+                        && 
!namesOfWithItems.contains(sqlIdentifier.names.get(0)))) {
+                    tableIdentifierFound((SqlIdentifier) node);
+                }
             }
         }
 
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 647f33244a..612233e66e 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.table.InternalTableDesc;
 import org.apache.kylin.metadata.table.InternalTableManager;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -33,6 +34,11 @@ public class SchemaConverterTest {
 
     SchemaConverter converter = new SchemaConverter();
 
+    @AfterEach
+    public void teardown() {
+        QueryContext.current().getQueryTagInfo().setPushdown(false);
+    }
+
     @Test
     void testCatalogConvert() {
         String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join 
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
@@ -46,11 +52,7 @@ public class SchemaConverterTest {
 
         getTestConfig().setProperty("kylin.internal-table-enabled", "true");
 
-        InternalTableManager innerTableMgr = 
InternalTableManager.getInstance(getTestConfig(), "default");
-        NTableMetadataManager tableMgr = 
NTableMetadataManager.getInstance(getTestConfig(), "default");
-        for (TableDesc tableDesc : tableMgr.listAllTables()) {
-            innerTableMgr.createInternalTable(new 
InternalTableDesc(tableDesc));
-        }
+        prepareInternalTable();
 
         Assertions.assertEquals(sql, converter.convert(sql, "default", null));
 
@@ -71,4 +73,51 @@ public class SchemaConverterTest {
 
         Assertions.assertEquals(expectedSql, converter.convert(sql, "default", 
null));
     }
+
+    @Test
+    void testConvertWithDefaultSchema() {
+        getTestConfig().setProperty("kylin.internal-table-enabled", "true");
+
+        prepareInternalTable();
+
+        QueryContext.current().getQueryTagInfo().setPushdown(true);
+        String sql1 = "select TRANS_ID from test_kylin_fact";
+        String result1 = converter.convert(sql1, "default", "default");
+        String expectedSql1 = "select TRANS_ID "
+                + "from 
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\"";
+        Assertions.assertEquals(expectedSql1, result1);
+
+        String sql2 = "with t1 as (select TRANS_ID from test_kylin_fact) 
select * from t1";
+        String result2 = converter.convert(sql2, "default", "default");
+        String expectedSql2 = "with t1 as (select TRANS_ID "
+                + "from 
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\")" + " select * 
from t1";
+        Assertions.assertEquals(expectedSql2, result2);
+
+        
getTestConfig().setProperty("kylin.source.name-case-sensitive-enabled", "true");
+        String sql3 = "with t1 as (select TRANS_ID from test_kylin_fact) 
select * from t1";
+        String result3 = converter.convert(sql3, "default", "default");
+        String expectedSql3 = "with t1 as (select TRANS_ID from 
test_kylin_fact) select * from t1";
+        Assertions.assertEquals(expectedSql3, result3);
+        
Assertions.assertTrue(QueryContext.current().getQueryTagInfo().isErrInterrupted());
+        Assertions.assertEquals("Table default.test_kylin_fact is not an 
internal table.",
+                QueryContext.current().getQueryTagInfo().getInterruptReason());
+
+        
getTestConfig().setProperty("kylin.source.name-case-sensitive-enabled", 
"false");
+        String sql4 = "with t1 as (select TRANS_ID from test_kylin_fact), t2 
as (select TRANS_ID from t1) "
+                + "select * from t2";
+        String result4 = converter.convert(sql4, "default", "default");
+        String expectedSql4 = "with t1 as (select TRANS_ID "
+                + "from 
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\"), "
+                + "t2 as (select TRANS_ID from t1) select * from t2";
+        Assertions.assertEquals(expectedSql4, result4);
+        QueryContext.current().getQueryTagInfo().setPushdown(false);
+    }
+
+    private void prepareInternalTable() {
+        InternalTableManager innerTableMgr = 
InternalTableManager.getInstance(getTestConfig(), "default");
+        NTableMetadataManager tableMgr = 
NTableMetadataManager.getInstance(getTestConfig(), "default");
+        for (TableDesc tableDesc : tableMgr.listAllTables()) {
+            innerTableMgr.createInternalTable(new 
InternalTableDesc(tableDesc));
+        }
+    }
 }

Reply via email to