This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new b0169af73c KYLIN-5280 Introduce memcached as a distributed cache for queries for Kylin5 (#2096) b0169af73c is described below commit b0169af73c77f32908d3dda18ca22c7699b663dd Author: zznlime <68555081+zznl...@users.noreply.github.com> AuthorDate: Fri Mar 3 13:50:26 2023 +0800 KYLIN-5280 Introduce memcached as a distributed cache for queries for Kylin5 (#2096) * [EBAY-KYLIN-3526] Introduce memcached as a distributed cache for queries for Kylin5 (#33) * [EBAY-KYLIN-3526] Introduce memcached as a distributed cache for queries for Kylin5 Co-authored-by: zhennzhang <zhennzh...@ebay.com> fix implicit conversion from Array to String [EBAY-KYLIN-3614] Add UTs for memcached on Kylin5.0 KYLIN-5280 add UT with a embedded memcached server refine sonar issues KYLIN-5280 add ut for memcached cache status enable rename with tomcat log file rotate add UT for KeyHooklookup.KeyHook interface add UT for KeyHooklookup.KeyHook interface add UT for KeyHooklookup.KeyHook interface * KYLIN-5388 Fix ut coverage * KYLIN-5388 Fix ut case * KYLIN-5388 Add ut case for KeyHook and MemcachedConnectionFactory. * KYLIN-5388 Add exception thrown ut cases for CompositeMemcachedCache, add default settings ut case for memcachedConnectionFactoryBuilder * KYLIN-5388 Add ut case for RefinedKetamaModeLocator --- pom.xml | 12 + src/common-service/pom.xml | 4 + .../spy/memcached/RefinedKetamaNodeLocator.java | 283 +++++++++ .../memcached/protocol/TCPMemcachedNodeImpl.java | 663 +++++++++++++++++++++ .../kylin/rest/cache/memcached/CacheStats.java | 105 ++++ .../cache/memcached/CompositeMemcachedCache.java | 270 +++++++++ .../kylin/rest/cache/memcached/KeyHookLookup.java | 105 ++++ .../kylin/rest/cache/memcached/MemcachedCache.java | 390 ++++++++++++ .../rest/cache/memcached/MemcachedCacheConfig.java | 122 ++++ .../cache/memcached/MemcachedChunkingCache.java | 266 +++++++++ .../memcached/MemcachedConnectionFactory.java | 179 ++++++ .../MemcachedConnectionFactoryBuilder.java | 176 ++++++ .../memcached/MemcacheConnectionFactoryTest.java | 143 +++++ .../rest/cache/memcached/MemcachedCacheTest.java | 109 ++++ .../memcached/MemcachedChunkingCacheTest.java | 189 ++++++ .../org/apache/kylin/common/KylinConfigBase.java | 25 + src/query-service/pom.xml | 5 + .../kylin/rest/service/QueryCacheManager.java | 15 +- .../service/QueryCompositeMemcachedCacheTest.java | 316 ++++++++++ 19 files changed, 3376 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e1a852cf3c..516eae722b 100644 --- a/pom.xml +++ b/pom.xml @@ -160,6 +160,7 @@ <assertj-core.version>3.11.1</assertj-core.version> <awaitility.version>3.0.0</awaitility.version> <embedded-redis.version>0.6</embedded-redis.version> + <embedded-memcached.version>4.1.3</embedded-memcached.version> <!-- Commons --> <commons-lang3.version>3.10</commons-lang3.version> @@ -176,6 +177,7 @@ <slf4j.version>1.7.30</slf4j.version> <apache-log4j.version>2.12.1</apache-log4j.version> <ehcache.version>2.10.9.2</ehcache.version> + <net.spy.memcached.verion>2.12.3</net.spy.memcached.verion> <redis.version>3.8.0</redis.version> <apache-httpclient.version>4.5.13</apache-httpclient.version> <beanutils.version>1.9.4</beanutils.version> @@ -2245,6 +2247,11 @@ <artifactId>ehcache</artifactId> <version>${ehcache.version}</version> </dependency> + <dependency> + <groupId>net.spy</groupId> + <artifactId>spymemcached</artifactId> + <version>${net.spy.memcached.verion}</version> + </dependency> <dependency> <groupId>org.opensaml</groupId> <artifactId>opensaml</artifactId> @@ -2256,6 +2263,11 @@ <version>${embedded-redis.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.code.simple-spring-memcached</groupId> + <artifactId>jmemcached-maven-plugin</artifactId> + <version>${embedded-memcached.version}</version> + </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml index db687b8f80..081ad89602 100644 --- a/src/common-service/pom.xml +++ b/src/common-service/pom.xml @@ -77,6 +77,10 @@ <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> </dependency> + <dependency> + <groupId>net.spy</groupId> + <artifactId>spymemcached</artifactId> + </dependency> <!-- Spring Boot --> diff --git a/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java b/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java new file mode 100644 index 0000000000..55ba0565c0 --- /dev/null +++ b/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.spy.memcached; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; + +import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration; +import net.spy.memcached.util.KetamaNodeLocatorConfiguration; + +/** + * Copyright (C) 2006-2009 Dustin Sallings + * Copyright (C) 2009-2011 Couchbase, Inc. + * + * This is a modified version of the Ketama consistent hash strategy from + * last.fm. This implementation may not be compatible with libketama as hashing + * is considered separate from node location. + * + * The only modified method is the getSequence(). + * The previous 7 may be too small to reduce the chance to get all down nodes. + * + * Note that this implementation does not currently supported weighted nodes. + * + * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/">RJ's + * blog post</a> + */ +public final class RefinedKetamaNodeLocator extends SpyObject implements NodeLocator { + + private final HashAlgorithm hashAlg; + private final Map<InetSocketAddress, Integer> weights; + private final boolean isWeightedKetama; + private final KetamaNodeLocatorConfiguration config; + private AtomicReference<TreeMap<Long, MemcachedNode>> ketamaNodes = new AtomicReference<>(); + private AtomicReference<Collection<MemcachedNode>> allNodes = new AtomicReference<>(); + + /** + * Create a new KetamaNodeLocator using specified nodes and the specifed hash + * algorithm. + * + * @param nodes The List of nodes to use in the Ketama consistent hash + * continuum + * @param alg The hash algorithm to use when choosing a node in the Ketama + * consistent hash continuum + */ + public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) { + this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<>()); + } + + /** + * Create a new KetamaNodeLocator with specific nodes, hash, node key format, + * and weight + * + * @param nodes The List of nodes to use in the Ketama consistent hash + * continuum + * @param alg The hash algorithm to use when choosing a node in the Ketama + * consistent hash continuum + * @param nodeKeyFormat the format used to name the nodes in Ketama, either + * SPYMEMCACHED or LIBMEMCACHED + * @param weights node weights for ketama, a map from InetSocketAddress to + * weight as Integer + */ + public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, + KetamaNodeKeyFormatter.Format nodeKeyFormat, Map<InetSocketAddress, Integer> weights) { + this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat))); + } + + /** + * Create a new KetamaNodeLocator using specified nodes and the specifed hash + * algorithm and configuration. + * + * @param nodes The List of nodes to use in the Ketama consistent hash + * continuum + * @param alg The hash algorithm to use when choosing a node in the Ketama + * consistent hash continuum + * @param conf + */ + public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) { + this(nodes, alg, new HashMap<>(), conf); + } + + /** + * Create a new KetamaNodeLocator with specific nodes, hash, node key format, + * and weight + * + * @param nodes The List of nodes to use in the Ketama consistent hash + * continuum + * @param alg The hash algorithm to use when choosing a node in the Ketama + * consistent hash continuum + * @param nodeWeights node weights for ketama, a map from InetSocketAddress to + * weight as Integer + * @param configuration node locator configuration + */ + public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, + Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration configuration) { + super(); + allNodes.set(nodes); + hashAlg = alg; + config = configuration; + weights = nodeWeights; + isWeightedKetama = !weights.isEmpty(); + setKetamaNodes(nodes); + } + + private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, Collection<MemcachedNode> an, HashAlgorithm alg, + Map<InetSocketAddress, Integer> nodeWeights, KetamaNodeLocatorConfiguration conf) { + super(); + ketamaNodes.set(smn); + allNodes.set(an); + hashAlg = alg; + config = conf; + weights = nodeWeights; + isWeightedKetama = !weights.isEmpty(); + } + + public Collection<MemcachedNode> getAll() { + return allNodes.get(); + } + + public MemcachedNode getPrimary(final String k) { + MemcachedNode rv = getNodeForKey(hashAlg.hash(k)); + if (null == rv) { + throw new IllegalArgumentException("Found no node for key" + k); + } + return rv; + } + + long getMaxKey() { + return getKetamaNodes().lastKey(); + } + + MemcachedNode getNodeForKey(long hash) { + final MemcachedNode rv; + if (!ketamaNodes.get().containsKey(hash)) { + // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5 + // in a lot of places, so I'm doing this myself. + SortedMap<Long, MemcachedNode> tailMap = getKetamaNodes().tailMap(hash); + if (tailMap.isEmpty()) { + hash = getKetamaNodes().firstKey(); + } else { + hash = tailMap.firstKey(); + } + } + rv = getKetamaNodes().get(hash); + return rv; + } + + /** + * the previous 7 may be too small to reduce the chance to get all down nodes + * @param k + * @return + */ + public Iterator<MemcachedNode> getSequence(String k) { + // Seven searches gives us a 1 in 2^maxTry chance of hitting the + // same dead node all of the time. + int maxTry = config.getNodeRepetitions() + 1; + if (maxTry < 20) { + maxTry = 20; + } + return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg); + } + + public NodeLocator getReadonlyCopy() { + TreeMap<Long, MemcachedNode> smn = new TreeMap<>(getKetamaNodes()); + Collection<MemcachedNode> an = new ArrayList<>(allNodes.get().size()); + + // Rewrite the values a copy of the map. + for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) { + smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue())); + } + + // Copy the allNodes collection. + for (MemcachedNode n : allNodes.get()) { + an.add(new MemcachedNodeROImpl(n)); + } + + return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config); + } + + @Override + public void updateLocator(List<MemcachedNode> nodes) { + allNodes.set(nodes); + setKetamaNodes(nodes); + } + + /** + * @return the ketamaNodes + */ + protected TreeMap<Long, MemcachedNode> getKetamaNodes() { + return ketamaNodes.get(); + } + + /** + * Setup the KetamaNodeLocator with the list of nodes it should use. + * + * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use in + * its continuum + */ + @SuppressWarnings({"squid:S3776"}) + protected void setKetamaNodes(List<MemcachedNode> nodes) { + TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<>(); + int numReps = config.getNodeRepetitions(); + int nodeCount = nodes.size(); + int totalWeight = 0; + + if (isWeightedKetama) { + for (MemcachedNode node : nodes) { + totalWeight += weights.get(node.getSocketAddress()); + } + } + + for (MemcachedNode node : nodes) { + if (isWeightedKetama) { + + int thisWeight = weights.get(node.getSocketAddress()); + float percent = (totalWeight == 0 ? 0f : (float) thisWeight / (float) totalWeight); + int pointerPerServer = (int) ((Math.floor( + (float) (percent * config.getNodeRepetitions() / 4 * nodeCount + 0.0000000001))) + * 4); + for (int i = 0; i < pointerPerServer / 4; i++) { + for (long position : ketamaNodePositionsAtIteration(node, i)) { + newNodeMap.put(position, node); + getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position); + } + } + } else { + // Ketama does some special work with md5 where it reuses chunks. + // Check to be backwards compatible, the hash algorithm does not + // matter for Ketama, just the placement should always be done using + // MD5 + if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) { + for (int i = 0; i < numReps / 4; i++) { + for (long position : ketamaNodePositionsAtIteration(node, i)) { + newNodeMap.put(position, node); + getLogger().debug("Adding node %s in position %d", node, position); + } + } + } else { + for (int i = 0; i < numReps; i++) { + newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node); + } + } + } + } + assert newNodeMap.size() == numReps * nodes.size(); + ketamaNodes.set(newNodeMap); + } + + private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) { + List<Long> positions = new ArrayList<>(); + byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration)); + for (int h = 0; h < 4; h++) { + long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) (digest[2 + h * 4] & 0xFF) << 16); + k |= ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 0xFF); + positions.add(k); + } + return positions; + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java new file mode 100644 index 0000000000..4bae811cfa --- /dev/null +++ b/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.spy.memcached.protocol; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.channels.UnsupportedAddressTypeException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.protocol.binary.TapAckOperationImpl; + +/** + * Represents a node with the net.spy.memcached cluster, along with buffering and + * operation queues. + * + * This is a modified version of the net.spy.memcached.protocol.TCPMemcachedNodeImpl + * Override the final method getSocketAddress() to refresh SocketAddress to achieve same hostname with ip changing + */ +public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode { + + protected final BlockingQueue<Operation> writeQ; + private final SocketAddress socketAddress; + private final ByteBuffer rbuf; + private final ByteBuffer wbuf; + private final BlockingQueue<Operation> readQ; + private final BlockingQueue<Operation> inputQueue; + private final long opQueueMaxBlockTime; + private final long authWaitTime; + private final ConnectionFactory connectionFactory; + // operation Future.get timeout counter + private final AtomicInteger continuousTimeout = new AtomicInteger(0); + protected Operation optimizedOp = null; + private AtomicInteger reconnectAttempt = new AtomicInteger(1); + private SocketChannel channel; + private int toWrite = 0; + private SelectionKey sk = null; + private boolean shouldAuth = false; + private CountDownLatch authLatch; + private ArrayList<Operation> reconnectBlocked; + private long defaultOpTimeout; + private volatile long lastReadTimestamp = System.nanoTime(); + private MemcachedConnection connection; + + @SuppressWarnings({"squid:S107", "squid:S5993"}) + public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int bufSize, BlockingQueue<Operation> rq, + BlockingQueue<Operation> wq, BlockingQueue<Operation> iq, long opQueueMaxBlockTime, + boolean waitForAuth, long dt, long authWaitTime, ConnectionFactory fact) { + super(); + if (sa == null) { + throw new IllegalArgumentException("No SocketAddress"); + } + if (c == null) { + throw new IllegalArgumentException("No SocketChannel"); + } + if (bufSize <= 0) { + String msg = String.format("Invalid buffer size: %d", bufSize); + throw new IllegalArgumentException(msg); + } + if (rq == null) { + throw new IllegalArgumentException("No operation read queue"); + } + if (wq == null) { + throw new IllegalArgumentException("No operation write queue"); + } + if (iq == null) { + throw new IllegalArgumentException("No input queue"); + } + socketAddress = sa; + connectionFactory = fact; + this.authWaitTime = authWaitTime; + setChannel(c); + // Since these buffers are allocated rarely (only on client creation + // or reconfigure), and are passed to Channel.read() and Channel.write(), + // use direct buffers to avoid + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6214569 + rbuf = ByteBuffer.allocateDirect(bufSize); + wbuf = ByteBuffer.allocateDirect(bufSize); + getWbuf().clear(); + readQ = rq; + writeQ = wq; + inputQueue = iq; + this.opQueueMaxBlockTime = opQueueMaxBlockTime; + shouldAuth = waitForAuth; + defaultOpTimeout = dt; + setupForAuth(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#copyInputQueue() + */ + public final void copyInputQueue() { + Collection<Operation> tmp = new ArrayList<>(); + + // don't drain more than we have space to place + inputQueue.drainTo(tmp, writeQ.remainingCapacity()); + writeQ.addAll(tmp); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#destroyInputQueue() + */ + public Collection<Operation> destroyInputQueue() { + Collection<Operation> rv = new ArrayList<>(); + inputQueue.drainTo(rv); + return rv; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#setupResend() + */ + public final void setupResend() { + // First, reset the current write op, or cancel it if we should + // be authenticating + Operation op = getCurrentWriteOp(); + if (shouldAuth && op != null) { + op.cancel(); + } else if (op != null) { + ByteBuffer buf = op.getBuffer(); + if (buf != null) { + buf.reset(); + } else { + getLogger().info("No buffer for current write op, removing"); + removeCurrentWriteOp(); + } + } + // Now cancel all the pending read operations. Might be better to + // to requeue them. + while (hasReadOp()) { + op = removeCurrentReadOp(); + if (op != getCurrentWriteOp()) { + getLogger().warn("Discarding partially completed op: %s", op); + op.cancel(); + } + } + + while (shouldAuth && hasWriteOp()) { + op = removeCurrentWriteOp(); + getLogger().warn("Discarding partially completed op: %s", op); + op.cancel(); + } + + getWbuf().clear(); + getRbuf().clear(); + toWrite = 0; + } + + // Prepare the pending operations. Return true if there are any pending + // ops + private boolean preparePending() { + // Copy the input queue into the write queue. + copyInputQueue(); + + // Now check the ops + Operation nextOp = getCurrentWriteOp(); + while (nextOp != null && nextOp.isCancelled()) { + getLogger().info("Removing cancelled operation: %s", nextOp); + removeCurrentWriteOp(); + nextOp = getCurrentWriteOp(); + } + return nextOp != null; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean) + */ + public final void fillWriteBuffer(boolean shouldOptimize) { + if (toWrite == 0 && readQ.remainingCapacity() > 0) { + getWbuf().clear(); + Operation o = getNextWritableOp(); + + while (o != null && toWrite < getWbuf().capacity()) { + synchronized (o) { + assert o.getState() == OperationState.WRITING; + + ByteBuffer obuf = o.getBuffer(); + assert obuf != null : "Didn't get a write buffer from " + o; + int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining()); + byte[] b = new byte[bytesToCopy]; + obuf.get(b); + getWbuf().put(b); + getLogger().debug("After copying stuff from %s: %s", o, getWbuf()); + if (!o.getBuffer().hasRemaining()) { + o.writeComplete(); + transitionWriteItem(); + + preparePending(); + if (shouldOptimize) { + optimize(); + } + + o = getNextWritableOp(); + } + toWrite += bytesToCopy; + } + } + getWbuf().flip(); + assert toWrite <= getWbuf().capacity() : "toWrite exceeded capacity: " + this; + assert toWrite == getWbuf().remaining() : "Expected " + toWrite + " remaining, got " + + getWbuf().remaining(); + } else { + getLogger().debug("Buffer is full, skipping"); + } + } + + private Operation getNextWritableOp() { + Operation o = getCurrentWriteOp(); + while (o != null && o.getState() == OperationState.WRITE_QUEUED) { + synchronized (o) { + if (o.isCancelled()) { + getLogger().debug("Not writing cancelled op."); + Operation cancelledOp = removeCurrentWriteOp(); + assert o == cancelledOp; + } else if (o.isTimedOut(defaultOpTimeout)) { + getLogger().debug("Not writing timed out op."); + Operation timedOutOp = removeCurrentWriteOp(); + assert o == timedOutOp; + } else { + o.writing(); + if (!(o instanceof TapAckOperationImpl)) { + readQ.add(o); + } + return o; + } + o = getCurrentWriteOp(); + } + } + return o; + } + + /* (non-Javadoc) + * @see net.spy.net.spy.memcached.MemcachedNode#transitionWriteItem() + */ + public final void transitionWriteItem() { + Operation op = removeCurrentWriteOp(); + assert op != null : "There is no write item to transition"; + getLogger().debug("Finished writing %s", op); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#optimize() + */ + protected abstract void optimize(); + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getCurrentReadOp() + */ + public final Operation getCurrentReadOp() { + return readQ.peek(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#removeCurrentReadOp() + */ + public final Operation removeCurrentReadOp() { + return readQ.remove(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getCurrentWriteOp() + */ + public final Operation getCurrentWriteOp() { + return optimizedOp == null ? writeQ.peek() : optimizedOp; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#removeCurrentWriteOp() + */ + public final Operation removeCurrentWriteOp() { + Operation rv = optimizedOp; + if (rv == null) { + rv = writeQ.remove(); + } else { + optimizedOp = null; + } + return rv; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#hasReadOp() + */ + public final boolean hasReadOp() { + return !readQ.isEmpty(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#hasWriteOp() + */ + public final boolean hasWriteOp() { + return !(optimizedOp == null && writeQ.isEmpty()); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#addOp(net.spy.net.spy.memcached.ops.Operation) + */ + public final void addOp(Operation op) { + try { + if (!authLatch.await(authWaitTime, TimeUnit.MILLISECONDS)) { + FailureMode mode = connectionFactory.getFailureMode(); + if (mode == FailureMode.Redistribute || mode == FailureMode.Retry) { + getLogger().debug("Redistributing Operation " + op + " because auth " + "latch taken longer than " + + authWaitTime + " milliseconds to " + "complete on node " + getSocketAddress()); + connection.retryOperation(op); + } else { + op.cancel(); + getLogger().warn("Operation canceled because authentication " + + "or reconnection and authentication has " + "taken more than " + authWaitTime + + " milliseconds to " + "complete on node " + this); + getLogger().debug("Canceled operation %s", op.toString()); + } + return; + } + if (!inputQueue.offer(op, opQueueMaxBlockTime, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException( + "Timed out waiting to add " + op + "(max wait=" + opQueueMaxBlockTime + "ms)"); + } + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting to add " + op); + } + } + + /* + * (non-Javadoc) + * + * @see + * net.spy.net.spy.memcached.MemcachedNode#insertOp(net.spy.net.spy.memcached.ops.Operation) + */ + public final void insertOp(Operation op) { + ArrayList<Operation> tmp = new ArrayList<>(inputQueue.size() + 1); + tmp.add(op); + inputQueue.drainTo(tmp); + inputQueue.addAll(tmp); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getSelectionOps() + */ + public final int getSelectionOps() { + int rv = 0; + if (getChannel().isConnected()) { + if (hasReadOp()) { + rv |= SelectionKey.OP_READ; + } + if (toWrite > 0 || hasWriteOp()) { + rv |= SelectionKey.OP_WRITE; + } + } else { + rv = SelectionKey.OP_CONNECT; + } + return rv; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getRbuf() + */ + public final ByteBuffer getRbuf() { + return rbuf; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getWbuf() + */ + public final ByteBuffer getWbuf() { + return wbuf; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getSocketAddress() + */ + public final SocketAddress getSocketAddress() { + if (!(socketAddress instanceof InetSocketAddress)) { + throw new UnsupportedAddressTypeException(); + } + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + return new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort()); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#isActive() + */ + public final boolean isActive() { + return reconnectAttempt.get() == 0 && getChannel() != null && getChannel().isConnected(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#isAuthenticated() + */ + public boolean isAuthenticated() { + return (0 == authLatch.getCount()); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#reconnecting() + */ + public final void reconnecting() { + reconnectAttempt.incrementAndGet(); + continuousTimeout.set(0); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#connected() + */ + public final void connected() { + reconnectAttempt.set(0); + continuousTimeout.set(0); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getReconnectCount() + */ + public final int getReconnectCount() { + return reconnectAttempt.get(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#toString() + */ + @Override + public final String toString() { + int sops = 0; + if (getSk() != null && getSk().isValid()) { + sops = getSk().interestOps(); + } + int rsize = readQ.size() + (optimizedOp == null ? 0 : 1); + int wsize = writeQ.size(); + int isize = inputQueue.size(); + return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize + ", #Wops=" + wsize + ", #iq=" + isize + ", topRop=" + + getCurrentReadOp() + ", topWop=" + getCurrentWriteOp() + ", toWrite=" + toWrite + ", interested=" + + sops + "}"; + } + + /* + * (non-Javadoc) + * + * @see + * net.spy.net.spy.memcached.MemcachedNode#registerChannel + * (java.nio.channels.SocketChannel, java.nio.channels.SelectionKey) + */ + public final void registerChannel(SocketChannel ch, SelectionKey skey) { + setChannel(ch); + setSk(skey); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getChannel() + */ + public final SocketChannel getChannel() { + return channel; + } + + /* + * (non-Javadoc) + * + * @see + * net.spy.net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel) + */ + public final void setChannel(SocketChannel to) { + assert channel == null || !channel.isOpen() : "Attempting to overwrite channel"; + channel = to; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getSk() + */ + public final SelectionKey getSk() { + return sk; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey) + */ + public final void setSk(SelectionKey to) { + sk = to; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer() + */ + public final int getBytesRemainingToWrite() { + return toWrite; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#writeSome() + */ + public final int writeSome() throws IOException { + int wrote = channel.write(wbuf); + assert wrote >= 0 : "Wrote negative bytes?"; + toWrite -= wrote; + assert toWrite >= 0 : "toWrite went negative after writing " + wrote + " bytes for " + this; + getLogger().debug("Wrote %d bytes", wrote); + return wrote; + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#getContinuousTimeout + */ + public int getContinuousTimeout() { + return continuousTimeout.get(); + } + + /* + * (non-Javadoc) + * + * @see net.spy.net.spy.memcached.MemcachedNode#setContinuousTimeout + */ + public void setContinuousTimeout(boolean timedOut) { + if (timedOut && isActive()) { + continuousTimeout.incrementAndGet(); + } else { + continuousTimeout.set(0); + } + } + + public final void fixupOps() { + // As the selection key can be changed at any point due to node + // failure, we'll grab the current volatile value and configure it. + SelectionKey s = sk; + if (s != null && s.isValid()) { + int iops = getSelectionOps(); + getLogger().debug("Setting interested opts to %d", iops); + s.interestOps(iops); + } else { + getLogger().debug("Selection key is not valid."); + } + } + + public final void authComplete() { + if (reconnectBlocked != null && reconnectBlocked.isEmpty()) { + inputQueue.addAll(reconnectBlocked); + } + authLatch.countDown(); + } + + public final void setupForAuth() { + if (shouldAuth) { + authLatch = new CountDownLatch(1); + if (inputQueue.isEmpty()) { + reconnectBlocked = new ArrayList<>(inputQueue.size() + 1); + inputQueue.drainTo(reconnectBlocked); + } + assert (inputQueue.isEmpty()); + setupResend(); + } else { + authLatch = new CountDownLatch(0); + } + } + + /** + * Number of milliseconds since the last read of this node completed. + * + * @return milliseconds since last read. + */ + public long lastReadDelta() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastReadTimestamp); + } + + /** + * Mark this node as having just completed a read. + */ + public void completedRead() { + lastReadTimestamp = System.nanoTime(); + } + + @Override + public MemcachedConnection getConnection() { + return connection; + } + + @Override + public void setConnection(MemcachedConnection connection) { + this.connection = connection; + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java new file mode 100644 index 0000000000..60499c85dc --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +public class CacheStats { + private final long getBytes; + private final long getTime; + private final long putBytes; + private final CacheStatsCounter cacheStatsCounter; + + public CacheStats(long getBytes, long getTime, long putBytes, CacheStatsCounter cacheStatsCounter) { + this.getBytes = getBytes; + this.getTime = getTime; + this.putBytes = putBytes; + this.cacheStatsCounter = cacheStatsCounter; + } + + static class CacheStatsCounter { + final long numHits; + final long numMisses; + final long numPut; + final long numEvictions; + final long numTimeouts; + final long numErrors; + + CacheStatsCounter(long numPut, long numHits, long numMisses, + long numEvictions, long numTimeouts, long numErrors) { + this.numPut = numPut; + this.numHits = numHits; + this.numMisses = numMisses; + this.numEvictions = numEvictions; + this.numTimeouts = numTimeouts; + this.numErrors = numErrors; + } + } + + public long getNumHits() { + return cacheStatsCounter.numHits; + } + + public long getNumMisses() { + return cacheStatsCounter.numMisses; + } + + public long getNumGet() { + return cacheStatsCounter.numHits + cacheStatsCounter.numMisses; + } + + public long getNumGetBytes() { + return getBytes; + } + + public long getNumPutBytes() { + return putBytes; + } + + public long getNumPut() { + return cacheStatsCounter.numPut; + } + + public long getNumEvictions() { + return cacheStatsCounter.numEvictions; + } + + public long getNumTimeouts() { + return cacheStatsCounter.numTimeouts; + } + + public long getNumErrors() { + return cacheStatsCounter.numErrors; + } + + public long numLookups() { + return cacheStatsCounter.numHits + cacheStatsCounter.numMisses; + } + + public double hitRate() { + long lookups = numLookups(); + return lookups == 0 ? 0 : cacheStatsCounter.numHits / (double) lookups; + } + + public long avgGetBytes() { + return getBytes == 0 ? 0 : getBytes / numLookups(); + } + + public long getAvgGetTime() { + return getTime / numLookups(); + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java new file mode 100644 index 0000000000..67dcf62041 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.common.Singletons; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.rest.service.CommonQueryCacheSupporter; +import org.slf4j.Logger; +import org.apache.kylin.rest.cache.KylinCache; +import org.slf4j.LoggerFactory; +import org.springframework.cache.Cache; +import org.springframework.cache.support.SimpleValueWrapper; + +import java.util.Arrays; +import java.util.Locale; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This is a cache manager for memcached which implements KylinCache. + * It has a map contains memcached clients created for different types of cache constants. + */ +public class CompositeMemcachedCache implements KylinCache { + + private static final Logger logger = LoggerFactory.getLogger(CompositeMemcachedCache.class); + + private static final String PREFIX = "Kylin"; + + private static final ConcurrentMap<String, Cache> cacheMap = new ConcurrentHashMap<>(16); + + private static final MemcachedCacheConfig memcachedCacheConfig = Singletons.getInstance(MemcachedCacheConfig.class); + + private static final Cache exceptionCache = new MemCachedCacheAdaptor( + new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName, 86400))); + + private static final Cache schemaCache = new MemCachedCacheAdaptor( + new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CommonQueryCacheSupporter.Type.SCHEMA_CACHE.rootCacheName, 86400))); + + private static final Cache successCache = new MemCachedCacheAdaptor( + new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName))); + + static { + cacheMap.put(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName, exceptionCache); + cacheMap.put(CommonQueryCacheSupporter.Type.SCHEMA_CACHE.rootCacheName, schemaCache); + cacheMap.put(CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName, successCache); + } + + public static KylinCache getInstance() { + try { + return Singletons.getInstance(CompositeMemcachedCache.class); + } catch (RuntimeException e) { + logger.error("Memcached init failed", e); + } + return null; + } + + private void checkCacheType(String type) { + if (type == null) { + throw new NullPointerException("type can't be null"); + } + + if (!cacheMap.containsKey(type)) { + throw new IllegalArgumentException("unsupported rootCacheName: " + type); + } + } + + private String getTypeProjectPrefix(String type, String project) { + return String.format(Locale.ROOT, "%s-%s-%s-", PREFIX, type, project); + } + + protected String serializeKey(Object key) { + try { + return JsonUtil.writeValueAsString(key); + } catch (JsonProcessingException e) { + logger.warn("Can not convert key to String.", e); + } + return null; + } + + @Override + public void put(String type, String project, Object key, Object value) { + checkCacheType(type); + String keyS = serializeKey(key); + if (keyS == null) { + logger.warn("write to cash failed for key can not convert to String"); + return; + } + keyS = getTypeProjectPrefix(type, project) + keyS; + cacheMap.get(type).put(keyS, value); + } + + @Override + public void update(String type, String project, Object key, Object value) { + checkCacheType(type); + String keyS = serializeKey(key); + if (keyS == null) { + logger.warn("write to cache failed for key can not convert to String"); + return; + } + keyS = getTypeProjectPrefix(type, project) + keyS; + cacheMap.get(type).put(keyS, value); + } + + @Override + public Object get(String type, String project, Object key) { + checkCacheType(type); + String keyS = serializeKey(key); + if (keyS == null){ + logger.warn("read from cache failed for key can not convert to String"); + return null; + } + keyS = getTypeProjectPrefix(type, project) + keyS; + SimpleValueWrapper valueWrapper = (SimpleValueWrapper)cacheMap.get(type).get(keyS); + return valueWrapper == null ? null : valueWrapper.get(); + } + + @Override + public boolean remove(String type, String project, Object key) { + checkCacheType(type); + String keyS = serializeKey(key); + if (keyS == null) { + logger.warn("evict cache failed for key can not convert to String"); + return false; + } + keyS = getTypeProjectPrefix(type, project) + keyS; + if (cacheMap.get(type).get(keyS) == null) { + return false; + } + cacheMap.get(type).evict(keyS); + return true; + } + + @Override + public void clearAll() { + for (Cache cache : cacheMap.values()) { + cache.clear(); + } + } + + @Override + public void clearByType(String type, String project) { + checkCacheType(type); + String pattern = getTypeProjectPrefix(type, project); + Cache cache = cacheMap.get(type); + if (cache instanceof MemCachedCacheAdaptor) { + ((MemCachedCacheAdaptor) cache).clearByType(pattern); + } else { + logger.warn("cache do not support clear by project"); + cacheMap.clear(); + } + } + + public String getName(String type) { + checkCacheType(type); + return cacheMap.get(type).getName(); + } + + public CacheStats getCacheStats(String type) { + checkCacheType(type); + Cache cache = cacheMap.get(type); + if (cache instanceof MemCachedCacheAdaptor) { + return ((MemCachedCacheAdaptor) cache).getCacheStats(); + } else { + logger.warn("only support get cache stats with memcached adaptor, otherwise will return null"); + return null; + } + } + + public static class MemCachedCacheAdaptor implements Cache { + private MemcachedCache memcachedCache; + + public MemCachedCacheAdaptor(MemcachedCache memcachedCache) { + this.memcachedCache = memcachedCache; + } + + @Override + public String getName() { + return memcachedCache.getName(); + } + + @Override + public Object getNativeCache() { + return memcachedCache.getNativeCache(); + } + + public CacheStats getCacheStats() { + return memcachedCache.getStats(); + } + + @Override + public ValueWrapper get(Object key) { + byte[] value = memcachedCache.get(key); + if (value == null || value.length == 0) { + return null; + } + return new SimpleValueWrapper(SerializationUtils.deserialize(value)); + } + + @Override + public void put(Object key, Object value) { + memcachedCache.put(key, value); + } + + @Override + public void evict(Object key) { + memcachedCache.evict(key); + } + + @Override + public void clear() { + memcachedCache.clear(); + } + + public void clearByType(String pattern) { + memcachedCache.clearByType(pattern); + } + + @Override + @SuppressWarnings("unchecked") + public <T> T get(Object key, Class<T> type) { + byte[] value = memcachedCache.get(key); + if (value == null || value.length == 0) { + return null; + } + Object obj = SerializationUtils.deserialize(value); + if (obj != null && type != null && !type.isInstance(value)) { + throw new IllegalStateException( + "Cached value is not of required type [" + type.getName() + "]: " + Arrays.toString(value)); + } + return (T) obj; + } + + @Override + public <T> T get(Object key, Callable<T> valueLoader) { + throw new UnsupportedOperationException(); + } + + @Override + //Without atomicity, this method should not be invoked + public ValueWrapper putIfAbsent(Object key, Object value) { + //implementation here doesn't guarantee the atomicity. + byte[] existing = memcachedCache.get(key); + if (existing == null || existing.length == 0) { + memcachedCache.put(key, value); + return null; + } else { + return new SimpleValueWrapper(SerializationUtils.deserialize(existing)); + } + } + } +} diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java new file mode 100644 index 0000000000..844939e4e1 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * A Class implement this interface indicates that the key information need to be calculated from a first lookup from cache itself to get + * a hook. + */ +public interface KeyHookLookup { + KeyHook lookupKeyHook(String key); + + public static class KeyHook implements Serializable { + private static final long serialVersionUID = 2400159460862757991L; + + private String[] chunkskey; + private byte[] values; + + /** + * For de-serialization + */ + public KeyHook() { + } + + public KeyHook(String[] chunkskey, byte[] values) { + super(); + this.chunkskey = chunkskey; + this.values = values; + } + + public String[] getChunkskey() { + return chunkskey; + } + + public void setChunkskey(String[] chunkskey) { + this.chunkskey = chunkskey; + } + + public byte[] getValues() { + return values; + } + + public void setValues(byte[] values) { + this.values = values; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(chunkskey); + result = prime * result + Arrays.hashCode(values); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + KeyHook other = (KeyHook) obj; + if (!Arrays.equals(chunkskey, other.chunkskey)) + return false; + return Arrays.equals(values, other.values); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + if (chunkskey != null) { + builder.append("chunkskey_length:" + chunkskey.length); + } else { + builder.append("chunkskey_is_null"); + } + builder.append("|"); + if (values != null) { + builder.append("value_length:" + values.length); + } else { + builder.append("value_is_null"); + } + return builder.toString(); + } + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java new file mode 100644 index 0000000000..b54e4afe76 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.DataFormatException; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.CompressionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Shorts; + +import net.spy.memcached.AddrUtil; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.DefaultHashAlgorithm; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.MemcachedClientIF; +import net.spy.memcached.ops.ArrayOperationQueueFactory; +import net.spy.memcached.ops.LinkedOperationQueueFactory; +import net.spy.memcached.ops.OperationQueueFactory; +import net.spy.memcached.transcoders.SerializingTranscoder; + +/** + * This is a cache backend of Memcached. The implementation leverages spymemcached client to talk to the servers. + * Memcached itself has a limitation to the size of the key. So the real key for cache lookup is hashed from the orginal key. + * The implementation provides a way for hash collsion detection. It can also compress/decompress the value bytes based on the preconfigred compression threshold to save network bandwidth and storage space. + */ +public class MemcachedCache { + public static final int MAX_PREFIX_LENGTH = MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash + - 40 // length of key hash + - 2; // length of separators + private static final Logger logger = LoggerFactory.getLogger(MemcachedCache.class); + private static final int DEFAULT_TTL = 7 * 24 * 3600; + + private static final String UNABLE_TO_QUEUE_CACHE_OPERATION = "Unable to queue cache operation."; + + protected final MemcachedCacheConfig config; + protected final MemcachedClientIF client; + protected final String memcachedPrefix; + protected final int compressThreshold; + protected final AtomicLong hitCount = new AtomicLong(0); + protected final AtomicLong missCount = new AtomicLong(0); + protected final AtomicLong readBytes = new AtomicLong(0); + protected final AtomicLong timeoutCount = new AtomicLong(0); + protected final AtomicLong errorCount = new AtomicLong(0); + protected final AtomicLong putCount = new AtomicLong(0); + protected final AtomicLong putBytes = new AtomicLong(0); + private final int timeToLiveSeconds; + protected AtomicLong cacheGetTime = new AtomicLong(0); + + public MemcachedCache(final MemcachedClientIF client, final MemcachedCacheConfig config, + final String memcachedPrefix, int timeToLiveSeconds) { + Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH, + "memcachedPrefix length [%d] exceeds maximum length [%d]", memcachedPrefix.length(), MAX_PREFIX_LENGTH); + this.memcachedPrefix = memcachedPrefix; + this.client = client; + this.config = config; + this.compressThreshold = config.getMaxObjectSize() / 2; + this.timeToLiveSeconds = timeToLiveSeconds; + } + + public MemcachedCache(MemcachedCache cache) { + this(cache.client, cache.config, cache.memcachedPrefix, cache.timeToLiveSeconds); + } + + /** + * Create and return the MemcachedCache. Each time call this method will create a new instance. + * @param config The MemcachedCache configuration to control the cache behavior. + * @return + */ + public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix) { + return create(config, memcachedPrefix, DEFAULT_TTL); + } + + public static MemcachedCache create(final MemcachedCacheConfig config, String memcachedPrefix, int timeToLive) { + try { + SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize()); + // always no compression inside, we compress/decompress outside + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + + OperationQueueFactory opQueueFactory; + int maxQueueSize = config.getMaxOperationQueueSize(); + if (maxQueueSize > 0) { + opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize); + } else { + opQueueFactory = new LinkedOperationQueueFactory(); + } + String hostsStr = config.getHosts(); + ConnectionFactory connectionFactory = new MemcachedConnectionFactoryBuilder() + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build(); + return new MemcachedCache(new MemcachedClient(new MemcachedConnectionFactory(connectionFactory), + getResolvedAddrList(hostsStr)), config, memcachedPrefix, timeToLive); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + public static List<InetSocketAddress> getResolvedAddrList(String hostsStr) { + List<InetSocketAddress> addrs = AddrUtil.getAddresses(hostsStr); + Iterator<InetSocketAddress> addrIterator = addrs.iterator(); + while (addrIterator.hasNext()) { + if (addrIterator.next().isUnresolved()) { + addrIterator.remove(); + } + } + return addrs; + } + + public String getName() { + return memcachedPrefix; + } + + public Object getNativeCache() { + return client; + } + + protected byte[] serializeValue(Object value) { + return SerializationUtils.serialize((Serializable) value); + } + + @VisibleForTesting + byte[] encodeValue(String keyS, Object value) { + if (keyS == null) { + return new byte[0]; + } + return encodeValue(keyS.getBytes(StandardCharsets.UTF_8), serializeValue(value)); + } + + /** + * This method is used to get value object based on key from the Cache. The key should have been convert into + * json string in previous procession. it calls getBinary() method to compute hashed key from the original key + * string, and use this as the real key to do lookup from internal Cache. Then decodes the real values bytes + * from the cache lookup result and leverages object serializer to convert value bytes to object. + */ + public byte[] get(Object key) { + return get((String) key); + } + + /** + * @param keyS should be the serialized string + */ + public byte[] get(String keyS) { + return getBinary(keyS); + } + + /** + * This method is used to put key/value objects to the Cache. The key should have been convert into + * json string in previous procession. it will call putBinary() method to compute hashed key from + * the original key string and encode the original key bytes into value bytes for hash conflicts detection. + */ + public void put(Object key, Object value) { + put((String) key, value); + } + + /** + * @param keyS should be the serialized string + */ + public void put(String keyS, Object value) { + if (keyS != null) { + putBinary(keyS, serializeValue(value), timeToLiveSeconds); + } + } + + public void evict(Object key) { + if (key == null) + return; + evict((String) key); + } + + /** + * @param keyS should be the serialized string + */ + public void evict(String keyS) { + if (keyS == null) + return; + client.delete(computeKeyHash(keyS)); + } + + //currently memcached not support fuzzy matching. this method will clear remote cache of all project. + public void clearByType(String pattern) { + logger.debug("clear by pattern: {} will caused clear all method here", pattern); + this.clear(); + } + + public void clear() { + logger.warn("Clear Remote Cache!"); + Future<Boolean> resultFuture = client.flush(); + try { + boolean result = resultFuture.get(); + logger.warn("Clear Remote Cache returned with result: {}", result); + } catch (ExecutionException | InterruptedException e) { + logger.warn("Can't clear Remote Cache.", e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } + } + + public CacheStats getStats() { + return new CacheStats(readBytes.get(), cacheGetTime.get(), putBytes.get(), + new CacheStats.CacheStatsCounter(putCount.get(), hitCount.get(), + missCount.get(), 0, timeoutCount.get(), errorCount.get())); + } + + /** + * @param keyS should be the serialized string + * @return the serialized value + */ + protected byte[] getBinary(String keyS) { + if (Strings.isNullOrEmpty(keyS)) { + return new byte[0]; + } + byte[] bytes = internalGet(computeKeyHash(keyS)); + return decodeValue(keyS.getBytes(StandardCharsets.UTF_8), bytes); + } + + /** + * @param keyS should be the serialized string + * @param valueB should be the serialized value + */ + protected void putBinary(String keyS, byte[] valueB, int expiration) { + if (Strings.isNullOrEmpty(keyS)) { + return; + } + internalPut(computeKeyHash(keyS), encodeValue(keyS.getBytes(StandardCharsets.UTF_8), valueB), expiration); + } + + protected byte[] internalGet(String hashedKey) { + Future<Object> future; + long start = System.currentTimeMillis(); + try { + future = client.asyncGet(hashedKey); + } catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, e); + return new byte[0]; + } catch (Throwable t) { + errorCount.incrementAndGet(); + logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, t); + return new byte[0]; + } + + try { + byte[] result; + if (future != null) { + result = (byte[]) future.get(config.getTimeout(), TimeUnit.MILLISECONDS); + } else { + result = null; + } + cacheGetTime.addAndGet(System.currentTimeMillis() - start); + if (result != null) { + hitCount.incrementAndGet(); + readBytes.addAndGet(result.length); + } else { + missCount.incrementAndGet(); + } + return result; + } catch (TimeoutException e) { + timeoutCount.incrementAndGet(); + future.cancel(false); + return new byte[0]; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } catch (ExecutionException e) { + errorCount.incrementAndGet(); + logger.error("ExecutionException when pulling key meta from cache.", e); + return new byte[0]; + } + } + + private void internalPut(String hashedKey, byte[] encodedValue, int expiration) { + try { + client.set(hashedKey, expiration, encodedValue); + putCount.incrementAndGet(); + putBytes.addAndGet(encodedValue.length); + } catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, e); + } catch (Throwable t) { + errorCount.incrementAndGet(); + logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, t); + } + } + + protected byte[] encodeValue(byte[] key, byte[] valueB) { + byte[] compressed = null; + if (config.isEnableCompression() && (valueB.length + Ints.BYTES + key.length > compressThreshold)) { + try { + compressed = CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + valueB.length) + .putInt(key.length).put(key).put(valueB).array()); + } catch (IOException e) { + compressed = null; + logger.warn("Compressing value bytes error.", e); + } + } + if (compressed != null) { + return ByteBuffer.allocate(Shorts.BYTES + compressed.length).putShort((short) 1).put(compressed).array(); + } else { + return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length + valueB.length).putShort((short) 0) + .putInt(key.length).put(key).put(valueB).array(); + } + } + + protected byte[] decodeValue(byte[] key, byte[] valueE) { + if (valueE == null || valueE.length == 0) + return new byte[0]; + ByteBuffer buf = ByteBuffer.wrap(valueE); + short enableCompression = buf.getShort(); + byte[] uncompressed = null; + if (enableCompression == 1) { + byte[] value = new byte[buf.remaining()]; + buf.get(value); + try { + uncompressed = CompressionUtils.decompress(value); + } catch (IOException | DataFormatException e) { + logger.error("Decompressing value bytes error.", e); + return new byte[0]; + } + } + if (uncompressed != null) { + buf = ByteBuffer.wrap(uncompressed); + } + final int keyLength = buf.getInt(); + byte[] keyBytes = new byte[keyLength]; + buf.get(keyBytes); + if (!Arrays.equals(keyBytes, key)) { + logger.error("Keys do not match, possible hash collision!"); + return new byte[0]; + } + byte[] value = new byte[buf.remaining()]; + buf.get(value); + return value; + } + + @SuppressWarnings({"squid:S4790"}) + protected String computeKeyHash(String key) { + // hash keys to keep things under 250 characters for net.spy.memcached + return Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(), this.memcachedPrefix, + DigestUtils.sha1Hex(key)); + + } + +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java new file mode 100644 index 0000000000..76465b4a76 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import org.apache.kylin.common.KylinConfig; + +import net.spy.memcached.DefaultConnectionFactory; + +public class MemcachedCacheConfig { + + public MemcachedCacheConfig() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + hosts = kylinConfig.getMemcachedHosts(); + timeout = kylinConfig.getMemcachedOpTimeout(); + maxChunkSize = kylinConfig.getMaxChunkSize(); + maxObjectSize = kylinConfig.getMaxObjectSize(); + enableCompression = kylinConfig.isEnableCompression(); + } + + private long timeout; + + // comma delimited list of net.spy.memcached servers, given as host:port combination + private String hosts; + + private int maxChunkSize; + + private int maxObjectSize; + + // net.spy.memcached client read buffer size, -1 uses the spymemcached library default + private int readBufferSize = DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE; + + // maximum operation queue size. 0 means unbounded + private int maxOperationQueueSize = 0; + + // whether enable compress the value data or not + private boolean enableCompression; + + // only for test + private boolean enableDebugLog = false; + + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public int getMaxChunkSize() { + return maxChunkSize; + } + + public void setMaxChunkSize(int maxChunkSize) { + this.maxChunkSize = maxChunkSize; + } + + public int getMaxObjectSize() { + return maxObjectSize; + } + + public void setMaxObjectSize(int maxObjectSize) { + this.maxObjectSize = maxObjectSize; + } + + public int getMaxOperationQueueSize() { + return maxOperationQueueSize; + } + + public void setMaxOperationQueueSize(int maxOperationQueueSize) { + this.maxOperationQueueSize = maxOperationQueueSize; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + // only for test + public void setEnableDebugLog() { + this.enableDebugLog = true; + } + + // only for test + public boolean isEnableDebugLog() { + return enableDebugLog; + } + + public void setReadBufferSize(int readBufferSize) { + this.readBufferSize = readBufferSize; + } + + public boolean isEnableCompression() { + return enableCompression; + } + + public void setEnableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java new file mode 100644 index 0000000000..94c810eeb2 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.common.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Shorts; + +import net.spy.memcached.internal.BulkFuture; + +/** + * Subclass of MemcachedCache. It supports storing large objects. Memcached itself has a limitation to the value size with default value of 1M. + * This implement extends the limit to 1G and can split huge bytes to multiple chunks. It will take care of the data integrity if part of the chunks lost(due to server restart or other reasons) + * + * @author mingmwang + */ +public class MemcachedChunkingCache extends MemcachedCache implements KeyHookLookup { + private static final Logger logger = LoggerFactory.getLogger(MemcachedChunkingCache.class); + + public MemcachedChunkingCache(MemcachedCache cache) { + super(cache); + Preconditions.checkArgument(config.getMaxChunkSize() > 1, "maxChunkSize [%d] must be greater than 1", + config.getMaxChunkSize()); + Preconditions.checkArgument(config.getMaxObjectSize() > 261, "maxObjectSize [%d] must be greater than 261", + config.getMaxObjectSize()); + } + + protected static byte[][] splitBytes(final byte[] data, final int nSplit) { + byte[][] dest = new byte[nSplit][]; + + final int splitSize = (data.length - 1) / nSplit + 1; + for (int i = 0; i < nSplit - 1; i++) { + dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * splitSize); + } + dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, data.length); + + return dest; + } + + protected static int getValueSplit(MemcachedCacheConfig config, String keyS, int valueBLen) { + // the number 6 means the chunk number size never exceeds 6 bytes + final int valueSize = config.getMaxObjectSize() - Shorts.BYTES - Ints.BYTES + - keyS.getBytes(StandardCharsets.UTF_8).length - 6; + final int maxValueSize = config.getMaxChunkSize() * valueSize; + Preconditions.checkArgument(valueBLen <= maxValueSize, + "the value bytes length [%d] exceeds maximum value size [%d]", valueBLen, maxValueSize); + return (valueBLen - 1) / valueSize + 1; + } + + protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, String keyS, byte[] valueB) { + KeyHook keyHook; + byte[][] splitValueB = null; + if (nSplit > 1) { + String[] chunkKeySs = new String[nSplit]; + for (int i = 0; i < nSplit; i++) { + chunkKeySs[i] = keyS + i; + } + keyHook = new KeyHook(chunkKeySs, null); + splitValueB = splitBytes(valueB, nSplit); + } else { + keyHook = new KeyHook(null, valueB); + } + + return new Pair<>(keyHook, splitValueB); + } + + /** + * This method overrides the parent getBinary(), it gets the KeyHook from the Cache first and check the KeyHook that whether chunking is enabled or not. + */ + @Override + @SuppressWarnings({"squid:S3776"}) + public byte[] getBinary(String keyS) { + if (Strings.isNullOrEmpty(keyS)) { + return new byte[0]; + } + KeyHook keyHook = lookupKeyHook(keyS); + if (keyHook == null) { + return new byte[0]; + } + + if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 0) { + if (logger.isDebugEnabled() || config.isEnableDebugLog()) { + logger.debug("Chunking not enabled, return the value bytes in the keyhook directly, value bytes size = {}", + keyHook.getValues().length); + } + return keyHook.getValues(); + } + + BulkFuture<Map<String, Object>> bulkFuture; + long start = System.currentTimeMillis(); + + if (logger.isDebugEnabled() || config.isEnableDebugLog()) { + logger.debug("Chunking enabled, chunk size = {}", keyHook.getChunkskey().length); + } + + Map<String, String> keyLookup = computeKeyHash(Arrays.asList(keyHook.getChunkskey())); + try { + bulkFuture = client.asyncGetBulk(keyLookup.keySet()); + } catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + logger.error("Unable to queue cache operation.", e); + return new byte[0]; + } catch (Throwable t) { + errorCount.incrementAndGet(); + logger.error("Unable to queue cache operation.", t); + return new byte[0]; + } + + try { + Map<String, Object> bulkResult = bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS); + cacheGetTime.addAndGet(System.currentTimeMillis() - start); + if (bulkResult.size() != keyHook.getChunkskey().length) { + missCount.incrementAndGet(); + logger.warn("Some paritial chunks missing for query key: {}", keyS); + //remove all the partital chunks here. + for (String partitalKey : bulkResult.keySet()) { + client.delete(partitalKey); + } + deleteKeyHook(keyS); + return new byte[0]; + } + hitCount.getAndAdd(keyHook.getChunkskey().length); + byte[][] bytesArray = new byte[keyHook.getChunkskey().length][]; + for (Map.Entry<String, Object> entry : bulkResult.entrySet()) { + byte[] bytes = (byte[]) entry.getValue(); + readBytes.addAndGet(bytes.length); + String originalKeyS = keyLookup.get(entry.getKey()); + int idx = Integer.parseInt(originalKeyS.substring(keyS.length())); + bytesArray[idx] = decodeValue(originalKeyS.getBytes(StandardCharsets.UTF_8), bytes); + } + return concatBytes(bytesArray); + } catch (TimeoutException e) { + timeoutCount.incrementAndGet(); + bulkFuture.cancel(false); + return new byte[0]; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } catch (ExecutionException e) { + errorCount.incrementAndGet(); + logger.error("ExecutionException when pulling item from cache.", e); + return new byte[0]; + } + } + + /** + * This method overrides the parent putBinary() method. It will split the large value bytes into multiple chunks to fit into the internal Cache. + * It generates a KeyHook to store the splitted chunked keys. + */ + @Override + public void putBinary(String keyS, byte[] valueB, int expiration) { + if (Strings.isNullOrEmpty(keyS)) { + return; + } + int nSplit = getValueSplit(config, keyS, valueB.length); + Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, valueB); + KeyHook keyHook = keyValuePair.getFirst(); + byte[][] splitValueB = keyValuePair.getSecond(); + + if (logger.isDebugEnabled() || config.isEnableDebugLog()) { + logger.debug("put key hook:{} to cache for hash key", keyHook); + } + super.putBinary(keyS, serializeValue(keyHook), expiration); + if (nSplit > 1) { + for (int i = 0; i < nSplit; i++) { + if (logger.isDebugEnabled() || config.isEnableDebugLog()) { + logger.debug(String.format("Chunk[ %d ] bytes size before encoding = %d", i, splitValueB[i].length)); + } + super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], expiration); + } + } + } + + @Override + public void evict(String keyS) { + if (Strings.isNullOrEmpty(keyS)) { + return; + } + KeyHook keyHook = lookupKeyHook(keyS); + if (keyHook == null) { + return; + } + + if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 0) { + String[] chunkKeys = keyHook.getChunkskey(); + for (String chunkKey : chunkKeys) { + super.evict(chunkKey); + } + } + super.evict(keyS); + } + + protected Map<String, String> computeKeyHash(List<String> keySList) { + return Maps.uniqueIndex(keySList, this::computeKeyHash); + } + + private void deleteKeyHook(String keyS) { + try { + super.evict(keyS); + } catch (IllegalStateException e) { + // operation did not get queued in time (queue is full) + errorCount.incrementAndGet(); + logger.error("Unable to queue cache operation: ", e); + } + } + + private byte[] concatBytes(byte[]... bytesArray) { + int length = 0; + for (byte[] bytes : bytesArray) { + length += bytes.length; + } + byte[] result = new byte[length]; + int destPos = 0; + for (byte[] bytes : bytesArray) { + System.arraycopy(bytes, 0, result, destPos, bytes.length); + destPos += bytes.length; + } + if (logger.isDebugEnabled() || config.isEnableDebugLog()) { + logger.debug("Original value bytes size for all chunks = {}", result.length); + } + + return result; + } + + @Override + public KeyHook lookupKeyHook(String keyS) { + byte[] bytes = super.getBinary(keyS); + if (bytes == null || bytes.length == 0) { + return null; + } + return (KeyHook) SerializationUtils.deserialize(bytes); + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java new file mode 100644 index 0000000000..a164bc9954 --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionObserver; +import net.spy.memcached.DefaultConnectionFactory; +import net.spy.memcached.FailureMode; +import net.spy.memcached.HashAlgorithm; +import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.NodeLocator; +import net.spy.memcached.OperationFactory; +import net.spy.memcached.auth.AuthDescriptor; +import net.spy.memcached.compat.SpyObject; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.metrics.NoopMetricCollector; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.transcoders.Transcoder; + +public class MemcachedConnectionFactory extends SpyObject implements ConnectionFactory { + private ConnectionFactory underlying; + + public MemcachedConnectionFactory(ConnectionFactory underlying) { + this.underlying = underlying; + } + + @Override + public MetricType enableMetrics() { + //Turn off metric collection by default. + return DefaultConnectionFactory.DEFAULT_METRIC_TYPE; + } + + @Override + public MetricCollector getMetricCollector() { + return new NoopMetricCollector(); + } + + @Override + public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException { + return underlying.createConnection(addrs); + } + + @Override + public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) { + return underlying.createMemcachedNode(sa, c, bufSize); + } + + @Override + public BlockingQueue<Operation> createOperationQueue() { + return underlying.createOperationQueue(); + } + + @Override + public BlockingQueue<Operation> createReadOperationQueue() { + return underlying.createReadOperationQueue(); + } + + @Override + public BlockingQueue<Operation> createWriteOperationQueue() { + return underlying.createWriteOperationQueue(); + } + + @Override + public long getOpQueueMaxBlockTime() { + return underlying.getOpQueueMaxBlockTime(); + } + + @Override + public ExecutorService getListenerExecutorService() { + return underlying.getListenerExecutorService(); + } + + @Override + public boolean isDefaultExecutorService() { + return underlying.isDefaultExecutorService(); + } + + @Override + public NodeLocator createLocator(List<MemcachedNode> nodes) { + return underlying.createLocator(nodes); + } + + @Override + public OperationFactory getOperationFactory() { + return underlying.getOperationFactory(); + } + + @Override + public long getOperationTimeout() { + return underlying.getOperationTimeout(); + } + + @Override + public boolean isDaemon() { + return underlying.isDaemon(); + } + + @Override + public boolean useNagleAlgorithm() { + return underlying.useNagleAlgorithm(); + } + + @Override + public Collection<ConnectionObserver> getInitialObservers() { + return underlying.getInitialObservers(); + } + + @Override + public FailureMode getFailureMode() { + return underlying.getFailureMode(); + } + + @Override + public Transcoder<Object> getDefaultTranscoder() { + return underlying.getDefaultTranscoder(); + } + + @Override + public boolean shouldOptimize() { + return underlying.shouldOptimize(); + } + + @Override + public int getReadBufSize() { + return underlying.getReadBufSize(); + } + + @Override + public HashAlgorithm getHashAlg() { + return underlying.getHashAlg(); + } + + @Override + public long getMaxReconnectDelay() { + return underlying.getMaxReconnectDelay(); + } + + @Override + public AuthDescriptor getAuthDescriptor() { + return underlying.getAuthDescriptor(); + } + + @Override + public int getTimeoutExceptionThreshold() { + return underlying.getTimeoutExceptionThreshold(); + } + + @Override + public long getAuthWaitTime() { + return underlying.getAuthWaitTime(); + } +} \ No newline at end of file diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java new file mode 100644 index 0000000000..480313f91c --- /dev/null +++ b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; + +import net.spy.memcached.ArrayModNodeLocator; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.ConnectionObserver; +import net.spy.memcached.DefaultConnectionFactory; +import net.spy.memcached.FailureMode; +import net.spy.memcached.HashAlgorithm; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.NodeLocator; +import net.spy.memcached.OperationFactory; +import net.spy.memcached.RefinedKetamaNodeLocator; +import net.spy.memcached.auth.AuthDescriptor; +import net.spy.memcached.metrics.MetricCollector; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.transcoders.Transcoder; + +public class MemcachedConnectionFactoryBuilder extends ConnectionFactoryBuilder { + /** + * Get the ConnectionFactory set up with the provided parameters. + */ + @Override + @SuppressWarnings({"squid:S3776"}) + public ConnectionFactory build() { + return new DefaultConnectionFactory() { + + @Override + public BlockingQueue<Operation> createOperationQueue() { + return opQueueFactory == null ? super.createOperationQueue() : opQueueFactory.create(); + } + + @Override + public BlockingQueue<Operation> createReadOperationQueue() { + return readQueueFactory == null ? super.createReadOperationQueue() : readQueueFactory.create(); + } + + @Override + public BlockingQueue<Operation> createWriteOperationQueue() { + return writeQueueFactory == null ? super.createReadOperationQueue() : writeQueueFactory.create(); + } + + @Override + public NodeLocator createLocator(List<MemcachedNode> nodes) { + switch (locator) { + case ARRAY_MOD: + return new ArrayModNodeLocator(nodes, getHashAlg()); + case CONSISTENT: + return new RefinedKetamaNodeLocator(nodes, getHashAlg()); + default: + throw new IllegalStateException("Unhandled locator type: " + locator); + } + } + + @Override + public Transcoder<Object> getDefaultTranscoder() { + return transcoder == null ? super.getDefaultTranscoder() : transcoder; + } + + @Override + public FailureMode getFailureMode() { + return failureMode == null ? super.getFailureMode() : failureMode; + } + + @Override + public HashAlgorithm getHashAlg() { + return hashAlg == null ? super.getHashAlg() : hashAlg; + } + + @Override + public Collection<ConnectionObserver> getInitialObservers() { + return initialObservers; + } + + @Override + public OperationFactory getOperationFactory() { + return opFact == null ? super.getOperationFactory() : opFact; + } + + @Override + public long getOperationTimeout() { + return opTimeout == -1 ? super.getOperationTimeout() : opTimeout; + } + + @Override + public int getReadBufSize() { + return readBufSize == -1 ? super.getReadBufSize() : readBufSize; + } + + @Override + public boolean isDaemon() { + return isDaemon; + } + + @Override + public boolean shouldOptimize() { + return shouldOptimize; + } + + @Override + public boolean useNagleAlgorithm() { + return useNagle; + } + + @Override + public long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + @Override + public AuthDescriptor getAuthDescriptor() { + return authDescriptor; + } + + @Override + public long getOpQueueMaxBlockTime() { + return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : super.getOpQueueMaxBlockTime(); + } + + @Override + public int getTimeoutExceptionThreshold() { + return timeoutExceptionThreshold; + } + + @Override + public MetricType enableMetrics() { + return metricType == null ? super.enableMetrics() : metricType; + } + + @Override + public MetricCollector getMetricCollector() { + return collector == null ? super.getMetricCollector() : collector; + } + + @Override + public ExecutorService getListenerExecutorService() { + return executorService == null ? super.getListenerExecutorService() : executorService; + } + + @Override + public boolean isDefaultExecutorService() { + return executorService == null; + } + + @Override + public long getAuthWaitTime() { + return authWaitTime; + } + }; + + } +} \ No newline at end of file diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java new file mode 100644 index 0000000000..c1bd949b87 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.DefaultConnectionFactory; +import net.spy.memcached.DefaultHashAlgorithm; +import net.spy.memcached.FailureMode; +import net.spy.memcached.OperationFactory; +import net.spy.memcached.metrics.MetricType; +import net.spy.memcached.metrics.NoopMetricCollector; +import net.spy.memcached.ops.LinkedOperationQueueFactory; +import net.spy.memcached.ops.OperationQueueFactory; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.SerializingTranscoder; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class MemcacheConnectionFactoryTest extends NLocalFileMetadataTestCase { + + @Before + public void setup() { + overwriteSystemProp("kylin.cache.memcached.enabled", "true"); + createTestMetadata(); + } + + @After + public void destroy() { + cleanupTestMetadata(); + } + + @Test + public void testMemcachedConnectionFactory() { + SerializingTranscoder transcoder = new SerializingTranscoder(1048576); + // always no compression inside, we compress/decompress outside + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + OperationQueueFactory opQueueFactory; + opQueueFactory = new LinkedOperationQueueFactory(); + + MemcachedConnectionFactoryBuilder builder = new MemcachedConnectionFactoryBuilder(); + + ConnectionFactory connectionFactory = builder + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setOpQueueMaxBlockTime(500).setOpTimeout(500) + .setReadBufferSize(DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE).setOpQueueFactory(opQueueFactory).build(); + + MemcachedConnectionFactory factory = new MemcachedConnectionFactory(connectionFactory); + + factory.createOperationQueue(); + factory.createReadOperationQueue(); + factory.createWriteOperationQueue(); + Assert.assertEquals(transcoder, factory.getDefaultTranscoder()); + Assert.assertEquals(FailureMode.Redistribute, factory.getFailureMode()); + Assert.assertEquals(DefaultHashAlgorithm.FNV1A_64_HASH, factory.getHashAlg()); + Assert.assertNotNull(factory.getOperationFactory()); + Assert.assertNotNull(factory.getMetricCollector()); + Assert.assertNotNull(factory.getListenerExecutorService()); + Assert.assertNotNull(factory.getInitialObservers()); + Assert.assertEquals(500, factory.getOperationTimeout()); + Assert.assertEquals(16384, factory.getReadBufSize()); + Assert.assertEquals(500, factory.getOpQueueMaxBlockTime()); + Assert.assertEquals(MetricType.OFF, factory.enableMetrics()); + Assert.assertTrue(factory.isDefaultExecutorService()); + Assert.assertTrue(factory.shouldOptimize()); + Assert.assertFalse(factory.useNagleAlgorithm()); + Assert.assertEquals(30, factory.getMaxReconnectDelay()); + Assert.assertEquals(1000, factory.getAuthWaitTime()); + Assert.assertEquals(500, factory.getOperationTimeout()); + } + + @Test + public void testMemcachedConnectionConfig() { + SerializingTranscoder transcoder = new SerializingTranscoder(1048576); + // always no compression inside, we compress/decompress outside + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + OperationQueueFactory opQueueFactory; + opQueueFactory = new LinkedOperationQueueFactory(); + OperationFactory operationFactory = new BinaryOperationFactory(); + ExecutorService executorService; + executorService = Executors.newSingleThreadExecutor(); + + MemcachedConnectionFactoryBuilder builder = new MemcachedConnectionFactoryBuilder(); + + ConnectionFactory connectionFactory = builder + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setListenerExecutorService(executorService) + .setOpFact(operationFactory) + .setEnableMetrics(MetricType.OFF) + .setMetricCollector(new NoopMetricCollector()) + .setOpQueueFactory(opQueueFactory).build(); + + MemcachedConnectionFactory factory = new MemcachedConnectionFactory(connectionFactory); + + factory.createOperationQueue(); + factory.createReadOperationQueue(); + factory.createWriteOperationQueue(); + Assert.assertEquals(transcoder, factory.getDefaultTranscoder()); + Assert.assertEquals(FailureMode.Redistribute, factory.getFailureMode()); + Assert.assertEquals(DefaultHashAlgorithm.NATIVE_HASH, factory.getHashAlg()); + Assert.assertNotNull(factory.getOperationFactory()); + Assert.assertNotNull(factory.getMetricCollector()); + Assert.assertNotNull(factory.getListenerExecutorService()); + Assert.assertNotNull(factory.getInitialObservers()); + Assert.assertEquals(2500, factory.getOperationTimeout()); + Assert.assertEquals(16384, factory.getReadBufSize()); + Assert.assertEquals(10000, factory.getOpQueueMaxBlockTime()); + Assert.assertEquals(MetricType.OFF, factory.enableMetrics()); + Assert.assertFalse(factory.isDefaultExecutorService()); + Assert.assertTrue(factory.shouldOptimize()); + Assert.assertFalse(factory.useNagleAlgorithm()); + Assert.assertEquals(30, factory.getMaxReconnectDelay()); + Assert.assertEquals(1000, factory.getAuthWaitTime()); + Assert.assertEquals(2500, factory.getOperationTimeout()); + } +} diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java new file mode 100644 index 0000000000..28c3895086 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Maps; +import net.spy.memcached.DefaultConnectionFactory; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache.MemCachedCacheAdaptor; +import org.apache.kylin.rest.service.CommonQueryCacheSupporter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.internal.GetFuture; + +public class MemcachedCacheTest extends NLocalFileMetadataTestCase { + + private Map<String, String> keyValueMap; + private MemCachedCacheAdaptor memCachedAdaptor; + private MemcachedCacheConfig cacheConfig; + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + + keyValueMap = Maps.newHashMap(); + keyValueMap.put("sql1", "value1"); + keyValueMap.put("sql11", "value11"); + + cacheConfig = new MemcachedCacheConfig(); + MemcachedClient memcachedClient = mock(MemcachedClient.class); + MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName, + 7 * 24 * 3600); + memCachedAdaptor = new MemCachedCacheAdaptor(memcachedCache); + + //Mock put to cache + for (String key : keyValueMap.keySet()) { + String hashedKey = memcachedCache.computeKeyHash(key); + + String value = keyValueMap.get(key); + byte[] valueE = memcachedCache.encodeValue(key, value); + + GetFuture<Object> future = mock(GetFuture.class); + when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE); + when(memcachedClient.asyncGet(hashedKey)).thenReturn(future); + } + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testGet() { + for (String key : keyValueMap.keySet()) { + Assert.assertEquals("The value should not change", keyValueMap.get(key), memCachedAdaptor.get(key).get()); + } + } + + @Test + public void testGetResolvedAddrList() { + String hostsStr = "localhost:11211,unresolvedHost1:11211,unresolvedHost2:11211"; + List<InetSocketAddress> addrList = MemcachedCache.getResolvedAddrList(hostsStr); + Assert.assertEquals(1, addrList.size()); + } + + @Test + public void testMemcachedConfig() { + cacheConfig.setTimeout(1000); + cacheConfig.setHosts("localhost:11211"); + cacheConfig.setMaxChunkSize(1024); + cacheConfig.setMaxOperationQueueSize(1); + cacheConfig.setReadBufferSize(DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE); + cacheConfig.setEnableCompression(false); + Assert.assertEquals(1000, cacheConfig.getTimeout()); + Assert.assertEquals("localhost:11211", cacheConfig.getHosts()); + Assert.assertEquals(1024, cacheConfig.getMaxChunkSize()); + Assert.assertEquals(1, cacheConfig.getMaxOperationQueueSize()); + Assert.assertEquals(16384, cacheConfig.getReadBufferSize()); + Assert.assertFalse(cacheConfig.isEnableCompression()); + } +} \ No newline at end of file diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java new file mode 100644 index 0000000000..4239c814b5 --- /dev/null +++ b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.cache.memcached; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Charsets; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache.MemCachedCacheAdaptor; +import org.apache.kylin.rest.service.CommonQueryCacheSupporter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.internal.BulkFuture; +import net.spy.memcached.internal.GetFuture; + +public class MemcachedChunkingCacheTest extends NLocalFileMetadataTestCase { + + private Map<String, String> smallValueMap; + private Map<String, String> largeValueMap; + private MemCachedCacheAdaptor memCachedAdaptor; + private KeyHookLookup.KeyHook keyHook; + private MemcachedChunkingCache memcachedChunkingCache; + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + final int maxObjectSize = 300; + + smallValueMap = Maps.newHashMap(); + smallValueMap.put("sql1", "value1"); + + largeValueMap = Maps.newHashMap(); + largeValueMap.put("sql2", Strings.repeat("value2", maxObjectSize)); + + MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig(); + cacheConfig.setMaxObjectSize(maxObjectSize); + MemcachedClient memcachedClient = mock(MemcachedClient.class); + MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, cacheConfig, CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName, + 7 * 24 * 3600); + memcachedChunkingCache = new MemcachedChunkingCache(memcachedCache); + memCachedAdaptor = new MemCachedCacheAdaptor(memcachedChunkingCache); + + //Mock put to cache + for (String key : smallValueMap.keySet()) { + String hashedKey = memcachedCache.computeKeyHash(key); + + String value = smallValueMap.get(key); + byte[] valueB = memcachedCache.serializeValue(value); + KeyHookLookup.KeyHook keyHook = new KeyHookLookup.KeyHook(null, valueB); + byte[] valueE = memcachedCache.encodeValue(key, keyHook); + + GetFuture<Object> future = mock(GetFuture.class); + when(memcachedClient.asyncGet(hashedKey)).thenReturn(future); + + when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE); + } + + //Mock put large value to cache + for (String key : largeValueMap.keySet()) { + String hashedKey = memcachedCache.computeKeyHash(key); + + String value = largeValueMap.get(key); + byte[] valueB = memcachedCache.serializeValue(value); + int nSplit = MemcachedChunkingCache.getValueSplit(cacheConfig, key, valueB.length); + Pair<KeyHookLookup.KeyHook, byte[][]> keyValuePair = MemcachedChunkingCache.getKeyValuePair(nSplit, key, + valueB); + KeyHookLookup.KeyHook keyHook = keyValuePair.getFirst(); + this.keyHook = keyHook; + byte[][] splitValueB = keyValuePair.getSecond(); + + //For key + byte[] valueE = memcachedCache.encodeValue(key, keyHook); + GetFuture<Object> future = mock(GetFuture.class); + when(memcachedClient.asyncGet(hashedKey)).thenReturn(future); + when(future.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(valueE); + + //For splits + Map<String, String> keyLookup = memcachedChunkingCache + .computeKeyHash(Arrays.asList(keyHook.getChunkskey())); + Map<String, Object> bulkResult = Maps.newHashMap(); + for (int i = 0; i < nSplit; i++) { + String splitKeyS = keyHook.getChunkskey()[i]; + bulkResult.put(memcachedCache.computeKeyHash(splitKeyS), + memcachedCache.encodeValue(splitKeyS.getBytes(Charsets.UTF_8), splitValueB[i])); + } + + BulkFuture<Map<String, Object>> bulkFuture = mock(BulkFuture.class); + when(memcachedClient.asyncGetBulk(keyLookup.keySet())).thenReturn(bulkFuture); + when(bulkFuture.get(cacheConfig.getTimeout(), TimeUnit.MILLISECONDS)).thenReturn(bulkResult); + } + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testGet() { + for (String key : smallValueMap.keySet()) { + Assert.assertEquals("The value should not change", smallValueMap.get(key), memCachedAdaptor.get(key).get()); + } + for (String key : largeValueMap.keySet()) { + Assert.assertEquals("The value should not change", largeValueMap.get(key), memCachedAdaptor.get(key).get()); + } + } + + @Test + public void testEvict() { + memcachedChunkingCache.evict(""); + memcachedChunkingCache.evict(null); + memcachedChunkingCache.put("first", "first_value"); + keyHook.setChunkskey(new String[]{"first", "second"}); + for (String key : keyHook.getChunkskey()) { + memcachedChunkingCache.evict(key); + } + byte[] value = memcachedChunkingCache.get("first"); + Assert.assertEquals(0, value.length); + } + + @Test + public void testSplitBytes() { + byte[] data = new byte[8]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + + int nSplit; + byte[][] dataSplits; + + nSplit = 2; + dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit); + Assert.assertEquals(nSplit, dataSplits.length); + Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2, 3 }); + Assert.assertArrayEquals(dataSplits[1], new byte[] { 4, 5, 6, 7 }); + + nSplit = 3; + dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit); + Assert.assertEquals(nSplit, dataSplits.length); + Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2 }); + Assert.assertArrayEquals(dataSplits[1], new byte[] { 3, 4, 5 }); + Assert.assertArrayEquals(dataSplits[2], new byte[] { 6, 7 }); + } + + @Test + public void testKeyHookFunc() { + KeyHookLookup.KeyHook keyHookEq = new KeyHookLookup.KeyHook(); + keyHookEq.setChunkskey(keyHook.getChunkskey()); + keyHookEq.setValues(keyHook.getValues()); + + Assert.assertEquals(keyHook, keyHookEq); + Assert.assertEquals(keyHook.hashCode(), keyHookEq.hashCode()); + Assert.assertTrue(keyHook.equals(keyHookEq) && (keyHook.toString().equals(keyHookEq.toString()) && keyHook.equals(keyHook))); + + keyHookEq.setChunkskey(null); + keyHookEq.setValues(null); + + Assert.assertNotEquals(keyHook, keyHookEq); + Assert.assertFalse(keyHook.equals(keyHookEq) || keyHook.toString().equals(keyHookEq.toString()) || keyHook.equals(null)); + } +} \ No newline at end of file diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f6b9622033..d76736609f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1954,6 +1954,31 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.cache.redis.batch-count", ONE_HUNDRED_THOUSAND)); } + // Memcached + public boolean isMemcachedEnabled() { + return Boolean.parseBoolean(getOptional("kylin.cache.memcached.enabled", FALSE)); + } + + public String getMemcachedHosts() { + return getOptional("kylin.cache.memcached.hosts", "localhost:11211"); + } + + public long getMemcachedOpTimeout() { + return Long.parseLong(getOptional("kylin.cache.memcached.option.timeout", "500")); + } + + public int getMaxChunkSize() { + return Integer.parseInt(getOptional("kylin.cache.memcached.max-chunk-size", "1024")); + } + + public int getMaxObjectSize() { + return Integer.parseInt(getOptional("kylin.cache.memcached.max-object-size", "1048576")); + } + + public boolean isEnableCompression() { + return Boolean.parseBoolean((getOptional("kylin.cache.memcached.is-enable-compression", TRUE))); + } + public long getMaxWaitMillis() { return Long.parseLong(getOptional("kylin.cache.redis.max-wait", "300000")); } diff --git a/src/query-service/pom.xml b/src/query-service/pom.xml index a69eeb9ebc..2d289bf34e 100644 --- a/src/query-service/pom.xml +++ b/src/query-service/pom.xml @@ -161,6 +161,11 @@ <artifactId>embedded-redis</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.code.simple-spring-memcached</groupId> + <artifactId>jmemcached-maven-plugin</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java index 5f68795339..862a7e9b8f 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java @@ -34,6 +34,7 @@ import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.rest.cache.KylinCache; import org.apache.kylin.rest.cache.KylinEhCache; +import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache; import org.apache.kylin.rest.cache.RedisCache; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; @@ -73,6 +74,8 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); if (kylinConfig.isRedisEnabled()) { kylinCache = RedisCache.getInstance(); + } else if (kylinConfig.isMemcachedEnabled()) { + kylinCache = CompositeMemcachedCache.getInstance(); } else { kylinCache = KylinEhCache.getInstance(); } @@ -185,7 +188,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { cached.setStorageCacheUsed(true); QueryContext.current().getQueryTagInfo().setStorageCacheUsed(true); - String cacheType = KylinConfig.getInstanceFromEnv().isRedisEnabled() ? "Redis" : "Ehcache"; + String cacheType = getCacheType(); cached.setStorageCacheType(cacheType); QueryContext.current().getQueryTagInfo().setStorageCacheType(cacheType); @@ -200,6 +203,16 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { return cached; } + private String getCacheType() { + if (KylinConfig.getInstanceFromEnv().isRedisEnabled()) { + return "Redis"; + } else if (KylinConfig.getInstanceFromEnv().isMemcachedEnabled()) { + return "Memcached"; + } else { + return "Ehcache"; + } + } + public SQLResponse searchFailedCache(SQLRequest sqlRequest) { SQLResponse cached = doSearchQuery(Type.EXCEPTION_QUERY_CACHE, sqlRequest); if (cached == null) { diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java new file mode 100644 index 0000000000..2c97ac37e6 --- /dev/null +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.google.code.ssm.jmemcached.plugin.AbstractJmemcachedMojo; +import com.google.code.ssm.jmemcached.plugin.JmemcachedStartMojo; +import com.google.code.ssm.jmemcached.plugin.Server; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; +import net.spy.memcached.DefaultHashAlgorithm; +import net.spy.memcached.FailureMode; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.NodeLocator; +import net.spy.memcached.ops.LinkedOperationQueueFactory; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationQueueFactory; +import net.spy.memcached.transcoders.SerializingTranscoder; +import org.apache.kylin.rest.cache.KylinCache; +import org.apache.kylin.rest.cache.memcached.CacheStats; +import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache; +import org.apache.kylin.rest.cache.memcached.MemcachedCache; +import org.apache.kylin.rest.cache.memcached.MemcachedCacheConfig; +import org.apache.kylin.rest.cache.memcached.MemcachedConnectionFactory; +import org.apache.kylin.rest.cache.memcached.MemcachedConnectionFactoryBuilder; +import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.SQLResponse; +import org.apache.maven.plugin.MojoExecutionException; +import org.apache.maven.plugin.MojoFailureException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class QueryCompositeMemcachedCacheTest extends LocalFileMetadataTestCase { + + static { + Server server = new Server(); + server.setPort(11211); + server.setBinary(true); + AbstractJmemcachedMojo embeddedMemcachedServerStarter = new JmemcachedStartMojo(); + List<Server> serverList = new ArrayList<>(); + serverList.add(server); + embeddedMemcachedServerStarter.setServers(serverList); + try { + embeddedMemcachedServerStarter.execute(); + } catch (MojoExecutionException e) { + e.printStackTrace(); + } catch (MojoFailureException e) { + e.printStackTrace(); + } + } + + private static String PROJECT = "test_project"; + private static String TYPE = "test_type"; + private static String CACHE_KEY = "test_key"; + private static String CACHE_VAL = "test_val"; + + @Spy + private KylinCache memcachedCache = Mockito.spy(CompositeMemcachedCache.getInstance()); + + @InjectMocks + private QueryCacheManager queryCacheManager; + + @BeforeClass + public static void setupResource() { + staticCreateTestMetadata(); + } + + @AfterClass + public static void tearDownResource() { + staticCleanupTestMetadata(); + } + + @Before + public void setup() { + overwriteSystemProp("kylin.cache.memcached.enabled", "true"); + createTestMetadata(); + MockitoAnnotations.initMocks(this); + } + + @After + public void destroy() { + cleanupTestMetadata(); + } + + @Test + public void testMemcachedConnection() throws IOException { + SerializingTranscoder transcoder = new SerializingTranscoder(1048576); + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + OperationQueueFactory opQueueFactory; + opQueueFactory = new LinkedOperationQueueFactory(); + + MemcachedCacheConfig config = new MemcachedCacheConfig(); + + String memcachedPrefix = "testCli"; + String hostStr = "localhost:11211"; + String cacheKey = "test_cache_key"; + String cacheVal = "test_cache_val"; + int timeToLive = 7 * 24 * 3600; + MemcachedConnectionFactoryBuilder builder = new MemcachedConnectionFactoryBuilder(); + + ConnectionFactory connectionFactory = builder + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setHashAlg(DefaultHashAlgorithm.KETAMA_HASH) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build(); + + MemcachedConnectionFactory factory = new MemcachedConnectionFactory(connectionFactory); + MemcachedClient memcachedClient = new MemcachedClient(factory, + MemcachedCache.getResolvedAddrList(hostStr)); + MemcachedCache cache = new MemcachedCache(memcachedClient, config, memcachedPrefix, timeToLive); + cache.put(cacheKey, cacheVal); + cache.put("", cacheVal); + cache.put(null, cacheVal); + cache.evict(cacheKey); + Assert.assertEquals(0, cache.get("").length); + Assert.assertEquals(0, cache.get(null).length); + Assert.assertEquals(0, cache.get(cacheKey).length); + cache.clear(); + } + + @Test + public void testMemcachedClient() throws IOException { + SerializingTranscoder transcoder = new SerializingTranscoder(1048576); + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + OperationQueueFactory opQueueFactory; + opQueueFactory = new LinkedOperationQueueFactory(); + + MemcachedCacheConfig config = new MemcachedCacheConfig(); + + String hostStr = "localhost:11211"; + MemcachedConnectionFactoryBuilder builder = new MemcachedConnectionFactoryBuilder(); + + ConnectionFactory connectionFactory = builder + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build(); + + MemcachedConnectionFactory factory = new MemcachedConnectionFactory(connectionFactory); + MemcachedClient memcachedClient = new MemcachedClient(factory, + MemcachedCache.getResolvedAddrList(hostStr)); + List<MemcachedNode> nodeList = new ArrayList<>(memcachedClient.getNodeLocator().getReadonlyCopy().getAll()); + NodeLocator nodeLocator = memcachedClient.getNodeLocator(); + memcachedClient.getNodeLocator().updateLocator(nodeList); + Assert.assertEquals(500, memcachedClient.getOperationTimeout()); + Assert.assertNotEquals(nodeLocator, memcachedClient.getNodeLocator()); + } + + @Test + public void testMemcachedNode() throws IOException { + SerializingTranscoder transcoder = new SerializingTranscoder(1048576); + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + OperationQueueFactory opQueueFactory; + opQueueFactory = new LinkedOperationQueueFactory(); + + MemcachedCacheConfig config = new MemcachedCacheConfig(); + + String hostStr = "localhost:11211"; + MemcachedConnectionFactoryBuilder builder = new MemcachedConnectionFactoryBuilder(); + + ConnectionFactory connectionFactory = builder + .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) + .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH) + .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true) + .setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true) + .setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout()) + .setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build(); + + MemcachedConnectionFactory factory = new MemcachedConnectionFactory(connectionFactory); + MemcachedClient memcachedClient = new MemcachedClient(factory, + MemcachedCache.getResolvedAddrList(hostStr)); + List<MemcachedNode> nodeList = new ArrayList<>(memcachedClient.getNodeLocator().getReadonlyCopy().getAll()); + nodeList.forEach(node -> { + Assert.assertThrows(UnsupportedOperationException.class, node::setupResend); + Assert.assertThrows(UnsupportedOperationException.class, node::setupForAuth); + Assert.assertThrows(UnsupportedOperationException.class, node::destroyInputQueue); + Assert.assertThrows(UnsupportedOperationException.class, node::reconnecting); + Operation op = Mockito.spy(Operation.class); + Assert.assertThrows(UnsupportedOperationException.class, () -> node.insertOp(op)); + Assert.assertThrows(UnsupportedOperationException.class, () -> node.addOp(op)); + }); + } + + @Test() + public void testTypeAsNull() { + CompositeMemcachedCache compositeMemcachedCache = (CompositeMemcachedCache) CompositeMemcachedCache.getInstance(); + Assert.assertNotNull(compositeMemcachedCache); + Assert.assertThrows(NullPointerException.class, () -> compositeMemcachedCache.put(null, PROJECT, CACHE_KEY, CACHE_VAL)); + } + + @Test() + public void testUnsupportedCacheType() { + CompositeMemcachedCache compositeMemcachedCache = (CompositeMemcachedCache) CompositeMemcachedCache.getInstance(); + Assert.assertNotNull(compositeMemcachedCache); + Assert.assertThrows(IllegalArgumentException.class, () -> compositeMemcachedCache.put(TYPE, PROJECT, CACHE_KEY, CACHE_VAL)); + } + + @Test + public void testCompositeMemcachedCache() { + CompositeMemcachedCache compositeMemcachedCache = (CompositeMemcachedCache) CompositeMemcachedCache.getInstance(); + Assert.assertNotNull(compositeMemcachedCache); + Object mockItem = mock(Object.class); + when(mockItem.toString()).thenReturn(mockItem.getClass().getName()); + // write bad case + String type = CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName; + compositeMemcachedCache.put(type, PROJECT, mockItem, CACHE_VAL); + compositeMemcachedCache.update(type, PROJECT, mockItem, CACHE_VAL); + Object result1 = compositeMemcachedCache.get(type, PROJECT, mockItem); + Assert.assertNull(result1); + Assert.assertFalse(compositeMemcachedCache.remove(type, PROJECT, mockItem)); + + compositeMemcachedCache.put(type, PROJECT, CACHE_KEY, CACHE_VAL); + Object result2 = compositeMemcachedCache.get(type, PROJECT, CACHE_KEY); + Assert.assertEquals("test_val", (String)result2); + + compositeMemcachedCache.update(type, PROJECT, CACHE_KEY, "update_val"); + Object result3 = compositeMemcachedCache.get(type, PROJECT, CACHE_KEY); + Assert.assertEquals("update_val", (String)result3); + + compositeMemcachedCache.clearAll(); + } + + @Test + public void testProjectCompositeMemcachedCacheQuery() { + overwriteSystemProp("kylin.cache.memcached.enabled", "true"); + final String project = "default"; + final SQLRequest req1 = new SQLRequest(); + req1.setProject(project); + req1.setSql("select a from b"); + final SQLResponse resp1 = new SQLResponse(); + List<List<String>> results = new ArrayList<>(); + resp1.setResults(results); + resp1.setResultRowCount(1); + //Single Node mode + testHelper(req1, resp1, project); + //TODO: Cluster mode + testCacheStatus(); + } + + private void testHelper(SQLRequest req1, SQLResponse resp1, String project) { + queryCacheManager.cacheSuccessQuery(req1, resp1); + + queryCacheManager.doCacheSuccessQuery(req1, resp1); + Assert.assertEquals(resp1.getResultRowCount(), + queryCacheManager.doSearchQuery(QueryCacheManager.Type.SUCCESS_QUERY_CACHE, req1).getResultRowCount()); + Assert.assertNull(queryCacheManager.searchQuery(req1)); + queryCacheManager.clearQueryCache(req1); + Assert.assertNull(queryCacheManager.doSearchQuery(QueryCacheManager.Type.SUCCESS_QUERY_CACHE, req1)); + + queryCacheManager.cacheFailedQuery(req1, resp1); + Assert.assertEquals(resp1.getResultRowCount(), queryCacheManager.searchQuery(req1).getResultRowCount()); + queryCacheManager.clearProjectCache(project); + Assert.assertNull(queryCacheManager.searchQuery(req1)); + queryCacheManager.clearProjectCache(null); + + queryCacheManager.recoverCache(); + } + + private void testCacheStatus() { + CacheStats cacheStats = ((CompositeMemcachedCache)queryCacheManager.getCache()).getCacheStats("StorageCache"); + String name = ((CompositeMemcachedCache)queryCacheManager.getCache()).getName("StorageCache"); + System.out.println("Cache name is: " + name); + System.out.println("AvgGetTime is: " + cacheStats.getAvgGetTime()); + System.out.println("HitRate is: " + cacheStats.hitRate()); + System.out.println("AvgGetBytes is :" + cacheStats.avgGetBytes()); + System.out.println("NumErrors is :" + cacheStats.getNumErrors()); + System.out.println("Get number is :" + cacheStats.getNumGet()); + System.out.println("NumEvictions is :" + cacheStats.getNumEvictions()); + System.out.println("NumGetBytes is :" + cacheStats.getNumGetBytes()); + System.out.println("Hit number is :" + cacheStats.getNumHits()); + System.out.println("Miss number is :" + cacheStats.getNumMisses()); + System.out.println("Put number is :" + cacheStats.getNumPut()); + System.out.println("Put bytes is :" + cacheStats.getNumPutBytes()); + System.out.println("Timeout number is :" + cacheStats.getNumTimeouts()); + System.out.println("Lookup number is :" + cacheStats.numLookups()); + + Assert.assertEquals(0, cacheStats.getNumErrors()); + Assert.assertEquals(0, cacheStats.getNumEvictions()); + Assert.assertEquals(0, cacheStats.getNumTimeouts()); + } +}