This is an automated email from the ASF dual-hosted git repository.
pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new c5d9939366 KYLIN-6050 fix use default schema when query internal table
directly (#2300)
c5d9939366 is described below
commit c5d9939366c081994880af10e07d60390904a48c
Author: Guoliang Sun <[email protected]>
AuthorDate: Tue Feb 25 16:23:25 2025 +0800
KYLIN-6050 fix use default schema when query internal table directly (#2300)
---------
Co-authored-by: Xuecheng Shan <[email protected]>
Co-authored-by: Xuecheng Shan <[email protected]>
---
.../org/apache/kylin/common/msg/MsgPicker.java | 1 -
.../common/persistence/transaction/UnitOfWork.java | 4 +-
.../apache/kylin/common/KylinConfigBaseTest.java | 3 +-
.../config/initialize/OpsAppInitializerTest.java | 7 ++-
.../apache/kylin/query/util/SchemaConverter.java | 49 +++++++++++++++---
.../kylin/query/util/SchemaConverterTest.java | 59 ++++++++++++++++++++--
6 files changed, 106 insertions(+), 17 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
index 5cf19447ac..0b7ce72e0a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/MsgPicker.java
@@ -18,7 +18,6 @@
package org.apache.kylin.common.msg;
-import com.alibaba.ttl.TransmittableThreadLocal;
public class MsgPicker {
private static final ThreadLocal<Message> msg = new ThreadLocal<>();
private static final String CHINESE_LANGUAGE_CODE = "cn";
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
index c7d9319ca6..805515a19d 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/transaction/UnitOfWork.java
@@ -46,6 +46,7 @@ import
org.apache.kylin.common.persistence.lock.DeadLockException;
import org.apache.kylin.common.persistence.lock.LockInterruptException;
import org.apache.kylin.common.persistence.lock.TransactionLock;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.persistence.resources.SystemRawResource;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
@@ -240,7 +241,8 @@ public class UnitOfWork {
Set<String> copyForWriteResources =
UnitOfWork.get().getCopyForWriteItems();
val eventList = data.stream().map(x -> {
String resPath = x.generateKeyWithType();
- if (x.getContent() != null &&
!copyForWriteResources.contains(resPath)) {
+ if (x.getContent() != null && !(x instanceof SystemRawResource)
+ && !copyForWriteResources.contains(resPath)) {
throw new IllegalStateException(
"Transaction try to modify a resource without
copyForWrite: " + x.getMetaKey());
}
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 7017a3fa06..295ced8a66 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1563,7 +1563,8 @@ class KylinConfigBaseTest {
Assertions.assertEquals(withoutExpected, withoutGluten);
}
- void testGetServerAddress() {
+ @Test
+ public void testGetServerAddress() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
config.setProperty("kylin.server.address", "127.0.0.1");
Assertions.assertEquals("127.0.0.1", config.getServerAddress());
diff --git
a/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
b/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
index ba5722b6e6..5562a80c21 100644
---
a/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
+++
b/src/ops-booter/src/test/java/org/apache/kylin/rest/config/initialize/OpsAppInitializerTest.java
@@ -40,8 +40,10 @@ import org.apache.kylin.tool.MetadataTool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -52,6 +54,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
@PowerMockIgnore({ "com.sun.security.*", "org.w3c.*", "javax.xml.*",
"org.xml.*", "org.apache.cxf.*",
"javax.management.*", "javax.script.*", "org.apache.hadoop.*",
"javax.security.*", "java.security.*",
"javax.crypto.*", "javax.net.ssl.*",
"org.apache.kylin.common.asyncprofiler.AsyncProfiler" })
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class OpsAppInitializerTest extends NLocalFileMetadataTestCase {
OpsService opsService;
@@ -72,7 +75,7 @@ public class OpsAppInitializerTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testMetadataBackupInterrupted() throws IOException {
+ public void testMetadataBackupInterruptedA() throws IOException {
String path = opsService.backupMetadata(null);
JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv());
OpsService.MetadataBackup.getRunningTask()
@@ -85,7 +88,7 @@ public class OpsAppInitializerTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testMetadataRestoreInterrupted() throws IOException {
+ public void testMetadataRestoreInterruptedB() throws IOException {
String path = opsService.backupMetadata(null);
JobContextUtil.getJobContext(KylinConfig.getInstanceFromEnv());
await().atMost(2, TimeUnit.MINUTES).until(() -> {
diff --git
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
index f2f429850b..0df7bc20b0 100644
---
a/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
+++
b/src/query-common/src/main/java/org/apache/kylin/query/util/SchemaConverter.java
@@ -18,7 +18,10 @@
package org.apache.kylin.query.util;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
+import java.util.Set;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
@@ -27,6 +30,8 @@ import org.apache.calcite.sql.SqlJoin;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.util.SqlBasicVisitor;
import org.apache.kylin.common.KylinConfig;
@@ -60,7 +65,7 @@ public class SchemaConverter implements IPushDownConverter {
return originSql;
}
try {
- String transformedSql = transform(originSql, project, config);
+ String transformedSql = transform(originSql, project,
defaultSchema, config);
QueryContext.current().setPushdownEngine("GLUTEN");
return transformedSql;
} catch (Exception e) {
@@ -74,21 +79,29 @@ public class SchemaConverter implements IPushDownConverter {
}
- public String transform(String originSql, String project, KylinConfig
config) throws SqlParseException {
+ public String transform(String originSql, String project, String
defaultSchema, KylinConfig config)
+ throws SqlParseException {
SqlNode node = CalciteParser.parse(originSql, project);
TableNameVisitor visitor = new TableNameVisitor(originSql);
node.accept(visitor);
List<Pair<SqlIdentifier, Pair<Integer, Integer>>> tableNamesWithPos =
visitor.getTableNamesWithPos();
- return replaceDbNameAndAddCatalog(tableNamesWithPos, originSql,
config, project);
+ return replaceDbNameAndAddCatalog(tableNamesWithPos, originSql,
defaultSchema, config, project);
}
private String replaceDbNameAndAddCatalog(List<Pair<SqlIdentifier,
Pair<Integer, Integer>>> positions,
- String originSql, KylinConfig config, String project) {
+ String originSql, String defaultSchema, KylinConfig config, String
project) {
positions.sort(((o1, o2) -> o2.getSecond().getFirst() -
o1.getSecond().getFirst()));
String sql = originSql + " ";
InternalTableManager manager =
InternalTableManager.getInstance(config, project);
for (Pair<SqlIdentifier, Pair<Integer, Integer>> pos : positions) {
- String tableIdentity = pos.getFirst().toString();
+ SqlIdentifier identifier = pos.getFirst();
+ String tableIdentity = identifier.toString();
+ if (identifier.names.size() == 1) {
+ tableIdentity = defaultSchema + '.' + tableIdentity;
+ }
+ if (!config.getSourceNameCaseSensitiveEnabled()) {
+ tableIdentity = tableIdentity.toUpperCase(Locale.ROOT);
+ }
InternalTableDesc table =
manager.getInternalTableDesc(tableIdentity);
if (table == null) {
throw new IllegalStateException("Table " + tableIdentity + "
is not an internal table.");
@@ -103,6 +116,7 @@ public class SchemaConverter implements IPushDownConverter {
@Getter
private final List<Pair<SqlIdentifier, Pair<Integer, Integer>>>
tableNamesWithPos = new ArrayList<>();
private final String originSql;
+ private final Set<String> namesOfWithItems = new HashSet<>();
public TableNameVisitor(String originSql) {
this.originSql = originSql;
@@ -117,16 +131,37 @@ public class SchemaConverter implements
IPushDownConverter {
SqlJoin join = (SqlJoin) call;
checkIdentifier(join.getLeft());
checkIdentifier(join.getRight());
+ } else if (call.getKind() == SqlKind.WITH) {
+ SqlWith sqlWith = (SqlWith) call;
+ for (SqlNode withNode : sqlWith.withList) {
+ visitWith(withNode);
+ }
+ sqlWith.body.accept(this);
+ return null;
}
return super.visit(call);
}
+ private void visitWith(SqlNode withNode) {
+ if (withNode instanceof SqlWithItem) {
+ SqlWithItem withItem = (SqlWithItem) withNode;
+ withItem.query.accept(this);
+ namesOfWithItems.add(withItem.name.toString());
+ } else if (withNode instanceof SqlCall) {
+ withNode.accept(this);
+ }
+ }
+
private void checkIdentifier(SqlNode node) {
if (node instanceof SqlBasicCall && node.getKind() == SqlKind.AS) {
node = ((SqlBasicCall) node).operand(0);
}
- if (node instanceof SqlIdentifier && ((SqlIdentifier)
node).names.size() == 2) {
- tableIdentifierFound((SqlIdentifier) node);
+ if (node instanceof SqlIdentifier) {
+ SqlIdentifier sqlIdentifier = (SqlIdentifier) node;
+ if (sqlIdentifier.names.size() == 2 ||
(sqlIdentifier.names.size() == 1
+ &&
!namesOfWithItems.contains(sqlIdentifier.names.get(0)))) {
+ tableIdentifierFound((SqlIdentifier) node);
+ }
}
}
diff --git
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
index 647f33244a..612233e66e 100644
---
a/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
+++
b/src/query/src/test/java/org/apache/kylin/query/util/SchemaConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.table.InternalTableDesc;
import org.apache.kylin.metadata.table.InternalTableManager;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -33,6 +34,11 @@ public class SchemaConverterTest {
SchemaConverter converter = new SchemaConverter();
+ @AfterEach
+ public void teardown() {
+ QueryContext.current().getQueryTagInfo().setPushdown(false);
+ }
+
@Test
void testCatalogConvert() {
String sql = "select t1.id t_id from SSB.P_LINEORDER t1 left join
\"SSB\".\"PART\" on t1.PARTKEY = PART.PARTKEY "
@@ -46,11 +52,7 @@ public class SchemaConverterTest {
getTestConfig().setProperty("kylin.internal-table-enabled", "true");
- InternalTableManager innerTableMgr =
InternalTableManager.getInstance(getTestConfig(), "default");
- NTableMetadataManager tableMgr =
NTableMetadataManager.getInstance(getTestConfig(), "default");
- for (TableDesc tableDesc : tableMgr.listAllTables()) {
- innerTableMgr.createInternalTable(new
InternalTableDesc(tableDesc));
- }
+ prepareInternalTable();
Assertions.assertEquals(sql, converter.convert(sql, "default", null));
@@ -71,4 +73,51 @@ public class SchemaConverterTest {
Assertions.assertEquals(expectedSql, converter.convert(sql, "default",
null));
}
+
+ @Test
+ void testConvertWithDefaultSchema() {
+ getTestConfig().setProperty("kylin.internal-table-enabled", "true");
+
+ prepareInternalTable();
+
+ QueryContext.current().getQueryTagInfo().setPushdown(true);
+ String sql1 = "select TRANS_ID from test_kylin_fact";
+ String result1 = converter.convert(sql1, "default", "default");
+ String expectedSql1 = "select TRANS_ID "
+ + "from
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\"";
+ Assertions.assertEquals(expectedSql1, result1);
+
+ String sql2 = "with t1 as (select TRANS_ID from test_kylin_fact)
select * from t1";
+ String result2 = converter.convert(sql2, "default", "default");
+ String expectedSql2 = "with t1 as (select TRANS_ID "
+ + "from
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\")" + " select *
from t1";
+ Assertions.assertEquals(expectedSql2, result2);
+
+
getTestConfig().setProperty("kylin.source.name-case-sensitive-enabled", "true");
+ String sql3 = "with t1 as (select TRANS_ID from test_kylin_fact)
select * from t1";
+ String result3 = converter.convert(sql3, "default", "default");
+ String expectedSql3 = "with t1 as (select TRANS_ID from
test_kylin_fact) select * from t1";
+ Assertions.assertEquals(expectedSql3, result3);
+
Assertions.assertTrue(QueryContext.current().getQueryTagInfo().isErrInterrupted());
+ Assertions.assertEquals("Table default.test_kylin_fact is not an
internal table.",
+ QueryContext.current().getQueryTagInfo().getInterruptReason());
+
+
getTestConfig().setProperty("kylin.source.name-case-sensitive-enabled",
"false");
+ String sql4 = "with t1 as (select TRANS_ID from test_kylin_fact), t2
as (select TRANS_ID from t1) "
+ + "select * from t2";
+ String result4 = converter.convert(sql4, "default", "default");
+ String expectedSql4 = "with t1 as (select TRANS_ID "
+ + "from
\"INTERNAL_CATALOG\".\"default\".\"DEFAULT\".\"TEST_KYLIN_FACT\"), "
+ + "t2 as (select TRANS_ID from t1) select * from t2";
+ Assertions.assertEquals(expectedSql4, result4);
+ QueryContext.current().getQueryTagInfo().setPushdown(false);
+ }
+
+ private void prepareInternalTable() {
+ InternalTableManager innerTableMgr =
InternalTableManager.getInstance(getTestConfig(), "default");
+ NTableMetadataManager tableMgr =
NTableMetadataManager.getInstance(getTestConfig(), "default");
+ for (TableDesc tableDesc : tableMgr.listAllTables()) {
+ innerTableMgr.createInternalTable(new
InternalTableDesc(tableDesc));
+ }
+ }
}