This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 78b17f5a70fb2e5d66e9cce4def79e5adb377878 Author: Zhong <nju_y...@apache.org> AuthorDate: Wed Jan 31 19:08:58 2018 +0800 KYLIN-2723 fix potential concurrent issue when add rpc statistics --- .../java/org/apache/kylin/common/QueryContext.java | 54 ++++++++++------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 1a961ec..a065a13 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -179,7 +180,7 @@ public class QueryContext { } public void addContext(int ctxId, String type, boolean ifCube) { - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null; + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null; if (ifCube) { cubeSegmentStatisticsMap = Maps.newConcurrentMap(); } @@ -210,13 +211,13 @@ public class QueryContext { logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId); return null; } - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; if (cubeSegmentStatisticsMap == null) { logger.warn( "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType); return null; } - Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); if (segmentStatisticsMap == null) { logger.warn( "cubeSegmentStatistic should be initialized for cube {}", cubeName); @@ -237,18 +238,16 @@ public class QueryContext { logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId); return; } - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; if (cubeSegmentStatisticsMap == null) { logger.warn( "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType); return; } String cubeName = cubeSegmentStatistics.cubeName; - Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); - if (segmentStatisticsMap == null) { - segmentStatisticsMap = Maps.newConcurrentMap(); - cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap); - } + cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap()); + ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), cubeSegmentStatistics); } @@ -263,28 +262,25 @@ public class QueryContext { CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId); if (cubeSegmentStatisticsResult == null) { - logger.warn("CubeSegmentStatisticsResult should be initialized for context " + ctxId); + logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId); return; } - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; if (cubeSegmentStatisticsMap == null) { logger.warn( - "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type " - + cubeSegmentStatisticsResult.queryType); + "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", + cubeSegmentStatisticsResult.queryType); return; } - Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); - if (segmentStatisticsMap == null) { - segmentStatisticsMap = Maps.newConcurrentMap(); - cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap); - } + cubeSegmentStatisticsMap.putIfAbsent(cubeName, Maps.<String, CubeSegmentStatistics> newConcurrentMap()); + ConcurrentMap<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + + CubeSegmentStatistics old = segmentStatisticsMap.putIfAbsent(segmentName, new CubeSegmentStatistics()); CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName); - if (segmentStatistics == null) { - segmentStatistics = new CubeSegmentStatistics(); - segmentStatisticsMap.put(segmentName, segmentStatistics); + if (old == null) { segmentStatistics.setWrapper(cubeName, segmentName, sourceCuboidId, targetCuboidId, filterMask); - } - if (segmentStatistics.sourceCuboidId != sourceCuboidId || segmentStatistics.targetCuboidId != targetCuboidId + } else if (segmentStatistics.sourceCuboidId != sourceCuboidId + || segmentStatistics.targetCuboidId != targetCuboidId || segmentStatistics.filterMask != filterMask) { StringBuilder inconsistency = new StringBuilder(); if (segmentStatistics.sourceCuboidId != sourceCuboidId) { @@ -445,8 +441,8 @@ public class QueryContext { this.filterMask = filterMask; } - public void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, long aggrCount, - long scanBytes, boolean ifSuccess) { + public synchronized void addRPCStats(long callTimeMs, long skipCount, long scanCount, long returnCount, + long aggrCount, long scanBytes, boolean ifSuccess) { this.callCount++; this.callTimeSum += callTimeMs; if (this.callTimeMax < callTimeMs) { @@ -584,7 +580,7 @@ public class QueryContext { protected static final long serialVersionUID = 1L; private String queryType; - private Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap; + private ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap; private String realization; private int realizationType; @@ -592,7 +588,7 @@ public class QueryContext { } public CubeSegmentStatisticsResult(String queryType, - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) { + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) { this.queryType = queryType; this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap; } @@ -618,7 +614,7 @@ public class QueryContext { } public void setCubeSegmentStatisticsMap( - Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) { + ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap) { this.cubeSegmentStatisticsMap = cubeSegmentStatisticsMap; } @@ -627,7 +623,7 @@ public class QueryContext { } - public Map<String, Map<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() { + public ConcurrentMap<String, ConcurrentMap<String, CubeSegmentStatistics>> getCubeSegmentStatisticsMap() { return cubeSegmentStatisticsMap; }