This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 636f93a  KYLIN-3716 FastThreadLocal replaces ThreadLocal
636f93a is described below

commit 636f93a0a3fb412b352e1518f1658b7c94368a0d
Author: Nicholas Jiang <programg...@163.com>
AuthorDate: Sat Jan 19 09:09:00 2019 +0800

    KYLIN-3716 FastThreadLocal replaces ThreadLocal
---
 .../java/org/apache/kylin/common/KylinConfig.java  |   5 +-
 .../apache/kylin/common/QueryContextFacade.java    |   3 +-
 .../apache/kylin/common/debug/BackdoorToggles.java |   3 +-
 .../common/metrics/metrics2/CodahaleMetrics.java   |   3 +-
 .../common/metrics/perflog/PerfLoggerFactory.java  |   3 +-
 .../kylin/common/persistence/ResourceStore.java    |   3 +-
 .../kylin/common/threadlocal/InternalThread.java   |  74 ++++++++
 .../common/threadlocal/InternalThreadLocal.java    | 200 +++++++++++++++++++++
 .../common/threadlocal/InternalThreadLocalMap.java | 169 +++++++++++++++++
 .../org/apache/kylin/common/util/HadoopUtil.java   |   3 +-
 .../apache/kylin/dict/Number2BytesConverter.java   |   3 +-
 .../org/apache/kylin/engine/EngineFactory.java     |   3 +-
 .../org/apache/kylin/job/SchedulerFactory.java     |   3 +-
 .../measure/percentile/PercentileSerializer.java   |   5 +-
 .../kylin/measure/topn/DoubleDeltaSerializer.java  |   6 +-
 .../metadata/datatype/DataTypeSerializer.java      |   5 +-
 .../metadata/filter/function/BuiltInMethod.java    |   3 +-
 .../main/java/org/apache/kylin/util/KryoUtils.java |   3 +-
 .../apache/kylin/metrics/lib/impl/RecordEvent.java |   3 +-
 .../metrics/lib/impl/RecordEventTimeDetail.java    |   5 +-
 .../org/apache/kylin/storage/StorageFactory.java   |   3 +-
 .../apache/kylin/query/relnode/OLAPContext.java    |   5 +-
 .../java/org/apache/kylin/rest/msg/MsgPicker.java  |   4 +-
 .../kylin/storage/hbase/HBaseConnection.java       |   3 +-
 24 files changed, 493 insertions(+), 27 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 464f9cd..9aa54d6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.OrderedProperties;
 import org.slf4j.Logger;
@@ -67,7 +68,7 @@ public class KylinConfig extends KylinConfigBase {
     private static OrderedProperties defaultOrderedProperties = new 
OrderedProperties();
 
     // thread-local instances, will override SYS_ENV_INSTANCE
-    private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = 
new ThreadLocal<>();
+    private static transient InternalThreadLocal<KylinConfig> 
THREAD_ENV_INSTANCE = new InternalThreadLocal<>();
 
     static {
         /*
@@ -151,7 +152,7 @@ public class KylinConfig extends KylinConfigBase {
             logger.info("Destroy KylinConfig");
             dumpStackTrace();
             SYS_ENV_INSTANCE = null;
-            THREAD_ENV_INSTANCE = new ThreadLocal<>();
+            THREAD_ENV_INSTANCE = new InternalThreadLocal<>();
         }
     }
 
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
index e1cf54b..bd22f68 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java
@@ -23,6 +23,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ public class QueryContextFacade {
     private static final Logger logger = 
LoggerFactory.getLogger(QueryContextFacade.class);
 
     private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = 
Maps.newConcurrentMap();
-    private static final ThreadLocal<QueryContext> CURRENT_CTX = new 
ThreadLocal<QueryContext>() {
+    private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new 
InternalThreadLocal<QueryContext>() {
         @Override
         protected QueryContext initialValue() {
             QueryContext queryContext = new QueryContext();
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index be0f7a6..210c1f8 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.Pair;
 
 import com.google.common.collect.Maps;
@@ -39,7 +40,7 @@ import com.google.common.collect.Maps;
  */
 public class BackdoorToggles {
 
-    private static final ThreadLocal<Map<String, String>> _backdoorToggles = 
new ThreadLocal<Map<String, String>>();
+    private static final InternalThreadLocal<Map<String, String>> 
_backdoorToggles = new InternalThreadLocal<Map<String, String>>();
 
     public static void setToggles(Map<String, String> toggles) {
         _backdoorToggles.set(toggles);
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java
 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java
index 7487930..53aaf8d 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java
@@ -39,6 +39,7 @@ import org.apache.kylin.common.metrics.common.Metrics;
 import org.apache.kylin.common.metrics.common.MetricsConstant;
 import org.apache.kylin.common.metrics.common.MetricsScope;
 import org.apache.kylin.common.metrics.common.MetricsVariable;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,7 @@ public class CodahaleMetrics implements Metrics {
     private final Lock metersLock = new ReentrantLock();
     private final Lock histogramLock = new ReentrantLock();
     private final Set<Closeable> reporters = new HashSet<Closeable>();
-    private final ThreadLocal<HashMap<String, CodahaleMetricsScope>> 
threadLocalScopes = new ThreadLocal<HashMap<String, CodahaleMetricsScope>>() {
+    private final InternalThreadLocal<HashMap<String, CodahaleMetricsScope>> 
threadLocalScopes = new InternalThreadLocal<HashMap<String, 
CodahaleMetricsScope>>() {
         @Override
         protected HashMap<String, CodahaleMetricsScope> initialValue() {
             return new HashMap<String, CodahaleMetricsScope>();
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java
 
b/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java
index 4fc43a8..647a650 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java
@@ -20,12 +20,13 @@ package org.apache.kylin.common.metrics.perflog;
 
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PerfLoggerFactory {
 
-    protected static final ThreadLocal<IPerfLogger> perfLogger = new 
ThreadLocal<IPerfLogger>();
+    protected static final InternalThreadLocal<IPerfLogger> perfLogger = new 
InternalThreadLocal<IPerfLogger>();
     static final private Logger LOG = 
LoggerFactory.getLogger(PerfLoggerFactory.class.getName());
 
     public static IPerfLogger getPerfLogger() {
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 275d95a..5b3f196 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
@@ -521,7 +522,7 @@ abstract public class ResourceStore {
 
     // 
============================================================================
 
-    ThreadLocal<Checkpoint> checkpointing = new ThreadLocal<>();
+    InternalThreadLocal<Checkpoint> checkpointing = new 
InternalThreadLocal<>();
 
     public Checkpoint checkpoint() {
         Checkpoint cp = checkpointing.get();
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java
 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java
new file mode 100644
index 0000000..912a12c
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.threadlocal;
+
+/**
+ * InternalThread
+ */
+public class InternalThread extends Thread {
+
+    private InternalThreadLocalMap threadLocalMap;
+
+    public InternalThread() {
+    }
+
+    public InternalThread(Runnable target) {
+        super(target);
+    }
+
+    public InternalThread(ThreadGroup group, Runnable target) {
+        super(group, target);
+    }
+
+    public InternalThread(String name) {
+        super(name);
+    }
+
+    public InternalThread(ThreadGroup group, String name) {
+        super(group, name);
+    }
+
+    public InternalThread(Runnable target, String name) {
+        super(target, name);
+    }
+
+    public InternalThread(ThreadGroup group, Runnable target, String name) {
+        super(group, target, name);
+    }
+
+    public InternalThread(ThreadGroup group, Runnable target, String name, 
long stackSize) {
+        super(group, target, name, stackSize);
+    }
+
+    /**
+     * Returns the internal data structure that keeps the threadLocal 
variables bound to this thread.
+     * Note that this method is for internal use only, and thus is subject to 
change at any time.
+     */
+    public final InternalThreadLocalMap threadLocalMap() {
+        return threadLocalMap;
+    }
+
+    /**
+     * Sets the internal data structure that keeps the threadLocal variables 
bound to this thread.
+     * Note that this method is for internal use only, and thus is subject to 
change at any time.
+     */
+    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) 
{
+        this.threadLocalMap = threadLocalMap;
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java
 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java
new file mode 100644
index 0000000..7719ab7
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.threadlocal;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
+
+/**
+ * InternalThreadLocal
+ * A special variant of {@link ThreadLocal} that yields higher access 
performance when accessed from a
+ * {@link InternalThread}.
+ * <p></p>
+ * Internally, a {@link InternalThread} uses a constant index in an array, 
instead of using hash code and hash table,
+ * to look for a variable.  Although seemingly very subtle, it yields slight 
performance advantage over using a hash
+ * table, and it is useful when accessed frequently.
+ * <p></p>
+ * This design is learning from {@see 
io.netty.util.concurrent.FastThreadLocal} which is in Netty.
+ */
+public class InternalThreadLocal<V> {
+
+    private static final int variablesToRemoveIndex = 
InternalThreadLocalMap.nextVariableIndex();
+
+    private final int index;
+
+    public InternalThreadLocal() {
+        index = InternalThreadLocalMap.nextVariableIndex();
+    }
+
+    /**
+     * Removes all {@link InternalThreadLocal} variables bound to the current 
thread.  This operation is useful when you
+     * are in a container environment, and you don't want to leave the thread 
local variables in the threads you do not
+     * manage.
+     */
+    @SuppressWarnings("unchecked")
+    public static void removeAll() {
+        InternalThreadLocalMap threadLocalMap = 
InternalThreadLocalMap.getIfSet();
+        if (threadLocalMap == null) {
+            return;
+        }
+
+        try {
+            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+            if (v != null && v != InternalThreadLocalMap.UNSET) {
+                Set<InternalThreadLocal<?>> variablesToRemove = 
(Set<InternalThreadLocal<?>>) v;
+                InternalThreadLocal<?>[] variablesToRemoveArray =
+                        variablesToRemove.toArray(new 
InternalThreadLocal[variablesToRemove.size()]);
+                for (InternalThreadLocal<?> tlv : variablesToRemoveArray) {
+                    tlv.remove(threadLocalMap);
+                }
+            }
+        } finally {
+            InternalThreadLocalMap.remove();
+        }
+    }
+
+    /**
+     * Returns the number of thread local variables bound to the current 
thread.
+     */
+    public static int size() {
+        InternalThreadLocalMap threadLocalMap = 
InternalThreadLocalMap.getIfSet();
+        if (threadLocalMap == null) {
+            return 0;
+        } else {
+            return threadLocalMap.size();
+        }
+    }
+
+    public static void destroy() {
+        InternalThreadLocalMap.destroy();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void addToVariablesToRemove(InternalThreadLocalMap 
threadLocalMap, InternalThreadLocal<?> variable) {
+        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+        Set<InternalThreadLocal<?>> variablesToRemove;
+        if (v == InternalThreadLocalMap.UNSET || v == null) {
+            variablesToRemove = Collections.newSetFromMap(new 
IdentityHashMap<InternalThreadLocal<?>, Boolean>());
+            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, 
variablesToRemove);
+        } else {
+            variablesToRemove = (Set<InternalThreadLocal<?>>) v;
+        }
+
+        variablesToRemove.add(variable);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void removeFromVariablesToRemove(InternalThreadLocalMap 
threadLocalMap, InternalThreadLocal<?> variable) {
+
+        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
+
+        if (v == InternalThreadLocalMap.UNSET || v == null) {
+            return;
+        }
+
+        Set<InternalThreadLocal<?>> variablesToRemove = 
(Set<InternalThreadLocal<?>>) v;
+        variablesToRemove.remove(variable);
+    }
+
+    /**
+     * Returns the current value for the current thread
+     */
+    @SuppressWarnings("unchecked")
+    public final V get() {
+        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
+        Object v = threadLocalMap.indexedVariable(index);
+        if (v != InternalThreadLocalMap.UNSET) {
+            return (V) v;
+        }
+
+        return initialize(threadLocalMap);
+    }
+
+    private V initialize(InternalThreadLocalMap threadLocalMap) {
+        V v = null;
+        try {
+            v = initialValue();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        threadLocalMap.setIndexedVariable(index, v);
+        addToVariablesToRemove(threadLocalMap, this);
+        return v;
+    }
+
+    /**
+     * Sets the value for the current thread.
+     */
+    public final void set(V value) {
+        if (value == null || value == InternalThreadLocalMap.UNSET) {
+            remove();
+        } else {
+            InternalThreadLocalMap threadLocalMap = 
InternalThreadLocalMap.get();
+            if (threadLocalMap.setIndexedVariable(index, value)) {
+                addToVariablesToRemove(threadLocalMap, this);
+            }
+        }
+    }
+
+    /**
+     * Sets the value to uninitialized; a proceeding call to get() will 
trigger a call to initialValue().
+     */
+    @SuppressWarnings("unchecked")
+    public final void remove() {
+        remove(InternalThreadLocalMap.getIfSet());
+    }
+
+    /**
+     * Sets the value to uninitialized for the specified thread local map;
+     * a proceeding call to get() will trigger a call to initialValue().
+     * The specified thread local map must be for the current thread.
+     */
+    @SuppressWarnings("unchecked")
+    public final void remove(InternalThreadLocalMap threadLocalMap) {
+        if (threadLocalMap == null) {
+            return;
+        }
+
+        Object v = threadLocalMap.removeIndexedVariable(index);
+        removeFromVariablesToRemove(threadLocalMap, this);
+
+        if (v != InternalThreadLocalMap.UNSET) {
+            try {
+                onRemoval((V) v);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Returns the initial value for this thread-local variable.
+     */
+    protected V initialValue() throws Exception {
+        return null;
+    }
+
+    /**
+     * Invoked when this thread local variable is removed by {@link #remove()}.
+     */
+    protected void onRemoval(@SuppressWarnings("unused") V value) throws 
Exception {
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java
 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java
new file mode 100644
index 0000000..60edf96
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.threadlocal;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The internal data structure that stores the threadLocal variables for Netty 
and all {@link InternalThread}s.
+ * Note that this class is for internal use only. Use {@link InternalThread}
+ * unless you know what you are doing.
+ */
+public final class InternalThreadLocalMap {
+
+    private Object[] indexedVariables;
+
+    private static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = 
new ThreadLocal<InternalThreadLocalMap>();
+
+    private static final AtomicInteger nextIndex = new AtomicInteger();
+
+    public static final Object UNSET = new Object();
+
+    public static InternalThreadLocalMap getIfSet() {
+        Thread thread = Thread.currentThread();
+        if (thread instanceof InternalThread) {
+            return ((InternalThread) thread).threadLocalMap();
+        }
+        return slowThreadLocalMap.get();
+    }
+
+    public static InternalThreadLocalMap get() {
+        Thread thread = Thread.currentThread();
+        if (thread instanceof InternalThread) {
+            return fastGet((InternalThread) thread);
+        }
+        return slowGet();
+    }
+
+    public static void remove() {
+        Thread thread = Thread.currentThread();
+        if (thread instanceof InternalThread) {
+            ((InternalThread) thread).setThreadLocalMap(null);
+        } else {
+            slowThreadLocalMap.remove();
+        }
+    }
+
+    public static void destroy() {
+        slowThreadLocalMap = null;
+    }
+
+    public static int nextVariableIndex() {
+        int index = nextIndex.getAndIncrement();
+        if (index < 0) {
+            nextIndex.decrementAndGet();
+            throw new IllegalStateException("Too many thread-local indexed 
variables");
+        }
+        return index;
+    }
+
+    public static int lastVariableIndex() {
+        return nextIndex.get() - 1;
+    }
+
+    private InternalThreadLocalMap() {
+        indexedVariables = newIndexedVariableTable();
+    }
+
+    public Object indexedVariable(int index) {
+        Object[] lookup = indexedVariables;
+        return index < lookup.length ? lookup[index] : UNSET;
+    }
+
+    /**
+     * @return {@code true} if and only if a new thread-local variable has 
been created
+     */
+    public boolean setIndexedVariable(int index, Object value) {
+        Object[] lookup = indexedVariables;
+        if (index < lookup.length) {
+            Object oldValue = lookup[index];
+            lookup[index] = value;
+            return oldValue == UNSET;
+        } else {
+            expandIndexedVariableTableAndSet(index, value);
+            return true;
+        }
+    }
+
+    public Object removeIndexedVariable(int index) {
+        Object[] lookup = indexedVariables;
+        if (index < lookup.length) {
+            Object v = lookup[index];
+            lookup[index] = UNSET;
+            return v;
+        } else {
+            return UNSET;
+        }
+    }
+
+    public int size() {
+        int count = 0;
+        for (Object o : indexedVariables) {
+            if (o != UNSET) {
+                ++count;
+            }
+        }
+
+        //the fist element in `indexedVariables` is a set to keep all the 
InternalThreadLocal to remove
+        //look at method `addToVariablesToRemove`
+        return count - 1;
+    }
+
+    private static Object[] newIndexedVariableTable() {
+        Object[] array = new Object[32];
+        Arrays.fill(array, UNSET);
+        return array;
+    }
+
+    private static InternalThreadLocalMap fastGet(InternalThread thread) {
+        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
+        if (threadLocalMap == null) {
+            thread.setThreadLocalMap(threadLocalMap = new 
InternalThreadLocalMap());
+        }
+        return threadLocalMap;
+    }
+
+    private static InternalThreadLocalMap slowGet() {
+        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = 
InternalThreadLocalMap.slowThreadLocalMap;
+        InternalThreadLocalMap ret = slowThreadLocalMap.get();
+        if (ret == null) {
+            ret = new InternalThreadLocalMap();
+            slowThreadLocalMap.set(ret);
+        }
+        return ret;
+    }
+
+    private void expandIndexedVariableTableAndSet(int index, Object value) {
+        Object[] oldArray = indexedVariables;
+        final int oldCapacity = oldArray.length;
+        int newCapacity = index;
+        newCapacity |= newCapacity >>> 1;
+        newCapacity |= newCapacity >>> 2;
+        newCapacity |= newCapacity >>> 4;
+        newCapacity |= newCapacity >>> 8;
+        newCapacity |= newCapacity >>> 16;
+        newCapacity++;
+
+        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
+        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
+        newArray[index] = value;
+        indexedVariables = newArray;
+    }
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 4afce58..5187361 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,7 @@ import com.google.common.collect.Maps;
 public class HadoopUtil {
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(HadoopUtil.class);
-    private static final transient ThreadLocal<Configuration> hadoopConfig = 
new ThreadLocal<>();
+    private static final transient InternalThreadLocal<Configuration> 
hadoopConfig = new InternalThreadLocal<>();
     private HadoopUtil() {
         throw new IllegalStateException("Class HadoopUtil is an utility class 
!");
     }
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java
index d7de7d8..b7beba5 100644
--- 
a/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.math.BigDecimal;
 import java.util.Map;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.Bytes;
 
 import com.google.common.collect.Maps;
@@ -38,7 +39,7 @@ public class Number2BytesConverter implements 
BytesConverter<String>, Serializab
 
     int maxDigitsBeforeDecimalPoint;
 
-    static final transient ThreadLocal<Map<Integer, NumberBytesCodec>> LOCAL = 
new ThreadLocal<Map<Integer, NumberBytesCodec>>();
+    static final transient InternalThreadLocal<Map<Integer, NumberBytesCodec>> 
LOCAL = new InternalThreadLocal<Map<Integer, NumberBytesCodec>>();
 
     static NumberBytesCodec getCodec(int maxDigitsBeforeDecimalPoint) {
         Map<Integer, NumberBytesCodec> codecMap = LOCAL.get();
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java 
b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 03d986b..734c470 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -29,7 +30,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 public class EngineFactory {
 
     // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
-    private static ThreadLocal<ImplementationSwitch<IBatchCubingEngine>> 
engines = new ThreadLocal<>();
+    private static 
InternalThreadLocal<ImplementationSwitch<IBatchCubingEngine>> engines = new 
InternalThreadLocal<>();
 
     public static IBatchCubingEngine batchEngine(IEngineAware aware) {
         ImplementationSwitch<IBatchCubingEngine> current = engines.get();
diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java 
b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
index 1bf8942..c84b397 100644
--- a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java
@@ -18,11 +18,12 @@
 package org.apache.kylin.job;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ImplementationSwitch;
 
 public class SchedulerFactory {
     // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
-    private static ThreadLocal<ImplementationSwitch<Scheduler>> schedulers = 
new ThreadLocal<>();
+    private static InternalThreadLocal<ImplementationSwitch<Scheduler>> 
schedulers = new InternalThreadLocal<>();
 
     public static Scheduler scheduler(int schedulerType) {
         ImplementationSwitch<Scheduler> current = schedulers.get();
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index d0ecba7..2f75e16 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -20,12 +20,13 @@ package org.apache.kylin.measure.percentile;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 public class PercentileSerializer extends 
DataTypeSerializer<PercentileCounter> {
     // be thread-safe and avoid repeated obj creation
-    private transient ThreadLocal<PercentileCounter> current = null;
+    private transient InternalThreadLocal<PercentileCounter> current = null;
 
     private double compression;
 
@@ -55,7 +56,7 @@ public class PercentileSerializer extends 
DataTypeSerializer<PercentileCounter>
 
     private PercentileCounter current() {
         if (current == null) {
-            current = new ThreadLocal<>();
+            current = new InternalThreadLocal<>();
         }
 
         PercentileCounter counter = current.get();
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
index 1cb63e4..564d930 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.kylin.measure.topn;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -41,7 +43,7 @@ public class DoubleDeltaSerializer implements 
java.io.Serializable {
     final private int precision;
     final private int multiplier;
 
-    transient ThreadLocal<long[]> deltasThreadLocal;
+    transient InternalThreadLocal<long[]> deltasThreadLocal;
 
     public DoubleDeltaSerializer() {
         this(2);
@@ -112,7 +114,7 @@ public class DoubleDeltaSerializer implements 
java.io.Serializable {
         len = Math.max(0, len);
 
         if (deltasThreadLocal == null) {
-            deltasThreadLocal = new ThreadLocal<>();
+            deltasThreadLocal = new InternalThreadLocal<>();
         }
 
         long[] deltas = deltasThreadLocal.get();
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index 7c1435d..96b2d88 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -23,6 +23,7 @@ import java.io.ObjectInputStream;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.BytesSerializer;
 
 import com.google.common.collect.Maps;
@@ -34,7 +35,7 @@ import com.google.common.collect.Maps;
 abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, 
java.io.Serializable {
 
     final static Map<String, Class<?>> implementations = Maps.newHashMap();
-    protected transient ThreadLocal current = new ThreadLocal();
+    protected transient InternalThreadLocal current = new 
InternalThreadLocal();
     static {
         implementations.put("char", StringSerializer.class);
         implementations.put("varchar", StringSerializer.class);
@@ -118,6 +119,6 @@ abstract public class DataTypeSerializer<T> implements 
BytesSerializer<T>, java.
 
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
         in.defaultReadObject();
-        current = new ThreadLocal();
+        current = new InternalThreadLocal();
     }
 }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
index 95f7ea6..38683fb 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java
@@ -28,13 +28,14 @@ import java.util.Map;
 import org.apache.commons.lang3.reflect.MethodUtils;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 
 public enum BuiltInMethod {
     UPPER(BuiltInMethod.class, "upper", String.class), 
LOWER(BuiltInMethod.class, "lower", String.class), 
SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, 
int.class), CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class), 
LIKE(BuiltInMethod.class, "like", String.class, String.class), 
INITCAP(BuiltInMethod.class, "initcap", String.class), 
CONCAT(BuiltInMethod.class, "concat", String.class, String.class);
     public final Method method;
     public static final ImmutableMap<String, BuiltInMethod> MAP;
 
-    private static ThreadLocal<Map<String, LikeMatcher>> likePatterns = new 
ThreadLocal<Map<String, LikeMatcher>>() {
+    private static InternalThreadLocal<Map<String, LikeMatcher>> likePatterns 
= new InternalThreadLocal<Map<String, LikeMatcher>>() {
         @Override
         public Map<String, LikeMatcher> initialValue() {
             return new HashMap<>();
diff --git a/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java 
b/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java
index 675d49d..c419b32 100644
--- a/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java
+++ b/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java
@@ -20,6 +20,7 @@ package org.apache.kylin.util;
 
 import java.util.BitSet;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import com.esotericsoftware.kryo.Kryo;
@@ -28,7 +29,7 @@ import com.esotericsoftware.kryo.io.Output;
 
 public class KryoUtils {
 
-    private static ThreadLocal<Kryo> _Kryo = new ThreadLocal<>();
+    private static InternalThreadLocal<Kryo> _Kryo = new 
InternalThreadLocal<>();
 
     private KryoUtils() {
         throw new IllegalStateException("Class KryoUtils is an utility class 
!");
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
index 6452d78..5118fc9 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metrics.lib.Record;
 
@@ -35,7 +36,7 @@ import com.google.common.collect.Maps;
 
 public class RecordEvent implements Record, Map<String, Object>, Serializable {
 
-    private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new 
ThreadLocal<ByteArrayOutputStream>();
+    private static final InternalThreadLocal<ByteArrayOutputStream> _localBaos 
= new InternalThreadLocal<ByteArrayOutputStream>();
 
     static String localHostname;
     static {
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
index 827b5a1..aa1d307 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
@@ -24,11 +24,12 @@ import java.util.Locale;
 import java.util.TimeZone;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 
 public class RecordEventTimeDetail {
     private static final TimeZone timeZone;
-    private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = 
new ThreadLocal<SimpleDateFormat>();
-    private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = 
new ThreadLocal<SimpleDateFormat>();
+    private static final InternalThreadLocal<SimpleDateFormat> 
dateFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>();
+    private static final InternalThreadLocal<SimpleDateFormat> 
timeFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>();
 
     static {
         timeZone = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone());
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java 
b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 3505708..55bdb4b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -28,7 +29,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 public class StorageFactory {
 
     // Use thread-local because KylinConfig can be thread-local and 
implementation might be different among multiple threads.
-    private static ThreadLocal<ImplementationSwitch<IStorage>> storages = new 
ThreadLocal<>();
+    private static InternalThreadLocal<ImplementationSwitch<IStorage>> 
storages = new InternalThreadLocal<>();
 
     public static IStorage storage(IStorageAware aware) {
         ImplementationSwitch<IStorage> current = storages.get();
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index b8ddd02..2087b22 100755
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -31,6 +31,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.expression.ExpressionColCollector;
@@ -65,9 +66,9 @@ public class OLAPContext {
     public static final String PRM_ACCEPT_PARTIAL_RESULT = 
"AcceptPartialResult";
     public static final String PRM_USER_AUTHEN_INFO = "UserAuthenInfo";
 
-    static final ThreadLocal<Map<String, String>> _localPrarameters = new 
ThreadLocal<Map<String, String>>();
+    static final InternalThreadLocal<Map<String, String>> _localPrarameters = 
new InternalThreadLocal<Map<String, String>>();
 
-    static final ThreadLocal<Map<Integer, OLAPContext>> _localContexts = new 
ThreadLocal<Map<Integer, OLAPContext>>();
+    static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts 
= new InternalThreadLocal<Map<Integer, OLAPContext>>();
 
     public static void setParameters(Map<String, String> parameters) {
         _localPrarameters.set(parameters);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java 
b/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java
index 58b3fb3..ee9644a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.rest.msg;
 
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
+
 /**
  * Created by luwei on 17-4-12.
  */
 public class MsgPicker {
-    private static ThreadLocal<Message> msg = new ThreadLocal<Message>();
+    private static InternalThreadLocal<Message> msg = new 
InternalThreadLocal<Message>();
 
     public static void setMsg(String lang) {
         if ("cn".equals(lang))
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 53e8a68..6678418 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class HBaseConnection {
 
     private static final Map<StorageURL, Configuration> configCache = new 
ConcurrentHashMap<StorageURL, Configuration>();
     private static final Map<StorageURL, Connection> connPool = new 
ConcurrentHashMap<StorageURL, Connection>();
-    private static final ThreadLocal<Configuration> configThreadLocal = new 
ThreadLocal<>();
+    private static final InternalThreadLocal<Configuration> configThreadLocal 
= new InternalThreadLocal<>();
 
     private static ExecutorService coprocessorPool = null;
 

Reply via email to