This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 636f93a KYLIN-3716 FastThreadLocal replaces ThreadLocal 636f93a is described below commit 636f93a0a3fb412b352e1518f1658b7c94368a0d Author: Nicholas Jiang <programg...@163.com> AuthorDate: Sat Jan 19 09:09:00 2019 +0800 KYLIN-3716 FastThreadLocal replaces ThreadLocal --- .../java/org/apache/kylin/common/KylinConfig.java | 5 +- .../apache/kylin/common/QueryContextFacade.java | 3 +- .../apache/kylin/common/debug/BackdoorToggles.java | 3 +- .../common/metrics/metrics2/CodahaleMetrics.java | 3 +- .../common/metrics/perflog/PerfLoggerFactory.java | 3 +- .../kylin/common/persistence/ResourceStore.java | 3 +- .../kylin/common/threadlocal/InternalThread.java | 74 ++++++++ .../common/threadlocal/InternalThreadLocal.java | 200 +++++++++++++++++++++ .../common/threadlocal/InternalThreadLocalMap.java | 169 +++++++++++++++++ .../org/apache/kylin/common/util/HadoopUtil.java | 3 +- .../apache/kylin/dict/Number2BytesConverter.java | 3 +- .../org/apache/kylin/engine/EngineFactory.java | 3 +- .../org/apache/kylin/job/SchedulerFactory.java | 3 +- .../measure/percentile/PercentileSerializer.java | 5 +- .../kylin/measure/topn/DoubleDeltaSerializer.java | 6 +- .../metadata/datatype/DataTypeSerializer.java | 5 +- .../metadata/filter/function/BuiltInMethod.java | 3 +- .../main/java/org/apache/kylin/util/KryoUtils.java | 3 +- .../apache/kylin/metrics/lib/impl/RecordEvent.java | 3 +- .../metrics/lib/impl/RecordEventTimeDetail.java | 5 +- .../org/apache/kylin/storage/StorageFactory.java | 3 +- .../apache/kylin/query/relnode/OLAPContext.java | 5 +- .../java/org/apache/kylin/rest/msg/MsgPicker.java | 4 +- .../kylin/storage/hbase/HBaseConnection.java | 3 +- 24 files changed, 493 insertions(+), 27 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index 464f9cd..9aa54d6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.OrderedProperties; import org.slf4j.Logger; @@ -67,7 +68,7 @@ public class KylinConfig extends KylinConfigBase { private static OrderedProperties defaultOrderedProperties = new OrderedProperties(); // thread-local instances, will override SYS_ENV_INSTANCE - private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new ThreadLocal<>(); + private static transient InternalThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new InternalThreadLocal<>(); static { /* @@ -151,7 +152,7 @@ public class KylinConfig extends KylinConfigBase { logger.info("Destroy KylinConfig"); dumpStackTrace(); SYS_ENV_INSTANCE = null; - THREAD_ENV_INSTANCE = new ThreadLocal<>(); + THREAD_ENV_INSTANCE = new InternalThreadLocal<>(); } } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java index e1cf54b..bd22f68 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java @@ -23,6 +23,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,7 @@ public class QueryContextFacade { private static final Logger logger = LoggerFactory.getLogger(QueryContextFacade.class); private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = Maps.newConcurrentMap(); - private static final ThreadLocal<QueryContext> CURRENT_CTX = new ThreadLocal<QueryContext>() { + private static final InternalThreadLocal<QueryContext> CURRENT_CTX = new InternalThreadLocal<QueryContext>() { @Override protected QueryContext initialValue() { QueryContext queryContext = new QueryContext(); diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java index be0f7a6..210c1f8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java +++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java @@ -26,6 +26,7 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.Pair; import com.google.common.collect.Maps; @@ -39,7 +40,7 @@ import com.google.common.collect.Maps; */ public class BackdoorToggles { - private static final ThreadLocal<Map<String, String>> _backdoorToggles = new ThreadLocal<Map<String, String>>(); + private static final InternalThreadLocal<Map<String, String>> _backdoorToggles = new InternalThreadLocal<Map<String, String>>(); public static void setToggles(Map<String, String> toggles) { _backdoorToggles.set(toggles); diff --git a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java index 7487930..53aaf8d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java +++ b/core-common/src/main/java/org/apache/kylin/common/metrics/metrics2/CodahaleMetrics.java @@ -39,6 +39,7 @@ import org.apache.kylin.common.metrics.common.Metrics; import org.apache.kylin.common.metrics.common.MetricsConstant; import org.apache.kylin.common.metrics.common.MetricsScope; import org.apache.kylin.common.metrics.common.MetricsVariable; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +76,7 @@ public class CodahaleMetrics implements Metrics { private final Lock metersLock = new ReentrantLock(); private final Lock histogramLock = new ReentrantLock(); private final Set<Closeable> reporters = new HashSet<Closeable>(); - private final ThreadLocal<HashMap<String, CodahaleMetricsScope>> threadLocalScopes = new ThreadLocal<HashMap<String, CodahaleMetricsScope>>() { + private final InternalThreadLocal<HashMap<String, CodahaleMetricsScope>> threadLocalScopes = new InternalThreadLocal<HashMap<String, CodahaleMetricsScope>>() { @Override protected HashMap<String, CodahaleMetricsScope> initialValue() { return new HashMap<String, CodahaleMetricsScope>(); diff --git a/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java b/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java index 4fc43a8..647a650 100644 --- a/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java +++ b/core-common/src/main/java/org/apache/kylin/common/metrics/perflog/PerfLoggerFactory.java @@ -20,12 +20,13 @@ package org.apache.kylin.common.metrics.perflog; import org.apache.commons.lang3.ClassUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PerfLoggerFactory { - protected static final ThreadLocal<IPerfLogger> perfLogger = new ThreadLocal<IPerfLogger>(); + protected static final InternalThreadLocal<IPerfLogger> perfLogger = new InternalThreadLocal<IPerfLogger>(); static final private Logger LOG = LoggerFactory.getLogger(PerfLoggerFactory.class.getName()); public static IPerfLogger getPerfLogger() { diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 275d95a..5b3f196 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -40,6 +40,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.OptionsHelper; import org.slf4j.Logger; @@ -521,7 +522,7 @@ abstract public class ResourceStore { // ============================================================================ - ThreadLocal<Checkpoint> checkpointing = new ThreadLocal<>(); + InternalThreadLocal<Checkpoint> checkpointing = new InternalThreadLocal<>(); public Checkpoint checkpoint() { Checkpoint cp = checkpointing.get(); diff --git a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java new file mode 100644 index 0000000..912a12c --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThread.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.threadlocal; + +/** + * InternalThread + */ +public class InternalThread extends Thread { + + private InternalThreadLocalMap threadLocalMap; + + public InternalThread() { + } + + public InternalThread(Runnable target) { + super(target); + } + + public InternalThread(ThreadGroup group, Runnable target) { + super(group, target); + } + + public InternalThread(String name) { + super(name); + } + + public InternalThread(ThreadGroup group, String name) { + super(group, name); + } + + public InternalThread(Runnable target, String name) { + super(target, name); + } + + public InternalThread(ThreadGroup group, Runnable target, String name) { + super(group, target, name); + } + + public InternalThread(ThreadGroup group, Runnable target, String name, long stackSize) { + super(group, target, name, stackSize); + } + + /** + * Returns the internal data structure that keeps the threadLocal variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final InternalThreadLocalMap threadLocalMap() { + return threadLocalMap; + } + + /** + * Sets the internal data structure that keeps the threadLocal variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { + this.threadLocalMap = threadLocalMap; + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java new file mode 100644 index 0000000..7719ab7 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocal.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.threadlocal; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; + +/** + * InternalThreadLocal + * A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a + * {@link InternalThread}. + * <p></p> + * Internally, a {@link InternalThread} uses a constant index in an array, instead of using hash code and hash table, + * to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash + * table, and it is useful when accessed frequently. + * <p></p> + * This design is learning from {@see io.netty.util.concurrent.FastThreadLocal} which is in Netty. + */ +public class InternalThreadLocal<V> { + + private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); + + private final int index; + + public InternalThreadLocal() { + index = InternalThreadLocalMap.nextVariableIndex(); + } + + /** + * Removes all {@link InternalThreadLocal} variables bound to the current thread. This operation is useful when you + * are in a container environment, and you don't want to leave the thread local variables in the threads you do not + * manage. + */ + @SuppressWarnings("unchecked") + public static void removeAll() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return; + } + + try { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + if (v != null && v != InternalThreadLocalMap.UNSET) { + Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v; + InternalThreadLocal<?>[] variablesToRemoveArray = + variablesToRemove.toArray(new InternalThreadLocal[variablesToRemove.size()]); + for (InternalThreadLocal<?> tlv : variablesToRemoveArray) { + tlv.remove(threadLocalMap); + } + } + } finally { + InternalThreadLocalMap.remove(); + } + } + + /** + * Returns the number of thread local variables bound to the current thread. + */ + public static int size() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return 0; + } else { + return threadLocalMap.size(); + } + } + + public static void destroy() { + InternalThreadLocalMap.destroy(); + } + + @SuppressWarnings("unchecked") + private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + Set<InternalThreadLocal<?>> variablesToRemove; + if (v == InternalThreadLocalMap.UNSET || v == null) { + variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<InternalThreadLocal<?>, Boolean>()); + threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); + } else { + variablesToRemove = (Set<InternalThreadLocal<?>>) v; + } + + variablesToRemove.add(variable); + } + + @SuppressWarnings("unchecked") + private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, InternalThreadLocal<?> variable) { + + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + + if (v == InternalThreadLocalMap.UNSET || v == null) { + return; + } + + Set<InternalThreadLocal<?>> variablesToRemove = (Set<InternalThreadLocal<?>>) v; + variablesToRemove.remove(variable); + } + + /** + * Returns the current value for the current thread + */ + @SuppressWarnings("unchecked") + public final V get() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + Object v = threadLocalMap.indexedVariable(index); + if (v != InternalThreadLocalMap.UNSET) { + return (V) v; + } + + return initialize(threadLocalMap); + } + + private V initialize(InternalThreadLocalMap threadLocalMap) { + V v = null; + try { + v = initialValue(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + threadLocalMap.setIndexedVariable(index, v); + addToVariablesToRemove(threadLocalMap, this); + return v; + } + + /** + * Sets the value for the current thread. + */ + public final void set(V value) { + if (value == null || value == InternalThreadLocalMap.UNSET) { + remove(); + } else { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); + if (threadLocalMap.setIndexedVariable(index, value)) { + addToVariablesToRemove(threadLocalMap, this); + } + } + } + + /** + * Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue(). + */ + @SuppressWarnings("unchecked") + public final void remove() { + remove(InternalThreadLocalMap.getIfSet()); + } + + /** + * Sets the value to uninitialized for the specified thread local map; + * a proceeding call to get() will trigger a call to initialValue(). + * The specified thread local map must be for the current thread. + */ + @SuppressWarnings("unchecked") + public final void remove(InternalThreadLocalMap threadLocalMap) { + if (threadLocalMap == null) { + return; + } + + Object v = threadLocalMap.removeIndexedVariable(index); + removeFromVariablesToRemove(threadLocalMap, this); + + if (v != InternalThreadLocalMap.UNSET) { + try { + onRemoval((V) v); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Returns the initial value for this thread-local variable. + */ + protected V initialValue() throws Exception { + return null; + } + + /** + * Invoked when this thread local variable is removed by {@link #remove()}. + */ + protected void onRemoval(@SuppressWarnings("unused") V value) throws Exception { + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java new file mode 100644 index 0000000..60edf96 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/threadlocal/InternalThreadLocalMap.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.threadlocal; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The internal data structure that stores the threadLocal variables for Netty and all {@link InternalThread}s. + * Note that this class is for internal use only. Use {@link InternalThread} + * unless you know what you are doing. + */ +public final class InternalThreadLocalMap { + + private Object[] indexedVariables; + + private static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); + + private static final AtomicInteger nextIndex = new AtomicInteger(); + + public static final Object UNSET = new Object(); + + public static InternalThreadLocalMap getIfSet() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + return ((InternalThread) thread).threadLocalMap(); + } + return slowThreadLocalMap.get(); + } + + public static InternalThreadLocalMap get() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + return fastGet((InternalThread) thread); + } + return slowGet(); + } + + public static void remove() { + Thread thread = Thread.currentThread(); + if (thread instanceof InternalThread) { + ((InternalThread) thread).setThreadLocalMap(null); + } else { + slowThreadLocalMap.remove(); + } + } + + public static void destroy() { + slowThreadLocalMap = null; + } + + public static int nextVariableIndex() { + int index = nextIndex.getAndIncrement(); + if (index < 0) { + nextIndex.decrementAndGet(); + throw new IllegalStateException("Too many thread-local indexed variables"); + } + return index; + } + + public static int lastVariableIndex() { + return nextIndex.get() - 1; + } + + private InternalThreadLocalMap() { + indexedVariables = newIndexedVariableTable(); + } + + public Object indexedVariable(int index) { + Object[] lookup = indexedVariables; + return index < lookup.length ? lookup[index] : UNSET; + } + + /** + * @return {@code true} if and only if a new thread-local variable has been created + */ + public boolean setIndexedVariable(int index, Object value) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object oldValue = lookup[index]; + lookup[index] = value; + return oldValue == UNSET; + } else { + expandIndexedVariableTableAndSet(index, value); + return true; + } + } + + public Object removeIndexedVariable(int index) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object v = lookup[index]; + lookup[index] = UNSET; + return v; + } else { + return UNSET; + } + } + + public int size() { + int count = 0; + for (Object o : indexedVariables) { + if (o != UNSET) { + ++count; + } + } + + //the fist element in `indexedVariables` is a set to keep all the InternalThreadLocal to remove + //look at method `addToVariablesToRemove` + return count - 1; + } + + private static Object[] newIndexedVariableTable() { + Object[] array = new Object[32]; + Arrays.fill(array, UNSET); + return array; + } + + private static InternalThreadLocalMap fastGet(InternalThread thread) { + InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); + if (threadLocalMap == null) { + thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); + } + return threadLocalMap; + } + + private static InternalThreadLocalMap slowGet() { + ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = InternalThreadLocalMap.slowThreadLocalMap; + InternalThreadLocalMap ret = slowThreadLocalMap.get(); + if (ret == null) { + ret = new InternalThreadLocalMap(); + slowThreadLocalMap.set(ret); + } + return ret; + } + + private void expandIndexedVariableTableAndSet(int index, Object value) { + Object[] oldArray = indexedVariables; + final int oldCapacity = oldArray.length; + int newCapacity = index; + newCapacity |= newCapacity >>> 1; + newCapacity |= newCapacity >>> 2; + newCapacity |= newCapacity >>> 4; + newCapacity |= newCapacity >>> 8; + newCapacity |= newCapacity >>> 16; + newCapacity++; + + Object[] newArray = Arrays.copyOf(oldArray, newCapacity); + Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); + newArray[index] = value; + indexedVariables = newArray; + } +} diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index 4afce58..5187361 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -37,6 +37,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +46,7 @@ import com.google.common.collect.Maps; public class HadoopUtil { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class); - private static final transient ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>(); + private static final transient InternalThreadLocal<Configuration> hadoopConfig = new InternalThreadLocal<>(); private HadoopUtil() { throw new IllegalStateException("Class HadoopUtil is an utility class !"); } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java b/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java index d7de7d8..b7beba5 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/Number2BytesConverter.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.util.Map; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.Bytes; import com.google.common.collect.Maps; @@ -38,7 +39,7 @@ public class Number2BytesConverter implements BytesConverter<String>, Serializab int maxDigitsBeforeDecimalPoint; - static final transient ThreadLocal<Map<Integer, NumberBytesCodec>> LOCAL = new ThreadLocal<Map<Integer, NumberBytesCodec>>(); + static final transient InternalThreadLocal<Map<Integer, NumberBytesCodec>> LOCAL = new InternalThreadLocal<Map<Integer, NumberBytesCodec>>(); static NumberBytesCodec getCodec(int maxDigitsBeforeDecimalPoint) { Map<Integer, NumberBytesCodec> codecMap = LOCAL.get(); diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 03d986b..734c470 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -29,7 +30,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class EngineFactory { // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. - private static ThreadLocal<ImplementationSwitch<IBatchCubingEngine>> engines = new ThreadLocal<>(); + private static InternalThreadLocal<ImplementationSwitch<IBatchCubingEngine>> engines = new InternalThreadLocal<>(); public static IBatchCubingEngine batchEngine(IEngineAware aware) { ImplementationSwitch<IBatchCubingEngine> current = engines.get(); diff --git a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java index 1bf8942..c84b397 100644 --- a/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java +++ b/core-job/src/main/java/org/apache/kylin/job/SchedulerFactory.java @@ -18,11 +18,12 @@ package org.apache.kylin.job; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ImplementationSwitch; public class SchedulerFactory { // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. - private static ThreadLocal<ImplementationSwitch<Scheduler>> schedulers = new ThreadLocal<>(); + private static InternalThreadLocal<ImplementationSwitch<Scheduler>> schedulers = new InternalThreadLocal<>(); public static Scheduler scheduler(int schedulerType) { ImplementationSwitch<Scheduler> current = schedulers.get(); diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java index d0ecba7..2f75e16 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java @@ -20,12 +20,13 @@ package org.apache.kylin.measure.percentile; import java.nio.ByteBuffer; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; public class PercentileSerializer extends DataTypeSerializer<PercentileCounter> { // be thread-safe and avoid repeated obj creation - private transient ThreadLocal<PercentileCounter> current = null; + private transient InternalThreadLocal<PercentileCounter> current = null; private double compression; @@ -55,7 +56,7 @@ public class PercentileSerializer extends DataTypeSerializer<PercentileCounter> private PercentileCounter current() { if (current == null) { - current = new ThreadLocal<>(); + current = new InternalThreadLocal<>(); } PercentileCounter counter = current.get(); diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java index 1cb63e4..564d930 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoubleDeltaSerializer.java @@ -17,6 +17,8 @@ */ package org.apache.kylin.measure.topn; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; + import java.nio.ByteBuffer; /** @@ -41,7 +43,7 @@ public class DoubleDeltaSerializer implements java.io.Serializable { final private int precision; final private int multiplier; - transient ThreadLocal<long[]> deltasThreadLocal; + transient InternalThreadLocal<long[]> deltasThreadLocal; public DoubleDeltaSerializer() { this(2); @@ -112,7 +114,7 @@ public class DoubleDeltaSerializer implements java.io.Serializable { len = Math.max(0, len); if (deltasThreadLocal == null) { - deltasThreadLocal = new ThreadLocal<>(); + deltasThreadLocal = new InternalThreadLocal<>(); } long[] deltas = deltasThreadLocal.get(); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java index 7c1435d..96b2d88 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -23,6 +23,7 @@ import java.io.ObjectInputStream; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.BytesSerializer; import com.google.common.collect.Maps; @@ -34,7 +35,7 @@ import com.google.common.collect.Maps; abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.io.Serializable { final static Map<String, Class<?>> implementations = Maps.newHashMap(); - protected transient ThreadLocal current = new ThreadLocal(); + protected transient InternalThreadLocal current = new InternalThreadLocal(); static { implementations.put("char", StringSerializer.class); implementations.put("varchar", StringSerializer.class); @@ -118,6 +119,6 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java. private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - current = new ThreadLocal(); + current = new InternalThreadLocal(); } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java index 95f7ea6..38683fb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/function/BuiltInMethod.java @@ -28,13 +28,14 @@ import java.util.Map; import org.apache.commons.lang3.reflect.MethodUtils; import com.google.common.collect.ImmutableMap; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; public enum BuiltInMethod { UPPER(BuiltInMethod.class, "upper", String.class), LOWER(BuiltInMethod.class, "lower", String.class), SUBSTRING(BuiltInMethod.class, "substring", String.class, int.class, int.class), CHAR_LENGTH(BuiltInMethod.class, "charLength", String.class), LIKE(BuiltInMethod.class, "like", String.class, String.class), INITCAP(BuiltInMethod.class, "initcap", String.class), CONCAT(BuiltInMethod.class, "concat", String.class, String.class); public final Method method; public static final ImmutableMap<String, BuiltInMethod> MAP; - private static ThreadLocal<Map<String, LikeMatcher>> likePatterns = new ThreadLocal<Map<String, LikeMatcher>>() { + private static InternalThreadLocal<Map<String, LikeMatcher>> likePatterns = new InternalThreadLocal<Map<String, LikeMatcher>>() { @Override public Map<String, LikeMatcher> initialValue() { return new HashMap<>(); diff --git a/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java b/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java index 675d49d..c419b32 100644 --- a/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java +++ b/core-metadata/src/main/java/org/apache/kylin/util/KryoUtils.java @@ -20,6 +20,7 @@ package org.apache.kylin.util; import java.util.BitSet; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.objenesis.strategy.StdInstantiatorStrategy; import com.esotericsoftware.kryo.Kryo; @@ -28,7 +29,7 @@ import com.esotericsoftware.kryo.io.Output; public class KryoUtils { - private static ThreadLocal<Kryo> _Kryo = new ThreadLocal<>(); + private static InternalThreadLocal<Kryo> _Kryo = new InternalThreadLocal<>(); private KryoUtils() { throw new IllegalStateException("Class KryoUtils is an utility class !"); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java index 6452d78..5118fc9 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metrics.lib.Record; @@ -35,7 +36,7 @@ import com.google.common.collect.Maps; public class RecordEvent implements Record, Map<String, Object>, Serializable { - private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new ThreadLocal<ByteArrayOutputStream>(); + private static final InternalThreadLocal<ByteArrayOutputStream> _localBaos = new InternalThreadLocal<ByteArrayOutputStream>(); static String localHostname; static { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java index 827b5a1..aa1d307 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java @@ -24,11 +24,12 @@ import java.util.Locale; import java.util.TimeZone; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; public class RecordEventTimeDetail { private static final TimeZone timeZone; - private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>(); - private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new ThreadLocal<SimpleDateFormat>(); + private static final InternalThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>(); + private static final InternalThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>(); static { timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()); diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java index 3505708..55bdb4b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.realization.IRealization; @@ -28,7 +29,7 @@ import org.apache.kylin.metadata.realization.IRealization; public class StorageFactory { // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads. - private static ThreadLocal<ImplementationSwitch<IStorage>> storages = new ThreadLocal<>(); + private static InternalThreadLocal<ImplementationSwitch<IStorage>> storages = new InternalThreadLocal<>(); public static IStorage storage(IStorageAware aware) { ImplementationSwitch<IStorage> current = storages.get(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index b8ddd02..2087b22 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -31,6 +31,7 @@ import org.apache.calcite.DataContext; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.expression.ExpressionColCollector; @@ -65,9 +66,9 @@ public class OLAPContext { public static final String PRM_ACCEPT_PARTIAL_RESULT = "AcceptPartialResult"; public static final String PRM_USER_AUTHEN_INFO = "UserAuthenInfo"; - static final ThreadLocal<Map<String, String>> _localPrarameters = new ThreadLocal<Map<String, String>>(); + static final InternalThreadLocal<Map<String, String>> _localPrarameters = new InternalThreadLocal<Map<String, String>>(); - static final ThreadLocal<Map<Integer, OLAPContext>> _localContexts = new ThreadLocal<Map<Integer, OLAPContext>>(); + static final InternalThreadLocal<Map<Integer, OLAPContext>> _localContexts = new InternalThreadLocal<Map<Integer, OLAPContext>>(); public static void setParameters(Map<String, String> parameters) { _localPrarameters.set(parameters); diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java b/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java index 58b3fb3..ee9644a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/MsgPicker.java @@ -18,11 +18,13 @@ package org.apache.kylin.rest.msg; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; + /** * Created by luwei on 17-4-12. */ public class MsgPicker { - private static ThreadLocal<Message> msg = new ThreadLocal<Message>(); + private static InternalThreadLocal<Message> msg = new InternalThreadLocal<Message>(); public static void setMsg(String lang) { if ("cn".equals(lang)) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 53e8a68..6678418 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.lock.DistributedLock; +import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +68,7 @@ public class HBaseConnection { private static final Map<StorageURL, Configuration> configCache = new ConcurrentHashMap<StorageURL, Configuration>(); private static final Map<StorageURL, Connection> connPool = new ConcurrentHashMap<StorageURL, Connection>(); - private static final ThreadLocal<Configuration> configThreadLocal = new ThreadLocal<>(); + private static final InternalThreadLocal<Configuration> configThreadLocal = new InternalThreadLocal<>(); private static ExecutorService coprocessorPool = null;