This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new b0169af73c KYLIN-5280 Introduce memcached as a distributed cache for 
queries for Kylin5 (#2096)
b0169af73c is described below

commit b0169af73c77f32908d3dda18ca22c7699b663dd
Author: zznlime <68555081+zznl...@users.noreply.github.com>
AuthorDate: Fri Mar 3 13:50:26 2023 +0800

    KYLIN-5280 Introduce memcached as a distributed cache for queries for 
Kylin5 (#2096)
    
    * [EBAY-KYLIN-3526] Introduce memcached as a distributed cache for queries 
for Kylin5 (#33)
    
    * [EBAY-KYLIN-3526] Introduce memcached as a distributed cache for queries 
for Kylin5
    
    Co-authored-by: zhennzhang <zhennzh...@ebay.com>
    
    fix implicit conversion from Array to String
    
    [EBAY-KYLIN-3614] Add UTs for memcached on Kylin5.0
    
    KYLIN-5280 add UT with a embedded memcached server
    
    refine sonar issues
    
    KYLIN-5280 add ut for memcached cache status
    
    enable rename with tomcat log file rotate
    
    add UT for KeyHooklookup.KeyHook interface
    
    add UT for KeyHooklookup.KeyHook interface
    
    add UT for KeyHooklookup.KeyHook interface
    
    * KYLIN-5388 Fix ut coverage
    
    * KYLIN-5388 Fix ut case
    
    * KYLIN-5388 Add ut case for KeyHook and MemcachedConnectionFactory.
    
    * KYLIN-5388 Add exception thrown ut cases for CompositeMemcachedCache, add 
default settings ut case for memcachedConnectionFactoryBuilder
    
    * KYLIN-5388 Add ut case for RefinedKetamaModeLocator
---
 pom.xml                                            |  12 +
 src/common-service/pom.xml                         |   4 +
 .../spy/memcached/RefinedKetamaNodeLocator.java    | 283 +++++++++
 .../memcached/protocol/TCPMemcachedNodeImpl.java   | 663 +++++++++++++++++++++
 .../kylin/rest/cache/memcached/CacheStats.java     | 105 ++++
 .../cache/memcached/CompositeMemcachedCache.java   | 270 +++++++++
 .../kylin/rest/cache/memcached/KeyHookLookup.java  | 105 ++++
 .../kylin/rest/cache/memcached/MemcachedCache.java | 390 ++++++++++++
 .../rest/cache/memcached/MemcachedCacheConfig.java | 122 ++++
 .../cache/memcached/MemcachedChunkingCache.java    | 266 +++++++++
 .../memcached/MemcachedConnectionFactory.java      | 179 ++++++
 .../MemcachedConnectionFactoryBuilder.java         | 176 ++++++
 .../memcached/MemcacheConnectionFactoryTest.java   | 143 +++++
 .../rest/cache/memcached/MemcachedCacheTest.java   | 109 ++++
 .../memcached/MemcachedChunkingCacheTest.java      | 189 ++++++
 .../org/apache/kylin/common/KylinConfigBase.java   |  25 +
 src/query-service/pom.xml                          |   5 +
 .../kylin/rest/service/QueryCacheManager.java      |  15 +-
 .../service/QueryCompositeMemcachedCacheTest.java  | 316 ++++++++++
 19 files changed, 3376 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index e1a852cf3c..516eae722b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,6 +160,7 @@
         <assertj-core.version>3.11.1</assertj-core.version>
         <awaitility.version>3.0.0</awaitility.version>
         <embedded-redis.version>0.6</embedded-redis.version>
+        <embedded-memcached.version>4.1.3</embedded-memcached.version>
 
         <!-- Commons -->
         <commons-lang3.version>3.10</commons-lang3.version>
@@ -176,6 +177,7 @@
         <slf4j.version>1.7.30</slf4j.version>
         <apache-log4j.version>2.12.1</apache-log4j.version>
         <ehcache.version>2.10.9.2</ehcache.version>
+        <net.spy.memcached.verion>2.12.3</net.spy.memcached.verion>
         <redis.version>3.8.0</redis.version>
         <apache-httpclient.version>4.5.13</apache-httpclient.version>
         <beanutils.version>1.9.4</beanutils.version>
@@ -2245,6 +2247,11 @@
                 <artifactId>ehcache</artifactId>
                 <version>${ehcache.version}</version>
             </dependency>
+            <dependency>
+                <groupId>net.spy</groupId>
+                <artifactId>spymemcached</artifactId>
+                <version>${net.spy.memcached.verion}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opensaml</groupId>
                 <artifactId>opensaml</artifactId>
@@ -2256,6 +2263,11 @@
                 <version>${embedded-redis.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>com.google.code.simple-spring-memcached</groupId>
+                <artifactId>jmemcached-maven-plugin</artifactId>
+                <version>${embedded-memcached.version}</version>
+            </dependency>
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml
index db687b8f80..081ad89602 100644
--- a/src/common-service/pom.xml
+++ b/src/common-service/pom.xml
@@ -77,6 +77,10 @@
             <groupId>net.sf.ehcache</groupId>
             <artifactId>ehcache</artifactId>
         </dependency>
+        <dependency>
+            <groupId>net.spy</groupId>
+            <artifactId>spymemcached</artifactId>
+        </dependency>
 
 
         <!-- Spring Boot -->
diff --git 
a/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
 
b/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
new file mode 100644
index 0000000000..55ba0565c0
--- /dev/null
+++ 
b/src/common-service/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
+import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
+
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2011 Couchbase, Inc.
+ *
+ * This is a modified version of the Ketama consistent hash strategy from
+ * last.fm. This implementation may not be compatible with libketama as hashing
+ * is considered separate from node location.
+ *
+ * The only modified method is the getSequence(). 
+ * The previous 7 may be too small to reduce the chance to get all down nodes.
+ *
+ * Note that this implementation does not currently supported weighted nodes.
+ *
+ * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/";>RJ's
+ *      blog post</a>
+ */
+public final class RefinedKetamaNodeLocator extends SpyObject implements 
NodeLocator {
+
+    private final HashAlgorithm hashAlg;
+    private final Map<InetSocketAddress, Integer> weights;
+    private final boolean isWeightedKetama;
+    private final KetamaNodeLocatorConfiguration config;
+    private AtomicReference<TreeMap<Long, MemcachedNode>> ketamaNodes = new 
AtomicReference<>();
+    private AtomicReference<Collection<MemcachedNode>> allNodes = new 
AtomicReference<>();
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed 
hash
+     * algorithm.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg) {
+        this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new 
HashMap<>());
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key 
format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeKeyFormat the format used to name the nodes in Ketama, either
+     *          SPYMEMCACHED or LIBMEMCACHED
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg,
+                                    KetamaNodeKeyFormatter.Format 
nodeKeyFormat, Map<InetSocketAddress, Integer> weights) {
+        this(nodes, alg, weights, new 
DefaultKetamaNodeLocatorConfiguration(new 
KetamaNodeKeyFormatter(nodeKeyFormat)));
+    }
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed 
hash
+     * algorithm and configuration.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param conf
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg, KetamaNodeLocatorConfiguration conf) {
+        this(nodes, alg, new HashMap<>(), conf);
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key 
format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeWeights node weights for ketama, a map from 
InetSocketAddress to
+     *          weight as Integer
+     * @param configuration node locator configuration
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg,
+                                    Map<InetSocketAddress, Integer> 
nodeWeights, KetamaNodeLocatorConfiguration configuration) {
+        super();
+        allNodes.set(nodes);
+        hashAlg = alg;
+        config = configuration;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+        setKetamaNodes(nodes);
+    }
+
+    private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, 
Collection<MemcachedNode> an, HashAlgorithm alg,
+                                     Map<InetSocketAddress, Integer> 
nodeWeights, KetamaNodeLocatorConfiguration conf) {
+        super();
+        ketamaNodes.set(smn);
+        allNodes.set(an);
+        hashAlg = alg;
+        config = conf;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+    }
+
+    public Collection<MemcachedNode> getAll() {
+        return allNodes.get();
+    }
+
+    public MemcachedNode getPrimary(final String k) {
+        MemcachedNode rv = getNodeForKey(hashAlg.hash(k));
+        if (null == rv) {
+            throw new IllegalArgumentException("Found no node for key" + k);
+        }
+        return rv;
+    }
+
+    long getMaxKey() {
+        return getKetamaNodes().lastKey();
+    }
+
+    MemcachedNode getNodeForKey(long hash) {
+        final MemcachedNode rv;
+        if (!ketamaNodes.get().containsKey(hash)) {
+            // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5
+            // in a lot of places, so I'm doing this myself.
+            SortedMap<Long, MemcachedNode> tailMap = 
getKetamaNodes().tailMap(hash);
+            if (tailMap.isEmpty()) {
+                hash = getKetamaNodes().firstKey();
+            } else {
+                hash = tailMap.firstKey();
+            }
+        }
+        rv = getKetamaNodes().get(hash);
+        return rv;
+    }
+
+    /**
+     * the previous 7 may be too small to reduce the chance to get all down 
nodes
+     * @param k
+     * @return
+     */
+    public Iterator<MemcachedNode> getSequence(String k) {
+        // Seven searches gives us a 1 in 2^maxTry chance of hitting the
+        // same dead node all of the time.
+        int maxTry = config.getNodeRepetitions() + 1;
+        if (maxTry < 20) {
+            maxTry = 20;
+        }
+        return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
+    }
+
+    public NodeLocator getReadonlyCopy() {
+        TreeMap<Long, MemcachedNode> smn = new TreeMap<>(getKetamaNodes());
+        Collection<MemcachedNode> an = new ArrayList<>(allNodes.get().size());
+
+        // Rewrite the values a copy of the map.
+        for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) {
+            smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue()));
+        }
+
+        // Copy the allNodes collection.
+        for (MemcachedNode n : allNodes.get()) {
+            an.add(new MemcachedNodeROImpl(n));
+        }
+
+        return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config);
+    }
+
+    @Override
+    public void updateLocator(List<MemcachedNode> nodes) {
+        allNodes.set(nodes);
+        setKetamaNodes(nodes);
+    }
+
+    /**
+     * @return the ketamaNodes
+     */
+    protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
+        return ketamaNodes.get();
+    }
+
+    /**
+     * Setup the KetamaNodeLocator with the list of nodes it should use.
+     *
+     * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use 
in
+     *          its continuum
+     */
+    @SuppressWarnings({"squid:S3776"})
+    protected void setKetamaNodes(List<MemcachedNode> nodes) {
+        TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<>();
+        int numReps = config.getNodeRepetitions();
+        int nodeCount = nodes.size();
+        int totalWeight = 0;
+
+        if (isWeightedKetama) {
+            for (MemcachedNode node : nodes) {
+                totalWeight += weights.get(node.getSocketAddress());
+            }
+        }
+
+        for (MemcachedNode node : nodes) {
+            if (isWeightedKetama) {
+
+                int thisWeight = weights.get(node.getSocketAddress());
+                float percent = (totalWeight == 0 ? 0f : (float) thisWeight / 
(float) totalWeight);
+                int pointerPerServer = (int) ((Math.floor(
+                        (float) (percent * config.getNodeRepetitions() / 4 * 
nodeCount + 0.0000000001)))
+                        * 4);
+                for (int i = 0; i < pointerPerServer / 4; i++) {
+                    for (long position : ketamaNodePositionsAtIteration(node, 
i)) {
+                        newNodeMap.put(position, node);
+                        getLogger().debug("Adding node %s with weight %s in 
position %d", node, thisWeight, position);
+                    }
+                }
+            } else {
+                // Ketama does some special work with md5 where it reuses 
chunks.
+                // Check to be backwards compatible, the hash algorithm does 
not
+                // matter for Ketama, just the placement should always be done 
using
+                // MD5
+                if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
+                    for (int i = 0; i < numReps / 4; i++) {
+                        for (long position : 
ketamaNodePositionsAtIteration(node, i)) {
+                            newNodeMap.put(position, node);
+                            getLogger().debug("Adding node %s in position %d", 
node, position);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < numReps; i++) {
+                        newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, 
i)), node);
+                    }
+                }
+            }
+        }
+        assert newNodeMap.size() == numReps * nodes.size();
+        ketamaNodes.set(newNodeMap);
+    }
+
+    private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int 
iteration) {
+        List<Long> positions = new ArrayList<>();
+        byte[] digest = 
DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
+        for (int h = 0; h < 4; h++) {
+            long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) 
(digest[2 + h * 4] & 0xFF) << 16);
+            k |= ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 
0xFF);
+            positions.add(k);
+        }
+        return positions;
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
 
b/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
new file mode 100644
index 0000000000..4bae811cfa
--- /dev/null
+++ 
b/src/common-service/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached.protocol;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.UnsupportedAddressTypeException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationState;
+import net.spy.memcached.protocol.binary.TapAckOperationImpl;
+
+/**
+ * Represents a node with the net.spy.memcached cluster, along with buffering 
and
+ * operation queues.
+ *
+ * This is a modified version of the 
net.spy.memcached.protocol.TCPMemcachedNodeImpl
+ * Override the final method getSocketAddress() to refresh SocketAddress to 
achieve same hostname with ip changing
+ */
+public abstract class TCPMemcachedNodeImpl extends SpyObject implements 
MemcachedNode {
+
+    protected final BlockingQueue<Operation> writeQ;
+    private final SocketAddress socketAddress;
+    private final ByteBuffer rbuf;
+    private final ByteBuffer wbuf;
+    private final BlockingQueue<Operation> readQ;
+    private final BlockingQueue<Operation> inputQueue;
+    private final long opQueueMaxBlockTime;
+    private final long authWaitTime;
+    private final ConnectionFactory connectionFactory;
+    // operation Future.get timeout counter
+    private final AtomicInteger continuousTimeout = new AtomicInteger(0);
+    protected Operation optimizedOp = null;
+    private AtomicInteger reconnectAttempt = new AtomicInteger(1);
+    private SocketChannel channel;
+    private int toWrite = 0;
+    private SelectionKey sk = null;
+    private boolean shouldAuth = false;
+    private CountDownLatch authLatch;
+    private ArrayList<Operation> reconnectBlocked;
+    private long defaultOpTimeout;
+    private volatile long lastReadTimestamp = System.nanoTime();
+    private MemcachedConnection connection;
+
+    @SuppressWarnings({"squid:S107", "squid:S5993"})
+    public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int 
bufSize, BlockingQueue<Operation> rq,
+                                   BlockingQueue<Operation> wq, 
BlockingQueue<Operation> iq, long opQueueMaxBlockTime,
+                                   boolean waitForAuth, long dt, long 
authWaitTime, ConnectionFactory fact) {
+        super();
+        if (sa == null) {
+            throw new IllegalArgumentException("No SocketAddress");
+        }
+        if (c == null) {
+            throw new IllegalArgumentException("No SocketChannel");
+        }
+        if (bufSize <= 0) {
+            String msg = String.format("Invalid buffer size: %d", bufSize);
+            throw new IllegalArgumentException(msg);
+        }
+        if (rq == null) {
+            throw new IllegalArgumentException("No operation read queue");
+        }
+        if (wq == null) {
+            throw new IllegalArgumentException("No operation write queue");
+        }
+        if (iq == null) {
+            throw new IllegalArgumentException("No input queue");
+        }
+        socketAddress = sa;
+        connectionFactory = fact;
+        this.authWaitTime = authWaitTime;
+        setChannel(c);
+        // Since these buffers are allocated rarely (only on client creation
+        // or reconfigure), and are passed to Channel.read() and 
Channel.write(),
+        // use direct buffers to avoid
+        //   http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6214569
+        rbuf = ByteBuffer.allocateDirect(bufSize);
+        wbuf = ByteBuffer.allocateDirect(bufSize);
+        getWbuf().clear();
+        readQ = rq;
+        writeQ = wq;
+        inputQueue = iq;
+        this.opQueueMaxBlockTime = opQueueMaxBlockTime;
+        shouldAuth = waitForAuth;
+        defaultOpTimeout = dt;
+        setupForAuth();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#copyInputQueue()
+     */
+    public final void copyInputQueue() {
+        Collection<Operation> tmp = new ArrayList<>();
+
+        // don't drain more than we have space to place
+        inputQueue.drainTo(tmp, writeQ.remainingCapacity());
+        writeQ.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#destroyInputQueue()
+     */
+    public Collection<Operation> destroyInputQueue() {
+        Collection<Operation> rv = new ArrayList<>();
+        inputQueue.drainTo(rv);
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#setupResend()
+     */
+    public final void setupResend() {
+        // First, reset the current write op, or cancel it if we should
+        // be authenticating
+        Operation op = getCurrentWriteOp();
+        if (shouldAuth && op != null) {
+            op.cancel();
+        } else if (op != null) {
+            ByteBuffer buf = op.getBuffer();
+            if (buf != null) {
+                buf.reset();
+            } else {
+                getLogger().info("No buffer for current write op, removing");
+                removeCurrentWriteOp();
+            }
+        }
+        // Now cancel all the pending read operations. Might be better to
+        // to requeue them.
+        while (hasReadOp()) {
+            op = removeCurrentReadOp();
+            if (op != getCurrentWriteOp()) {
+                getLogger().warn("Discarding partially completed op: %s", op);
+                op.cancel();
+            }
+        }
+
+        while (shouldAuth && hasWriteOp()) {
+            op = removeCurrentWriteOp();
+            getLogger().warn("Discarding partially completed op: %s", op);
+            op.cancel();
+        }
+
+        getWbuf().clear();
+        getRbuf().clear();
+        toWrite = 0;
+    }
+
+    // Prepare the pending operations. Return true if there are any pending
+    // ops
+    private boolean preparePending() {
+        // Copy the input queue into the write queue.
+        copyInputQueue();
+
+        // Now check the ops
+        Operation nextOp = getCurrentWriteOp();
+        while (nextOp != null && nextOp.isCancelled()) {
+            getLogger().info("Removing cancelled operation: %s", nextOp);
+            removeCurrentWriteOp();
+            nextOp = getCurrentWriteOp();
+        }
+        return nextOp != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean)
+     */
+    public final void fillWriteBuffer(boolean shouldOptimize) {
+        if (toWrite == 0 && readQ.remainingCapacity() > 0) {
+            getWbuf().clear();
+            Operation o = getNextWritableOp();
+
+            while (o != null && toWrite < getWbuf().capacity()) {
+                synchronized (o) {
+                    assert o.getState() == OperationState.WRITING;
+
+                    ByteBuffer obuf = o.getBuffer();
+                    assert obuf != null : "Didn't get a write buffer from " + 
o;
+                    int bytesToCopy = Math.min(getWbuf().remaining(), 
obuf.remaining());
+                    byte[] b = new byte[bytesToCopy];
+                    obuf.get(b);
+                    getWbuf().put(b);
+                    getLogger().debug("After copying stuff from %s: %s", o, 
getWbuf());
+                    if (!o.getBuffer().hasRemaining()) {
+                        o.writeComplete();
+                        transitionWriteItem();
+
+                        preparePending();
+                        if (shouldOptimize) {
+                            optimize();
+                        }
+
+                        o = getNextWritableOp();
+                    }
+                    toWrite += bytesToCopy;
+                }
+            }
+            getWbuf().flip();
+            assert toWrite <= getWbuf().capacity() : "toWrite exceeded 
capacity: " + this;
+            assert toWrite == getWbuf().remaining() : "Expected " + toWrite + 
" remaining, got "
+                    + getWbuf().remaining();
+        } else {
+            getLogger().debug("Buffer is full, skipping");
+        }
+    }
+
+    private Operation getNextWritableOp() {
+        Operation o = getCurrentWriteOp();
+        while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
+            synchronized (o) {
+                if (o.isCancelled()) {
+                    getLogger().debug("Not writing cancelled op.");
+                    Operation cancelledOp = removeCurrentWriteOp();
+                    assert o == cancelledOp;
+                } else if (o.isTimedOut(defaultOpTimeout)) {
+                    getLogger().debug("Not writing timed out op.");
+                    Operation timedOutOp = removeCurrentWriteOp();
+                    assert o == timedOutOp;
+                } else {
+                    o.writing();
+                    if (!(o instanceof TapAckOperationImpl)) {
+                        readQ.add(o);
+                    }
+                    return o;
+                }
+                o = getCurrentWriteOp();
+            }
+        }
+        return o;
+    }
+
+    /* (non-Javadoc)
+     * @see net.spy.net.spy.memcached.MemcachedNode#transitionWriteItem()
+     */
+    public final void transitionWriteItem() {
+        Operation op = removeCurrentWriteOp();
+        assert op != null : "There is no write item to transition";
+        getLogger().debug("Finished writing %s", op);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#optimize()
+     */
+    protected abstract void optimize();
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getCurrentReadOp()
+     */
+    public final Operation getCurrentReadOp() {
+        return readQ.peek();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#removeCurrentReadOp()
+     */
+    public final Operation removeCurrentReadOp() {
+        return readQ.remove();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getCurrentWriteOp()
+     */
+    public final Operation getCurrentWriteOp() {
+        return optimizedOp == null ? writeQ.peek() : optimizedOp;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#removeCurrentWriteOp()
+     */
+    public final Operation removeCurrentWriteOp() {
+        Operation rv = optimizedOp;
+        if (rv == null) {
+            rv = writeQ.remove();
+        } else {
+            optimizedOp = null;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#hasReadOp()
+     */
+    public final boolean hasReadOp() {
+        return !readQ.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#hasWriteOp()
+     */
+    public final boolean hasWriteOp() {
+        return !(optimizedOp == null && writeQ.isEmpty());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
net.spy.net.spy.memcached.MemcachedNode#addOp(net.spy.net.spy.memcached.ops.Operation)
+     */
+    public final void addOp(Operation op) {
+        try {
+            if (!authLatch.await(authWaitTime, TimeUnit.MILLISECONDS)) {
+                FailureMode mode = connectionFactory.getFailureMode();
+                if (mode == FailureMode.Redistribute || mode == 
FailureMode.Retry) {
+                    getLogger().debug("Redistributing Operation " + op + " 
because auth " + "latch taken longer than "
+                            + authWaitTime + " milliseconds to " + "complete 
on node " + getSocketAddress());
+                    connection.retryOperation(op);
+                } else {
+                    op.cancel();
+                    getLogger().warn("Operation canceled because 
authentication "
+                            + "or reconnection and authentication has " + 
"taken more than " + authWaitTime
+                            + " milliseconds to " + "complete on node " + 
this);
+                    getLogger().debug("Canceled operation %s", op.toString());
+                }
+                return;
+            }
+            if (!inputQueue.offer(op, opQueueMaxBlockTime, 
TimeUnit.MILLISECONDS)) {
+                throw new IllegalStateException(
+                        "Timed out waiting to add " + op + "(max wait=" + 
opQueueMaxBlockTime + "ms)");
+            }
+        } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Interrupted while waiting to add 
" + op);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * 
net.spy.net.spy.memcached.MemcachedNode#insertOp(net.spy.net.spy.memcached.ops.Operation)
+     */
+    public final void insertOp(Operation op) {
+        ArrayList<Operation> tmp = new ArrayList<>(inputQueue.size() + 1);
+        tmp.add(op);
+        inputQueue.drainTo(tmp);
+        inputQueue.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getSelectionOps()
+     */
+    public final int getSelectionOps() {
+        int rv = 0;
+        if (getChannel().isConnected()) {
+            if (hasReadOp()) {
+                rv |= SelectionKey.OP_READ;
+            }
+            if (toWrite > 0 || hasWriteOp()) {
+                rv |= SelectionKey.OP_WRITE;
+            }
+        } else {
+            rv = SelectionKey.OP_CONNECT;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getRbuf()
+     */
+    public final ByteBuffer getRbuf() {
+        return rbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getWbuf()
+     */
+    public final ByteBuffer getWbuf() {
+        return wbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getSocketAddress()
+     */
+    public final SocketAddress getSocketAddress() {
+        if (!(socketAddress instanceof InetSocketAddress)) {
+            throw new UnsupportedAddressTypeException();
+        }
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) 
socketAddress;
+        return new InetSocketAddress(inetSocketAddress.getHostName(), 
inetSocketAddress.getPort());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#isActive()
+     */
+    public final boolean isActive() {
+        return reconnectAttempt.get() == 0 && getChannel() != null && 
getChannel().isConnected();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#isAuthenticated()
+     */
+    public boolean isAuthenticated() {
+        return (0 == authLatch.getCount());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#reconnecting()
+     */
+    public final void reconnecting() {
+        reconnectAttempt.incrementAndGet();
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#connected()
+     */
+    public final void connected() {
+        reconnectAttempt.set(0);
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getReconnectCount()
+     */
+    public final int getReconnectCount() {
+        return reconnectAttempt.get();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#toString()
+     */
+    @Override
+    public final String toString() {
+        int sops = 0;
+        if (getSk() != null && getSk().isValid()) {
+            sops = getSk().interestOps();
+        }
+        int rsize = readQ.size() + (optimizedOp == null ? 0 : 1);
+        int wsize = writeQ.size();
+        int isize = inputQueue.size();
+        return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize + ", 
#Wops=" + wsize + ", #iq=" + isize + ", topRop="
+                + getCurrentReadOp() + ", topWop=" + getCurrentWriteOp() + ", 
toWrite=" + toWrite + ", interested="
+                + sops + "}";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * net.spy.net.spy.memcached.MemcachedNode#registerChannel
+     * (java.nio.channels.SocketChannel, java.nio.channels.SelectionKey)
+     */
+    public final void registerChannel(SocketChannel ch, SelectionKey skey) {
+        setChannel(ch);
+        setSk(skey);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getChannel()
+     */
+    public final SocketChannel getChannel() {
+        return channel;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * 
net.spy.net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel)
+     */
+    public final void setChannel(SocketChannel to) {
+        assert channel == null || !channel.isOpen() : "Attempting to overwrite 
channel";
+        channel = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getSk()
+     */
+    public final SelectionKey getSk() {
+        return sk;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
net.spy.net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey)
+     */
+    public final void setSk(SelectionKey to) {
+        sk = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer()
+     */
+    public final int getBytesRemainingToWrite() {
+        return toWrite;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#writeSome()
+     */
+    public final int writeSome() throws IOException {
+        int wrote = channel.write(wbuf);
+        assert wrote >= 0 : "Wrote negative bytes?";
+        toWrite -= wrote;
+        assert toWrite >= 0 : "toWrite went negative after writing " + wrote + 
" bytes for " + this;
+        getLogger().debug("Wrote %d bytes", wrote);
+        return wrote;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#getContinuousTimeout
+     */
+    public int getContinuousTimeout() {
+        return continuousTimeout.get();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.net.spy.memcached.MemcachedNode#setContinuousTimeout
+     */
+    public void setContinuousTimeout(boolean timedOut) {
+        if (timedOut && isActive()) {
+            continuousTimeout.incrementAndGet();
+        } else {
+            continuousTimeout.set(0);
+        }
+    }
+
+    public final void fixupOps() {
+        // As the selection key can be changed at any point due to node
+        // failure, we'll grab the current volatile value and configure it.
+        SelectionKey s = sk;
+        if (s != null && s.isValid()) {
+            int iops = getSelectionOps();
+            getLogger().debug("Setting interested opts to %d", iops);
+            s.interestOps(iops);
+        } else {
+            getLogger().debug("Selection key is not valid.");
+        }
+    }
+
+    public final void authComplete() {
+        if (reconnectBlocked != null && reconnectBlocked.isEmpty()) {
+            inputQueue.addAll(reconnectBlocked);
+        }
+        authLatch.countDown();
+    }
+
+    public final void setupForAuth() {
+        if (shouldAuth) {
+            authLatch = new CountDownLatch(1);
+            if (inputQueue.isEmpty()) {
+                reconnectBlocked = new ArrayList<>(inputQueue.size() + 1);
+                inputQueue.drainTo(reconnectBlocked);
+            }
+            assert (inputQueue.isEmpty());
+            setupResend();
+        } else {
+            authLatch = new CountDownLatch(0);
+        }
+    }
+
+    /**
+     * Number of milliseconds since the last read of this node completed.
+     *
+     * @return milliseconds since last read.
+     */
+    public long lastReadDelta() {
+        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastReadTimestamp);
+    }
+
+    /**
+     * Mark this node as having just completed a read.
+     */
+    public void completedRead() {
+        lastReadTimestamp = System.nanoTime();
+    }
+
+    @Override
+    public MemcachedConnection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public void setConnection(MemcachedConnection connection) {
+        this.connection = connection;
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java
new file mode 100644
index 0000000000..60499c85dc
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CacheStats.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+public class CacheStats {
+    private final long getBytes;
+    private final long getTime;
+    private final long putBytes;
+    private final CacheStatsCounter cacheStatsCounter;
+
+    public CacheStats(long getBytes, long getTime, long putBytes, 
CacheStatsCounter cacheStatsCounter) {
+        this.getBytes = getBytes;
+        this.getTime = getTime;
+        this.putBytes = putBytes;
+        this.cacheStatsCounter = cacheStatsCounter;
+    }
+
+    static class CacheStatsCounter {
+        final long numHits;
+        final long numMisses;
+        final long numPut;
+        final long numEvictions;
+        final long numTimeouts;
+        final long numErrors;
+
+        CacheStatsCounter(long numPut, long numHits, long numMisses,
+                          long numEvictions, long numTimeouts, long numErrors) 
{
+            this.numPut = numPut;
+            this.numHits = numHits;
+            this.numMisses = numMisses;
+            this.numEvictions = numEvictions;
+            this.numTimeouts = numTimeouts;
+            this.numErrors = numErrors;
+        }
+    }
+
+    public long getNumHits() {
+        return cacheStatsCounter.numHits;
+    }
+
+    public long getNumMisses() {
+        return cacheStatsCounter.numMisses;
+    }
+
+    public long getNumGet() {
+        return cacheStatsCounter.numHits + cacheStatsCounter.numMisses;
+    }
+
+    public long getNumGetBytes() {
+        return getBytes;
+    }
+
+    public long getNumPutBytes() {
+        return putBytes;
+    }
+
+    public long getNumPut() {
+        return cacheStatsCounter.numPut;
+    }
+
+    public long getNumEvictions() {
+        return cacheStatsCounter.numEvictions;
+    }
+
+    public long getNumTimeouts() {
+        return cacheStatsCounter.numTimeouts;
+    }
+
+    public long getNumErrors() {
+        return cacheStatsCounter.numErrors;
+    }
+
+    public long numLookups() {
+        return cacheStatsCounter.numHits + cacheStatsCounter.numMisses;
+    }
+
+    public double hitRate() {
+        long lookups = numLookups();
+        return lookups == 0 ? 0 : cacheStatsCounter.numHits / (double) lookups;
+    }
+
+    public long avgGetBytes() {
+        return getBytes == 0 ? 0 : getBytes / numLookups();
+    }
+
+    public long getAvgGetTime() {
+        return getTime / numLookups();
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java
new file mode 100644
index 0000000000..67dcf62041
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/CompositeMemcachedCache.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.Singletons;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.rest.service.CommonQueryCacheSupporter;
+import org.slf4j.Logger;
+import org.apache.kylin.rest.cache.KylinCache;
+import org.slf4j.LoggerFactory;
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.SimpleValueWrapper;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This is a cache manager for memcached which implements KylinCache.
+ * It has a map contains memcached clients created for different types of 
cache constants.
+ */
+public class CompositeMemcachedCache implements KylinCache {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CompositeMemcachedCache.class);
+
+    private static final String PREFIX = "Kylin";
+
+    private static final ConcurrentMap<String, Cache> cacheMap = new 
ConcurrentHashMap<>(16);
+
+    private static final MemcachedCacheConfig memcachedCacheConfig = 
Singletons.getInstance(MemcachedCacheConfig.class);
+
+    private static final Cache exceptionCache = new MemCachedCacheAdaptor(
+            new 
MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, 
CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName, 86400)));
+
+    private static final Cache schemaCache = new MemCachedCacheAdaptor(
+            new 
MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, 
CommonQueryCacheSupporter.Type.SCHEMA_CACHE.rootCacheName, 86400)));
+
+    private static final Cache successCache = new MemCachedCacheAdaptor(
+            new 
MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, 
CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName)));
+
+    static {
+         
cacheMap.put(CommonQueryCacheSupporter.Type.EXCEPTION_QUERY_CACHE.rootCacheName,
 exceptionCache);
+         
cacheMap.put(CommonQueryCacheSupporter.Type.SCHEMA_CACHE.rootCacheName, 
schemaCache);
+         
cacheMap.put(CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName, 
successCache);
+    }
+
+    public static KylinCache getInstance() {
+        try {
+            return Singletons.getInstance(CompositeMemcachedCache.class);
+        } catch (RuntimeException e) {
+            logger.error("Memcached init failed", e);
+        }
+        return null;
+    }
+
+    private void checkCacheType(String type) {
+        if (type == null) {
+            throw new NullPointerException("type can't be null");
+        }
+
+        if (!cacheMap.containsKey(type)) {
+            throw new IllegalArgumentException("unsupported rootCacheName: " + 
type);
+        }
+    }
+
+    private String getTypeProjectPrefix(String type, String project) {
+        return String.format(Locale.ROOT, "%s-%s-%s-", PREFIX, type, project);
+    }
+
+    protected String serializeKey(Object key) {
+        try {
+            return JsonUtil.writeValueAsString(key);
+        } catch (JsonProcessingException e) {
+            logger.warn("Can not convert key to String.", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void put(String type, String project, Object key, Object value) {
+         checkCacheType(type);
+         String keyS = serializeKey(key);
+         if (keyS == null) {
+             logger.warn("write to cash failed for key can not convert to 
String");
+             return;
+         }
+         keyS = getTypeProjectPrefix(type, project) + keyS;
+         cacheMap.get(type).put(keyS, value);
+    }
+
+    @Override
+    public void update(String type, String project, Object key, Object value) {
+        checkCacheType(type);
+        String keyS = serializeKey(key);
+        if (keyS == null) {
+            logger.warn("write to cache failed for key can not convert to 
String");
+            return;
+        }
+        keyS = getTypeProjectPrefix(type, project) + keyS;
+        cacheMap.get(type).put(keyS, value);
+    }
+
+    @Override
+    public Object get(String type, String project, Object key) {
+        checkCacheType(type);
+        String keyS = serializeKey(key);
+        if (keyS == null){
+            logger.warn("read from cache failed for key can not convert to 
String");
+            return null;
+        }
+        keyS = getTypeProjectPrefix(type, project) + keyS;
+        SimpleValueWrapper valueWrapper = 
(SimpleValueWrapper)cacheMap.get(type).get(keyS);
+        return valueWrapper == null ? null : valueWrapper.get();
+    }
+
+    @Override
+    public boolean remove(String type, String project, Object key) {
+        checkCacheType(type);
+        String keyS = serializeKey(key);
+        if (keyS == null) {
+            logger.warn("evict cache failed for key can not convert to 
String");
+            return false;
+        }
+        keyS = getTypeProjectPrefix(type, project) + keyS;
+        if (cacheMap.get(type).get(keyS) == null) {
+            return false;
+        }
+        cacheMap.get(type).evict(keyS);
+        return true;
+    }
+
+    @Override
+    public void clearAll() {
+        for (Cache cache : cacheMap.values()) {
+            cache.clear();
+        }
+    }
+
+    @Override
+    public void clearByType(String type, String project) {
+        checkCacheType(type);
+        String pattern = getTypeProjectPrefix(type, project);
+        Cache cache = cacheMap.get(type);
+        if (cache instanceof MemCachedCacheAdaptor) {
+            ((MemCachedCacheAdaptor) cache).clearByType(pattern);
+        } else {
+            logger.warn("cache do not support clear by project");
+            cacheMap.clear();
+        }
+    }
+
+    public String getName(String type) {
+        checkCacheType(type);
+        return cacheMap.get(type).getName();
+    }
+
+    public CacheStats getCacheStats(String type) {
+        checkCacheType(type);
+        Cache cache = cacheMap.get(type);
+        if (cache instanceof MemCachedCacheAdaptor) {
+            return ((MemCachedCacheAdaptor) cache).getCacheStats();
+        } else {
+            logger.warn("only support get cache stats with memcached adaptor, 
otherwise will return null");
+            return null;
+        }
+    }
+
+    public static class MemCachedCacheAdaptor implements Cache {
+        private MemcachedCache memcachedCache;
+
+        public MemCachedCacheAdaptor(MemcachedCache memcachedCache) {
+            this.memcachedCache = memcachedCache;
+        }
+
+        @Override
+        public String getName() {
+            return memcachedCache.getName();
+        }
+
+        @Override
+        public Object getNativeCache() {
+            return memcachedCache.getNativeCache();
+        }
+
+        public CacheStats getCacheStats() {
+            return memcachedCache.getStats();
+        }
+
+        @Override
+        public ValueWrapper get(Object key) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null || value.length == 0) {
+                return null;
+            }
+            return new 
SimpleValueWrapper(SerializationUtils.deserialize(value));
+        }
+
+        @Override
+        public void put(Object key, Object value) {
+            memcachedCache.put(key, value);
+        }
+
+        @Override
+        public void evict(Object key) {
+            memcachedCache.evict(key);
+        }
+
+        @Override
+        public void clear() {
+            memcachedCache.clear();
+        }
+
+        public void clearByType(String pattern) {
+            memcachedCache.clearByType(pattern);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> T get(Object key, Class<T> type) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null || value.length == 0) {
+                return null;
+            }
+            Object obj = SerializationUtils.deserialize(value);
+            if (obj != null && type != null && !type.isInstance(value)) {
+                throw new IllegalStateException(
+                        "Cached value is not of required type [" + 
type.getName() + "]: " + Arrays.toString(value));
+            }
+            return (T) obj;
+        }
+
+        @Override
+        public <T> T get(Object key, Callable<T> valueLoader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        //Without atomicity, this method should not be invoked
+        public ValueWrapper putIfAbsent(Object key, Object value) {
+            //implementation here doesn't guarantee the atomicity.
+            byte[] existing = memcachedCache.get(key);
+            if (existing == null || existing.length == 0) {
+                memcachedCache.put(key, value);
+                return null;
+            } else {
+                return new 
SimpleValueWrapper(SerializationUtils.deserialize(existing));
+            }
+        }
+    }
+}
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java
new file mode 100644
index 0000000000..844939e4e1
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/KeyHookLookup.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Class implement this interface indicates that the key information need to 
be calculated from a first lookup from cache itself to get
+ * a hook.
+ */
+public interface KeyHookLookup {
+    KeyHook lookupKeyHook(String key);
+
+    public static class KeyHook implements Serializable {
+        private static final long serialVersionUID = 2400159460862757991L;
+
+        private String[] chunkskey;
+        private byte[] values;
+
+        /**
+         * For de-serialization
+         */
+        public KeyHook() {
+        }
+
+        public KeyHook(String[] chunkskey, byte[] values) {
+            super();
+            this.chunkskey = chunkskey;
+            this.values = values;
+        }
+
+        public String[] getChunkskey() {
+            return chunkskey;
+        }
+
+        public void setChunkskey(String[] chunkskey) {
+            this.chunkskey = chunkskey;
+        }
+
+        public byte[] getValues() {
+            return values;
+        }
+
+        public void setValues(byte[] values) {
+            this.values = values;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(chunkskey);
+            result = prime * result + Arrays.hashCode(values);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            KeyHook other = (KeyHook) obj;
+            if (!Arrays.equals(chunkskey, other.chunkskey))
+                return false;
+            return Arrays.equals(values, other.values);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (chunkskey != null) {
+                builder.append("chunkskey_length:" + chunkskey.length);
+            } else {
+                builder.append("chunkskey_is_null");
+            }
+            builder.append("|");
+            if (values != null) {
+                builder.append("value_length:" + values.length);
+            } else {
+                builder.append("value_is_null");
+            }
+            return builder.toString();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java
new file mode 100644
index 0000000000..b54e4afe76
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCache.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.DataFormatException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.ops.ArrayOperationQueueFactory;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+/**
+ * This is a cache backend of Memcached. The implementation leverages 
spymemcached client to talk to the servers.
+ * Memcached itself has a limitation to the size of the key. So the real key 
for cache lookup is hashed from the orginal key.
+ * The implementation provides a way for hash collsion detection. It can also 
compress/decompress the value bytes based on the preconfigred compression 
threshold to save network bandwidth and storage space.
+ */
+public class MemcachedCache {
+    public static final int MAX_PREFIX_LENGTH = 
MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash
+            - 40 // length of key hash
+            - 2; // length of separators
+    private static final Logger logger = 
LoggerFactory.getLogger(MemcachedCache.class);
+    private static final int DEFAULT_TTL = 7 * 24 * 3600;
+
+    private static final String UNABLE_TO_QUEUE_CACHE_OPERATION = "Unable to 
queue cache operation.";
+
+    protected final MemcachedCacheConfig config;
+    protected final MemcachedClientIF client;
+    protected final String memcachedPrefix;
+    protected final int compressThreshold;
+    protected final AtomicLong hitCount = new AtomicLong(0);
+    protected final AtomicLong missCount = new AtomicLong(0);
+    protected final AtomicLong readBytes = new AtomicLong(0);
+    protected final AtomicLong timeoutCount = new AtomicLong(0);
+    protected final AtomicLong errorCount = new AtomicLong(0);
+    protected final AtomicLong putCount = new AtomicLong(0);
+    protected final AtomicLong putBytes = new AtomicLong(0);
+    private final int timeToLiveSeconds;
+    protected AtomicLong cacheGetTime = new AtomicLong(0);
+
+    public MemcachedCache(final MemcachedClientIF client, final 
MemcachedCacheConfig config,
+                          final String memcachedPrefix, int timeToLiveSeconds) 
{
+        Preconditions.checkArgument(memcachedPrefix.length() <= 
MAX_PREFIX_LENGTH,
+                "memcachedPrefix length [%d] exceeds maximum length [%d]", 
memcachedPrefix.length(), MAX_PREFIX_LENGTH);
+        this.memcachedPrefix = memcachedPrefix;
+        this.client = client;
+        this.config = config;
+        this.compressThreshold = config.getMaxObjectSize() / 2;
+        this.timeToLiveSeconds = timeToLiveSeconds;
+    }
+
+    public MemcachedCache(MemcachedCache cache) {
+        this(cache.client, cache.config, cache.memcachedPrefix, 
cache.timeToLiveSeconds);
+    }
+
+    /**
+     * Create and return the MemcachedCache. Each time call this method will 
create a new instance.
+     * @param config            The MemcachedCache configuration to control 
the cache behavior.
+     * @return
+     */
+    public static MemcachedCache create(final MemcachedCacheConfig config, 
String memcachedPrefix) {
+        return create(config, memcachedPrefix, DEFAULT_TTL);
+    }
+
+    public static MemcachedCache create(final MemcachedCacheConfig config, 
String memcachedPrefix, int timeToLive) {
+        try {
+            SerializingTranscoder transcoder = new 
SerializingTranscoder(config.getMaxObjectSize());
+            // always no compression inside, we compress/decompress outside
+            transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+            OperationQueueFactory opQueueFactory;
+            int maxQueueSize = config.getMaxOperationQueueSize();
+            if (maxQueueSize > 0) {
+                opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize);
+            } else {
+                opQueueFactory = new LinkedOperationQueueFactory();
+            }
+            String hostsStr = config.getHosts();
+            ConnectionFactory connectionFactory = new 
MemcachedConnectionFactoryBuilder()
+                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                    .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                    
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                    
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                    
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                    
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+            return new MemcachedCache(new MemcachedClient(new 
MemcachedConnectionFactory(connectionFactory),
+                    getResolvedAddrList(hostsStr)), config, memcachedPrefix, 
timeToLive);
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static List<InetSocketAddress> getResolvedAddrList(String hostsStr) 
{
+        List<InetSocketAddress> addrs = AddrUtil.getAddresses(hostsStr);
+        Iterator<InetSocketAddress> addrIterator = addrs.iterator();
+        while (addrIterator.hasNext()) {
+            if (addrIterator.next().isUnresolved()) {
+                addrIterator.remove();
+            }
+        }
+        return addrs;
+    }
+
+    public String getName() {
+        return memcachedPrefix;
+    }
+
+    public Object getNativeCache() {
+        return client;
+    }
+
+    protected byte[] serializeValue(Object value) {
+        return SerializationUtils.serialize((Serializable) value);
+    }
+
+    @VisibleForTesting
+    byte[] encodeValue(String keyS, Object value) {
+        if (keyS == null) {
+            return new byte[0];
+        }
+        return encodeValue(keyS.getBytes(StandardCharsets.UTF_8), 
serializeValue(value));
+    }
+
+    /**
+     * This method is used to get value object based on key from the Cache. 
The key should have been convert into
+     * json string in previous procession. it calls getBinary() method to 
compute hashed key from the original key
+     * string, and use this as the real key to do lookup from internal Cache. 
Then decodes the real values bytes
+     * from the cache lookup result and leverages object serializer to convert 
value bytes to object.
+     */
+    public byte[] get(Object key) {
+        return get((String) key);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public byte[] get(String keyS) {
+        return getBinary(keyS);
+    }
+
+    /**
+     * This method is used to put key/value objects to the Cache. The key 
should have been convert into
+     * json string in previous procession. it will call putBinary() method to 
compute hashed key from
+     * the original key string and encode the original key bytes into value 
bytes for hash conflicts detection.
+     */
+    public void put(Object key, Object value) {
+        put((String) key, value);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void put(String keyS, Object value) {
+        if (keyS != null) {
+            putBinary(keyS, serializeValue(value), timeToLiveSeconds);
+        }
+    }
+
+    public void evict(Object key) {
+        if (key == null)
+            return;
+        evict((String) key);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void evict(String keyS) {
+        if (keyS == null)
+            return;
+        client.delete(computeKeyHash(keyS));
+    }
+
+    //currently memcached not support fuzzy matching. this method will clear 
remote cache of all project.
+    public void clearByType(String pattern) {
+        logger.debug("clear by pattern: {} will caused clear all method here", 
pattern);
+        this.clear();
+    }
+
+    public void clear() {
+        logger.warn("Clear Remote Cache!");
+        Future<Boolean> resultFuture = client.flush();
+        try {
+            boolean result = resultFuture.get();
+            logger.warn("Clear Remote Cache returned with result: {}", result);
+        } catch (ExecutionException | InterruptedException e) {
+            logger.warn("Can't clear Remote Cache.", e);
+            // Restore interrupted state...
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public CacheStats getStats() {
+        return new CacheStats(readBytes.get(), cacheGetTime.get(), 
putBytes.get(),
+                new CacheStats.CacheStatsCounter(putCount.get(), 
hitCount.get(),
+                missCount.get(), 0, timeoutCount.get(), errorCount.get()));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @return the serialized value
+     */
+    protected byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return new byte[0];
+        }
+        byte[] bytes = internalGet(computeKeyHash(keyS));
+        return decodeValue(keyS.getBytes(StandardCharsets.UTF_8), bytes);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @param valueB should be the serialized value
+     */
+    protected void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        internalPut(computeKeyHash(keyS), 
encodeValue(keyS.getBytes(StandardCharsets.UTF_8), valueB), expiration);
+    }
+
+    protected byte[] internalGet(String hashedKey) {
+        Future<Object> future;
+        long start = System.currentTimeMillis();
+        try {
+            future = client.asyncGet(hashedKey);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, e);
+            return new byte[0];
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, t);
+            return new byte[0];
+        }
+
+        try {
+            byte[] result;
+            if (future != null) {
+                result = (byte[]) future.get(config.getTimeout(), 
TimeUnit.MILLISECONDS);
+            } else {
+                result = null;
+            }
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (result != null) {
+                hitCount.incrementAndGet();
+                readBytes.addAndGet(result.length);
+            } else {
+                missCount.incrementAndGet();
+            }
+            return result;
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            future.cancel(false);
+            return new byte[0];
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling key meta from 
cache.", e);
+            return new byte[0];
+        }
+    }
+
+    private void internalPut(String hashedKey, byte[] encodedValue, int 
expiration) {
+        try {
+            client.set(hashedKey, expiration, encodedValue);
+            putCount.incrementAndGet();
+            putBytes.addAndGet(encodedValue.length);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, e);
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error(UNABLE_TO_QUEUE_CACHE_OPERATION, t);
+        }
+    }
+
+    protected byte[] encodeValue(byte[] key, byte[] valueB) {
+        byte[] compressed = null;
+        if (config.isEnableCompression() && (valueB.length + Ints.BYTES + 
key.length > compressThreshold)) {
+            try {
+                compressed = 
CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + 
valueB.length)
+                        .putInt(key.length).put(key).put(valueB).array());
+            } catch (IOException e) {
+                compressed = null;
+                logger.warn("Compressing value bytes error.", e);
+            }
+        }
+        if (compressed != null) {
+            return ByteBuffer.allocate(Shorts.BYTES + 
compressed.length).putShort((short) 1).put(compressed).array();
+        } else {
+            return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length 
+ valueB.length).putShort((short) 0)
+                    .putInt(key.length).put(key).put(valueB).array();
+        }
+    }
+
+    protected byte[] decodeValue(byte[] key, byte[] valueE) {
+        if (valueE == null || valueE.length == 0)
+            return new byte[0];
+        ByteBuffer buf = ByteBuffer.wrap(valueE);
+        short enableCompression = buf.getShort();
+        byte[] uncompressed = null;
+        if (enableCompression == 1) {
+            byte[] value = new byte[buf.remaining()];
+            buf.get(value);
+            try {
+                uncompressed = CompressionUtils.decompress(value);
+            } catch (IOException | DataFormatException e) {
+                logger.error("Decompressing value bytes error.", e);
+                return new byte[0];
+            }
+        }
+        if (uncompressed != null) {
+            buf = ByteBuffer.wrap(uncompressed);
+        }
+        final int keyLength = buf.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        buf.get(keyBytes);
+        if (!Arrays.equals(keyBytes, key)) {
+            logger.error("Keys do not match, possible hash collision!");
+            return new byte[0];
+        }
+        byte[] value = new byte[buf.remaining()];
+        buf.get(value);
+        return value;
+    }
+
+    @SuppressWarnings({"squid:S4790"})
+    protected String computeKeyHash(String key) {
+        // hash keys to keep things under 250 characters for net.spy.memcached
+        return 
Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(),
 this.memcachedPrefix,
+                DigestUtils.sha1Hex(key));
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java
new file mode 100644
index 0000000000..76465b4a76
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheConfig.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import org.apache.kylin.common.KylinConfig;
+
+import net.spy.memcached.DefaultConnectionFactory;
+
+public class MemcachedCacheConfig {
+
+    public MemcachedCacheConfig() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        hosts = kylinConfig.getMemcachedHosts();
+        timeout = kylinConfig.getMemcachedOpTimeout();
+        maxChunkSize = kylinConfig.getMaxChunkSize();
+        maxObjectSize = kylinConfig.getMaxObjectSize();
+        enableCompression = kylinConfig.isEnableCompression();
+    }
+
+    private long timeout;
+
+    // comma delimited list of net.spy.memcached servers, given as host:port 
combination
+    private String hosts;
+
+    private int maxChunkSize;
+
+    private int maxObjectSize;
+
+    // net.spy.memcached client read buffer size, -1 uses the spymemcached 
library default
+    private int readBufferSize = 
DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
+
+    // maximum operation queue size. 0 means unbounded
+    private int maxOperationQueueSize = 0;
+
+    // whether enable compress the value data or not
+    private boolean enableCompression;
+
+    // only for test
+    private boolean enableDebugLog = false;
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public int getMaxChunkSize() {
+        return maxChunkSize;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+        this.maxChunkSize = maxChunkSize;
+    }
+
+    public int getMaxObjectSize() {
+        return maxObjectSize;
+    }
+
+    public void setMaxObjectSize(int maxObjectSize) {
+        this.maxObjectSize = maxObjectSize;
+    }
+
+    public int getMaxOperationQueueSize() {
+        return maxOperationQueueSize;
+    }
+
+    public void setMaxOperationQueueSize(int maxOperationQueueSize) {
+        this.maxOperationQueueSize = maxOperationQueueSize;
+    }
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+
+    // only for test
+    public void setEnableDebugLog() {
+        this.enableDebugLog = true;
+    }
+
+    // only for test
+    public boolean isEnableDebugLog() {
+        return enableDebugLog;
+    }
+
+    public void setReadBufferSize(int readBufferSize) {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public boolean isEnableCompression() {
+        return enableCompression;
+    }
+
+    public void setEnableCompression(boolean enableCompression) {
+        this.enableCompression = enableCompression;
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java
new file mode 100644
index 0000000000..94c810eeb2
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCache.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.internal.BulkFuture;
+
+/**
+ * Subclass of MemcachedCache. It supports storing large objects.  Memcached 
itself has a limitation to the value size with default value of 1M.
+ * This implement extends the limit to 1G and can split huge bytes to multiple 
chunks. It will take care of the data integrity if part of the chunks lost(due 
to server restart or other reasons)
+ *
+ * @author mingmwang
+ */
+public class MemcachedChunkingCache extends MemcachedCache implements 
KeyHookLookup {
+    private static final Logger logger = 
LoggerFactory.getLogger(MemcachedChunkingCache.class);
+
+    public MemcachedChunkingCache(MemcachedCache cache) {
+        super(cache);
+        Preconditions.checkArgument(config.getMaxChunkSize() > 1, 
"maxChunkSize [%d] must be greater than 1",
+                config.getMaxChunkSize());
+        Preconditions.checkArgument(config.getMaxObjectSize() > 261, 
"maxObjectSize [%d] must be greater than 261",
+                config.getMaxObjectSize());
+    }
+
+    protected static byte[][] splitBytes(final byte[] data, final int nSplit) {
+        byte[][] dest = new byte[nSplit][];
+
+        final int splitSize = (data.length - 1) / nSplit + 1;
+        for (int i = 0; i < nSplit - 1; i++) {
+            dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * 
splitSize);
+        }
+        dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, 
data.length);
+
+        return dest;
+    }
+
+    protected static int getValueSplit(MemcachedCacheConfig config, String 
keyS, int valueBLen) {
+        // the number 6 means the chunk number size never exceeds 6 bytes
+        final int valueSize = config.getMaxObjectSize() - Shorts.BYTES - 
Ints.BYTES
+                - keyS.getBytes(StandardCharsets.UTF_8).length - 6;
+        final int maxValueSize = config.getMaxChunkSize() * valueSize;
+        Preconditions.checkArgument(valueBLen <= maxValueSize,
+                "the value bytes length [%d] exceeds maximum value size [%d]", 
valueBLen, maxValueSize);
+        return (valueBLen - 1) / valueSize + 1;
+    }
+
+    protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, 
String keyS, byte[] valueB) {
+        KeyHook keyHook;
+        byte[][] splitValueB = null;
+        if (nSplit > 1) {
+            String[] chunkKeySs = new String[nSplit];
+            for (int i = 0; i < nSplit; i++) {
+                chunkKeySs[i] = keyS + i;
+            }
+            keyHook = new KeyHook(chunkKeySs, null);
+            splitValueB = splitBytes(valueB, nSplit);
+        } else {
+            keyHook = new KeyHook(null, valueB);
+        }
+
+        return new Pair<>(keyHook, splitValueB);
+    }
+
+    /**
+     * This method overrides the parent getBinary(), it gets the KeyHook from 
the Cache first and check the KeyHook that whether chunking is enabled or not.
+     */
+    @Override
+    @SuppressWarnings({"squid:S3776"})
+    public byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return new byte[0];
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return new byte[0];
+        }
+
+        if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 
0) {
+            if (logger.isDebugEnabled() || config.isEnableDebugLog()) {
+                logger.debug("Chunking not enabled, return the value bytes in 
the keyhook directly, value bytes size = {}", 
+                        keyHook.getValues().length);
+            }
+            return keyHook.getValues();
+        }
+
+        BulkFuture<Map<String, Object>> bulkFuture;
+        long start = System.currentTimeMillis();
+
+        if (logger.isDebugEnabled() || config.isEnableDebugLog()) {
+            logger.debug("Chunking enabled, chunk size = {}", 
keyHook.getChunkskey().length);
+        }
+
+        Map<String, String> keyLookup = 
computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+        try {
+            bulkFuture = client.asyncGetBulk(keyLookup.keySet());
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return new byte[0];
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return new byte[0];
+        }
+
+        try {
+            Map<String, Object> bulkResult = 
bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (bulkResult.size() != keyHook.getChunkskey().length) {
+                missCount.incrementAndGet();
+                logger.warn("Some paritial chunks missing for query key: {}", 
keyS);
+                //remove all the partital chunks here.
+                for (String partitalKey : bulkResult.keySet()) {
+                    client.delete(partitalKey);
+                }
+                deleteKeyHook(keyS);
+                return new byte[0];
+            }
+            hitCount.getAndAdd(keyHook.getChunkskey().length);
+            byte[][] bytesArray = new byte[keyHook.getChunkskey().length][];
+            for (Map.Entry<String, Object> entry : bulkResult.entrySet()) {
+                byte[] bytes = (byte[]) entry.getValue();
+                readBytes.addAndGet(bytes.length);
+                String originalKeyS = keyLookup.get(entry.getKey());
+                int idx = 
Integer.parseInt(originalKeyS.substring(keyS.length()));
+                bytesArray[idx] = 
decodeValue(originalKeyS.getBytes(StandardCharsets.UTF_8), bytes);
+            }
+            return concatBytes(bytesArray);
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            bulkFuture.cancel(false);
+            return new byte[0];
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling item from cache.", 
e);
+            return new byte[0];
+        }
+    }
+
+    /**
+     * This method overrides the parent putBinary() method. It will split the 
large value bytes into multiple chunks to fit into the internal Cache.
+     * It generates a KeyHook to store the splitted chunked keys.
+     */
+    @Override
+    public void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        int nSplit = getValueSplit(config, keyS, valueB.length);
+        Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, 
valueB);
+        KeyHook keyHook = keyValuePair.getFirst();
+        byte[][] splitValueB = keyValuePair.getSecond();
+
+        if (logger.isDebugEnabled() || config.isEnableDebugLog()) {
+            logger.debug("put key hook:{} to cache for hash key", keyHook);
+        }
+        super.putBinary(keyS, serializeValue(keyHook), expiration);
+        if (nSplit > 1) {
+            for (int i = 0; i < nSplit; i++) {
+                if (logger.isDebugEnabled() || config.isEnableDebugLog()) {
+                    logger.debug(String.format("Chunk[ %d ] bytes size before 
encoding  = %d", i, splitValueB[i].length));
+                }
+                super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], 
expiration);
+            }
+        }
+    }
+
+    @Override
+    public void evict(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return;
+        }
+
+        if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 
0) {
+            String[] chunkKeys = keyHook.getChunkskey();
+            for (String chunkKey : chunkKeys) {
+                super.evict(chunkKey);
+            }
+        }
+        super.evict(keyS);
+    }
+
+    protected Map<String, String> computeKeyHash(List<String> keySList) {
+        return Maps.uniqueIndex(keySList, this::computeKeyHash);
+    }
+
+    private void deleteKeyHook(String keyS) {
+        try {
+            super.evict(keyS);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation: ", e);
+        }
+    }
+
+    private byte[] concatBytes(byte[]... bytesArray) {
+        int length = 0;
+        for (byte[] bytes : bytesArray) {
+            length += bytes.length;
+        }
+        byte[] result = new byte[length];
+        int destPos = 0;
+        for (byte[] bytes : bytesArray) {
+            System.arraycopy(bytes, 0, result, destPos, bytes.length);
+            destPos += bytes.length;
+        }
+        if (logger.isDebugEnabled() || config.isEnableDebugLog()) {
+            logger.debug("Original value bytes size for all chunks  = {}", 
result.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public KeyHook lookupKeyHook(String keyS) {
+        byte[] bytes = super.getBinary(keyS);
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+        return (KeyHook) SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java
new file mode 100644
index 0000000000..a164bc9954
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactory extends SpyObject implements 
ConnectionFactory {
+    private ConnectionFactory underlying;
+
+    public MemcachedConnectionFactory(ConnectionFactory underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public MetricType enableMetrics() {
+        //Turn off metric collection by default.
+        return DefaultConnectionFactory.DEFAULT_METRIC_TYPE;
+    }
+
+    @Override
+    public MetricCollector getMetricCollector() {
+        return new NoopMetricCollector();
+    }
+
+    @Override
+    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) 
throws IOException {
+        return underlying.createConnection(addrs);
+    }
+
+    @Override
+    public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel 
c, int bufSize) {
+        return underlying.createMemcachedNode(sa, c, bufSize);
+    }
+
+    @Override
+    public BlockingQueue<Operation> createOperationQueue() {
+        return underlying.createOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createReadOperationQueue() {
+        return underlying.createReadOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createWriteOperationQueue() {
+        return underlying.createWriteOperationQueue();
+    }
+
+    @Override
+    public long getOpQueueMaxBlockTime() {
+        return underlying.getOpQueueMaxBlockTime();
+    }
+
+    @Override
+    public ExecutorService getListenerExecutorService() {
+        return underlying.getListenerExecutorService();
+    }
+
+    @Override
+    public boolean isDefaultExecutorService() {
+        return underlying.isDefaultExecutorService();
+    }
+
+    @Override
+    public NodeLocator createLocator(List<MemcachedNode> nodes) {
+        return underlying.createLocator(nodes);
+    }
+
+    @Override
+    public OperationFactory getOperationFactory() {
+        return underlying.getOperationFactory();
+    }
+
+    @Override
+    public long getOperationTimeout() {
+        return underlying.getOperationTimeout();
+    }
+
+    @Override
+    public boolean isDaemon() {
+        return underlying.isDaemon();
+    }
+
+    @Override
+    public boolean useNagleAlgorithm() {
+        return underlying.useNagleAlgorithm();
+    }
+
+    @Override
+    public Collection<ConnectionObserver> getInitialObservers() {
+        return underlying.getInitialObservers();
+    }
+
+    @Override
+    public FailureMode getFailureMode() {
+        return underlying.getFailureMode();
+    }
+
+    @Override
+    public Transcoder<Object> getDefaultTranscoder() {
+        return underlying.getDefaultTranscoder();
+    }
+
+    @Override
+    public boolean shouldOptimize() {
+        return underlying.shouldOptimize();
+    }
+
+    @Override
+    public int getReadBufSize() {
+        return underlying.getReadBufSize();
+    }
+
+    @Override
+    public HashAlgorithm getHashAlg() {
+        return underlying.getHashAlg();
+    }
+
+    @Override
+    public long getMaxReconnectDelay() {
+        return underlying.getMaxReconnectDelay();
+    }
+
+    @Override
+    public AuthDescriptor getAuthDescriptor() {
+        return underlying.getAuthDescriptor();
+    }
+
+    @Override
+    public int getTimeoutExceptionThreshold() {
+        return underlying.getTimeoutExceptionThreshold();
+    }
+
+    @Override
+    public long getAuthWaitTime() {
+        return underlying.getAuthWaitTime();
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java
new file mode 100644
index 0000000000..480313f91c
--- /dev/null
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/cache/memcached/MemcachedConnectionFactoryBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ArrayModNodeLocator;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.RefinedKetamaNodeLocator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactoryBuilder extends 
ConnectionFactoryBuilder {
+    /**
+     * Get the ConnectionFactory set up with the provided parameters.
+     */
+    @Override
+    @SuppressWarnings({"squid:S3776"})
+    public ConnectionFactory build() {
+        return new DefaultConnectionFactory() {
+
+            @Override
+            public BlockingQueue<Operation> createOperationQueue() {
+                return opQueueFactory == null ? super.createOperationQueue() : 
opQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createReadOperationQueue() {
+                return readQueueFactory == null ? 
super.createReadOperationQueue() : readQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createWriteOperationQueue() {
+                return writeQueueFactory == null ? 
super.createReadOperationQueue() : writeQueueFactory.create();
+            }
+
+            @Override
+            public NodeLocator createLocator(List<MemcachedNode> nodes) {
+                switch (locator) {
+                    case ARRAY_MOD:
+                        return new ArrayModNodeLocator(nodes, getHashAlg());
+                    case CONSISTENT:
+                        return new RefinedKetamaNodeLocator(nodes, 
getHashAlg());
+                    default:
+                        throw new IllegalStateException("Unhandled locator 
type: " + locator);
+                }
+            }
+
+            @Override
+            public Transcoder<Object> getDefaultTranscoder() {
+                return transcoder == null ? super.getDefaultTranscoder() : 
transcoder;
+            }
+
+            @Override
+            public FailureMode getFailureMode() {
+                return failureMode == null ? super.getFailureMode() : 
failureMode;
+            }
+
+            @Override
+            public HashAlgorithm getHashAlg() {
+                return hashAlg == null ? super.getHashAlg() : hashAlg;
+            }
+
+            @Override
+            public Collection<ConnectionObserver> getInitialObservers() {
+                return initialObservers;
+            }
+
+            @Override
+            public OperationFactory getOperationFactory() {
+                return opFact == null ? super.getOperationFactory() : opFact;
+            }
+
+            @Override
+            public long getOperationTimeout() {
+                return opTimeout == -1 ? super.getOperationTimeout() : 
opTimeout;
+            }
+
+            @Override
+            public int getReadBufSize() {
+                return readBufSize == -1 ? super.getReadBufSize() : 
readBufSize;
+            }
+
+            @Override
+            public boolean isDaemon() {
+                return isDaemon;
+            }
+
+            @Override
+            public boolean shouldOptimize() {
+                return shouldOptimize;
+            }
+
+            @Override
+            public boolean useNagleAlgorithm() {
+                return useNagle;
+            }
+
+            @Override
+            public long getMaxReconnectDelay() {
+                return maxReconnectDelay;
+            }
+
+            @Override
+            public AuthDescriptor getAuthDescriptor() {
+                return authDescriptor;
+            }
+
+            @Override
+            public long getOpQueueMaxBlockTime() {
+                return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : 
super.getOpQueueMaxBlockTime();
+            }
+
+            @Override
+            public int getTimeoutExceptionThreshold() {
+                return timeoutExceptionThreshold;
+            }
+
+            @Override
+            public MetricType enableMetrics() {
+                return metricType == null ? super.enableMetrics() : metricType;
+            }
+
+            @Override
+            public MetricCollector getMetricCollector() {
+                return collector == null ? super.getMetricCollector() : 
collector;
+            }
+
+            @Override
+            public ExecutorService getListenerExecutorService() {
+                return executorService == null ? 
super.getListenerExecutorService() : executorService;
+            }
+
+            @Override
+            public boolean isDefaultExecutorService() {
+                return executorService == null;
+            }
+
+            @Override
+            public long getAuthWaitTime() {
+                return authWaitTime;
+            }
+        };
+
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java
new file mode 100644
index 0000000000..c1bd949b87
--- /dev/null
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcacheConnectionFactoryTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.protocol.binary.BinaryOperationFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class MemcacheConnectionFactoryTest extends NLocalFileMetadataTestCase {
+
+    @Before
+    public void setup() {
+        overwriteSystemProp("kylin.cache.memcached.enabled", "true");
+        createTestMetadata();
+    }
+
+    @After
+    public void destroy() {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMemcachedConnectionFactory() {
+        SerializingTranscoder transcoder = new SerializingTranscoder(1048576);
+        // always no compression inside, we compress/decompress outside
+        transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+        OperationQueueFactory opQueueFactory;
+        opQueueFactory = new LinkedOperationQueueFactory();
+
+        MemcachedConnectionFactoryBuilder builder = new 
MemcachedConnectionFactoryBuilder();
+
+        ConnectionFactory connectionFactory = builder
+                .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                .setOpQueueMaxBlockTime(500).setOpTimeout(500)
+                
.setReadBufferSize(DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE).setOpQueueFactory(opQueueFactory).build();
+
+        MemcachedConnectionFactory factory = new 
MemcachedConnectionFactory(connectionFactory);
+
+        factory.createOperationQueue();
+        factory.createReadOperationQueue();
+        factory.createWriteOperationQueue();
+        Assert.assertEquals(transcoder, factory.getDefaultTranscoder());
+        Assert.assertEquals(FailureMode.Redistribute, 
factory.getFailureMode());
+        Assert.assertEquals(DefaultHashAlgorithm.FNV1A_64_HASH, 
factory.getHashAlg());
+        Assert.assertNotNull(factory.getOperationFactory());
+        Assert.assertNotNull(factory.getMetricCollector());
+        Assert.assertNotNull(factory.getListenerExecutorService());
+        Assert.assertNotNull(factory.getInitialObservers());
+        Assert.assertEquals(500, factory.getOperationTimeout());
+        Assert.assertEquals(16384, factory.getReadBufSize());
+        Assert.assertEquals(500, factory.getOpQueueMaxBlockTime());
+        Assert.assertEquals(MetricType.OFF, factory.enableMetrics());
+        Assert.assertTrue(factory.isDefaultExecutorService());
+        Assert.assertTrue(factory.shouldOptimize());
+        Assert.assertFalse(factory.useNagleAlgorithm());
+        Assert.assertEquals(30, factory.getMaxReconnectDelay());
+        Assert.assertEquals(1000, factory.getAuthWaitTime());
+        Assert.assertEquals(500, factory.getOperationTimeout());
+    }
+
+    @Test
+    public void testMemcachedConnectionConfig() {
+        SerializingTranscoder transcoder = new SerializingTranscoder(1048576);
+        // always no compression inside, we compress/decompress outside
+        transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+        OperationQueueFactory opQueueFactory;
+        opQueueFactory = new LinkedOperationQueueFactory();
+        OperationFactory operationFactory = new BinaryOperationFactory();
+        ExecutorService executorService;
+        executorService = Executors.newSingleThreadExecutor();
+
+        MemcachedConnectionFactoryBuilder builder = new 
MemcachedConnectionFactoryBuilder();
+
+        ConnectionFactory connectionFactory = builder
+                .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                .setListenerExecutorService(executorService)
+                .setOpFact(operationFactory)
+                .setEnableMetrics(MetricType.OFF)
+                .setMetricCollector(new NoopMetricCollector())
+                .setOpQueueFactory(opQueueFactory).build();
+
+        MemcachedConnectionFactory factory = new 
MemcachedConnectionFactory(connectionFactory);
+
+        factory.createOperationQueue();
+        factory.createReadOperationQueue();
+        factory.createWriteOperationQueue();
+        Assert.assertEquals(transcoder, factory.getDefaultTranscoder());
+        Assert.assertEquals(FailureMode.Redistribute, 
factory.getFailureMode());
+        Assert.assertEquals(DefaultHashAlgorithm.NATIVE_HASH, 
factory.getHashAlg());
+        Assert.assertNotNull(factory.getOperationFactory());
+        Assert.assertNotNull(factory.getMetricCollector());
+        Assert.assertNotNull(factory.getListenerExecutorService());
+        Assert.assertNotNull(factory.getInitialObservers());
+        Assert.assertEquals(2500, factory.getOperationTimeout());
+        Assert.assertEquals(16384, factory.getReadBufSize());
+        Assert.assertEquals(10000, factory.getOpQueueMaxBlockTime());
+        Assert.assertEquals(MetricType.OFF, factory.enableMetrics());
+        Assert.assertFalse(factory.isDefaultExecutorService());
+        Assert.assertTrue(factory.shouldOptimize());
+        Assert.assertFalse(factory.useNagleAlgorithm());
+        Assert.assertEquals(30, factory.getMaxReconnectDelay());
+        Assert.assertEquals(1000, factory.getAuthWaitTime());
+        Assert.assertEquals(2500, factory.getOperationTimeout());
+    }
+}
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java
new file mode 100644
index 0000000000..28c3895086
--- /dev/null
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedCacheTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import net.spy.memcached.DefaultConnectionFactory;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import 
org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache.MemCachedCacheAdaptor;
+import org.apache.kylin.rest.service.CommonQueryCacheSupporter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.GetFuture;
+
+public class MemcachedCacheTest extends NLocalFileMetadataTestCase {
+
+    private Map<String, String> keyValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+    private MemcachedCacheConfig cacheConfig;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+
+        keyValueMap = Maps.newHashMap();
+        keyValueMap.put("sql1", "value1");
+        keyValueMap.put("sql11", "value11");
+
+        cacheConfig = new MemcachedCacheConfig();
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, 
cacheConfig, CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName,
+                7 * 24 * 3600);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedCache);
+
+        //Mock put to cache
+        for (String key : keyValueMap.keySet()) {
+            String hashedKey = memcachedCache.computeKeyHash(key);
+
+            String value = keyValueMap.get(key);
+            byte[] valueE = memcachedCache.encodeValue(key, value);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(future.get(cacheConfig.getTimeout(), 
TimeUnit.MILLISECONDS)).thenReturn(valueE);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : keyValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", 
keyValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+
+    @Test
+    public void testGetResolvedAddrList() {
+        String hostsStr = 
"localhost:11211,unresolvedHost1:11211,unresolvedHost2:11211";
+        List<InetSocketAddress> addrList = 
MemcachedCache.getResolvedAddrList(hostsStr);
+        Assert.assertEquals(1, addrList.size());
+    }
+
+    @Test
+    public void testMemcachedConfig() {
+        cacheConfig.setTimeout(1000);
+        cacheConfig.setHosts("localhost:11211");
+        cacheConfig.setMaxChunkSize(1024);
+        cacheConfig.setMaxOperationQueueSize(1);
+        
cacheConfig.setReadBufferSize(DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE);
+        cacheConfig.setEnableCompression(false);
+        Assert.assertEquals(1000, cacheConfig.getTimeout());
+        Assert.assertEquals("localhost:11211", cacheConfig.getHosts());
+        Assert.assertEquals(1024, cacheConfig.getMaxChunkSize());
+        Assert.assertEquals(1, cacheConfig.getMaxOperationQueueSize());
+        Assert.assertEquals(16384, cacheConfig.getReadBufferSize());
+        Assert.assertFalse(cacheConfig.isEnableCompression());
+    }
+}
\ No newline at end of file
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java
new file mode 100644
index 0000000000..4239c814b5
--- /dev/null
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/cache/memcached/MemcachedChunkingCacheTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.cache.memcached;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
+import 
org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache.MemCachedCacheAdaptor;
+import org.apache.kylin.rest.service.CommonQueryCacheSupporter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.internal.BulkFuture;
+import net.spy.memcached.internal.GetFuture;
+
+public class MemcachedChunkingCacheTest extends NLocalFileMetadataTestCase {
+
+    private Map<String, String> smallValueMap;
+    private Map<String, String> largeValueMap;
+    private MemCachedCacheAdaptor memCachedAdaptor;
+    private KeyHookLookup.KeyHook keyHook;
+    private MemcachedChunkingCache memcachedChunkingCache;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        final int maxObjectSize = 300;
+
+        smallValueMap = Maps.newHashMap();
+        smallValueMap.put("sql1", "value1");
+
+        largeValueMap = Maps.newHashMap();
+        largeValueMap.put("sql2", Strings.repeat("value2", maxObjectSize));
+
+        MemcachedCacheConfig cacheConfig = new MemcachedCacheConfig();
+        cacheConfig.setMaxObjectSize(maxObjectSize);
+        MemcachedClient memcachedClient = mock(MemcachedClient.class);
+        MemcachedCache memcachedCache = new MemcachedCache(memcachedClient, 
cacheConfig, CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName,
+                7 * 24 * 3600);
+        memcachedChunkingCache = new MemcachedChunkingCache(memcachedCache);
+        memCachedAdaptor = new MemCachedCacheAdaptor(memcachedChunkingCache);
+
+        //Mock put to cache
+        for (String key : smallValueMap.keySet()) {
+            String hashedKey = memcachedCache.computeKeyHash(key);
+
+            String value = smallValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            KeyHookLookup.KeyHook keyHook = new KeyHookLookup.KeyHook(null, 
valueB);
+            byte[] valueE = memcachedCache.encodeValue(key, keyHook);
+
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+
+            when(future.get(cacheConfig.getTimeout(), 
TimeUnit.MILLISECONDS)).thenReturn(valueE);
+        }
+
+        //Mock put large value to cache
+        for (String key : largeValueMap.keySet()) {
+            String hashedKey = memcachedCache.computeKeyHash(key);
+
+            String value = largeValueMap.get(key);
+            byte[] valueB = memcachedCache.serializeValue(value);
+            int nSplit = MemcachedChunkingCache.getValueSplit(cacheConfig, 
key, valueB.length);
+            Pair<KeyHookLookup.KeyHook, byte[][]> keyValuePair = 
MemcachedChunkingCache.getKeyValuePair(nSplit, key,
+                    valueB);
+            KeyHookLookup.KeyHook keyHook = keyValuePair.getFirst();
+            this.keyHook = keyHook;
+            byte[][] splitValueB = keyValuePair.getSecond();
+
+            //For key
+            byte[] valueE = memcachedCache.encodeValue(key, keyHook);
+            GetFuture<Object> future = mock(GetFuture.class);
+            when(memcachedClient.asyncGet(hashedKey)).thenReturn(future);
+            when(future.get(cacheConfig.getTimeout(), 
TimeUnit.MILLISECONDS)).thenReturn(valueE);
+
+            //For splits
+            Map<String, String> keyLookup = memcachedChunkingCache
+                    .computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+            Map<String, Object> bulkResult = Maps.newHashMap();
+            for (int i = 0; i < nSplit; i++) {
+                String splitKeyS = keyHook.getChunkskey()[i];
+                bulkResult.put(memcachedCache.computeKeyHash(splitKeyS),
+                        
memcachedCache.encodeValue(splitKeyS.getBytes(Charsets.UTF_8), splitValueB[i]));
+            }
+
+            BulkFuture<Map<String, Object>> bulkFuture = 
mock(BulkFuture.class);
+            
when(memcachedClient.asyncGetBulk(keyLookup.keySet())).thenReturn(bulkFuture);
+            when(bulkFuture.get(cacheConfig.getTimeout(), 
TimeUnit.MILLISECONDS)).thenReturn(bulkResult);
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGet() {
+        for (String key : smallValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", 
smallValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+        for (String key : largeValueMap.keySet()) {
+            Assert.assertEquals("The value should not change", 
largeValueMap.get(key), memCachedAdaptor.get(key).get());
+        }
+    }
+
+    @Test
+    public void testEvict() {
+        memcachedChunkingCache.evict("");
+        memcachedChunkingCache.evict(null);
+        memcachedChunkingCache.put("first", "first_value");
+        keyHook.setChunkskey(new String[]{"first", "second"});
+        for (String key : keyHook.getChunkskey()) {
+            memcachedChunkingCache.evict(key);
+        }
+        byte[] value = memcachedChunkingCache.get("first");
+        Assert.assertEquals(0, value.length);
+    }
+
+    @Test
+    public void testSplitBytes() {
+        byte[] data = new byte[8];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) i;
+        }
+
+        int nSplit;
+        byte[][] dataSplits;
+
+        nSplit = 2;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2, 3 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 4, 5, 6, 7 });
+
+        nSplit = 3;
+        dataSplits = MemcachedChunkingCache.splitBytes(data, nSplit);
+        Assert.assertEquals(nSplit, dataSplits.length);
+        Assert.assertArrayEquals(dataSplits[0], new byte[] { 0, 1, 2 });
+        Assert.assertArrayEquals(dataSplits[1], new byte[] { 3, 4, 5 });
+        Assert.assertArrayEquals(dataSplits[2], new byte[] { 6, 7 });
+    }
+
+    @Test
+    public void testKeyHookFunc() {
+        KeyHookLookup.KeyHook keyHookEq = new KeyHookLookup.KeyHook();
+        keyHookEq.setChunkskey(keyHook.getChunkskey());
+        keyHookEq.setValues(keyHook.getValues());
+
+        Assert.assertEquals(keyHook, keyHookEq);
+        Assert.assertEquals(keyHook.hashCode(), keyHookEq.hashCode());
+        Assert.assertTrue(keyHook.equals(keyHookEq) && 
(keyHook.toString().equals(keyHookEq.toString()) && keyHook.equals(keyHook)));
+
+        keyHookEq.setChunkskey(null);
+        keyHookEq.setValues(null);
+
+        Assert.assertNotEquals(keyHook, keyHookEq);
+        Assert.assertFalse(keyHook.equals(keyHookEq) || 
keyHook.toString().equals(keyHookEq.toString()) || keyHook.equals(null));
+    }
+}
\ No newline at end of file
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f6b9622033..d76736609f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1954,6 +1954,31 @@ public abstract class KylinConfigBase implements 
Serializable {
         return Integer.parseInt(getOptional("kylin.cache.redis.batch-count", 
ONE_HUNDRED_THOUSAND));
     }
 
+    // Memcached
+    public boolean isMemcachedEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.cache.memcached.enabled", FALSE));
+    }
+
+    public String getMemcachedHosts() {
+        return getOptional("kylin.cache.memcached.hosts", "localhost:11211");
+    }
+
+    public long getMemcachedOpTimeout() {
+        return 
Long.parseLong(getOptional("kylin.cache.memcached.option.timeout", "500"));
+    }
+
+    public int getMaxChunkSize() {
+        return 
Integer.parseInt(getOptional("kylin.cache.memcached.max-chunk-size", "1024"));
+    }
+
+    public int getMaxObjectSize() {
+        return 
Integer.parseInt(getOptional("kylin.cache.memcached.max-object-size", 
"1048576"));
+    }
+
+    public boolean isEnableCompression() {
+        return 
Boolean.parseBoolean((getOptional("kylin.cache.memcached.is-enable-compression",
 TRUE)));
+    }
+
     public long getMaxWaitMillis() {
         return Long.parseLong(getOptional("kylin.cache.redis.max-wait", 
"300000"));
     }
diff --git a/src/query-service/pom.xml b/src/query-service/pom.xml
index a69eeb9ebc..2d289bf34e 100644
--- a/src/query-service/pom.xml
+++ b/src/query-service/pom.xml
@@ -161,6 +161,11 @@
             <artifactId>embedded-redis</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.simple-spring-memcached</groupId>
+            <artifactId>jmemcached-maven-plugin</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index 5f68795339..862a7e9b8f 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -34,6 +34,7 @@ import org.apache.kylin.metadata.querymeta.TableMetaWithType;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.rest.cache.KylinCache;
 import org.apache.kylin.rest.cache.KylinEhCache;
+import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache;
 import org.apache.kylin.rest.cache.RedisCache;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
@@ -73,6 +74,8 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         if (kylinConfig.isRedisEnabled()) {
             kylinCache = RedisCache.getInstance();
+        } else if (kylinConfig.isMemcachedEnabled()) {
+            kylinCache = CompositeMemcachedCache.getInstance();
         } else {
             kylinCache = KylinEhCache.getInstance();
         }
@@ -185,7 +188,7 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
 
         cached.setStorageCacheUsed(true);
         QueryContext.current().getQueryTagInfo().setStorageCacheUsed(true);
-        String cacheType = KylinConfig.getInstanceFromEnv().isRedisEnabled() ? 
"Redis" : "Ehcache";
+        String cacheType = getCacheType();
         cached.setStorageCacheType(cacheType);
         
QueryContext.current().getQueryTagInfo().setStorageCacheType(cacheType);
 
@@ -200,6 +203,16 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
         return cached;
     }
 
+    private String getCacheType() {
+        if (KylinConfig.getInstanceFromEnv().isRedisEnabled()) {
+            return "Redis";
+        } else if (KylinConfig.getInstanceFromEnv().isMemcachedEnabled()) {
+            return "Memcached";
+        } else {
+            return "Ehcache";
+        }
+    }
+
     public SQLResponse searchFailedCache(SQLRequest sqlRequest) {
         SQLResponse cached = doSearchQuery(Type.EXCEPTION_QUERY_CACHE, 
sqlRequest);
         if (cached == null) {
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java
new file mode 100644
index 0000000000..2c97ac37e6
--- /dev/null
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryCompositeMemcachedCacheTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.code.ssm.jmemcached.plugin.AbstractJmemcachedMojo;
+import com.google.code.ssm.jmemcached.plugin.JmemcachedStartMojo;
+import com.google.code.ssm.jmemcached.plugin.Server;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+import org.apache.kylin.rest.cache.KylinCache;
+import org.apache.kylin.rest.cache.memcached.CacheStats;
+import org.apache.kylin.rest.cache.memcached.CompositeMemcachedCache;
+import org.apache.kylin.rest.cache.memcached.MemcachedCache;
+import org.apache.kylin.rest.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.rest.cache.memcached.MemcachedConnectionFactory;
+import org.apache.kylin.rest.cache.memcached.MemcachedConnectionFactoryBuilder;
+import org.apache.kylin.rest.request.SQLRequest;
+import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class QueryCompositeMemcachedCacheTest extends 
LocalFileMetadataTestCase {
+
+    static {
+        Server server = new Server();
+        server.setPort(11211);
+        server.setBinary(true);
+        AbstractJmemcachedMojo embeddedMemcachedServerStarter = new 
JmemcachedStartMojo();
+        List<Server> serverList = new ArrayList<>();
+        serverList.add(server);
+        embeddedMemcachedServerStarter.setServers(serverList);
+        try {
+            embeddedMemcachedServerStarter.execute();
+        } catch (MojoExecutionException e) {
+            e.printStackTrace();
+        } catch (MojoFailureException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private static String PROJECT = "test_project";
+    private static String TYPE = "test_type";
+    private static String CACHE_KEY = "test_key";
+    private static String CACHE_VAL = "test_val";
+
+    @Spy
+    private KylinCache memcachedCache = 
Mockito.spy(CompositeMemcachedCache.getInstance());
+
+    @InjectMocks
+    private QueryCacheManager queryCacheManager;
+
+    @BeforeClass
+    public static void setupResource() {
+        staticCreateTestMetadata();
+    }
+
+    @AfterClass
+    public static void tearDownResource() {
+        staticCleanupTestMetadata();
+    }
+
+    @Before
+    public void setup() {
+        overwriteSystemProp("kylin.cache.memcached.enabled", "true");
+        createTestMetadata();
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @After
+    public void destroy() {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMemcachedConnection() throws IOException {
+        SerializingTranscoder transcoder = new SerializingTranscoder(1048576);
+        transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+        OperationQueueFactory opQueueFactory;
+        opQueueFactory = new LinkedOperationQueueFactory();
+
+        MemcachedCacheConfig config = new MemcachedCacheConfig();
+
+        String memcachedPrefix = "testCli";
+        String hostStr = "localhost:11211";
+        String cacheKey = "test_cache_key";
+        String cacheVal = "test_cache_val";
+        int timeToLive = 7 * 24 * 3600;
+        MemcachedConnectionFactoryBuilder builder = new 
MemcachedConnectionFactoryBuilder();
+
+        ConnectionFactory connectionFactory = builder
+                .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                .setHashAlg(DefaultHashAlgorithm.KETAMA_HASH)
+                
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+
+        MemcachedConnectionFactory factory = new 
MemcachedConnectionFactory(connectionFactory);
+        MemcachedClient memcachedClient = new MemcachedClient(factory,
+                MemcachedCache.getResolvedAddrList(hostStr));
+        MemcachedCache cache = new MemcachedCache(memcachedClient, config, 
memcachedPrefix, timeToLive);
+        cache.put(cacheKey, cacheVal);
+        cache.put("", cacheVal);
+        cache.put(null, cacheVal);
+        cache.evict(cacheKey);
+        Assert.assertEquals(0, cache.get("").length);
+        Assert.assertEquals(0, cache.get(null).length);
+        Assert.assertEquals(0, cache.get(cacheKey).length);
+        cache.clear();
+    }
+
+    @Test
+    public void testMemcachedClient() throws IOException {
+        SerializingTranscoder transcoder = new SerializingTranscoder(1048576);
+        transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+        OperationQueueFactory opQueueFactory;
+        opQueueFactory = new LinkedOperationQueueFactory();
+
+        MemcachedCacheConfig config = new MemcachedCacheConfig();
+
+        String hostStr = "localhost:11211";
+        MemcachedConnectionFactoryBuilder builder = new 
MemcachedConnectionFactoryBuilder();
+
+        ConnectionFactory connectionFactory = builder
+                .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+
+        MemcachedConnectionFactory factory = new 
MemcachedConnectionFactory(connectionFactory);
+        MemcachedClient memcachedClient = new MemcachedClient(factory,
+                MemcachedCache.getResolvedAddrList(hostStr));
+        List<MemcachedNode> nodeList = new 
ArrayList<>(memcachedClient.getNodeLocator().getReadonlyCopy().getAll());
+        NodeLocator nodeLocator = memcachedClient.getNodeLocator();
+        memcachedClient.getNodeLocator().updateLocator(nodeList);
+        Assert.assertEquals(500, memcachedClient.getOperationTimeout());
+        Assert.assertNotEquals(nodeLocator, memcachedClient.getNodeLocator());
+    }
+
+    @Test
+    public void testMemcachedNode() throws IOException {
+        SerializingTranscoder transcoder = new SerializingTranscoder(1048576);
+        transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+        OperationQueueFactory opQueueFactory;
+        opQueueFactory = new LinkedOperationQueueFactory();
+
+        MemcachedCacheConfig config = new MemcachedCacheConfig();
+
+        String hostStr = "localhost:11211";
+        MemcachedConnectionFactoryBuilder builder = new 
MemcachedConnectionFactoryBuilder();
+
+        ConnectionFactory connectionFactory = builder
+                .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+
+        MemcachedConnectionFactory factory = new 
MemcachedConnectionFactory(connectionFactory);
+        MemcachedClient memcachedClient = new MemcachedClient(factory,
+                MemcachedCache.getResolvedAddrList(hostStr));
+        List<MemcachedNode> nodeList = new 
ArrayList<>(memcachedClient.getNodeLocator().getReadonlyCopy().getAll());
+        nodeList.forEach(node -> {
+            Assert.assertThrows(UnsupportedOperationException.class, 
node::setupResend);
+            Assert.assertThrows(UnsupportedOperationException.class, 
node::setupForAuth);
+            Assert.assertThrows(UnsupportedOperationException.class, 
node::destroyInputQueue);
+            Assert.assertThrows(UnsupportedOperationException.class, 
node::reconnecting);
+            Operation op = Mockito.spy(Operation.class);
+            Assert.assertThrows(UnsupportedOperationException.class, () -> 
node.insertOp(op));
+            Assert.assertThrows(UnsupportedOperationException.class, () -> 
node.addOp(op));
+        });
+    }
+
+    @Test()
+    public void testTypeAsNull() {
+        CompositeMemcachedCache compositeMemcachedCache = 
(CompositeMemcachedCache) CompositeMemcachedCache.getInstance();
+        Assert.assertNotNull(compositeMemcachedCache);
+        Assert.assertThrows(NullPointerException.class, () -> 
compositeMemcachedCache.put(null, PROJECT, CACHE_KEY, CACHE_VAL));
+    }
+
+    @Test()
+    public void testUnsupportedCacheType() {
+        CompositeMemcachedCache compositeMemcachedCache = 
(CompositeMemcachedCache) CompositeMemcachedCache.getInstance();
+        Assert.assertNotNull(compositeMemcachedCache);
+        Assert.assertThrows(IllegalArgumentException.class, () -> 
compositeMemcachedCache.put(TYPE, PROJECT, CACHE_KEY, CACHE_VAL));
+    }
+
+    @Test
+    public void testCompositeMemcachedCache() {
+        CompositeMemcachedCache compositeMemcachedCache = 
(CompositeMemcachedCache) CompositeMemcachedCache.getInstance();
+        Assert.assertNotNull(compositeMemcachedCache);
+        Object mockItem = mock(Object.class);
+        when(mockItem.toString()).thenReturn(mockItem.getClass().getName());
+        // write bad case
+        String type = 
CommonQueryCacheSupporter.Type.SUCCESS_QUERY_CACHE.rootCacheName;
+        compositeMemcachedCache.put(type, PROJECT, mockItem, CACHE_VAL);
+        compositeMemcachedCache.update(type, PROJECT, mockItem, CACHE_VAL);
+        Object result1 = compositeMemcachedCache.get(type, PROJECT, mockItem);
+        Assert.assertNull(result1);
+        Assert.assertFalse(compositeMemcachedCache.remove(type, PROJECT, 
mockItem));
+
+        compositeMemcachedCache.put(type, PROJECT, CACHE_KEY, CACHE_VAL);
+        Object result2 = compositeMemcachedCache.get(type, PROJECT, CACHE_KEY);
+        Assert.assertEquals("test_val", (String)result2);
+
+        compositeMemcachedCache.update(type, PROJECT, CACHE_KEY, "update_val");
+        Object result3 = compositeMemcachedCache.get(type, PROJECT, CACHE_KEY);
+        Assert.assertEquals("update_val", (String)result3);
+
+        compositeMemcachedCache.clearAll();
+    }
+
+    @Test
+    public void testProjectCompositeMemcachedCacheQuery() {
+        overwriteSystemProp("kylin.cache.memcached.enabled", "true");
+        final String project = "default";
+        final SQLRequest req1 = new SQLRequest();
+        req1.setProject(project);
+        req1.setSql("select a from b");
+        final SQLResponse resp1 = new SQLResponse();
+        List<List<String>> results = new ArrayList<>();
+        resp1.setResults(results);
+        resp1.setResultRowCount(1);
+        //Single Node mode
+        testHelper(req1, resp1, project);
+        //TODO: Cluster mode
+        testCacheStatus();
+    }
+
+    private void testHelper(SQLRequest req1, SQLResponse resp1, String 
project) {
+        queryCacheManager.cacheSuccessQuery(req1, resp1);
+
+        queryCacheManager.doCacheSuccessQuery(req1, resp1);
+        Assert.assertEquals(resp1.getResultRowCount(),
+                
queryCacheManager.doSearchQuery(QueryCacheManager.Type.SUCCESS_QUERY_CACHE, 
req1).getResultRowCount());
+        Assert.assertNull(queryCacheManager.searchQuery(req1));
+        queryCacheManager.clearQueryCache(req1);
+        
Assert.assertNull(queryCacheManager.doSearchQuery(QueryCacheManager.Type.SUCCESS_QUERY_CACHE,
 req1));
+
+        queryCacheManager.cacheFailedQuery(req1, resp1);
+        Assert.assertEquals(resp1.getResultRowCount(), 
queryCacheManager.searchQuery(req1).getResultRowCount());
+        queryCacheManager.clearProjectCache(project);
+        Assert.assertNull(queryCacheManager.searchQuery(req1));
+        queryCacheManager.clearProjectCache(null);
+
+        queryCacheManager.recoverCache();
+    }
+
+    private void testCacheStatus() {
+        CacheStats cacheStats = 
((CompositeMemcachedCache)queryCacheManager.getCache()).getCacheStats("StorageCache");
+        String name = 
((CompositeMemcachedCache)queryCacheManager.getCache()).getName("StorageCache");
+        System.out.println("Cache name is: " + name);
+        System.out.println("AvgGetTime is: " + cacheStats.getAvgGetTime());
+        System.out.println("HitRate is: " + cacheStats.hitRate());
+        System.out.println("AvgGetBytes is :" + cacheStats.avgGetBytes());
+        System.out.println("NumErrors is :" + cacheStats.getNumErrors());
+        System.out.println("Get number is :" + cacheStats.getNumGet());
+        System.out.println("NumEvictions is :" + cacheStats.getNumEvictions());
+        System.out.println("NumGetBytes is :" + cacheStats.getNumGetBytes());
+        System.out.println("Hit number is :" + cacheStats.getNumHits());
+        System.out.println("Miss number is :" + cacheStats.getNumMisses());
+        System.out.println("Put number is :" + cacheStats.getNumPut());
+        System.out.println("Put bytes is :" + cacheStats.getNumPutBytes());
+        System.out.println("Timeout number is :" + 
cacheStats.getNumTimeouts());
+        System.out.println("Lookup number is :" + cacheStats.numLookups());
+
+        Assert.assertEquals(0, cacheStats.getNumErrors());
+        Assert.assertEquals(0, cacheStats.getNumEvictions());
+        Assert.assertEquals(0, cacheStats.getNumTimeouts());
+    }
+}


Reply via email to