# IGNITE-289 (Need to get rid of locPart reference in DHT cache entry) Store local partitions in array.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/12fa8894 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/12fa8894 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/12fa8894 Branch: refs/heads/ignite-289 Commit: 12fa88943e133ad54f7a70b654952482f2f267cf Parents: 8b61e1c Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu May 7 19:27:50 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu May 7 19:27:50 2015 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 35 +++-- .../distributed/near/GridNearCacheAdapter.java | 64 ++++----- .../internal/util/RarefiedConcurrentIntMap.java | 139 +++++++++++++++++++ .../ignite/internal/util/lang/GridFunc.java | 19 +++ 5 files changed, 214 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index c551fb3..bad5efb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -121,7 +121,7 @@ public interface GridDhtPartitionTopology { * * @return All current local partitions. */ - public Collection<GridDhtLocalPartition> currentLocalPartitions(); + public Iterable<GridDhtLocalPartition> currentLocalPartitions(); /** * @return Local IDs. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 073e0e7..e2f77de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -28,10 +28,8 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.locks.*; import static org.apache.ignite.events.EventType.*; @@ -55,8 +53,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { private final IgniteLogger log; /** */ - private final ConcurrentMap<Integer, GridDhtLocalPartition> locParts = - new ConcurrentHashMap8<>(); + private final RarefiedConcurrentIntMap<GridDhtLocalPartition> locParts; /** Node to partition map. */ private GridDhtPartitionFullMap node2part; @@ -90,6 +87,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { this.cctx = cctx; + locParts = new RarefiedConcurrentIntMap<>(cctx.affinity().partitions()); + log = cctx.logger(getClass()); } @@ -120,7 +119,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { boolean changed = false; // Synchronously wait for all renting partitions to complete. - for (Iterator<GridDhtLocalPartition> it = locParts.values().iterator(); it.hasNext();) { + for (Iterator<GridDhtLocalPartition> it = locParts.iterator(); it.hasNext();) { GridDhtLocalPartition p = it.next(); GridDhtPartitionState state = p.state(); @@ -557,12 +556,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List<GridDhtLocalPartition> localPartitions() { - return new LinkedList<>(locParts.values()); + List<GridDhtLocalPartition> res = new LinkedList<>(); + + locParts.addAllTo(res); + + return res; } /** {@inheritDoc} */ - @Override public Collection<GridDhtLocalPartition> currentLocalPartitions() { - return locParts.values(); + @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() { + return locParts; } /** {@inheritDoc} */ @@ -603,8 +606,16 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { lock.readLock().lock(); try { - return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(), - F.viewReadOnly(locParts, CU.<K, V>part2state()), true); + GridDhtPartitionMap res = new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get()); + + for (int i = 0; i < locParts.maxIndex(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part.state().active()) + res.put(i, part.state()); + } + + return res; } finally { lock.readLock().unlock(); @@ -950,7 +961,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { UUID locId = cctx.nodeId(); - for (GridDhtLocalPartition part : locParts.values()) { + for (GridDhtLocalPartition part : locParts) { GridDhtPartitionState state = part.state(); if (state.active()) { @@ -1172,7 +1183,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology { @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); - for (GridDhtLocalPartition part : locParts.values()) { + for (GridDhtLocalPartition part : locParts) { int size = part.size(); if (size >= threshold) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 29c1d45..339649e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -329,42 +330,41 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Nullable final CacheEntryPredicate... filter) { final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - Collection<Cache.Entry<K, V>> entries = - F.flatCollections( - F.viewReadOnly( - dht().topology().currentLocalPartitions(), - new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { - @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { - Collection<GridDhtCacheEntry> entries0 = p.entries(); - - if (!F.isEmpty(filter)) - entries0 = F.view(entries0, new CacheEntryPredicateAdapter() { - @Override public boolean apply(GridCacheEntryEx e) { - return F.isAll(e, filter); - } - }); - - return F.viewReadOnly( - entries0, - new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { - @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { - return e.wrapLazyValue(); - } - }, - new P1<GridDhtCacheEntry>() { - @Override public boolean apply(GridDhtCacheEntry e) { - return !e.obsoleteOrDeleted(); - } - }); + Iterable<GridDhtLocalPartition> primaryOnly = IgniteIterables.filter(dht().topology().currentLocalPartitions(), + new P1<GridDhtLocalPartition>() { + @Override public boolean apply(GridDhtLocalPartition p) { + return p.primary(topVer); + } + }); + + Iterable<Collection<Cache.Entry<K, V>>> entriesCol = IgniteIterables.transform(primaryOnly, + new C1<GridDhtLocalPartition, Collection<Cache.Entry<K, V>>>() { + @Override public Collection<Cache.Entry<K, V>> apply(GridDhtLocalPartition p) { + Collection<GridDhtCacheEntry> entries0 = p.entries(); + + if (!F.isEmpty(filter)) + entries0 = F.view(entries0, new CacheEntryPredicateAdapter() { + @Override public boolean apply(GridCacheEntryEx e) { + return F.isAll(e, filter); + } + }); + + return F.viewReadOnly( + entries0, + new C1<GridDhtCacheEntry, Cache.Entry<K, V>>() { + @Override public Cache.Entry<K, V> apply(GridDhtCacheEntry e) { + return e.wrapLazyValue(); } }, - new P1<GridDhtLocalPartition>() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.primary(topVer); + new P1<GridDhtCacheEntry>() { + @Override public boolean apply(GridDhtCacheEntry e) { + return !e.obsoleteOrDeleted(); } - })); + }); + } + }); - return new GridCacheEntrySet<>(ctx, entries, null); + return new GridCacheEntrySet<>(ctx, F.flatCollections(entriesCol), null); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java new file mode 100644 index 0000000..0e7c0a1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/RarefiedConcurrentIntMap.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class RarefiedConcurrentIntMap<T> implements Iterable<T> { + /** */ + private final AtomicReferenceArray<T> arr; + + /** */ + private final int maxIdx; + + /** + * @param maxIdx Max element index. + */ + public RarefiedConcurrentIntMap(int maxIdx) { + arr = new AtomicReferenceArray<T>(maxIdx); + + this.maxIdx = maxIdx; + } + + /** {@inheritDoc} */ + @Override public Iterator<T> iterator() { + return new Iterator<T>() { + + private int idx; + + private T next; + + private T lastReturned; + + private int lastReturnedIdx; + + private void advance() { + while (next == null && idx < maxIdx) + next = arr.get(idx++); + } + + @Override public boolean hasNext() { + advance(); + + return next != null; + } + + @Override public T next() { + advance(); + + if (next == null) + throw new NoSuchElementException(); + + lastReturned = next; + lastReturnedIdx = idx - 1; + + next = null; + + return lastReturned; + } + + @Override public void remove() { + if (lastReturned == null) + throw new IllegalStateException(); + + arr.compareAndSet(lastReturnedIdx, lastReturned, null); + + lastReturned = null; + } + }; + } + + /** + * @param idx Index. + */ + public T get(int idx) { + return arr.get(idx); + } + + /** + * + */ + public int maxIndex() { + return maxIdx; + } + + /** + * @param idx Index. + * @param expVal Expected value. + */ + public boolean remove(int idx, T expVal) { + return arr.compareAndSet(idx, expVal, null); + } + + /** + * @param idx Index. + * @param val Value. + */ + public T putIfAbsent(int idx, T val) { + while (true) { + if (arr.compareAndSet(idx, null, val)) + return null; + + T res = arr.get(idx); + + if (res != null) + return res; + } + } + + /** + * @param c Closure. + */ + public void addAllTo(Collection<? super T> c) { + for (int i = 0; i < maxIdx; i++) { + T e = arr.get(i); + + if (e != null) + c.add(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/12fa8894/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index c86c5a4..c6137b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2884,6 +2884,25 @@ public class GridFunc { if (F.isEmpty(c)) return Collections.emptyList(); + return flatCollections((Iterable<? extends Collection<T>>)c); + } + + /** + * Flattens collection-of-collections and returns collection over the + * elements of the inner collections. This method doesn't create any + * new collections or copies any elements. + * <p> + * Note that due to non-copying nature of implementation, the + * {@link Collection#size() size()} method of resulting collection will have to + * iterate over all elements to produce size. Method {@link Collection#isEmpty() isEmpty()}, + * however, is constant time and is much more preferable to use instead + * of {@code 'size()'} method when checking if list is not empty. + * + * @param c Input collection of collections. + * @param <T> Type of the inner collections. + * @return Iterable over the elements of the inner collections. + */ + public static <T> Collection<T> flatCollections(@Nullable final Iterable<? extends Collection<T>> c) { return new GridSerializableCollection<T>() { @NotNull @Override public Iterator<T> iterator() {