This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ebd2f3d6704a566896b60f7a3c67bf7226c67348 Author: haocheni <hao_...@qq.com> AuthorDate: Thu Sep 14 10:00:44 2023 +0800 KYLIN-5814 Support lettuce sentinel mode --- pom.xml | 7 + src/common-service/pom.xml | 4 + .../kylin/rest/cache/AbstractKylinCache.java | 78 +++++++ .../org/apache/kylin/rest/cache/CacheConstant.java | 29 +++ .../org/apache/kylin/rest/cache/RedisCache.java | 91 ++------- .../org/apache/kylin/rest/cache/RedisCacheV2.java | 224 +++++++++++++++++++++ .../apache/kylin/rest/cache/CacheConstantTest.java | 30 +++ .../apache/kylin/rest/cache/RedisCacheTest.java | 36 ++++ .../apache/kylin/rest/cache/RedisCacheV2Test.java | 195 ++++++++++++++++++ .../org/apache/kylin/common/KylinConfigBase.java | 8 + .../kylin/common/exception/ServerErrorCode.java | 1 + .../resources/kylin_errorcode_conf_en.properties | 1 + .../resources/kylin_errorcode_conf_zh.properties | 1 + .../kylin/metadata/query/QueryMetricsContext.java | 3 +- src/query-service/pom.xml | 5 + .../kylin/rest/service/QueryCacheManager.java | 113 +++++++++-- .../kylin/rest/service/QueryCacheManagerTest.java | 191 ++++++++++++++++++ 17 files changed, 923 insertions(+), 94 deletions(-) diff --git a/pom.xml b/pom.xml index 88261b4d28..07224bd0f1 100644 --- a/pom.xml +++ b/pom.xml @@ -188,6 +188,7 @@ <ehcache.version>2.10.9.2</ehcache.version> <net.spy.memcached.verion>2.12.3</net.spy.memcached.verion> <redis.version>3.8.0</redis.version> + <lettuce.version>6.1.8.RELEASE</lettuce.version> <apache-httpclient.version>4.5.13</apache-httpclient.version> <beanutils.version>1.9.4</beanutils.version> <xerces.version>2.12.2</xerces.version> @@ -797,6 +798,12 @@ <version>${redis.version}</version> </dependency> + <dependency> + <groupId>io.lettuce</groupId> + <artifactId>lettuce-core</artifactId> + <version>${lettuce.version}</version> + </dependency> + <!--KAP test scope--> <dependency> <groupId>org.apache.kylin</groupId> diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml index 747735c9ea..7a0d6d3403 100644 --- a/src/common-service/pom.xml +++ b/src/common-service/pom.xml @@ -75,6 +75,10 @@ <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> + <dependency> + <groupId>io.lettuce</groupId> + <artifactId>lettuce-core</artifactId> + </dependency> <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/AbstractKylinCache.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/AbstractKylinCache.java new file mode 100644 index 0000000000..9461a29f09 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/AbstractKylinCache.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +import static org.apache.kylin.rest.cache.CacheConstant.PREFIX; + +import java.nio.charset.StandardCharsets; +import java.util.Locale; + +import org.apache.kylin.common.util.CompressionUtils; +import org.apache.kylin.rest.service.CommonQueryCacheSupporter; +import org.apache.kylin.rest.util.SerializeUtil; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractKylinCache implements KylinCache { + protected boolean isExceptionQuery(String type) { + return type.equals(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName); + } + + protected String getTypeProjectPrefix(String type, String project) { + return String.format(Locale.ROOT, "%s-%s", type, project); + } + + protected byte[] convertKeyToByte(String type, Object key) { + try { + String prefixAndType = PREFIX + type; + byte[] typeBytes = getBytesFromString(prefixAndType); + byte[] keyBytes = SerializeUtil.serialize(key); + byte[] trueKeyBytes = new byte[keyBytes.length + typeBytes.length]; + System.arraycopy(typeBytes, 0, trueKeyBytes, 0, typeBytes.length); + System.arraycopy(keyBytes, 0, trueKeyBytes, typeBytes.length, keyBytes.length); + return trueKeyBytes; + } catch (Exception e) { + log.error("serialize fail!", e); + return new byte[0]; + } + } + + protected byte[] convertValueToByte(Object value) { + try { + return CompressionUtils.compress(SerializeUtil.serialize(value)); + } catch (Exception e) { + log.error("serialize failed!", e); + return new byte[0]; + } + } + + protected byte[] getBytesFromString(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + protected Object convertByteToObject(byte[] bytes) { + try { + return SerializeUtil.deserialize(CompressionUtils.decompress(bytes)); + } catch (Exception e) { + log.error("deserialize fail!", e); + return null; + } + } +} diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/CacheConstant.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/CacheConstant.java new file mode 100644 index 0000000000..53544743d8 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/CacheConstant.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +public class CacheConstant { + public static final String NX = "NX"; + public static final String XX = "XX"; + public static final String PREFIX = "Kylin-"; + + private CacheConstant() { + throw new IllegalStateException("Utility class"); + } +} diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCache.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCache.java index cf49508eb3..5ef1d8568d 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCache.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCache.java @@ -18,22 +18,32 @@ package org.apache.kylin.rest.cache; -import org.apache.kylin.guava30.shaded.common.base.Joiner; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; +import static org.apache.kylin.rest.cache.CacheConstant.NX; +import static org.apache.kylin.rest.cache.CacheConstant.PREFIX; +import static org.apache.kylin.rest.cache.CacheConstant.XX; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.Singletons; -import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.EncryptUtil; -import org.apache.kylin.rest.service.CommonQueryCacheSupporter; -import org.apache.kylin.rest.util.SerializeUtil; +import org.apache.kylin.guava30.shaded.common.base.Joiner; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; @@ -48,24 +58,10 @@ import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.params.SetParams; import redis.clients.jedis.util.JedisClusterCRC16; -import java.io.UnsupportedEncodingException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -public class RedisCache implements KylinCache { +public class RedisCache extends AbstractKylinCache implements KylinCache { private static final Logger logger = LoggerFactory.getLogger(RedisCache.class); - private static final String NX = "NX"; - private static final String XX = "XX"; - private static final String PREFIX = "Kylin5-"; - private static final String CHARSET_NAME = "UTF-8"; private static final Charset CHARSET = StandardCharsets.UTF_8; private static final String SCAN_POINTER_START_STR = new String(ScanParams.SCAN_POINTER_START_BINARY, CHARSET); private static JedisPool jedisPool; @@ -234,14 +230,6 @@ public class RedisCache implements KylinCache { return cache; } - private String getTypeProjectPrefix(String type, String project) { - return String.format(Locale.ROOT, "%s-%s", type, project); - } - - private boolean isExceptionQuery(String type) { - return type.equals(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName); - } - @Override public void put(String type, String project, Object key, Object value) { if (isRecovering.get()) { @@ -457,47 +445,6 @@ public class RedisCache implements KylinCache { return keys; } - private byte[] convertValueToByte(Object value) { - try { - return CompressionUtils.compress(SerializeUtil.serialize(value)); - } catch (Exception e) { - logger.error("serialize failed!", e); - return null; - } - } - - private byte[] getBytesFromString(String str) { - try { - return str.getBytes(CHARSET_NAME); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("unsupported encoding:" + CHARSET_NAME, e); - } - } - - private byte[] convertKeyToByte(String type, Object key) { - try { - String prefixAndType = PREFIX + type; - byte[] typeBytes = getBytesFromString(prefixAndType); - byte[] keyBytes = SerializeUtil.serialize(key); - byte[] trueKeyBytes = new byte[keyBytes.length + typeBytes.length]; - System.arraycopy(typeBytes, 0, trueKeyBytes, 0, typeBytes.length); - System.arraycopy(keyBytes, 0, trueKeyBytes, typeBytes.length, keyBytes.length); - return trueKeyBytes; - } catch (Exception e) { - logger.error("serialize fail!", e); - return null; - } - } - - private Object convertByteToObject(byte[] bytes) { - try { - return SerializeUtil.deserialize(CompressionUtils.decompress(bytes)); - } catch (Exception e) { - logger.error("deserialize fail!", e); - return null; - } - } - @Override public boolean remove(String type, String project, Object key) { if (isRecovering.get()) { diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCacheV2.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCacheV2.java new file mode 100644 index 0000000000..99fe1cd50e --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/RedisCacheV2.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +import static org.apache.kylin.common.exception.ServerErrorCode.REDIS_INIT_FAILED; +import static org.apache.kylin.rest.cache.CacheConstant.NX; +import static org.apache.kylin.rest.cache.CacheConstant.PREFIX; +import static org.apache.kylin.rest.cache.CacheConstant.XX; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.util.EncryptUtil; + +import io.lettuce.core.KeyScanCursor; +import io.lettuce.core.ReadFrom; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.ScanArgs; +import io.lettuce.core.SetArgs; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.internal.HostAndPort; +import io.lettuce.core.masterreplica.MasterReplica; +import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RedisCacheV2 extends AbstractKylinCache implements KylinCache { + private static final AtomicBoolean isRecovering = new AtomicBoolean(false); + private String redisExpireTimeUnit; + private long redisExpireTime; + private long redisExpireTimeForException; + private StatefulRedisMasterReplicaConnection<byte[], byte[]> redisConnection; + private RedisCommands<byte[], byte[]> syncCommands; + + private RedisCacheV2() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + loadConfigurations(kylinConfig); + createRedisClient(kylinConfig); + log.info("Redis init success."); + } + + public static KylinCache getInstance() { + try { + return Singletons.getInstance(RedisCacheV2.class); + } catch (Exception e) { + log.error("Redis init failed:{} ", ExceptionUtils.getStackTrace(e)); + } + return null; + } + + @Override + public void put(String type, String project, Object key, Object value) { + long expireTime = isExceptionQuery(type) ? redisExpireTimeForException : redisExpireTime; + put(getTypeProjectPrefix(type, project), key, value, NX, redisExpireTimeUnit, expireTime); + } + + public void put(String type, Object key, Object value, String ifExist, String expireTimeUnit, long expireTime) { + byte[] realKey = convertKeyToByte(type, key); + byte[] valueBytes = convertValueToByte(value); + syncCommands.set(realKey, valueBytes, createSetArgs(ifExist, expireTimeUnit, expireTime)); + } + + @Override + public void update(String type, String project, Object key, Object value) { + long expireTime = isExceptionQuery(type) ? redisExpireTimeForException : redisExpireTime; + put(getTypeProjectPrefix(type, project), key, value, XX, redisExpireTimeUnit, expireTime); + } + + @Override + public Object get(String type, String project, Object key) { + byte[] realKey = convertKeyToByte(getTypeProjectPrefix(type, project), key); + log.trace("redis get start"); + byte[] sqlResp = syncCommands.get(realKey); + log.trace("redis get done, size = {}bytes", sqlResp == null ? 0 : sqlResp.length); + + if (sqlResp != null) { + Object result = convertByteToObject(sqlResp); + log.trace("redis result deserialized"); + return result; + } + return null; + } + + @Override + public boolean remove(String type, String project, Object key) { + Long removedCount = syncCommands.del(convertKeyToByte(getTypeProjectPrefix(type, project), key)); + return removedCount != null && removedCount > 0; + } + + @Override + public void clearAll() { + clearByType("", ""); + } + + @Override + public void clearByType(String type, String project) { + String prefixAndType = PREFIX; + if (!type.isEmpty()) { + prefixAndType += getTypeProjectPrefix(type, project); + } + KeyScanCursor<byte[]> scanCursor = syncCommands.scan(ScanArgs.Builder.matches(prefixAndType + "*")); + + if (scanCursor.isFinished()) { + for (byte[] key : scanCursor.getKeys()) { + syncCommands.del(key); + } + } + while (!scanCursor.isFinished()) { + for (byte[] key : scanCursor.getKeys()) { + syncCommands.del(key); + } + scanCursor = syncCommands.scan(scanCursor); + } + } + + public KylinCache recoverInstance() { + KylinCache kylinCache = this; + if (isRecovering.compareAndSet(false, true)) { + try { + log.info("Destroy RedisCacheV2."); + if (redisConnection != null) { + redisConnection.close(); + } + Singletons.clearInstance(RedisCacheV2.class); + log.info("Initiate RedisCacheV2."); + kylinCache = Singletons.getInstance(RedisCacheV2.class); + } finally { + isRecovering.set(false); + } + } + return kylinCache; + } + + private void loadConfigurations(KylinConfig kylinConfig) { + redisExpireTimeUnit = kylinConfig.getRedisExpireTimeUnit(); + redisExpireTime = kylinConfig.getRedisExpireTime(); + redisExpireTimeForException = kylinConfig.getRedisExpireTimeForException(); + } + + private List<HostAndPort> parseHosts(KylinConfig kylinConfig) { + String redisHosts = kylinConfig.getRedisHosts().trim(); + String[] hostAndPorts = redisHosts.split(","); + if (Arrays.stream(hostAndPorts).anyMatch(StringUtils::isBlank)) { + throw new KylinException(REDIS_INIT_FAILED, "Redis client init failed because there are " + + "some errors in kylin.properties for 'kylin.cache.redis.hosts'"); + } + + return Arrays.stream(hostAndPorts).map(hostAndPort -> { + String host = hostAndPort.split(":")[0].trim(); + int port = Integer.parseInt(hostAndPort.split(":")[1]); + return HostAndPort.of(host, port); + }).collect(Collectors.toList()); + } + + private char[] parsePassword(KylinConfig kylinConfig) { + String redisPassword = kylinConfig.getRedisPassword(); + if (EncryptUtil.isEncrypted(redisPassword)) { + redisPassword = EncryptUtil.getDecryptedValue(redisPassword); + } + return redisPassword == null ? null : redisPassword.toCharArray(); + } + + private void createRedisClient(KylinConfig kylinConfig) { + List<HostAndPort> hostAndPorts = parseHosts(kylinConfig); + char[] redisPassword = parsePassword(kylinConfig); + + log.info("The 'kylin.cache.redis.sentinel-enabled' is {}", kylinConfig.isRedisSentinelEnabled()); + if (kylinConfig.isRedisSentinelEnabled()) { + log.info("kylin will use redis sentinel"); + RedisURI redisURI = buildRedisURI(kylinConfig, hostAndPorts, redisPassword); + RedisClient redisClient = RedisClient.create(); + redisConnection = MasterReplica.connect(redisClient, ByteArrayCodec.INSTANCE, redisURI); + redisConnection.setReadFrom(ReadFrom.REPLICA_PREFERRED); + syncCommands = redisConnection.sync(); + log.info("Lettuce sentinel ping:{}", syncCommands.ping()); + } + } + + private RedisURI buildRedisURI(KylinConfig kylinConfig, List<HostAndPort> hostAndPorts, char[] redisPassword) { + RedisURI.Builder redisUriBuilder = RedisURI.builder() + .withSentinelMasterId(kylinConfig.getRedisSentinelMasterId()).withPassword(redisPassword) + .withTimeout(Duration.ofMillis(kylinConfig.getRedisConnectionTimeout())); + for (HostAndPort hostAndPort : hostAndPorts) { + redisUriBuilder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort()); + } + RedisURI redisURI = redisUriBuilder.build(); + log.info("Redis uri:{}", redisURI); + return redisURI; + } + + private SetArgs createSetArgs(String ifExist, String expireTimeUnit, Long expireTime) { + SetArgs setArgs = new SetArgs(); + setArgs = expireTimeUnit.equals("EX") ? setArgs.ex(expireTime) : setArgs.px(expireTime); + setArgs = ifExist.equals(NX) ? setArgs.nx() : setArgs.xx(); + return setArgs; + } + +} diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/CacheConstantTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/CacheConstantTest.java new file mode 100644 index 0000000000..89942597f7 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/CacheConstantTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +import org.apache.kylin.common.Singletons; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class CacheConstantTest { + @Test + void test() { + Assertions.assertThrows(RuntimeException.class, () -> Singletons.getInstance(CacheConstant.class)); + } +} diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheTest.java new file mode 100644 index 0000000000..dc29a70a44 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +@MetadataInfo +@ExtendWith(MockitoExtension.class) +class RedisCacheTest { + + @Test + void testRecoverInstance() { + assertDoesNotThrow(RedisCache::recoverInstance); + } +} diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheV2Test.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheV2Test.java new file mode 100644 index 0000000000..50b8cdb241 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/RedisCacheV2Test.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache; + +import static org.apache.kylin.common.util.EncryptUtil.ENC_PREFIX; +import static org.apache.kylin.common.util.EncryptUtil.ENC_SUBFIX; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.zip.DataFormatException; + +import org.apache.kylin.common.util.CompressionUtils; +import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.kylin.rest.util.SerializeUtil; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +import io.lettuce.core.KeyScanCursor; +import io.lettuce.core.ScanArgs; +import io.lettuce.core.api.sync.RedisCommands; + +@MetadataInfo +@ExtendWith(MockitoExtension.class) +class RedisCacheV2Test { + @Mock + private RedisCommands<byte[], byte[]> syncCommands; + + @InjectMocks + private RedisCacheV2 redisCacheV2; + + @Test + void testPut() { + String type = "type"; + String project = "project"; + Object key = new Object(); + Object value = new Object(); + + redisCacheV2.put(type, project, key, value); + + verify(syncCommands, times(1)).set(any(), any(), any()); + } + + @Test + void testUpdate() { + String type = "type"; + String project = "project"; + Object key = new Object(); + Object value = new Object(); + + redisCacheV2.update(type, project, key, value); + + verify(syncCommands, times(1)).set(any(), any(), any()); + } + + @Test + void testGetNull() { + String type = "type"; + String project = "project"; + Object key = new Object(); + + when(syncCommands.get(any())).thenReturn(null); + + Object result = redisCacheV2.get(type, project, key); + + assertNull(result); + verify(syncCommands, times(1)).get(any()); + } + + @Test + void testGet() throws DataFormatException, IOException { + String type = "type"; + String project = "project"; + Object key = new Object(); + byte[] valueBytes = { -84, -19, 0, 5, 116, 0, 5, 118, 97, 108, 117, 101 }; + when(syncCommands.get(any())).thenReturn(valueBytes); + Object expectedResult = SerializeUtil.deserialize(CompressionUtils.decompress(valueBytes)); + + Object result = redisCacheV2.get(type, project, key); + + Assertions.assertEquals(expectedResult, result); + } + + @Test + void testRemove() { + String type = "type"; + String project = "project"; + Object key = new Object(); + + when(syncCommands.del(any())).thenReturn(1L); + + boolean result = redisCacheV2.remove(type, project, key); + + assertTrue(result); + verify(syncCommands, times(1)).del(any()); + } + + @Test + void testClearAll() { + mockScanResult(); + + redisCacheV2.clearAll(); + + verify(syncCommands, atLeastOnce()).scan(any(ScanArgs.class)); + verify(syncCommands, atLeastOnce()).del(any()); + } + + @Test + void testClearByType() { + String type = "type"; + String project = "project"; + mockScanResult(); + + redisCacheV2.clearByType(type, project); + + verify(syncCommands, atLeastOnce()).scan(any(ScanArgs.class)); + verify(syncCommands, atLeastOnce()).del(any()); + } + + @Test + void testGetInstance() { + Unsafe.overwriteSystemProp(Maps.newHashMap(), "kylin.cache.redis.sentinel-enabled", "TRUE"); + Unsafe.overwriteSystemProp(Maps.newHashMap(), "kylin.cache.redis.sentinel-master", "default-master"); + + KylinCache kylinCache = RedisCacheV2.getInstance(); + assertNull(kylinCache); + + Unsafe.clearProperty("kylin.cache.redis.sentinel-enabled"); + Unsafe.clearProperty("kylin.cache.redis.sentinel-master"); + } + + @Test + void testRecoverInstance() { + KylinCache kylinCache = redisCacheV2.recoverInstance(); + Assertions.assertNotNull(kylinCache); + } + + @Test + void testEncryptedPassword() { + Unsafe.overwriteSystemProp(Maps.newHashMap(), "kylin.cache.redis.password", + ENC_PREFIX + "password" + ENC_SUBFIX); + try { + RedisCacheV2.getInstance(); + } catch (Exception e) { + Assertions.fail(); + } + } + + @Test + void testRedisHostsError() { + Unsafe.overwriteSystemProp(Maps.newHashMap(), "kylin.cache.redis.hosts", " "); + + KylinCache kylinCache = RedisCacheV2.getInstance(); + assertNull(kylinCache); + + Unsafe.clearProperty("kylin.cache.redis.hosts"); + } + + private void mockScanResult() { + KeyScanCursor<byte[]> scanCursor = new KeyScanCursor<>(); + scanCursor.setFinished(true); + byte[] delKeys = new byte[] { 1, 2, 3 }; + ReflectionTestUtils.setField(scanCursor, "keys", Lists.newArrayList(delKeys)); + when(syncCommands.scan(any(ScanArgs.class))).thenReturn(scanCursor); + } +} diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e991f4d990..a2518358a9 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1970,6 +1970,14 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.cache.redis.cluster-enabled", FALSE)); } + public boolean isRedisSentinelEnabled() { + return Boolean.parseBoolean(getOptional("kylin.cache.redis.sentinel-enabled", FALSE)); + } + + public String getRedisSentinelMasterId() { + return getOptional("kylin.cache.redis.sentinel-master", null); + } + public String getRedisHosts() { return getOptional("kylin.cache.redis.hosts", "localhost:6379"); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java index 68aa6bcd3f..9b869c01a8 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java @@ -289,6 +289,7 @@ public enum ServerErrorCode implements ErrorCodeSupplier { // 10040XXX cache REDIS_CLEAR_ERROR("KE-010040001"), // + REDIS_INIT_FAILED("KE-010040002"), // // 10050XXX SQL DDL DDL_CHECK_ERROR("KE-010050001"); diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties index 02e3f7f830..d6fd4bd8d5 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties @@ -278,5 +278,6 @@ KE-010039001=Invalid Connection Info # cache KE-010040001=Clear Redis Cache Failure +KE-010040002=Redis Init Failed # SQL DDL KE-010050001=DDL Operation Failure \ No newline at end of file diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties index d8fbfc42ed..fa09dca37e 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties @@ -276,5 +276,6 @@ KE-010039001=连接信息有误 # cache KE-010040001=清空Redis缓存失败 +KE-010040002=Redis初始化失败 # SQL DDL KE-010050001=DDL操作失败 \ No newline at end of file diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryMetricsContext.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryMetricsContext.java index 341f7ecb48..3bc911a44c 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryMetricsContext.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryMetricsContext.java @@ -133,8 +133,7 @@ public class QueryMetricsContext extends QueryMetrics { if (context.getQueryTagInfo().isHitExceptionCache() || context.getQueryTagInfo().isStorageCacheUsed()) { this.isCacheHit = true; - this.cacheType = KylinConfig.getInstanceFromEnv().isRedisEnabled() ? QueryHistory.CacheType.REDIS.name() - : QueryHistory.CacheType.EHCACHE.name(); + this.cacheType = context.getQueryTagInfo().getStorageCacheType(); } this.resultRowCount = context.getMetrics().getResultRowCount(); this.queryMsg = context.getMetrics().getQueryMsg(); diff --git a/src/query-service/pom.xml b/src/query-service/pom.xml index da04266338..03fb2c0072 100644 --- a/src/query-service/pom.xml +++ b/src/query-service/pom.xml @@ -135,6 +135,11 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java index 862a7e9b8f..b809e7f210 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java @@ -36,6 +36,7 @@ import org.apache.kylin.rest.cache.KylinCache; import org.apache.kylin.rest.cache.KylinEhCache; import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache; import org.apache.kylin.rest.cache.RedisCache; +import org.apache.kylin.rest.cache.RedisCacheV2; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.response.TableMetaCacheResult; @@ -73,10 +74,15 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { public void init() { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); if (kylinConfig.isRedisEnabled()) { - kylinCache = RedisCache.getInstance(); + if (kylinConfig.isRedisSentinelEnabled()) { + kylinCache = RedisCacheV2.getInstance(); + } else { + kylinCache = RedisCache.getInstance(); + } } else if (kylinConfig.isMemcachedEnabled()) { kylinCache = CompositeMemcachedCache.getInstance(); - } else { + } + if (kylinCache == null) { kylinCache = KylinEhCache.getInstance(); } if (kylinCache instanceof RedisCache && checkRedisClient()) { @@ -164,7 +170,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { } public SQLResponse doSearchQuery(QueryCacheManager.Type type, SQLRequest sqlRequest) { - Object response = kylinCache.get(type.rootCacheName, sqlRequest.getProject(), sqlRequest.getCacheKey()); + Object response = getCache(type.rootCacheName, sqlRequest.getProject(), sqlRequest.getCacheKey()); logger.debug("[query cache log] The cache key is: {}", sqlRequest.getCacheKey()); if (response == null) { return null; @@ -254,7 +260,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { } public TableMetaCacheResult doGetSchemaCache(String project, String userName) { - Object metaList = kylinCache.get(Type.SCHEMA_CACHE.rootCacheName, project, userName); + Object metaList = getCache(Type.SCHEMA_CACHE.rootCacheName, project, userName); if (metaList == null) { return null; } @@ -286,7 +292,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { if (modelName != null) { cacheKey = cacheKey + modelName; } - Object metaList = kylinCache.get(Type.SCHEMA_CACHE.rootCacheName, project, cacheKey); + Object metaList = getCache(Type.SCHEMA_CACHE.rootCacheName, project, cacheKey); if (metaList == null) { return null; } @@ -298,15 +304,15 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { if (modelName != null) { cacheKey = cacheKey + modelName; } - kylinCache.put(Type.SCHEMA_CACHE.rootCacheName, project, cacheKey, schemas); + putCache(Type.SCHEMA_CACHE.rootCacheName, project, cacheKey, schemas); } public void clearSchemaCacheV2(String project, String userName) { - kylinCache.remove(Type.SCHEMA_CACHE.rootCacheName, project, userName + "v2"); + removeCache(Type.SCHEMA_CACHE.rootCacheName, project, userName + "v2"); } public void clearSchemaCache(String project, String userName) { - kylinCache.remove(Type.SCHEMA_CACHE.rootCacheName, project, userName); + removeCache(Type.SCHEMA_CACHE.rootCacheName, project, userName); } public void onClearSchemaCache(String project) { @@ -314,12 +320,12 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { } public void clearSchemaCache(String project) { - kylinCache.clearByType(Type.SCHEMA_CACHE.rootCacheName, project); + clearCacheByType(Type.SCHEMA_CACHE.rootCacheName, project); } public void clearQueryCache(SQLRequest request) { - kylinCache.remove(Type.SUCCESS_QUERY_CACHE.rootCacheName, request.getProject(), request.getCacheKey()); - kylinCache.remove(Type.EXCEPTION_QUERY_CACHE.rootCacheName, request.getProject(), request.getCacheKey()); + removeCache(Type.SUCCESS_QUERY_CACHE.rootCacheName, request.getProject(), request.getCacheKey()); + removeCache(Type.EXCEPTION_QUERY_CACHE.rootCacheName, request.getProject(), request.getCacheKey()); } public void onClearProjectCache(String project) { @@ -329,20 +335,44 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { public void clearProjectCache(String project) { if (project == null) { logger.info("[query cache log] clear query cache for all projects."); - kylinCache.clearAll(); + clearAllCache(); } else { logger.info("[query cache log] clear query cache for {}", project); - kylinCache.clearByType(Type.SUCCESS_QUERY_CACHE.rootCacheName, project); - kylinCache.clearByType(Type.EXCEPTION_QUERY_CACHE.rootCacheName, project); - kylinCache.clearByType(Type.SCHEMA_CACHE.rootCacheName, project); + clearCacheByType(Type.SUCCESS_QUERY_CACHE.rootCacheName, project); + clearCacheByType(Type.EXCEPTION_QUERY_CACHE.rootCacheName, project); + clearCacheByType(Type.SCHEMA_CACHE.rootCacheName, project); } } - public void recoverCache() { - boolean isRedisEnabled = KylinConfig.getInstanceFromEnv().isRedisEnabled(); - if (isRedisEnabled) { - RedisCache.recoverInstance(); - logger.info("[query cache log] Redis client recover successfully."); + public boolean recoverCache() { + logger.info("Redis client recovery start."); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + // no need to recover + if (!kylinConfig.isRedisEnabled()) { + logger.info("No need to recover Redis client, isRedisEnabled:{}", kylinConfig.isRedisEnabled()); + return false; + } + + if (!kylinConfig.isRedisSentinelEnabled()) { + // cluster、standalone + kylinCache = RedisCache.recoverInstance(); + } else { + // sentinel + if (kylinCache instanceof RedisCacheV2) { + kylinCache = ((RedisCacheV2) kylinCache).recoverInstance(); + } else { + kylinCache = RedisCacheV2.getInstance(); + } + } + + // recover failed,use Ehcache + if (kylinCache == null) { + logger.info("Redis client recover failed, use ehcache instead."); + kylinCache = KylinEhCache.getInstance(); + return false; + } else { + logger.info("Redis client recover successfully."); + return true; } } @@ -350,4 +380,47 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { public KylinCache getCache() { return kylinCache; } + + public Object getCache(String type, String project, Object key) { + try { + return kylinCache.get(type, project, key); + } catch (Exception e) { + logger.error("Get cache failed, type:{}, project:{}, key:{}, exception:{}", type, project, key, e); + } + return null; + } + + public void putCache(String type, String project, Object key, Object value) { + try { + kylinCache.put(type, project, key, value); + } catch (Exception e) { + logger.error("Put cache failed, type:{}, project:{}, key:{}, value:{}, exception:{}", type, project, key, + value, e); + } + } + + public boolean removeCache(String type, String project, Object key) { + try { + return kylinCache.remove(type, project, key); + } catch (Exception e) { + logger.error("Remove cache failed, type:{}, project:{}, key:{}, exception:{}", type, project, key, e); + } + return false; + } + + public void clearAllCache() { + try { + kylinCache.clearAll(); + } catch (Exception e) { + logger.error("Clear all cache failed, exception:", e); + } + } + + public void clearCacheByType(String type, String project) { + try { + kylinCache.clearByType(type, project); + } catch (Exception e) { + logger.error("Clear cache by type failed, type:{}, project:{}, exception:{}", type, project, e); + } + } } diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCacheManagerTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCacheManagerTest.java new file mode 100644 index 0000000000..5067356099 --- /dev/null +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCacheManagerTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.kylin.common.SystemPropertiesCache; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.kylin.rest.cache.KylinCache; +import org.apache.kylin.rest.cache.RedisCache; +import org.apache.kylin.rest.cache.RedisCacheV2; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +@MetadataInfo +@ExtendWith(MockitoExtension.class) +class QueryCacheManagerTest { + @Mock + private KylinCache kylinCache; + @InjectMocks + private QueryCacheManager queryCacheManager; + + @Test + void testInitIsRedisEnabled() { + System.setProperty("kylin.cache.redis.enabled", "true"); + try { + queryCacheManager.init(); + } catch (Exception e) { + Assertions.fail(); + } + System.clearProperty("kylin.cache.redis.enabled"); + } + + @Test + void testInitIsNotRedisEnabled() { + System.setProperty("kylin.cache.redis.enabled", "false"); + try { + queryCacheManager.init(); + } catch (Exception e) { + Assertions.fail(); + } + System.clearProperty("kylin.cache.redis.enabled"); + } + + @Test + void testInitIsRedisSentinelEnabled() { + System.setProperty("kylin.cache.redis.enabled", "true"); + System.setProperty("kylin.cache.redis.sentinel-enabled", "true"); + System.setProperty("kylin.cache.redis.sentinel-master", "master"); + + assertDoesNotThrow(() -> queryCacheManager.init()); + + System.clearProperty("kylin.cache.redis.enabled"); + System.clearProperty("kylin.cache.redis.sentinel-enabled"); + System.clearProperty("kylin.cache.redis.sentinel-master"); + } + + @Test + void testRecoverCacheWhenNoNeedToRecover() { + System.setProperty("kylin.cache.redis.enabled", "false"); + + boolean recoverResult = queryCacheManager.recoverCache(); + assertFalse(recoverResult); + + System.clearProperty("kylin.cache.redis.enabled"); + } + + @Test + void testRecoverCacheWhenCluster() { + System.setProperty("kylin.cache.redis.enabled", "true"); + kylinCache = Mockito.mock(RedisCache.class); + ReflectionTestUtils.setField(queryCacheManager, "kylinCache", kylinCache); + + try { + queryCacheManager.recoverCache(); + } catch (Exception e) { + Assertions.fail(); + } + + System.clearProperty("kylin.cache.redis.enabled"); + } + + @Test + void testRecoverCacheWhenSentinelRecoverTrue() { + SystemPropertiesCache.setProperty("kylin.cache.redis.enabled", "true"); + SystemPropertiesCache.setProperty("kylin.cache.redis.sentinel-enabled", "true"); + kylinCache = Mockito.mock(RedisCacheV2.class); + ReflectionTestUtils.setField(queryCacheManager, "kylinCache", kylinCache); + Mockito.when(((RedisCacheV2) kylinCache).recoverInstance()).thenReturn(kylinCache); + + boolean recoverResult = queryCacheManager.recoverCache(); + assertTrue(recoverResult); + + SystemPropertiesCache.clearProperty("kylin.cache.redis.enabled"); + SystemPropertiesCache.clearProperty("kylin.cache.redis.sentinel-enabled"); + } + + @Test + void testRecoverCacheWhenSentinelRecoverFailed() { + System.setProperty("kylin.cache.redis.enabled", "true"); + System.setProperty("kylin.cache.redis.sentinel-enabled", "true"); + kylinCache = Mockito.mock(RedisCacheV2.class); + ReflectionTestUtils.setField(queryCacheManager, "kylinCache", kylinCache); + + boolean recoverResult = queryCacheManager.recoverCache(); + assertFalse(recoverResult); + + System.clearProperty("kylin.cache.redis.enabled"); + System.clearProperty("kylin.cache.redis.sentinel-enabled"); + } + + @Test + void testGetCacheException() { + Mockito.when(kylinCache.get("type", "project", "key")).thenThrow(new RuntimeException()); + + Object value = queryCacheManager.getCache("type", "project", "key"); + + assertNull(value); + } + + @Test + void testPutCacheException() { + Mockito.doThrow(new RuntimeException()).when(kylinCache).put("type", "project", "key", "value"); + + assertDoesNotThrow(() -> queryCacheManager.putCache("type", "project", "key", "value")); + } + + @Test + void testRemoveCacheException() { + Mockito.doThrow(new RuntimeException()).when(kylinCache).remove("type", "project", "key"); + + boolean value = queryCacheManager.removeCache("type", "project", "key"); + + assertFalse(value); + } + + @Test + void testClearAllCacheException() { + Mockito.doThrow(new RuntimeException()).when(kylinCache).clearAll(); + + assertDoesNotThrow(() -> queryCacheManager.clearAllCache()); + } + + @Test + void testClearCacheByTypeException() { + Mockito.doThrow(new RuntimeException()).when(kylinCache).clearByType("type", "project"); + + assertDoesNotThrow(() -> queryCacheManager.clearCacheByType("type", "project")); + } + + @Test + void testClearSchemaCacheV2() { + assertDoesNotThrow(() -> queryCacheManager.clearSchemaCacheV2("project", "userName")); + } + + @Test + void testClearSchemaCache() { + assertDoesNotThrow(() -> queryCacheManager.clearSchemaCache("project", "userName")); + } + + @Test + void testOnClearSchemaCache() { + assertDoesNotThrow(() -> queryCacheManager.onClearProjectCache("project")); + } + +}