This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4d3418c237d4a839a85573712d0111d81086a477
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Wed Aug 30 14:11:32 2023 +0800

    KYLIN-5798 Add user-table access cache for table-loading
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 13 +++++++++++
 .../apache/kylin/common/KylinConfigBaseTest.java   | 22 ++++++++++++++++++
 .../apache/kylin/rest/service/SparkDDLTest.java    | 26 ++++++++++------------
 .../spark/source/NSparkMetadataExplorer.java       | 22 +++++++++++++++---
 .../spark/source/NSparkMetadataExplorerTest.java   | 14 +++++++++++-
 5 files changed, 79 insertions(+), 18 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3de55bf84e..8b9a0308f7 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2800,6 +2800,19 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.source.hive.table-access-filter-enabled",
 FALSE));
     }
 
+    public boolean getTableAccessCacheEnable() {
+        return 
Boolean.parseBoolean(getOptional("kylin.source.hive.table-access-cache-enabled",
 TRUE));
+    }
+
+    public long getTableAccessCacheSize() {
+        return 
Long.parseLong(getOptional("kylin.source.hive.table-access-cache-size", 
ONE_HUNDRED_THOUSAND));
+    }
+
+    public long getTableAccessCacheTTL() {
+        return 
TimeUtil.timeStringAs(getOptional("kylin.source.hive.table-access-cache-ttl", 
"7d"),
+                TimeUnit.MINUTES);
+    }
+
     public String[] getHiveDatabases() {
         return getOptionalStringArray("kylin.source.hive.databases", new 
String[0]);
     }
diff --git 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index 6be4411004..59e4a3fc9b 100644
--- 
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ 
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -36,6 +36,7 @@
 
 package org.apache.kylin.common;
 
+import static org.apache.kylin.common.KylinConfigBase.FALSE;
 import static org.apache.kylin.common.KylinConfigBase.PATH_DELIMITER;
 import static 
org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR;
 import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_ENABLE_KEY;
@@ -1499,6 +1500,27 @@ class KylinConfigBaseTest {
         config.setProperty("kylin.metadata.audit-log.max-size", "3000000");
         assertEquals(3000000, config.getMetadataAuditLogMaxSize());
     }
+
+    @Test
+    void testGetTableAccessCache() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+
+        assertTrue(config.getTableAccessCacheEnable());
+        config.setProperty("kylin.source.hive.table-access-cache-enabled", 
FALSE);
+        assertFalse(config.getTableAccessCacheEnable());
+
+        assertEquals(100000, config.getTableAccessCacheSize());
+        config.setProperty("kylin.source.hive.table-access-cache-size", 
"200000");
+        assertEquals(200000, config.getTableAccessCacheSize());
+
+        assertEquals(10080, config.getTableAccessCacheTTL());
+        config.setProperty("kylin.source.hive.table-access-cache-ttl", "1m");
+        assertEquals(1, config.getTableAccessCacheTTL());
+        config.setProperty("kylin.source.hive.table-access-cache-ttl", "1h");
+        assertEquals(60, config.getTableAccessCacheTTL());
+        config.setProperty("kylin.source.hive.table-access-cache-ttl", "1d");
+        assertEquals(1440, config.getTableAccessCacheTTL());
+    }
 }
 
 class EnvironmentUpdateUtils {
diff --git 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
index cf23d159aa..dc293e2b0c 100644
--- 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
+++ 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -41,36 +42,27 @@ import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.ViewRequest;
 import org.apache.kylin.rest.response.LoadTableResponse;
 import org.apache.kylin.rest.response.LogicalViewResponse;
-
 import org.apache.spark.sql.LogicalViewLoader;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.common.SparkDDLTestUtils;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 public class SparkDDLTest extends NLocalFileMetadataTestCase {
-  @Autowired
-  private final SparkDDLService ddlService = Mockito.spy(new 
SparkDDLService());
-  @Autowired
-  private final TableExtService tableExtService = Mockito.spy(new 
TableExtService());
-  @Autowired
-  private final TableService tableService = Mockito.spy(new TableService());
-  @Autowired
-  private final IUserGroupService userGroupService = 
Mockito.spy(NUserGroupService.class);
-  private final Integer LOGICAL_VIEW_CATCHUP_INTERVAL = 3;
+  private SparkDDLService ddlService;
+  private TableExtService tableExtService;
+  private TableService tableService;
+  private IUserGroupService userGroupService;
+  private Integer LOGICAL_VIEW_CATCHUP_INTERVAL = 3;
 
   // Hive View
   private static final String CREATEVIEW_SQL1 =
@@ -122,6 +114,12 @@ public class SparkDDLTest extends 
NLocalFileMetadataTestCase {
   @Before
   public void setup() {
     createTestMetadata();
+
+    ddlService = Mockito.spy(new SparkDDLService());
+    tableExtService = Mockito.spy(new TableExtService());
+    tableService = Mockito.spy(new TableService());
+    userGroupService = Mockito.spy(NUserGroupService.class);
+
     Authentication authentication = new TestingAuthenticationToken("ADMIN",
         "ADMIN", Constant.ROLE_ADMIN);
     SecurityContextHolder.getContext().setAuthentication(authentication);
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index 3ade78d63a..6f3b1cf5ed 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
@@ -41,6 +43,8 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.guava30.shaded.common.cache.Cache;
+import org.apache.kylin.guava30.shaded.common.cache.CacheBuilder;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -67,6 +71,9 @@ import lombok.val;
 public class NSparkMetadataExplorer implements ISourceMetadataExplorer, 
ISampleDataDeployer, Serializable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(NSparkMetadataExplorer.class);
+    private static Cache<String, Boolean> tableAccessCache = 
CacheBuilder.newBuilder()
+            
.maximumSize(KylinConfig.getInstanceFromEnv().getTableAccessCacheSize())
+            
.expireAfterWrite(KylinConfig.getInstanceFromEnv().getTableAccessCacheTTL(), 
TimeUnit.MINUTES).build();
 
     public static String generateCreateSchemaSql(String schemaName) {
         return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", 
schemaName);
@@ -122,7 +129,6 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
     public List<String> listTables(String database) throws Exception {
         val ugi = UserGroupInformation.getCurrentUser();
         val config = KylinConfig.getInstanceFromEnv();
-        val spark = SparderEnv.getSparkSession();
 
         List<String> tables = Lists.newArrayList();
         try {
@@ -136,9 +142,10 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
             if (config.getTableAccessFilterEnable() && 
config.getKerberosProjectLevelEnable()
                     && UserGroupInformation.isSecurityEnabled()) {
                 List<String> accessTables = Lists.newArrayList();
+                boolean cacheEnabled = config.getTableAccessCacheEnable();
                 for (String table : tables) {
                     val tableName = database + "." + table;
-                    if (checkTableAccess(tableName)) {
+                    if (checkTableWithCache(cacheEnabled, ugi.getUserName(), 
tableName)) {
                         accessTables.add(table);
                     }
                 }
@@ -151,6 +158,15 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
         return tables;
     }
 
+    public boolean checkTableWithCache(boolean cacheEnabled, String user, 
String tableName) throws ExecutionException {
+        if (cacheEnabled) {
+            String cacheKey = user + "#tableAccess#" + tableName;
+            return tableAccessCache.get(cacheKey, () -> 
checkTableAccess(tableName));
+        } else {
+            return checkTableAccess(tableName);
+        }
+    }
+
     public boolean checkTableAccess(String tableName) {
         boolean isAccess = true;
         try {
@@ -257,7 +273,7 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
                 .orElseGet(Collections::emptyList).stream().map(field -> 
field.name) //
                 .collect(Collectors.toSet());
         int columnNumber = tableMeta.allColumns.size();
-        List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+        List<ColumnDesc> columns = new ArrayList<>(columnNumber);
         for (int i = 0; i < columnNumber; i++) {
             NSparkTableMeta.SparkTableColumnMeta field = 
tableMeta.allColumns.get(i);
             ColumnDesc cdesc = new ColumnDesc();
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java
index fddfa930a5..4ed485541b 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java
@@ -47,9 +47,21 @@ public class NSparkMetadataExplorerTest extends 
NLocalWithSparkSessionTest {
 
     @Test
     public void testListTables() throws Exception {
+        String testProject = "default";
+        String testTableName = "DEFAULT.TEST_KYLIN_FACT";
+
         NSparkMetadataExplorer sparkMetadataExplorer = new 
NSparkMetadataExplorer();
-        List<String> tables = sparkMetadataExplorer.listTables("");
+        NTableMetadataManager tableMgr = 
NTableMetadataManager.getInstance(getTestConfig(), testProject);
+        TableDesc fact = tableMgr.getTableDesc(testTableName);
+        sparkMetadataExplorer.createSampleTable(fact);
+
+        List<String> tables = sparkMetadataExplorer.listTables(testProject);
         Assert.assertTrue(tables != null && tables.size() > 0);
+
+        // test tableAccessCache, use only under Project Kerberos
+        boolean access = sparkMetadataExplorer.checkTableWithCache(false, 
"testUser", testTableName);
+        boolean cacheAccess = sparkMetadataExplorer.checkTableWithCache(true, 
"testUser", testTableName);
+        Assert.assertEquals(access, cacheAccess);
     }
 
     @Test

Reply via email to