This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4d3418c237d4a839a85573712d0111d81086a477 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Wed Aug 30 14:11:32 2023 +0800 KYLIN-5798 Add user-table access cache for table-loading --- .../org/apache/kylin/common/KylinConfigBase.java | 13 +++++++++++ .../apache/kylin/common/KylinConfigBaseTest.java | 22 ++++++++++++++++++ .../apache/kylin/rest/service/SparkDDLTest.java | 26 ++++++++++------------ .../spark/source/NSparkMetadataExplorer.java | 22 +++++++++++++++--- .../spark/source/NSparkMetadataExplorerTest.java | 14 +++++++++++- 5 files changed, 79 insertions(+), 18 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3de55bf84e..8b9a0308f7 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2800,6 +2800,19 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.source.hive.table-access-filter-enabled", FALSE)); } + public boolean getTableAccessCacheEnable() { + return Boolean.parseBoolean(getOptional("kylin.source.hive.table-access-cache-enabled", TRUE)); + } + + public long getTableAccessCacheSize() { + return Long.parseLong(getOptional("kylin.source.hive.table-access-cache-size", ONE_HUNDRED_THOUSAND)); + } + + public long getTableAccessCacheTTL() { + return TimeUtil.timeStringAs(getOptional("kylin.source.hive.table-access-cache-ttl", "7d"), + TimeUnit.MINUTES); + } + public String[] getHiveDatabases() { return getOptionalStringArray("kylin.source.hive.databases", new String[0]); } diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java index 6be4411004..59e4a3fc9b 100644 --- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java +++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java @@ -36,6 +36,7 @@ package org.apache.kylin.common; +import static org.apache.kylin.common.KylinConfigBase.FALSE; import static org.apache.kylin.common.KylinConfigBase.PATH_DELIMITER; import static org.apache.kylin.common.KylinConfigBase.WRITING_CLUSTER_WORKING_DIR; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_ENABLE_KEY; @@ -1499,6 +1500,27 @@ class KylinConfigBaseTest { config.setProperty("kylin.metadata.audit-log.max-size", "3000000"); assertEquals(3000000, config.getMetadataAuditLogMaxSize()); } + + @Test + void testGetTableAccessCache() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + + assertTrue(config.getTableAccessCacheEnable()); + config.setProperty("kylin.source.hive.table-access-cache-enabled", FALSE); + assertFalse(config.getTableAccessCacheEnable()); + + assertEquals(100000, config.getTableAccessCacheSize()); + config.setProperty("kylin.source.hive.table-access-cache-size", "200000"); + assertEquals(200000, config.getTableAccessCacheSize()); + + assertEquals(10080, config.getTableAccessCacheTTL()); + config.setProperty("kylin.source.hive.table-access-cache-ttl", "1m"); + assertEquals(1, config.getTableAccessCacheTTL()); + config.setProperty("kylin.source.hive.table-access-cache-ttl", "1h"); + assertEquals(60, config.getTableAccessCacheTTL()); + config.setProperty("kylin.source.hive.table-access-cache-ttl", "1d"); + assertEquals(1440, config.getTableAccessCacheTTL()); + } } class EnvironmentUpdateUtils { diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java index cf23d159aa..dc293e2b0c 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java @@ -32,6 +32,7 @@ import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; @@ -41,36 +42,27 @@ import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.ViewRequest; import org.apache.kylin.rest.response.LoadTableResponse; import org.apache.kylin.rest.response.LogicalViewResponse; - import org.apache.spark.sql.LogicalViewLoader; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.common.SparkDDLTestUtils; - import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.test.util.ReflectionTestUtils; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - public class SparkDDLTest extends NLocalFileMetadataTestCase { - @Autowired - private final SparkDDLService ddlService = Mockito.spy(new SparkDDLService()); - @Autowired - private final TableExtService tableExtService = Mockito.spy(new TableExtService()); - @Autowired - private final TableService tableService = Mockito.spy(new TableService()); - @Autowired - private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class); - private final Integer LOGICAL_VIEW_CATCHUP_INTERVAL = 3; + private SparkDDLService ddlService; + private TableExtService tableExtService; + private TableService tableService; + private IUserGroupService userGroupService; + private Integer LOGICAL_VIEW_CATCHUP_INTERVAL = 3; // Hive View private static final String CREATEVIEW_SQL1 = @@ -122,6 +114,12 @@ public class SparkDDLTest extends NLocalFileMetadataTestCase { @Before public void setup() { createTestMetadata(); + + ddlService = Mockito.spy(new SparkDDLService()); + tableExtService = Mockito.spy(new TableExtService()); + tableService = Mockito.spy(new TableService()); + userGroupService = Mockito.spy(NUserGroupService.class); + Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); SecurityContextHolder.getContext().setAuthentication(authentication); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java index 3ade78d63a..6f3b1cf5ed 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -41,6 +43,8 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.guava30.shaded.common.cache.Cache; +import org.apache.kylin.guava30.shaded.common.cache.CacheBuilder; import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.ISourceAware; @@ -67,6 +71,9 @@ import lombok.val; public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer, Serializable { private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class); + private static Cache<String, Boolean> tableAccessCache = CacheBuilder.newBuilder() + .maximumSize(KylinConfig.getInstanceFromEnv().getTableAccessCacheSize()) + .expireAfterWrite(KylinConfig.getInstanceFromEnv().getTableAccessCacheTTL(), TimeUnit.MINUTES).build(); public static String generateCreateSchemaSql(String schemaName) { return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName); @@ -122,7 +129,6 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD public List<String> listTables(String database) throws Exception { val ugi = UserGroupInformation.getCurrentUser(); val config = KylinConfig.getInstanceFromEnv(); - val spark = SparderEnv.getSparkSession(); List<String> tables = Lists.newArrayList(); try { @@ -136,9 +142,10 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable() && UserGroupInformation.isSecurityEnabled()) { List<String> accessTables = Lists.newArrayList(); + boolean cacheEnabled = config.getTableAccessCacheEnable(); for (String table : tables) { val tableName = database + "." + table; - if (checkTableAccess(tableName)) { + if (checkTableWithCache(cacheEnabled, ugi.getUserName(), tableName)) { accessTables.add(table); } } @@ -151,6 +158,15 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD return tables; } + public boolean checkTableWithCache(boolean cacheEnabled, String user, String tableName) throws ExecutionException { + if (cacheEnabled) { + String cacheKey = user + "#tableAccess#" + tableName; + return tableAccessCache.get(cacheKey, () -> checkTableAccess(tableName)); + } else { + return checkTableAccess(tableName); + } + } + public boolean checkTableAccess(String tableName) { boolean isAccess = true; try { @@ -257,7 +273,7 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD .orElseGet(Collections::emptyList).stream().map(field -> field.name) // .collect(Collectors.toSet()); int columnNumber = tableMeta.allColumns.size(); - List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber); + List<ColumnDesc> columns = new ArrayList<>(columnNumber); for (int i = 0; i < columnNumber; i++) { NSparkTableMeta.SparkTableColumnMeta field = tableMeta.allColumns.get(i); ColumnDesc cdesc = new ColumnDesc(); diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java index fddfa930a5..4ed485541b 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorerTest.java @@ -47,9 +47,21 @@ public class NSparkMetadataExplorerTest extends NLocalWithSparkSessionTest { @Test public void testListTables() throws Exception { + String testProject = "default"; + String testTableName = "DEFAULT.TEST_KYLIN_FACT"; + NSparkMetadataExplorer sparkMetadataExplorer = new NSparkMetadataExplorer(); - List<String> tables = sparkMetadataExplorer.listTables(""); + NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), testProject); + TableDesc fact = tableMgr.getTableDesc(testTableName); + sparkMetadataExplorer.createSampleTable(fact); + + List<String> tables = sparkMetadataExplorer.listTables(testProject); Assert.assertTrue(tables != null && tables.size() > 0); + + // test tableAccessCache, use only under Project Kerberos + boolean access = sparkMetadataExplorer.checkTableWithCache(false, "testUser", testTableName); + boolean cacheAccess = sparkMetadataExplorer.checkTableWithCache(true, "testUser", testTableName); + Assert.assertEquals(access, cacheAccess); } @Test