This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cd952ffd23e27b8468db03c38758d552a00c7486
Author: Wang Ken <mingmw...@ebay.com>
AuthorDate: Thu Oct 18 20:44:30 2018 +0800

    KYLIN-2898 Introduce memcached as a distributed cache for queries
---
 cache/pom.xml                                      |  94 ++++++
 .../spy/memcached/RefinedKetamaNodeLocator.java    | 279 ++++++++++++++++
 .../kylin/cache/cachemanager/CacheConstants.java   |  23 ++
 .../InstrumentedEhCacheCacheManager.java           | 101 ++++++
 .../cache/cachemanager/MemcachedCacheManager.java  | 181 ++++++++++
 .../RemoteLocalFailOverCacheManager.java           |  71 ++++
 .../cache/ehcache/InstrumentedEhCacheCache.java    | 205 ++++++++++++
 .../apache/kylin/cache/memcached/CacheStats.java   |  97 ++++++
 .../kylin/cache/memcached/KeyHookLookup.java       | 139 ++++++++
 .../kylin/cache/memcached/MemcachedCache.java      | 371 +++++++++++++++++++++
 .../cache/memcached/MemcachedCacheConfig.java      |  97 ++++++
 .../cache/memcached/MemcachedChunkingCache.java    | 279 ++++++++++++++++
 .../memcached/MemcachedConnectionFactory.java      | 193 +++++++++++
 .../MemcachedConnectionFactoryBuilder.java         | 173 ++++++++++
 .../kylin/cache/memcached/MemcachedMetrics.java    | 139 ++++++++
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 pom.xml                                            |  17 +
 17 files changed, 2463 insertions(+)

diff --git a/cache/pom.xml b/cache/pom.xml
new file mode 100644
index 0000000..8e31435
--- /dev/null
+++ b/cache/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-cache</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Cache</name>
+    <description>Apache Kylin - Cache</description>
+
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metrics</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-jvm</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.ehcache</groupId>
+            <artifactId>ehcache</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.spy</groupId>
+            <artifactId>spymemcached</artifactId>
+        </dependency>
+
+        <!-- Test & Env -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <!--MRUnit relies on older version of mockito, so cannot manage it 
globally-->
+            <version>${mockito.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java 
b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
new file mode 100644
index 0000000..95298d7
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/RefinedKetamaNodeLocator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
+import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
+
+/**
+ * Copyright (C) 2006-2009 Dustin Sallings
+ * Copyright (C) 2009-2011 Couchbase, Inc.
+ *
+ * This is a modified version of the Ketama consistent hash strategy from
+ * last.fm. This implementation may not be compatible with libketama as hashing
+ * is considered separate from node location.
+ *
+ * The only modified method is the getSequence(). 
+ * The previous 7 may be too small to reduce the chance to get all down nodes.
+ *
+ * Note that this implementation does not currently supported weighted nodes.
+ *
+ * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/";>RJ's
+ *      blog post</a>
+ */
+public final class RefinedKetamaNodeLocator extends SpyObject implements 
NodeLocator {
+
+    private final HashAlgorithm hashAlg;
+    private final Map<InetSocketAddress, Integer> weights;
+    private final boolean isWeightedKetama;
+    private final KetamaNodeLocatorConfiguration config;
+    private volatile TreeMap<Long, MemcachedNode> ketamaNodes;
+    private volatile Collection<MemcachedNode> allNodes;
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed 
hash
+     * algorithm.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg) {
+        this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new 
HashMap<InetSocketAddress, Integer>());
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key 
format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param nodeKeyFormat the format used to name the nodes in Ketama, either
+     *          SPYMEMCACHED or LIBMEMCACHED
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg,
+            KetamaNodeKeyFormatter.Format nodeKeyFormat, 
Map<InetSocketAddress, Integer> weights) {
+        this(nodes, alg, weights, new 
DefaultKetamaNodeLocatorConfiguration(new 
KetamaNodeKeyFormatter(nodeKeyFormat)));
+    }
+
+    /**
+     * Create a new KetamaNodeLocator using specified nodes and the specifed 
hash
+     * algorithm and configuration.
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param conf
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg, KetamaNodeLocatorConfiguration conf) {
+        this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
+    }
+
+    /**
+     * Create a new KetamaNodeLocator with specific nodes, hash, node key 
format,
+     * and weight
+     *
+     * @param nodes The List of nodes to use in the Ketama consistent hash
+     *          continuum
+     * @param alg The hash algorithm to use when choosing a node in the Ketama
+     *          consistent hash continuum
+     * @param weights node weights for ketama, a map from InetSocketAddress to
+     *          weight as Integer
+     * @param configuration node locator configuration
+     */
+    public RefinedKetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm 
alg,
+            Map<InetSocketAddress, Integer> nodeWeights, 
KetamaNodeLocatorConfiguration configuration) {
+        super();
+        allNodes = nodes;
+        hashAlg = alg;
+        config = configuration;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+        setKetamaNodes(nodes);
+    }
+
+    private RefinedKetamaNodeLocator(TreeMap<Long, MemcachedNode> smn, 
Collection<MemcachedNode> an, HashAlgorithm alg,
+            Map<InetSocketAddress, Integer> nodeWeights, 
KetamaNodeLocatorConfiguration conf) {
+        super();
+        ketamaNodes = smn;
+        allNodes = an;
+        hashAlg = alg;
+        config = conf;
+        weights = nodeWeights;
+        isWeightedKetama = !weights.isEmpty();
+    }
+
+    public Collection<MemcachedNode> getAll() {
+        return allNodes;
+    }
+
+    public MemcachedNode getPrimary(final String k) {
+        MemcachedNode rv = getNodeForKey(hashAlg.hash(k));
+        assert rv != null : "Found no node for key " + k;
+        return rv;
+    }
+
+    long getMaxKey() {
+        return getKetamaNodes().lastKey();
+    }
+
+    MemcachedNode getNodeForKey(long hash) {
+        final MemcachedNode rv;
+        if (!ketamaNodes.containsKey(hash)) {
+            // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5
+            // in a lot of places, so I'm doing this myself.
+            SortedMap<Long, MemcachedNode> tailMap = 
getKetamaNodes().tailMap(hash);
+            if (tailMap.isEmpty()) {
+                hash = getKetamaNodes().firstKey();
+            } else {
+                hash = tailMap.firstKey();
+            }
+        }
+        rv = getKetamaNodes().get(hash);
+        return rv;
+    }
+
+    /**
+     * the previous 7 may be too small to reduce the chance to get all down 
nodes
+     * @param k
+     * @return
+     */
+    public Iterator<MemcachedNode> getSequence(String k) {
+        // Seven searches gives us a 1 in 2^maxTry chance of hitting the
+        // same dead node all of the time.
+        int maxTry = config.getNodeRepetitions() + 1;
+        if (maxTry < 20) {
+            maxTry = 20;
+        }
+        return new KetamaIterator(k, maxTry, getKetamaNodes(), hashAlg);
+    }
+
+    public NodeLocator getReadonlyCopy() {
+        TreeMap<Long, MemcachedNode> smn = new TreeMap<Long, 
MemcachedNode>(getKetamaNodes());
+        Collection<MemcachedNode> an = new 
ArrayList<MemcachedNode>(allNodes.size());
+
+        // Rewrite the values a copy of the map.
+        for (Map.Entry<Long, MemcachedNode> me : smn.entrySet()) {
+            smn.put(me.getKey(), new MemcachedNodeROImpl(me.getValue()));
+        }
+
+        // Copy the allNodes collection.
+        for (MemcachedNode n : allNodes) {
+            an.add(new MemcachedNodeROImpl(n));
+        }
+
+        return new RefinedKetamaNodeLocator(smn, an, hashAlg, weights, config);
+    }
+
+    @Override
+    public void updateLocator(List<MemcachedNode> nodes) {
+        allNodes = nodes;
+        setKetamaNodes(nodes);
+    }
+
+    /**
+     * @return the ketamaNodes
+     */
+    protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
+        return ketamaNodes;
+    }
+
+    /**
+     * Setup the KetamaNodeLocator with the list of nodes it should use.
+     *
+     * @param nodes a List of MemcachedNodes for this KetamaNodeLocator to use 
in
+     *          its continuum
+     */
+    protected void setKetamaNodes(List<MemcachedNode> nodes) {
+        TreeMap<Long, MemcachedNode> newNodeMap = new TreeMap<Long, 
MemcachedNode>();
+        int numReps = config.getNodeRepetitions();
+        int nodeCount = nodes.size();
+        int totalWeight = 0;
+
+        if (isWeightedKetama) {
+            for (MemcachedNode node : nodes) {
+                totalWeight += weights.get(node.getSocketAddress());
+            }
+        }
+
+        for (MemcachedNode node : nodes) {
+            if (isWeightedKetama) {
+
+                int thisWeight = weights.get(node.getSocketAddress());
+                float percent = (float) thisWeight / (float) totalWeight;
+                int pointerPerServer = (int) ((Math.floor(
+                        (float) (percent * (float) config.getNodeRepetitions() 
/ 4 * (float) nodeCount + 0.0000000001)))
+                        * 4);
+                for (int i = 0; i < pointerPerServer / 4; i++) {
+                    for (long position : ketamaNodePositionsAtIteration(node, 
i)) {
+                        newNodeMap.put(position, node);
+                        getLogger().debug("Adding node %s with weight %s in 
position %d", node, thisWeight, position);
+                    }
+                }
+            } else {
+                // Ketama does some special work with md5 where it reuses 
chunks.
+                // Check to be backwards compatible, the hash algorithm does 
not
+                // matter for Ketama, just the placement should always be done 
using
+                // MD5
+                if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
+                    for (int i = 0; i < numReps / 4; i++) {
+                        for (long position : 
ketamaNodePositionsAtIteration(node, i)) {
+                            newNodeMap.put(position, node);
+                            getLogger().debug("Adding node %s in position %d", 
node, position);
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < numReps; i++) {
+                        newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, 
i)), node);
+                    }
+                }
+            }
+        }
+        assert newNodeMap.size() == numReps * nodes.size();
+        ketamaNodes = newNodeMap;
+    }
+
+    private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int 
iteration) {
+        List<Long> positions = new ArrayList<Long>();
+        byte[] digest = 
DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
+        for (int h = 0; h < 4; h++) {
+            Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24) | ((long) 
(digest[2 + h * 4] & 0xFF) << 16);
+            k |= ((long) (digest[1 + h * 4] & 0xFF) << 8) | (digest[h * 4] & 
0xFF);
+            positions.add(k);
+        }
+        return positions;
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
new file mode 100644
index 0000000..07b15a5
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/CacheConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+public class CacheConstants {
+    public static final String QUERY_CACHE = "StorageCache";
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
new file mode 100644
index 0000000..4f0911f
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/InstrumentedEhCacheCacheManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.cache.ehcache.InstrumentedEhCacheCache;
+import org.apache.kylin.common.KylinConfig;
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.util.Assert;
+
+import com.google.common.collect.Sets;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Status;
+
+/**
+ * CacheManager backed by an EhCache {@link net.sf.ehcache.CacheManager}.
+ *
+ */
+public class InstrumentedEhCacheCacheManager extends AbstractCacheManager {
+
+    private net.sf.ehcache.CacheManager cacheManager;
+    private Map<String, String> metricsConfig = 
KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+    private boolean enableMetrics = false;
+
+    /**
+     * Return the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public net.sf.ehcache.CacheManager getCacheManager() {
+        return this.cacheManager;
+    }
+
+    /**
+     * Set the backing EhCache {@link net.sf.ehcache.CacheManager}.
+     */
+    public void setCacheManager(net.sf.ehcache.CacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+        if ("true".equalsIgnoreCase(metricsConfig.get("ehcache.enabled"))) {
+            enableMetrics = true;
+        }
+    }
+
+    @Override
+    protected Collection<Cache> loadCaches() {
+        Assert.notNull(this.cacheManager, "A backing EhCache CacheManager is 
required");
+        Status status = this.cacheManager.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' EhCache CacheManager is required - current cache 
is " + status.toString());
+
+        String[] names = this.cacheManager.getCacheNames();
+        Collection<Cache> caches = 
Sets.newLinkedHashSetWithExpectedSize(names.length);
+        for (String name : names) {
+            if (enableMetrics) {
+                caches.add(new 
InstrumentedEhCacheCache(this.cacheManager.getEhcache(name)));
+            } else {
+                caches.add(new 
EhCacheCache(this.cacheManager.getEhcache(name)));
+            }
+        }
+        return caches;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        Cache cache = super.getCache(name);
+        if (cache == null) {
+            // check the EhCache cache again
+            // (in case the cache was added at runtime)
+            Ehcache ehcache = this.cacheManager.getEhcache(name);
+            if (ehcache != null) {
+                if (enableMetrics) {
+                    cache = new InstrumentedEhCacheCache(ehcache);
+                } else {
+                    cache = new EhCacheCache(ehcache);
+                }
+                addCache(cache);
+            }
+        }
+        return cache;
+    }
+
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
new file mode 100644
index 0000000..a4e1ffe
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MemcachedCacheManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.cache.support.SimpleValueWrapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import net.spy.memcached.MemcachedClientIF;
+
+public class MemcachedCacheManager extends AbstractCacheManager {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MemcachedCacheManager.class);
+    private static final Long ONE_MINUTE = 60 * 1000L;
+
+    @Autowired
+    private MemcachedCacheConfig memcachedCacheConfig;
+
+    private ScheduledExecutorService timer = 
Executors.newScheduledThreadPool(1,
+            new 
ThreadFactoryBuilder().setNameFormat("Memcached-HealthChecker").build());
+    private AtomicBoolean clusterHealth = new AtomicBoolean(true);
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        Cache successCache = new MemCachedCacheAdaptor(
+                new 
MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, 
CacheConstants.QUERY_CACHE)));
+
+        addCache(successCache);
+
+        Collection<String> names = getCacheNames();
+        Collection<Cache> caches = Lists.newArrayList();
+        for (String name : names) {
+            caches.add(getCache(name));
+        }
+
+        timer.scheduleWithFixedDelay(new MemcachedClusterHealthChecker(), 
ONE_MINUTE, ONE_MINUTE,
+                TimeUnit.MILLISECONDS);
+        return caches;
+    }
+
+    public boolean isClusterDown() {
+        return !clusterHealth.get();
+    }
+
+    @VisibleForTesting
+    void setClusterHealth(boolean ifHealth) {
+        clusterHealth.set(ifHealth);
+    }
+
+    public static class MemCachedCacheAdaptor implements Cache {
+        private MemcachedCache memcachedCache;
+
+        public MemCachedCacheAdaptor(MemcachedCache memcachedCache) {
+            this.memcachedCache = memcachedCache;
+        }
+
+        @Override
+        public String getName() {
+            return memcachedCache.getName();
+        }
+
+        @Override
+        public Object getNativeCache() {
+            return memcachedCache.getNativeCache();
+        }
+
+        @Override
+        public ValueWrapper get(Object key) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            return new 
SimpleValueWrapper(SerializationUtils.deserialize(value));
+        }
+
+        @Override
+        public void put(Object key, Object value) {
+            memcachedCache.put(key, value);
+        }
+
+        @Override
+        public void evict(Object key) {
+            memcachedCache.evict(key);
+        }
+
+        @Override
+        public void clear() {
+            memcachedCache.clear();
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T> T get(Object key, Class<T> type) {
+            byte[] value = memcachedCache.get(key);
+            if (value == null) {
+                return null;
+            }
+            Object obj = SerializationUtils.deserialize(value);
+            if (obj != null && type != null && !type.isInstance(value)) {
+                throw new IllegalStateException(
+                        "Cached value is not of required type [" + 
type.getName() + "]: " + value);
+            }
+            return (T) obj;
+        }
+
+        @Override
+        //TODO
+        public <T> T get(Object key, Callable<T> valueLoader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        //TODO implementation here doesn't guarantee the atomicity.
+        //Without atomicity, this method should not be invoked
+        public ValueWrapper putIfAbsent(Object key, Object value) {
+            byte[] existing = memcachedCache.get(key);
+            if (existing == null) {
+                memcachedCache.put(key, value);
+                return null;
+            } else {
+                return new 
SimpleValueWrapper(SerializationUtils.deserialize(existing));
+            }
+        }
+
+    }
+
+    private class MemcachedClusterHealthChecker implements Runnable {
+        @Override
+        public void run() {
+            Cache cache = getCache(CacheConstants.QUERY_CACHE);
+            MemcachedClientIF cacheClient = (MemcachedClientIF) 
cache.getNativeCache();
+            Collection<SocketAddress> liveServers = 
cacheClient.getAvailableServers();
+            Collection<SocketAddress> deadServers = 
cacheClient.getUnavailableServers();
+            if (liveServers.size() == 0) {
+                clusterHealth.set(false);
+                logger.error("All the servers in MemcachedCluster is down, 
UnavailableServers: " + deadServers);
+            } else {
+                clusterHealth.set(true);
+                if (deadServers.size() > liveServers.size()) {
+                    logger.warn("Half of the servers in MemcachedCluster is 
down, LiveServers: " + liveServers
+                            + ", UnavailableServers: " + deadServers);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
new file mode 100644
index 0000000..f9b7ef6
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.cachemanager;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
+import org.springframework.cache.support.AbstractCacheManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class RemoteLocalFailOverCacheManager extends AbstractCacheManager {
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteLocalFailOverCacheManager.class);
+
+    @Autowired
+    private MemcachedCacheManager remoteCacheManager;
+
+    @Autowired
+    private CacheManager localCacheManager;
+
+    @Override
+    public void afterPropertiesSet() {
+        Preconditions.checkNotNull(localCacheManager, "localCacheManager is 
not injected yet");
+    }
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        return null;
+    }
+
+    @Override
+    public Cache getCache(String name) {
+        if (remoteCacheManager == null || remoteCacheManager.isClusterDown()) {
+            logger.info("use local cache, because remote cache is not 
configured or down");
+            return localCacheManager.getCache(name);
+        } else {
+            return remoteCacheManager.getCache(name);
+        }
+    }
+
+    @VisibleForTesting
+    void disableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(false);
+    }
+
+    @VisibleForTesting
+    void enableRemoteCacheManager() {
+        remoteCacheManager.setClusterHealth(true);
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
 
b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
new file mode 100644
index 0000000..7a9b585
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/ehcache/InstrumentedEhCacheCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.ehcache;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.name;
+
+import java.util.concurrent.Callable;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.ehcache.EhCacheCache;
+import org.springframework.cache.support.SimpleValueWrapper;
+import org.springframework.util.Assert;
+
+import com.codahale.metrics.Gauge;
+
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.Status;
+
+/**
+ * {@link Cache} implementation on top of an {@link Ehcache} instance.
+ *
+ */
+public class InstrumentedEhCacheCache implements Cache {
+
+    private final Ehcache cache;
+
+    /**
+     * Create an {@link EhCacheCache} instance.
+     * @param ehcache backing Ehcache instance
+     */
+    public InstrumentedEhCacheCache(Ehcache ehcache) {
+        Assert.notNull(ehcache, "Ehcache must not be null");
+        Status status = ehcache.getStatus();
+        Assert.isTrue(Status.STATUS_ALIVE.equals(status),
+                "An 'alive' Ehcache is required - current cache is " + 
status.toString());
+        this.cache = ehcache;
+
+        final String prefix = name(cache.getClass(), cache.getName());
+        Metrics.register(name(prefix, "hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-hits"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapHitCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().cacheMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-misses"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().localHeapMissCount();
+            }
+        });
+
+        Metrics.register(name(prefix, "objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "in-memory-objects"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getLocalHeapSize();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-get-time"), new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return 
cache.getStatistics().cacheGetOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "mean-search-time"), new Gauge<Double>() 
{
+            @Override
+            public Double getValue() {
+                return 
cache.getStatistics().cacheSearchOperation().latency().average().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "eviction-count"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return 
cache.getStatistics().cacheEvictionOperation().count().value();
+            }
+        });
+
+        Metrics.register(name(prefix, "writer-queue-size"), new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return cache.getStatistics().getWriterQueueLength();
+            }
+        });
+    }
+
+    public String getName() {
+        return this.cache.getName();
+    }
+
+    public Ehcache getNativeCache() {
+        return this.cache;
+    }
+
+    public ValueWrapper get(Object key) {
+        Element element = this.cache.get(key);
+        return (element != null ? new 
SimpleValueWrapper(element.getObjectValue()) : null);
+    }
+
+    public void put(Object key, Object value) {
+        this.cache.put(new Element(key, value));
+    }
+
+    public void evict(Object key) {
+        this.cache.remove(key);
+    }
+
+    public void clear() {
+        this.cache.removeAll();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Class<T> type) {
+        Element element = lookup(key);
+        Object value = (element != null ? element.getObjectValue() : null);
+        if (value != null && type != null && !type.isInstance(value)) {
+            throw new IllegalStateException("Cached value is not of required 
type [" + type.getName() + "]: " + value);
+        }
+        return (T) value;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(Object key, Callable<T> valueLoader) {
+        Element element = lookup(key);
+        if (element != null) {
+            return (T) element.getObjectValue();
+        } else {
+            this.cache.acquireWriteLockOnKey(key);
+            try {
+                element = lookup(key); // One more attempt with the write lock
+                if (element != null) {
+                    return (T) element.getObjectValue();
+                } else {
+                    return loadValue(key, valueLoader);
+                }
+            } finally {
+                this.cache.releaseWriteLockOnKey(key);
+            }
+        }
+    }
+
+    @Override
+    public ValueWrapper putIfAbsent(Object key, Object value) {
+        Element existingElement = this.cache.putIfAbsent(new Element(key, 
value));
+        return (existingElement != null ? new 
SimpleValueWrapper(existingElement.getObjectValue()) : null);
+    }
+
+    private Element lookup(Object key) {
+        return this.cache.get(key);
+    }
+
+    private <T> T loadValue(Object key, Callable<T> valueLoader) {
+        T value;
+        try {
+            value = valueLoader.call();
+        } catch (Throwable ex) {
+            throw new ValueRetrievalException(key, valueLoader, ex);
+        }
+        put(key, value);
+        return value;
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java 
b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
new file mode 100644
index 0000000..c91ba45
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/CacheStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+public class CacheStats {
+    private final long numHits;
+    private final long numMisses;
+    private final long getBytes;
+    private final long getTime;
+    private final long numPut;
+    private final long putBytes;
+    private final long numEvictions;
+    private final long numTimeouts;
+    private final long numErrors;
+
+    public CacheStats(long getBytes, long getTime, long numPut, long putBytes, 
long numHits, long numMisses,
+            long numEvictions, long numTimeouts, long numErrors) {
+        this.getBytes = getBytes;
+        this.getTime = getTime;
+        this.numPut = numPut;
+        this.putBytes = putBytes;
+        this.numHits = numHits;
+        this.numMisses = numMisses;
+        this.numEvictions = numEvictions;
+        this.numTimeouts = numTimeouts;
+        this.numErrors = numErrors;
+    }
+
+    public long getNumHits() {
+        return numHits;
+    }
+
+    public long getNumMisses() {
+        return numMisses;
+    }
+
+    public long getNumGet() {
+        return numHits + numMisses;
+    }
+
+    public long getNumGetBytes() {
+        return getBytes;
+    }
+
+    public long getNumPutBytes() {
+        return putBytes;
+    }
+
+    public long getNumPut() {
+        return numPut;
+    }
+
+    public long getNumEvictions() {
+        return numEvictions;
+    }
+
+    public long getNumTimeouts() {
+        return numTimeouts;
+    }
+
+    public long getNumErrors() {
+        return numErrors;
+    }
+
+    public long numLookups() {
+        return numHits + numMisses;
+    }
+
+    public double hitRate() {
+        long lookups = numLookups();
+        return lookups == 0 ? 0 : numHits / (double) lookups;
+    }
+
+    public long avgGetBytes() {
+        return getBytes == 0 ? 0 : getBytes / numLookups();
+    }
+
+    public long getAvgGetTime() {
+        return getTime / numLookups();
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java 
b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
new file mode 100644
index 0000000..b9bdf5c
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/KeyHookLookup.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Class implement this interface indicates that the key information need to 
be calculated from a first lookup from cache itself to get
+ * a hook.
+ *
+ */
+public interface KeyHookLookup {
+    KeyHook lookupKeyHook(String key);
+
+    public static class KeyHook implements Serializable {
+        private static final long serialVersionUID = 2400159460862757991L;
+
+        private String[] chunkskey;
+        private byte[] values;
+
+        /**
+         * For de-serialization
+         */
+        public KeyHook() {
+        }
+
+        public KeyHook(String[] chunkskey, byte[] values) {
+            super();
+            this.chunkskey = chunkskey;
+            this.values = values;
+        }
+
+        public String[] getChunkskey() {
+            return chunkskey;
+        }
+
+        public void setChunkskey(String[] chunkskey) {
+            this.chunkskey = chunkskey;
+        }
+
+        public byte[] getValues() {
+            return values;
+        }
+
+        public void setValues(byte[] values) {
+            this.values = values;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + Arrays.hashCode(chunkskey);
+            result = prime * result + Arrays.hashCode(values);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            KeyHook other = (KeyHook) obj;
+            if (!Arrays.equals(chunkskey, other.chunkskey))
+                return false;
+            if (!Arrays.equals(values, other.values))
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            if (chunkskey != null) {
+                builder.append("chunkskey_length:" + chunkskey.length);
+            } else {
+                builder.append("chunkskey_is_null");
+            }
+            builder.append("|");
+            if (values != null) {
+                builder.append("value_length:" + values.length);
+            } else {
+                builder.append("value_is_null");
+            }
+            return builder.toString();
+        }
+
+        //        @Override
+        //        public void writeExternal(ObjectOutput out) throws 
IOException {
+        //            if(chunkskey == null){
+        //                out.writeInt(0);
+        //            }else{
+        //                out.writeInt(chunkskey.length);
+        //                for (String chunkKey : chunkskey) {
+        //                    out.writeUTF(chunkKey);
+        //                }
+        //            }
+        //            if(values != null){
+        //                out.write(values);
+        //            }
+        //        }
+        //        
+        //        @Override
+        //        public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        //            int keySize = in.readInt();
+        //            if(keySize > 0){
+        //                chunkskey = new String[keySize];
+        //                for (int i = 0; i < keySize; i++){
+        //                    chunkskey[i] = in.readUTF();
+        //                }
+        //            }
+        //            int available = in.available();
+        //            if(available > 0){
+        //                values = new byte[available];
+        //                in.read(values);
+        //            }
+        //        }
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
new file mode 100644
index 0000000..a2e69a7
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCache.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.DataFormatException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.AddrUtil;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.DefaultHashAlgorithm;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.ops.ArrayOperationQueueFactory;
+import net.spy.memcached.ops.LinkedOperationQueueFactory;
+import net.spy.memcached.ops.OperationQueueFactory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+/**
+ * Cache backend by Memcached. The implementation leverages spymemcached 
client to talk to the servers.
+ * Memcached itself has a limitation to the size of the key. So the real key 
for cache lookup is hashed from the orginal key.
+ * The implementation provdes a way for hash collsion detection. It can also 
compress/decompress the value bytes based on the preconfigred compression 
threshold to save network bandwidth and storage space.
+ *
+ * @author mingmwang
+ *
+ */
+public class MemcachedCache {
+    public static final int MAX_PREFIX_LENGTH = 
MemcachedClientIF.MAX_KEY_LENGTH - 40 // length of namespace hash
+            - 40 // length of key hash
+            - 2; // length of separators
+    private static final Logger logger = 
LoggerFactory.getLogger(MemcachedCache.class);
+    private static final int DEFAULT_TTL = 7 * 24 * 3600;
+    protected final MemcachedCacheConfig config;
+    protected final MemcachedClientIF client;
+    protected final String memcachedPrefix;
+    protected final int compressThreshold;
+    protected final AtomicLong hitCount = new AtomicLong(0);
+    protected final AtomicLong missCount = new AtomicLong(0);
+    protected final AtomicLong readBytes = new AtomicLong(0);
+    protected final AtomicLong timeoutCount = new AtomicLong(0);
+    protected final AtomicLong errorCount = new AtomicLong(0);
+    protected final AtomicLong putCount = new AtomicLong(0);
+    protected final AtomicLong putBytes = new AtomicLong(0);
+    private final int timeToLiveSeconds;
+    protected AtomicLong cacheGetTime = new AtomicLong(0);
+
+    public MemcachedCache(final MemcachedClientIF client, final 
MemcachedCacheConfig config,
+            final String memcachedPrefix, int timeToLiveSeconds) {
+        Preconditions.checkArgument(memcachedPrefix.length() <= 
MAX_PREFIX_LENGTH,
+                "memcachedPrefix length [%d] exceeds maximum length [%d]", 
memcachedPrefix.length(), MAX_PREFIX_LENGTH);
+        this.memcachedPrefix = memcachedPrefix;
+        this.client = client;
+        this.config = config;
+        this.compressThreshold = config.getMaxObjectSize() / 2;
+        this.timeToLiveSeconds = timeToLiveSeconds;
+    }
+
+    public MemcachedCache(MemcachedCache cache) {
+        this(cache.client, cache.config, cache.memcachedPrefix, 
cache.timeToLiveSeconds);
+    }
+
+    /**
+     * Create and return the MemcachedCache. Each time call this method will 
create a new instance.
+     * @param config            The MemcachedCache configuration to control 
the cache behavior.
+     * @return
+     */
+    public static MemcachedCache create(final MemcachedCacheConfig config, 
String memcachedPrefix) {
+        return create(config, memcachedPrefix, DEFAULT_TTL);
+    }
+
+    public static MemcachedCache create(final MemcachedCacheConfig config, 
String memcachedPrefix, int timeToLive) {
+        try {
+            SerializingTranscoder transcoder = new 
SerializingTranscoder(config.getMaxObjectSize());
+            // always no compression inside, we compress/decompress outside
+            transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+
+            OperationQueueFactory opQueueFactory;
+            int maxQueueSize = config.getMaxOperationQueueSize();
+            if (maxQueueSize > 0) {
+                opQueueFactory = new ArrayOperationQueueFactory(maxQueueSize);
+            } else {
+                opQueueFactory = new LinkedOperationQueueFactory();
+            }
+            String hostsStr = config.getHosts();
+            ConnectionFactory connectionFactory = new 
MemcachedConnectionFactoryBuilder()
+                    .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY)
+                    .setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
+                    
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT).setDaemon(true)
+                    
.setFailureMode(FailureMode.Redistribute).setTranscoder(transcoder).setShouldOptimize(true)
+                    
.setOpQueueMaxBlockTime(config.getTimeout()).setOpTimeout(config.getTimeout())
+                    
.setReadBufferSize(config.getReadBufferSize()).setOpQueueFactory(opQueueFactory).build();
+            return new MemcachedCache(new MemcachedClient(new 
MemcachedConnectionFactory(connectionFactory),
+                    AddrUtil.getAddresses(hostsStr)), config, memcachedPrefix, 
timeToLive);
+        } catch (IOException e) {
+            logger.error("Unable to create MemcachedCache instance.", e);
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public String getName() {
+        return memcachedPrefix;
+    }
+
+    public Object getNativeCache() {
+        return client;
+    }
+
+    protected String serializeKey(Object key) {
+        try {
+            return JsonUtil.writeValueAsString(key);
+        } catch (JsonProcessingException e) {
+            logger.warn("Can not convert key to String.", e);
+        }
+        return null;
+    }
+
+    protected byte[] serializeValue(Object value) {
+        return SerializationUtils.serialize((Serializable) value);
+    }
+
+    @VisibleForTesting
+    byte[] encodeValue(String keyS, Object value) {
+        if (keyS == null) {
+            return null;
+        }
+        return encodeValue(keyS.getBytes(Charsets.UTF_8), 
serializeValue(value));
+    }
+
+    /**
+     * This method is used to get value object based on key from the Cache. It 
converts key to json string first.
+     * And then it calls getBinary() method to compute hashed key from the 
original key string, and use this as the real key to do lookup from internal 
Cache.
+     * Then decodes the real values bytes from the cache lookup result, and 
leverages object serializer to convert value bytes to object.
+     */
+    public byte[] get(Object key) {
+        return get(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public byte[] get(String keyS) {
+        return getBinary(keyS);
+    }
+
+    /**
+     * This method is used to put key/value objects to the Cache. It converts 
key to json string and leverages object serializer to convert value object to 
bytes.
+     * And then it calls putBinary() method to compute hashed key from the 
original key string and encode the original key bytes into value bytes for hash 
conflicts detection.
+     */
+    public void put(Object key, Object value) {
+        put(serializeKey(key), value);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void put(String keyS, Object value) {
+        if (keyS != null) {
+            putBinary(keyS, serializeValue(value), timeToLiveSeconds);
+        }
+    }
+
+    public void evict(Object key) {
+        if (key == null)
+            return;
+        evict(serializeKey(key));
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     */
+    public void evict(String keyS) {
+        if (keyS == null)
+            return;
+        client.delete(computeKeyHash(keyS));
+    }
+
+    public void clear() {
+        logger.warn("Clear Remote Cache!");
+        Future<Boolean> resultFuture = client.flush();
+        try {
+            boolean result = resultFuture.get();
+            logger.warn("Clear Remote Cache returned with result: " + result);
+        } catch (Exception e) {
+            logger.warn("Can't clear Remote Cache.", e);
+        }
+    }
+
+    public CacheStats getStats() {
+        return new CacheStats(readBytes.get(), cacheGetTime.get(), 
putCount.get(), putBytes.get(), hitCount.get(),
+                missCount.get(), 0, timeoutCount.get(), errorCount.get());
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @return the serialized value
+     */
+    protected byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        byte[] bytes = internalGet(computeKeyHash(keyS));
+        return decodeValue(keyS.getBytes(Charsets.UTF_8), bytes);
+    }
+
+    /**
+     * @param keyS should be the serialized string
+     * @param valueB should be the serialized value
+     */
+    protected void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        internalPut(computeKeyHash(keyS), 
encodeValue(keyS.getBytes(Charsets.UTF_8), valueB), expiration);
+    }
+
+    protected byte[] internalGet(String hashedKey) {
+        Future<Object> future;
+        long start = System.currentTimeMillis();
+        try {
+            future = client.asyncGet(hashedKey);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            byte[] result = (byte[]) future.get(config.getTimeout(), 
TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (result != null) {
+                hitCount.incrementAndGet();
+                readBytes.addAndGet(result.length);
+            } else {
+                missCount.incrementAndGet();
+            }
+            return result;
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            future.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling key meta from 
cache.", e);
+            return null;
+        }
+    }
+
+    private void internalPut(String hashedKey, byte[] encodedValue, int 
expiration) {
+        try {
+            client.set(hashedKey, expiration, encodedValue);
+            putCount.incrementAndGet();
+            putBytes.addAndGet(encodedValue.length);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+        }
+    }
+
+    protected byte[] encodeValue(byte[] key, byte[] valueB) {
+        byte[] compressed = null;
+        if (config.isEnableCompression() && (valueB.length + Ints.BYTES + 
key.length > compressThreshold)) {
+            try {
+                compressed = 
CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + 
valueB.length)
+                        .putInt(key.length).put(key).put(valueB).array());
+            } catch (IOException e) {
+                compressed = null;
+                logger.warn("Compressing value bytes error.", e);
+            }
+        }
+        if (compressed != null) {
+            return ByteBuffer.allocate(Shorts.BYTES + 
compressed.length).putShort((short) 1).put(compressed).array();
+        } else {
+            return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length 
+ valueB.length).putShort((short) 0)
+                    .putInt(key.length).put(key).put(valueB).array();
+        }
+    }
+
+    protected byte[] decodeValue(byte[] key, byte[] valueE) {
+        if (valueE == null)
+            return null;
+        ByteBuffer buf = ByteBuffer.wrap(valueE);
+        short enableCompression = buf.getShort();
+        byte[] uncompressed = null;
+        if (enableCompression == 1) {
+            byte[] value = new byte[buf.remaining()];
+            buf.get(value);
+            try {
+                uncompressed = CompressionUtils.decompress(value);
+            } catch (IOException | DataFormatException e) {
+                logger.error("Decompressing value bytes error.", e);
+                return null;
+            }
+        }
+        if (uncompressed != null) {
+            buf = ByteBuffer.wrap(uncompressed);
+        }
+        final int keyLength = buf.getInt();
+        byte[] keyBytes = new byte[keyLength];
+        buf.get(keyBytes);
+        if (!Arrays.equals(keyBytes, key)) {
+            logger.error("Keys do not match, possible hash collision!");
+            return null;
+        }
+        byte[] value = new byte[buf.remaining()];
+        buf.get(value);
+        return value;
+    }
+
+    protected String computeKeyHash(String key) {
+        // hash keys to keep things under 250 characters for memcached
+        return 
Joiner.on(":").skipNulls().join(KylinConfig.getInstanceFromEnv().getDeployEnv(),
 this.memcachedPrefix,
+                DigestUtils.shaHex(key));
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
new file mode 100644
index 0000000..d71c279
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedCacheConfig.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import net.spy.memcached.DefaultConnectionFactory;
+
+public class MemcachedCacheConfig {
+    private long timeout = 500L;
+
+    // comma delimited list of memcached servers, given as host:port 
combination
+    private String hosts;
+
+    private int maxChunkSize = 1024;
+
+    private int maxObjectSize = 1024 * 1024;
+
+    // memcached client read buffer size, -1 uses the spymemcached library 
default
+    private int readBufferSize = 
DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;
+
+    // maximum operation queue size. 0 means unbounded
+    private int maxOperationQueueSize = 0;
+
+    // whether enable compress the value data or not
+    private boolean enableCompression = true;
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public String getHosts() {
+        return hosts;
+    }
+
+    public void setHosts(String hosts) {
+        this.hosts = hosts;
+    }
+
+    public int getMaxChunkSize() {
+        return maxChunkSize;
+    }
+
+    public void setMaxChunkSize(int maxChunkSize) {
+        this.maxChunkSize = maxChunkSize;
+    }
+
+    public int getMaxObjectSize() {
+        return maxObjectSize;
+    }
+
+    public void setMaxObjectSize(int maxObjectSize) {
+        this.maxObjectSize = maxObjectSize;
+    }
+
+    public int getMaxOperationQueueSize() {
+        return maxOperationQueueSize;
+    }
+
+    public void setMaxOperationQueueSize(int maxOperationQueueSize) {
+        this.maxOperationQueueSize = maxOperationQueueSize;
+    }
+
+    public int getReadBufferSize() {
+        return readBufferSize;
+    }
+
+    public void setReadBufferSize(int readBufferSize) {
+        this.readBufferSize = readBufferSize;
+    }
+
+    public boolean isEnableCompression() {
+        return enableCompression;
+    }
+
+    public void setEnableCompression(boolean enableCompression) {
+        this.enableCompression = enableCompression;
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
new file mode 100644
index 0000000..e79e717
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedChunkingCache.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
+
+import net.spy.memcached.internal.BulkFuture;
+
+/**
+ * Subclass of MemcachedCache. It supports storing large objects.  Memcached 
itself has a limitation to the value size with default value of 1M.
+ * This implement extends the limit to 1G and can split huge bytes to multiple 
chunks. It will take care of the data integrity if part of the chunks lost(due 
to server restart or other reasons)
+ *
+ * @author mingmwang
+ */
+public class MemcachedChunkingCache extends MemcachedCache implements 
KeyHookLookup {
+    private static final Logger logger = 
LoggerFactory.getLogger(MemcachedChunkingCache.class);
+
+    public MemcachedChunkingCache(MemcachedCache cache) {
+        super(cache);
+        Preconditions.checkArgument(config.getMaxChunkSize() > 1, 
"maxChunkSize [%d] must be greater than 1",
+                config.getMaxChunkSize());
+        Preconditions.checkArgument(config.getMaxObjectSize() > 261, 
"maxObjectSize [%d] must be greater than 261",
+                config.getMaxObjectSize());
+    }
+
+    protected static byte[][] splitBytes(final byte[] data, final int nSplit) {
+        byte[][] dest = new byte[nSplit][];
+
+        final int splitSize = (data.length - 1) / nSplit + 1;
+        for (int i = 0; i < nSplit - 1; i++) {
+            dest[i] = Arrays.copyOfRange(data, i * splitSize, (i + 1) * 
splitSize);
+        }
+        dest[nSplit - 1] = Arrays.copyOfRange(data, (nSplit - 1) * splitSize, 
data.length);
+
+        return dest;
+    }
+
+    protected static int getValueSplit(MemcachedCacheConfig config, String 
keyS, int valueBLen) {
+        // the number 6 means the chunk number size never exceeds 6 bytes
+        final int valueSize = config.getMaxObjectSize() - Shorts.BYTES - 
Ints.BYTES
+                - keyS.getBytes(Charsets.UTF_8).length - 6;
+        final int maxValueSize = config.getMaxChunkSize() * valueSize;
+        Preconditions.checkArgument(valueBLen <= maxValueSize,
+                "the value bytes length [%d] exceeds maximum value size [%d]", 
valueBLen, maxValueSize);
+        return (valueBLen - 1) / valueSize + 1;
+    }
+
+    protected static Pair<KeyHook, byte[][]> getKeyValuePair(int nSplit, 
String keyS, byte[] valueB) {
+        KeyHook keyHook;
+        byte[][] splitValueB = null;
+        if (nSplit > 1) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Enable chunking for putting large cached object 
values, chunk size = " + nSplit
+                        + ", original value bytes size = " + valueB.length);
+            }
+            String[] chunkKeySs = new String[nSplit];
+            for (int i = 0; i < nSplit; i++) {
+                chunkKeySs[i] = keyS + i;
+            }
+            keyHook = new KeyHook(chunkKeySs, null);
+            splitValueB = splitBytes(valueB, nSplit);
+        } else {
+            if (logger.isDebugEnabled()) {
+                logger.debug(
+                        "Chunking not enabled, put the original value bytes to 
keyhook directly, original value bytes size = "
+                                + valueB.length);
+            }
+            keyHook = new KeyHook(null, valueB);
+        }
+
+        return new Pair<>(keyHook, splitValueB);
+    }
+
+    /**
+     * This method overrides the parent getBinary(), it gets the KeyHook from 
the Cache first and check the KeyHook that whether chunking is enabled or not.
+     */
+    @Override
+    public byte[] getBinary(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return null;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return null;
+        }
+
+        if (keyHook.getChunkskey() == null || keyHook.getChunkskey().length == 
0) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Chunking not enabled, return the value bytes in 
the keyhook directly, value bytes size = "
+                        + keyHook.getValues().length);
+            }
+            return keyHook.getValues();
+        }
+
+        BulkFuture<Map<String, Object>> bulkFuture;
+        long start = System.currentTimeMillis();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Chunking enabled, chunk size = " + 
keyHook.getChunkskey().length);
+        }
+
+        Map<String, String> keyLookup = 
computeKeyHash(Arrays.asList(keyHook.getChunkskey()));
+        try {
+            bulkFuture = client.asyncGetBulk(keyLookup.keySet());
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", e);
+            return null;
+        } catch (Throwable t) {
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation.", t);
+            return null;
+        }
+
+        try {
+            Map<String, Object> bulkResult = 
bulkFuture.get(config.getTimeout(), TimeUnit.MILLISECONDS);
+            cacheGetTime.addAndGet(System.currentTimeMillis() - start);
+            if (bulkResult.size() != keyHook.getChunkskey().length) {
+                missCount.incrementAndGet();
+                logger.warn("Some paritial chunks missing for query key:" + 
keyS);
+                //remove all the partital chunks here.
+                for (String partitalKey : bulkResult.keySet()) {
+                    client.delete(partitalKey);
+                }
+                deleteKeyHook(keyS);
+                return null;
+            }
+            hitCount.getAndAdd(keyHook.getChunkskey().length);
+            byte[][] bytesArray = new byte[keyHook.getChunkskey().length][];
+            for (Map.Entry<String, Object> entry : bulkResult.entrySet()) {
+                byte[] bytes = (byte[]) entry.getValue();
+                readBytes.addAndGet(bytes.length);
+                String originalKeyS = keyLookup.get(entry.getKey());
+                int idx = 
Integer.parseInt(originalKeyS.substring(keyS.length()));
+                bytesArray[idx] = 
decodeValue(originalKeyS.getBytes(Charsets.UTF_8), bytes);
+            }
+            return concatBytes(bytesArray);
+        } catch (TimeoutException e) {
+            timeoutCount.incrementAndGet();
+            bulkFuture.cancel(false);
+            return null;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw Throwables.propagate(e);
+        } catch (ExecutionException e) {
+            errorCount.incrementAndGet();
+            logger.error("ExecutionException when pulling item from cache.", 
e);
+            return null;
+        }
+    }
+
+    /**
+     * This method overrides the parent putBinary() method. It will split the 
large value bytes into multiple chunks to fit into the internal Cache.
+     * It generates a KeyHook to store the splitted chunked keys.
+     */
+    @Override
+    public void putBinary(String keyS, byte[] valueB, int expiration) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        int nSplit = getValueSplit(config, keyS, valueB.length);
+        Pair<KeyHook, byte[][]> keyValuePair = getKeyValuePair(nSplit, keyS, 
valueB);
+        KeyHook keyHook = keyValuePair.getFirst();
+        byte[][] splitValueB = keyValuePair.getSecond();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("put key hook:{} to cache for hash key", keyHook);
+        }
+        super.putBinary(keyS, serializeValue(keyHook), expiration);
+        if (nSplit > 1) {
+            for (int i = 0; i < nSplit; i++) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Chunk[" + i + "] bytes size before encoding  
= " + splitValueB[i].length);
+                }
+                super.putBinary(keyHook.getChunkskey()[i], splitValueB[i], 
expiration);
+            }
+        }
+    }
+
+    public void evict(String keyS) {
+        if (Strings.isNullOrEmpty(keyS)) {
+            return;
+        }
+        KeyHook keyHook = lookupKeyHook(keyS);
+        if (keyHook == null) {
+            return;
+        }
+
+        if (keyHook.getChunkskey() != null && keyHook.getChunkskey().length > 
0) {
+            String[] chunkKeys = keyHook.getChunkskey();
+            for (String chunkKey : chunkKeys) {
+                super.evict(chunkKey);
+            }
+        }
+        super.evict(keyS);
+    }
+
+    protected Map<String, String> computeKeyHash(List<String> keySList) {
+        return Maps.uniqueIndex(keySList, new Function<String, String>() {
+            @Override
+            public String apply(String keyS) {
+                return computeKeyHash(keyS);
+            }
+        });
+    }
+
+    private void deleteKeyHook(String keyS) {
+        try {
+            super.evict(keyS);
+        } catch (IllegalStateException e) {
+            // operation did not get queued in time (queue is full)
+            errorCount.incrementAndGet();
+            logger.error("Unable to queue cache operation: ", e);
+        }
+    }
+
+    private byte[] concatBytes(byte[]... bytesArray) {
+        int length = 0;
+        for (byte[] bytes : bytesArray) {
+            length += bytes.length;
+        }
+        byte[] result = new byte[length];
+        int destPos = 0;
+        for (byte[] bytes : bytesArray) {
+            System.arraycopy(bytes, 0, result, destPos, bytes.length);
+            destPos += bytes.length;
+        }
+        if (logger.isDebugEnabled()) {
+            logger.debug("Original value bytes size for all chunks  = " + 
result.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public KeyHook lookupKeyHook(String keyS) {
+        byte[] bytes = super.getBinary(keyS);
+        if (bytes == null) {
+            return null;
+        }
+        return (KeyHook) SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
new file mode 100644
index 0000000..fe48d3e
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactory.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.kylin.common.KylinConfig;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.metrics.NoopMetricCollector;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactory extends SpyObject implements 
ConnectionFactory {
+    private ConnectionFactory underlying;
+    private Map<String, String> metricsConfig = 
KylinConfig.getInstanceFromEnv().getKylinMetricsConf();
+
+    public MemcachedConnectionFactory(ConnectionFactory underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public MetricType enableMetrics() {
+        String metricType = metricsConfig.get("memcached.metricstype");
+        return metricType == null ? 
DefaultConnectionFactory.DEFAULT_METRIC_TYPE
+                : MetricType.valueOf(metricType.toUpperCase(Locale.ROOT));
+    }
+
+    @Override
+    public MetricCollector getMetricCollector() {
+        String enableMetrics = metricsConfig.get("memcached.enabled");
+        if (enableMetrics().equals(MetricType.OFF) || enableMetrics == null
+                || "false".equalsIgnoreCase(enableMetrics)) {
+            getLogger().debug("Memcached metrics collection disabled.");
+            return new NoopMetricCollector();
+        } else {
+            getLogger().info("Memcached metrics collection enabled (Profile " 
+ enableMetrics() + ").");
+            return new MemcachedMetrics();
+        }
+    }
+
+    @Override
+    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) 
throws IOException {
+        return underlying.createConnection(addrs);
+    }
+
+    @Override
+    public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel 
c, int bufSize) {
+        return underlying.createMemcachedNode(sa, c, bufSize);
+    }
+
+    @Override
+    public BlockingQueue<Operation> createOperationQueue() {
+        return underlying.createOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createReadOperationQueue() {
+        return underlying.createReadOperationQueue();
+    }
+
+    @Override
+    public BlockingQueue<Operation> createWriteOperationQueue() {
+        return underlying.createWriteOperationQueue();
+    }
+
+    @Override
+    public long getOpQueueMaxBlockTime() {
+        return underlying.getOpQueueMaxBlockTime();
+    }
+
+    @Override
+    public ExecutorService getListenerExecutorService() {
+        return underlying.getListenerExecutorService();
+    }
+
+    @Override
+    public boolean isDefaultExecutorService() {
+        return underlying.isDefaultExecutorService();
+    }
+
+    @Override
+    public NodeLocator createLocator(List<MemcachedNode> nodes) {
+        return underlying.createLocator(nodes);
+    }
+
+    @Override
+    public OperationFactory getOperationFactory() {
+        return underlying.getOperationFactory();
+    }
+
+    @Override
+    public long getOperationTimeout() {
+        return underlying.getOperationTimeout();
+    }
+
+    @Override
+    public boolean isDaemon() {
+        return underlying.isDaemon();
+    }
+
+    @Override
+    public boolean useNagleAlgorithm() {
+        return underlying.useNagleAlgorithm();
+    }
+
+    @Override
+    public Collection<ConnectionObserver> getInitialObservers() {
+        return underlying.getInitialObservers();
+    }
+
+    @Override
+    public FailureMode getFailureMode() {
+        return underlying.getFailureMode();
+    }
+
+    @Override
+    public Transcoder<Object> getDefaultTranscoder() {
+        return underlying.getDefaultTranscoder();
+    }
+
+    @Override
+    public boolean shouldOptimize() {
+        return underlying.shouldOptimize();
+    }
+
+    @Override
+    public int getReadBufSize() {
+        return underlying.getReadBufSize();
+    }
+
+    @Override
+    public HashAlgorithm getHashAlg() {
+        return underlying.getHashAlg();
+    }
+
+    @Override
+    public long getMaxReconnectDelay() {
+        return underlying.getMaxReconnectDelay();
+    }
+
+    @Override
+    public AuthDescriptor getAuthDescriptor() {
+        return underlying.getAuthDescriptor();
+    }
+
+    @Override
+    public int getTimeoutExceptionThreshold() {
+        return underlying.getTimeoutExceptionThreshold();
+    }
+
+    @Override
+    public long getAuthWaitTime() {
+        return underlying.getAuthWaitTime();
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
new file mode 100644
index 0000000..97af4a6
--- /dev/null
+++ 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedConnectionFactoryBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+
+import net.spy.memcached.ArrayModNodeLocator;
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.ConnectionObserver;
+import net.spy.memcached.DefaultConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.HashAlgorithm;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.NodeLocator;
+import net.spy.memcached.OperationFactory;
+import net.spy.memcached.RefinedKetamaNodeLocator;
+import net.spy.memcached.auth.AuthDescriptor;
+import net.spy.memcached.metrics.MetricCollector;
+import net.spy.memcached.metrics.MetricType;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.transcoders.Transcoder;
+
+public class MemcachedConnectionFactoryBuilder extends 
ConnectionFactoryBuilder {
+    /**
+     * Get the ConnectionFactory set up with the provided parameters.
+     */
+    public ConnectionFactory build() {
+        return new DefaultConnectionFactory() {
+
+            @Override
+            public BlockingQueue<Operation> createOperationQueue() {
+                return opQueueFactory == null ? super.createOperationQueue() : 
opQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createReadOperationQueue() {
+                return readQueueFactory == null ? 
super.createReadOperationQueue() : readQueueFactory.create();
+            }
+
+            @Override
+            public BlockingQueue<Operation> createWriteOperationQueue() {
+                return writeQueueFactory == null ? 
super.createReadOperationQueue() : writeQueueFactory.create();
+            }
+
+            @Override
+            public NodeLocator createLocator(List<MemcachedNode> nodes) {
+                switch (locator) {
+                case ARRAY_MOD:
+                    return new ArrayModNodeLocator(nodes, getHashAlg());
+                case CONSISTENT:
+                    return new RefinedKetamaNodeLocator(nodes, getHashAlg());
+                default:
+                    throw new IllegalStateException("Unhandled locator type: " 
+ locator);
+                }
+            }
+
+            @Override
+            public Transcoder<Object> getDefaultTranscoder() {
+                return transcoder == null ? super.getDefaultTranscoder() : 
transcoder;
+            }
+
+            @Override
+            public FailureMode getFailureMode() {
+                return failureMode == null ? super.getFailureMode() : 
failureMode;
+            }
+
+            @Override
+            public HashAlgorithm getHashAlg() {
+                return hashAlg == null ? super.getHashAlg() : hashAlg;
+            }
+
+            public Collection<ConnectionObserver> getInitialObservers() {
+                return initialObservers;
+            }
+
+            @Override
+            public OperationFactory getOperationFactory() {
+                return opFact == null ? super.getOperationFactory() : opFact;
+            }
+
+            @Override
+            public long getOperationTimeout() {
+                return opTimeout == -1 ? super.getOperationTimeout() : 
opTimeout;
+            }
+
+            @Override
+            public int getReadBufSize() {
+                return readBufSize == -1 ? super.getReadBufSize() : 
readBufSize;
+            }
+
+            @Override
+            public boolean isDaemon() {
+                return isDaemon;
+            }
+
+            @Override
+            public boolean shouldOptimize() {
+                return shouldOptimize;
+            }
+
+            @Override
+            public boolean useNagleAlgorithm() {
+                return useNagle;
+            }
+
+            @Override
+            public long getMaxReconnectDelay() {
+                return maxReconnectDelay;
+            }
+
+            @Override
+            public AuthDescriptor getAuthDescriptor() {
+                return authDescriptor;
+            }
+
+            @Override
+            public long getOpQueueMaxBlockTime() {
+                return opQueueMaxBlockTime > -1 ? opQueueMaxBlockTime : 
super.getOpQueueMaxBlockTime();
+            }
+
+            @Override
+            public int getTimeoutExceptionThreshold() {
+                return timeoutExceptionThreshold;
+            }
+
+            @Override
+            public MetricType enableMetrics() {
+                return metricType == null ? super.enableMetrics() : metricType;
+            }
+
+            @Override
+            public MetricCollector getMetricCollector() {
+                return collector == null ? super.getMetricCollector() : 
collector;
+            }
+
+            @Override
+            public ExecutorService getListenerExecutorService() {
+                return executorService == null ? 
super.getListenerExecutorService() : executorService;
+            }
+
+            @Override
+            public boolean isDefaultExecutorService() {
+                return executorService == null;
+            }
+
+            @Override
+            public long getAuthWaitTime() {
+                return authWaitTime;
+            }
+        };
+
+    }
+}
\ No newline at end of file
diff --git 
a/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java 
b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
new file mode 100644
index 0000000..ada9144
--- /dev/null
+++ b/cache/src/main/java/org/apache/kylin/cache/memcached/MemcachedMetrics.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.memcached;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+
+import java.util.Map;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Maps;
+
+import net.spy.memcached.metrics.AbstractMetricCollector;
+import net.spy.memcached.metrics.DefaultMetricCollector;
+import net.spy.memcached.metrics.MetricCollector;
+
+/**
+ * A {@link MetricCollector} that uses the Codahale Metrics library.
+ *
+ * The following system properies can be used to customize the behavior
+ * of the collector during runtime:
+ */
+public final class MemcachedMetrics extends AbstractMetricCollector {
+
+    /**
+     * Contains all registered {@link Counter}s.
+     */
+    private Map<String, Counter> counters;
+
+    /**
+     * Contains all registered {@link Meter}s.
+     */
+    private Map<String, Meter> meters;
+
+    /**
+     * Contains all registered {@link Histogram}s.
+     */
+    private Map<String, Histogram> histograms;
+
+    /**
+     * Create a new {@link DefaultMetricCollector}.
+     *
+     * Note that when this constructor is called, the reporter is also
+     * automatically established.
+     */
+    public MemcachedMetrics() {
+        counters = Maps.newConcurrentMap();
+        meters = Maps.newConcurrentMap();
+        histograms = Maps.newConcurrentMap();
+    }
+
+    @Override
+    public void addCounter(String name) {
+        if (!counters.containsKey(name)) {
+            counters.put(name, Metrics.counter(name));
+        }
+    }
+
+    @Override
+    public void removeCounter(String name) {
+        if (!counters.containsKey(name)) {
+            Metrics.remove(name);
+            counters.remove(name);
+        }
+    }
+
+    @Override
+    public void incrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).inc(amount);
+        }
+    }
+
+    @Override
+    public void decrementCounter(String name, int amount) {
+        if (counters.containsKey(name)) {
+            counters.get(name).dec(amount);
+        }
+    }
+
+    @Override
+    public void addMeter(String name) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, Metrics.meter(name));
+        }
+    }
+
+    @Override
+    public void removeMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.remove(name);
+        }
+    }
+
+    @Override
+    public void markMeter(String name) {
+        if (meters.containsKey(name)) {
+            meters.get(name).mark();
+        }
+    }
+
+    @Override
+    public void addHistogram(String name) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(name, Metrics.histogram(name));
+        }
+    }
+
+    @Override
+    public void removeHistogram(String name) {
+        if (histograms.containsKey(name)) {
+            histograms.remove(name);
+        }
+    }
+
+    @Override
+    public void updateHistogram(String name, int amount) {
+        if (histograms.containsKey(name)) {
+            histograms.get(name).update(amount);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 778b5bf..159137b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1770,6 +1770,10 @@ abstract public class KylinConfigBase implements 
Serializable {
                 + getKylinMetricsSubjectSuffix();
     }
 
+    public Map<String, String> getKylinMetricsConf() {
+        return getPropertiesByPrefix("kylin.metrics.");
+    }
+
     // 
============================================================================
     // tool
     // 
============================================================================
diff --git a/pom.xml b/pom.xml
index 264583d..6cb6716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
     <xerces.version>2.11.0</xerces.version>
     <xalan.version>2.7.2</xalan.version>
     <ehcache.version>2.10.2.2.21</ehcache.version>
+    <memcached.verion>2.12.3</memcached.verion>
     <apache-httpclient.version>4.2.5</apache-httpclient.version>
     <roaring.version>0.6.18</roaring.version>
     <cglib.version>3.2.4</cglib.version>
@@ -261,6 +262,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-cache</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-engine-mr</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -731,6 +737,11 @@
         <artifactId>metrics-core</artifactId>
         <version>${dropwizard.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-jvm</artifactId>
+        <version>${dropwizard.version}</version>
+      </dependency>
 
       <!-- Test -->
       <dependency>
@@ -817,6 +828,11 @@
         <version>${ehcache.version}</version>
       </dependency>
       <dependency>
+        <groupId>net.spy</groupId>
+        <artifactId>spymemcached</artifactId>
+        <version>${memcached.verion}</version>
+      </dependency>
+      <dependency>
         <groupId>org.opensaml</groupId>
         <artifactId>opensaml</artifactId>
         <version>${opensaml.version}</version>
@@ -1246,6 +1262,7 @@
     <module>core-metrics</module>
     <module>metrics-reporter-hive</module>
     <module>metrics-reporter-kafka</module>
+    <module>cache</module>
   </modules>
 
   <reporting>

Reply via email to