# ignite-63

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b77f2a59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b77f2a59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b77f2a59

Branch: refs/heads/ignite-63
Commit: b77f2a59dbf9e379384c2ac8f9999d235474f74e
Parents: ef78d14
Author: sboikov <semen.boi...@inria.fr>
Authored: Fri Jan 23 00:49:19 2015 +0300
Committer: sboikov <semen.boi...@inria.fr>
Committed: Fri Jan 23 00:49:34 2015 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  816 ++++++++++
 .../dht/GridDhtAffinityAssignmentRequest.java   |  139 ++
 .../dht/GridDhtAffinityAssignmentResponse.java  |  196 +++
 .../dht/GridDhtAssignmentFetchFuture.java       |  183 +++
 .../cache/distributed/dht/GridDhtCache.java     |  103 ++
 .../distributed/dht/GridDhtCacheAdapter.java    | 1017 ++++++++++++
 .../distributed/dht/GridDhtCacheEntry.java      |  760 +++++++++
 .../distributed/dht/GridDhtCacheEntryImpl.java  |  172 ++
 .../distributed/dht/GridDhtEmbeddedFuture.java  |   92 ++
 .../distributed/dht/GridDhtFinishedFuture.java  |   66 +
 .../cache/distributed/dht/GridDhtFuture.java    |   36 +
 .../cache/distributed/dht/GridDhtGetFuture.java |  451 ++++++
 .../dht/GridDhtInvalidPartitionException.java   |   51 +
 .../distributed/dht/GridDhtLocalPartition.java  |  594 +++++++
 .../distributed/dht/GridDhtLockFuture.java      | 1235 +++++++++++++++
 .../distributed/dht/GridDhtLockRequest.java     |  596 +++++++
 .../distributed/dht/GridDhtLockResponse.java    |  453 ++++++
 .../distributed/dht/GridDhtPartitionState.java  |   55 +
 .../dht/GridDhtPartitionTopology.java           |  207 +++
 .../dht/GridDhtPartitionTopologyImpl.java       | 1195 ++++++++++++++
 .../distributed/dht/GridDhtTopologyFuture.java  |   44 +
 .../dht/GridDhtTransactionalCacheAdapter.java   | 1492 ++++++++++++++++++
 .../distributed/dht/GridDhtTxFinishFuture.java  |  532 +++++++
 .../distributed/dht/GridDhtTxFinishRequest.java |  702 ++++++++
 .../dht/GridDhtTxFinishResponse.java            |  145 ++
 .../cache/distributed/dht/GridDhtTxLocal.java   |  656 ++++++++
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  831 ++++++++++
 .../cache/distributed/dht/GridDhtTxMapping.java |  169 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java | 1074 +++++++++++++
 .../dht/GridDhtTxPrepareRequest.java            |  613 +++++++
 .../dht/GridDhtTxPrepareResponse.java           |  471 ++++++
 .../cache/distributed/dht/GridDhtTxRemote.java  |  332 ++++
 .../distributed/dht/GridDhtUnlockRequest.java   |  221 +++
 .../distributed/dht/GridNoStorageCacheMap.java  |  109 ++
 .../dht/GridPartitionedGetFuture.java           |  715 +++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |    2 +-
 .../dht/atomic/GridDhtAtomicCacheEntry.java     |    2 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |    2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |    2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |    2 +-
 .../colocated/GridDhtColocatedCacheEntry.java   |    2 +-
 .../colocated/GridDhtColocatedLockFuture.java   |    2 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |    4 +-
 .../preloader/GridDhtPartitionDemandPool.java   |    4 +-
 .../dht/preloader/GridDhtPartitionMap.java      |    2 +-
 .../preloader/GridDhtPartitionSupplyPool.java   |    4 +-
 .../GridDhtPartitionsExchangeFuture.java        |    2 +-
 .../dht/preloader/GridDhtPreloader.java         |    2 +-
 .../distributed/near/GridNearAtomicCache.java   |    2 +-
 .../distributed/near/GridNearCacheAdapter.java  |    2 +-
 .../distributed/near/GridNearCacheEntry.java    |    2 +-
 .../distributed/near/GridNearGetFuture.java     |    2 +-
 .../distributed/near/GridNearLockFuture.java    |    2 +-
 .../near/GridNearTransactionalCache.java        |    2 +-
 .../cache/distributed/near/GridNearTxLocal.java |    2 +-
 .../near/GridNearTxPrepareFuture.java           |    2 +-
 .../cache/query/GridCacheQueryManager.java      |    2 +-
 .../cache/transactions/IgniteTxHandler.java     |    2 +-
 .../cache/transactions/IgniteTxManager.java     |    2 +-
 .../GridTcpCommunicationMessageFactory.java     |    2 +-
 .../ignite/internal/visor/cache/VisorCache.java |    2 +-
 .../processors/cache/GridCacheAdapter.java      |    2 +-
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../processors/cache/GridCacheEntryImpl.java    |    2 +-
 .../cache/GridCacheEvictionManager.java         |    4 +-
 .../processors/cache/GridCacheMapEntry.java     |    2 +-
 .../GridCachePartitionExchangeManager.java      |    2 +-
 .../processors/cache/GridCacheProcessor.java    |    2 +-
 .../kernal/processors/cache/GridCacheUtils.java |    2 +-
 ...dCachePessimisticCheckCommittedTxFuture.java |    2 +-
 .../GridPartitionedCacheEntryImpl.java          |    2 +-
 .../dht/GridClientPartitionTopology.java        |  816 ----------
 .../dht/GridDhtAffinityAssignmentRequest.java   |  139 --
 .../dht/GridDhtAffinityAssignmentResponse.java  |  196 ---
 .../dht/GridDhtAssignmentFetchFuture.java       |  183 ---
 .../cache/distributed/dht/GridDhtCache.java     |  103 --
 .../distributed/dht/GridDhtCacheAdapter.java    | 1017 ------------
 .../distributed/dht/GridDhtCacheEntry.java      |  760 ---------
 .../distributed/dht/GridDhtCacheEntryImpl.java  |  172 --
 .../distributed/dht/GridDhtEmbeddedFuture.java  |   92 --
 .../distributed/dht/GridDhtFinishedFuture.java  |   66 -
 .../cache/distributed/dht/GridDhtFuture.java    |   36 -
 .../cache/distributed/dht/GridDhtGetFuture.java |  451 ------
 .../dht/GridDhtInvalidPartitionException.java   |   51 -
 .../distributed/dht/GridDhtLocalPartition.java  |  594 -------
 .../distributed/dht/GridDhtLockFuture.java      | 1235 ---------------
 .../distributed/dht/GridDhtLockRequest.java     |  596 -------
 .../distributed/dht/GridDhtLockResponse.java    |  453 ------
 .../distributed/dht/GridDhtPartitionState.java  |   55 -
 .../dht/GridDhtPartitionTopology.java           |  207 ---
 .../dht/GridDhtPartitionTopologyImpl.java       | 1195 --------------
 .../distributed/dht/GridDhtTopologyFuture.java  |   44 -
 .../dht/GridDhtTransactionalCacheAdapter.java   | 1492 ------------------
 .../distributed/dht/GridDhtTxFinishFuture.java  |  532 -------
 .../distributed/dht/GridDhtTxFinishRequest.java |  702 --------
 .../dht/GridDhtTxFinishResponse.java            |  145 --
 .../cache/distributed/dht/GridDhtTxLocal.java   |  656 --------
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  831 ----------
 .../cache/distributed/dht/GridDhtTxMapping.java |  169 --
 .../distributed/dht/GridDhtTxPrepareFuture.java | 1074 -------------
 .../dht/GridDhtTxPrepareRequest.java            |  613 -------
 .../dht/GridDhtTxPrepareResponse.java           |  471 ------
 .../cache/distributed/dht/GridDhtTxRemote.java  |  332 ----
 .../distributed/dht/GridDhtUnlockRequest.java   |  221 ---
 .../distributed/dht/GridNoStorageCacheMap.java  |  109 --
 .../dht/GridPartitionedGetFuture.java           |  715 ---------
 ...tPartitionedOnlyByteArrayValuesSelfTest.java |  168 ++
 ...heAbstractTransformWriteThroughSelfTest.java |  336 ++++
 ...acheAtomicExpiredEntriesPreloadSelfTest.java |   45 +
 .../dht/GridCacheAtomicFullApiSelfTest.java     |  100 ++
 .../dht/GridCacheAtomicNearCacheSelfTest.java   |  820 ++++++++++
 ...idCacheAtomicNearEnabledFullApiSelfTest.java |   32 +
 ...EnabledPrimaryWriteOrderFullApiSelfTest.java |   33 +
 ...eAtomicPrimaryWriteOrderFullApiSelfTest.java |   32 +
 ...tomicPrimaryWriteOrderReloadAllSelfTest.java |   32 +
 .../dht/GridCacheAtomicReloadAllSelfTest.java   |   38 +
 .../dht/GridCacheClientOnlySelfTest.java        |   38 +
 .../dht/GridCacheColocatedDebugTest.java        |  977 ++++++++++++
 .../dht/GridCacheColocatedFailoverSelfTest.java |   39 +
 ...eColocatedOptimisticTransactionSelfTest.java |  156 ++
 ...ridCacheColocatedPreloadRestartSelfTest.java |   30 +
 .../GridCacheColocatedPrimarySyncSelfTest.java  |   33 +
 .../GridCacheColocatedReloadAllSelfTest.java    |   30 +
 .../GridCacheColocatedTxExceptionSelfTest.java  |   39 +
 ...ssimisticOriginatingNodeFailureSelfTest.java |   49 +
 ...dCacheColocatedTxSingleThreadedSelfTest.java |  107 ++
 .../GridCacheDaemonNodePartitionedSelfTest.java |   31 +
 ...cheDhtAtomicEvictionNearReadersSelfTest.java |   32 +
 .../GridCacheDhtAtomicRemoveFailureTest.java    |   50 +
 .../dht/GridCacheDhtEntrySelfTest.java          |  314 ++++
 .../dht/GridCacheDhtEntrySetSelfTest.java       |   45 +
 ...GridCacheDhtEvictionNearReadersSelfTest.java |  297 ++++
 .../dht/GridCacheDhtEvictionSelfTest.java       |  363 +++++
 .../GridCacheDhtEvictionsDisabledSelfTest.java  |  125 ++
 ...idCacheDhtExpiredEntriesPreloadSelfTest.java |   39 +
 .../dht/GridCacheDhtInternalEntrySelfTest.java  |  225 +++
 .../dht/GridCacheDhtMappingSelfTest.java        |  105 ++
 .../dht/GridCacheDhtMultiBackupTest.java        |  135 ++
 .../dht/GridCacheDhtPreloadBigDataSelfTest.java |  228 +++
 .../dht/GridCacheDhtPreloadDelayedSelfTest.java |  459 ++++++
 .../GridCacheDhtPreloadDisabledSelfTest.java    |  278 ++++
 .../GridCacheDhtPreloadMessageCountTest.java    |  175 ++
 ...ridCacheDhtPreloadMultiThreadedSelfTest.java |  175 ++
 .../dht/GridCacheDhtPreloadOffHeapSelfTest.java |   39 +
 .../dht/GridCacheDhtPreloadPutGetSelfTest.java  |  275 ++++
 .../dht/GridCacheDhtPreloadSelfTest.java        |  674 ++++++++
 .../GridCacheDhtPreloadStartStopSelfTest.java   |  269 ++++
 .../dht/GridCacheDhtPreloadUnloadSelfTest.java  |  321 ++++
 .../dht/GridCacheDhtRemoveFailureTest.java      |   44 +
 .../distributed/dht/GridCacheDhtTestUtils.java  |  231 +++
 .../dht/GridCacheDhtTxPreloadSelfTest.java      |   44 +
 .../GridCacheExColocatedFullApiSelfTest.java    |   33 +
 .../dht/GridCacheGlobalLoadTest.java            |  171 ++
 .../GridCacheGroupLockColocatedSelfTest.java    |   38 +
 ...acheGroupLockMultiNodeColocatedSelfTest.java |   29 +
 ...cheGroupLockPartitionedAbstractSelfTest.java |  136 ++
 ...ockPartitionedMultiNodeAbstractSelfTest.java |  173 ++
 ...ePartitionedNearDisabledFullApiSelfTest.java |   33 +
 ...ePartitionedNearDisabledMetricsSelfTest.java |  119 ++
 ...nedNearDisabledMultiNodeFullApiSelfTest.java |   34 +
 ...bledMultiNodeP2PDisabledFullApiSelfTest.java |   34 +
 ...ionedNearDisabledOffHeapFullApiSelfTest.java |   29 +
 ...DisabledOffHeapMultiNodeFullApiSelfTest.java |   29 +
 ...isabledTxOriginatingNodeFailureSelfTest.java |   31 +
 ...dOnlyP2PDisabledByteArrayValuesSelfTest.java |   29 +
 ...edOnlyP2PEnabledByteArrayValuesSelfTest.java |   29 +
 ...dCachePartitionedOnlyProjectionSelfTest.java |   32 +
 ...idCachePartitionedPreloadEventsSelfTest.java |  124 ++
 ...dCachePartitionedTopologyChangeSelfTest.java |  596 +++++++
 ...ransformWriteThroughBatchUpdateSelfTest.java |   29 +
 ...itionedTxOriginatingNodeFailureSelfTest.java |  161 ++
 ...ridCachePartitionedUnloadEventsSelfTest.java |  151 ++
 ...teTxConsistencyColocatedRestartSelfTest.java |   36 +
 .../dht/IgniteTxReentryColocatedSelfTest.java   |   79 +
 ...GridCacheGroupLockMultiNodeNearSelfTest.java |    2 +-
 .../near/GridCacheNearMultiNodeSelfTest.java    |    2 +-
 .../near/GridCacheNearReadersSelfTest.java      |    2 +-
 .../near/GridCacheNearTxMultiNodeSelfTest.java  |    2 +-
 ...ssimisticOriginatingNodeFailureSelfTest.java |    2 +-
 .../GridCachePartitionedEvictionSelfTest.java   |    2 +-
 ...achePartitionedMultiNodeCounterSelfTest.java |    2 +-
 .../GridCacheConcurrentTxMultiNodeTest.java     |    2 +-
 .../cache/GridCacheEntryMemorySizeSelfTest.java |    2 +-
 .../cache/GridCacheMultiUpdateLockSelfTest.java |    2 +-
 .../IgniteTxConcurrentGetAbstractTest.java      |    2 +-
 .../cache/IgniteTxMultiNodeAbstractTest.java    |    2 +-
 .../cache/IgniteTxReentryAbstractSelfTest.java  |    2 +-
 .../GridCacheMultiNodeLockAbstractTest.java     |    2 +-
 ...dCacheMultithreadedFailoverAbstractTest.java |    2 +-
 ...tPartitionedOnlyByteArrayValuesSelfTest.java |  168 --
 ...heAbstractTransformWriteThroughSelfTest.java |  336 ----
 ...acheAtomicExpiredEntriesPreloadSelfTest.java |   45 -
 .../dht/GridCacheAtomicFullApiSelfTest.java     |  100 --
 .../dht/GridCacheAtomicNearCacheSelfTest.java   |  820 ----------
 ...idCacheAtomicNearEnabledFullApiSelfTest.java |   32 -
 ...EnabledPrimaryWriteOrderFullApiSelfTest.java |   33 -
 ...eAtomicPrimaryWriteOrderFullApiSelfTest.java |   32 -
 ...tomicPrimaryWriteOrderReloadAllSelfTest.java |   32 -
 .../dht/GridCacheAtomicReloadAllSelfTest.java   |   38 -
 .../dht/GridCacheClientOnlySelfTest.java        |   38 -
 .../dht/GridCacheColocatedDebugTest.java        |  977 ------------
 .../dht/GridCacheColocatedFailoverSelfTest.java |   39 -
 ...eColocatedOptimisticTransactionSelfTest.java |  156 --
 ...ridCacheColocatedPreloadRestartSelfTest.java |   30 -
 .../GridCacheColocatedPrimarySyncSelfTest.java  |   33 -
 .../GridCacheColocatedReloadAllSelfTest.java    |   30 -
 .../GridCacheColocatedTxExceptionSelfTest.java  |   39 -
 ...ssimisticOriginatingNodeFailureSelfTest.java |   49 -
 ...dCacheColocatedTxSingleThreadedSelfTest.java |  107 --
 .../GridCacheDaemonNodePartitionedSelfTest.java |   31 -
 ...cheDhtAtomicEvictionNearReadersSelfTest.java |   32 -
 .../GridCacheDhtAtomicRemoveFailureTest.java    |   51 -
 .../dht/GridCacheDhtEntrySelfTest.java          |  314 ----
 .../dht/GridCacheDhtEntrySetSelfTest.java       |   46 -
 ...GridCacheDhtEvictionNearReadersSelfTest.java |  297 ----
 .../dht/GridCacheDhtEvictionSelfTest.java       |  363 -----
 .../GridCacheDhtEvictionsDisabledSelfTest.java  |  126 --
 ...idCacheDhtExpiredEntriesPreloadSelfTest.java |   39 -
 .../dht/GridCacheDhtInternalEntrySelfTest.java  |  225 ---
 .../dht/GridCacheDhtMappingSelfTest.java        |  105 --
 .../dht/GridCacheDhtMultiBackupTest.java        |  135 --
 .../dht/GridCacheDhtPreloadBigDataSelfTest.java |  229 ---
 .../dht/GridCacheDhtPreloadDelayedSelfTest.java |  459 ------
 .../GridCacheDhtPreloadDisabledSelfTest.java    |  278 ----
 .../GridCacheDhtPreloadMessageCountTest.java    |  175 --
 ...ridCacheDhtPreloadMultiThreadedSelfTest.java |  175 --
 .../dht/GridCacheDhtPreloadOffHeapSelfTest.java |   39 -
 .../dht/GridCacheDhtPreloadPutGetSelfTest.java  |  275 ----
 .../dht/GridCacheDhtPreloadSelfTest.java        |  674 --------
 .../GridCacheDhtPreloadStartStopSelfTest.java   |  269 ----
 .../dht/GridCacheDhtPreloadUnloadSelfTest.java  |  322 ----
 .../dht/GridCacheDhtRemoveFailureTest.java      |   44 -
 .../distributed/dht/GridCacheDhtTestUtils.java  |  231 ---
 .../dht/GridCacheDhtTxPreloadSelfTest.java      |   45 -
 .../GridCacheExColocatedFullApiSelfTest.java    |   33 -
 .../dht/GridCacheGlobalLoadTest.java            |  171 --
 .../GridCacheGroupLockColocatedSelfTest.java    |   38 -
 ...acheGroupLockMultiNodeColocatedSelfTest.java |   29 -
 ...cheGroupLockPartitionedAbstractSelfTest.java |  136 --
 ...ockPartitionedMultiNodeAbstractSelfTest.java |  173 --
 ...ePartitionedNearDisabledFullApiSelfTest.java |   33 -
 ...ePartitionedNearDisabledMetricsSelfTest.java |  120 --
 ...nedNearDisabledMultiNodeFullApiSelfTest.java |   34 -
 ...bledMultiNodeP2PDisabledFullApiSelfTest.java |   34 -
 ...ionedNearDisabledOffHeapFullApiSelfTest.java |   29 -
 ...DisabledOffHeapMultiNodeFullApiSelfTest.java |   29 -
 ...isabledTxOriginatingNodeFailureSelfTest.java |   31 -
 ...dOnlyP2PDisabledByteArrayValuesSelfTest.java |   29 -
 ...edOnlyP2PEnabledByteArrayValuesSelfTest.java |   29 -
 ...dCachePartitionedOnlyProjectionSelfTest.java |   32 -
 ...idCachePartitionedPreloadEventsSelfTest.java |  124 --
 ...dCachePartitionedTopologyChangeSelfTest.java |  596 -------
 ...ransformWriteThroughBatchUpdateSelfTest.java |   29 -
 ...itionedTxOriginatingNodeFailureSelfTest.java |  161 --
 ...ridCachePartitionedUnloadEventsSelfTest.java |  151 --
 ...teTxConsistencyColocatedRestartSelfTest.java |   36 -
 .../dht/IgniteTxReentryColocatedSelfTest.java   |   79 -
 .../GridCacheReplicatedClientOnlySelfTest.java  |    3 +-
 .../gridgain/loadtests/dsi/GridDsiPerfJob.java  |    2 +-
 .../gridgain/testframework/GridTestUtils.java   |    2 +-
 .../junits/common/GridCommonAbstractTest.java   |    2 +-
 .../GridCacheFullApiSelfTestSuite.java          |    2 +-
 .../GridCacheGroupLockSelfTestSuite.java        |    2 +-
 .../GridCacheNearOnlySelfTestSuite.java         |    2 +-
 .../GridCacheTxRecoverySelfTestSuite.java       |    2 +-
 .../bamboo/GridCacheFailoverTestSuite.java      |    2 +-
 .../bamboo/GridDataGridTestSuite.java           |    2 +-
 267 files changed, 26850 insertions(+), 26858 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
new file mode 100644
index 0000000..586f4ef
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -0,0 +1,816 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Partition topology for node which does not have any local partitions.
+ */
+@GridToStringExclude
+public class GridClientPartitionTopology<K, V> implements 
GridDhtPartitionTopology<K, V> {
+    /** If true, then check consistency. */
+    private static final boolean CONSISTENCY_CHECK = false;
+
+    /** Flag to control amount of output for full map. */
+    private static final boolean FULL_MAP_DEBUG = false;
+
+    /** Cache shared context. */
+    private GridCacheSharedContext<K, V> cctx;
+
+    /** Cache ID. */
+    private int cacheId;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Node to partition map. */
+    private GridDhtPartitionFullMap node2part;
+
+    /** Partition to node map. */
+    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
+
+    /** */
+    private GridDhtPartitionExchangeId lastExchangeId;
+
+    /** */
+    private long topVer = -1;
+
+    /** A future that will be completed when topology with version topVer will 
be ready to use. */
+    private GridDhtTopologyFuture topReadyFut;
+
+    /** */
+    private final GridAtomicLong updateSeq = new GridAtomicLong(1);
+
+    /** Lock. */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * @param cctx Context.
+     * @param cacheId Cache ID.
+     * @param exchId Exchange ID.
+     */
+    public GridClientPartitionTopology(GridCacheSharedContext<K, V> cctx, int 
cacheId,
+        GridDhtPartitionExchangeId exchId) {
+        this.cctx = cctx;
+        this.cacheId = cacheId;
+
+        topVer = exchId.topologyVersion();
+
+        log = cctx.logger(getClass());
+
+        beforeExchange(exchId);
+    }
+
+    /**
+     * @return Full map string representation.
+     */
+    @SuppressWarnings( {"ConstantConditions"})
+    private String fullMapString() {
+        return node2part == null ? "null" : FULL_MAP_DEBUG ? 
node2part.toFullString() : node2part.toString();
+    }
+
+    /**
+     * @param map Map to get string for.
+     * @return Full map string representation.
+     */
+    @SuppressWarnings( {"ConstantConditions"})
+    private String mapString(GridDhtPartitionMap map) {
+        return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : 
map.toString();
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public int cacheId() {
+        return cacheId;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"})
+    @Override public void readLock() {
+        lock.readLock().lock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateTopologyVersion(GridDhtPartitionExchangeId 
exchId,
+        GridDhtPartitionsExchangeFuture<K, V> exchFut) {
+        lock.writeLock().lock();
+
+        try {
+            assert exchId.topologyVersion() > topVer : "Invalid topology 
version [topVer=" + topVer +
+                ", exchId=" + exchId + ']';
+
+            topVer = exchId.topologyVersion();
+
+            topReadyFut = exchFut;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long topologyVersion() {
+        lock.readLock().lock();
+
+        try {
+            assert topVer > 0;
+
+            return topVer;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtTopologyFuture topologyVersionFuture() {
+        lock.readLock().lock();
+
+        try {
+            assert topReadyFut != null;
+
+            return topReadyFut;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) {
+        ClusterNode loc = cctx.localNode();
+
+        lock.writeLock().lock();
+
+        try {
+            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
+                topVer + ", exchId=" + exchId + ']';
+
+            if (!exchId.isJoined())
+                removeNode(exchId.nodeId());
+
+            // In case if node joins, get topology at the time of joining node.
+            ClusterNode oldest = CU.oldest(cctx, topVer);
+
+            if (log.isDebugEnabled())
+                log.debug("Partition map beforeExchange [exchId=" + exchId + 
", fullMap=" + fullMapString() + ']');
+
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            // If this is the oldest node.
+            if (oldest.id().equals(loc.id())) {
+                if (node2part == null) {
+                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Created brand new full topology map on 
oldest node [exchId=" +
+                            exchId + ", fullMap=" + fullMapString() + ']');
+                }
+                else if (!node2part.valid()) {
+                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Created new full topology map on oldest 
node [exchId=" + exchId + ", fullMap=" +
+                            node2part + ']');
+                }
+                else if (!node2part.nodeId().equals(loc.id())) {
+                    node2part = new GridDhtPartitionFullMap(loc.id(), 
loc.order(), updateSeq, node2part, false);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Copied old map into new map on oldest node 
(previous oldest node left) [exchId=" +
+                            exchId + ", fullMap=" + fullMapString() + ']');
+                }
+            }
+
+            consistencyCheck();
+
+            if (log.isDebugEnabled())
+                log.debug("Partition map after beforeExchange [exchId=" + 
exchId + ", fullMap=" +
+                    fullMapString() + ']');
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) 
throws IgniteCheckedException {
+        long topVer = exchId.topologyVersion();
+
+        lock.writeLock().lock();
+
+        try {
+            assert topVer == exchId.topologyVersion() : "Invalid topology 
version [topVer=" +
+                topVer + ", exchId=" + exchId + ']';
+
+            if (log.isDebugEnabled())
+                log.debug("Partition map before afterExchange [exchId=" + 
exchId + ", fullMap=" +
+                    fullMapString() + ']');
+
+            updateSeq.incrementAndGet();
+
+            consistencyCheck();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int 
p, long topVer, boolean create)
+        throws GridDhtInvalidPartitionException {
+        if (!create)
+            return null;
+
+        throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted 
partition [part=" + p +
+            ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean 
create) {
+        return localPartition(1, -1, create);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<GridDhtLocalPartition<K, V>> localPartitions() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridDhtLocalPartition<K, V>> 
currentLocalPartitions() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, 
GridDhtCacheEntry<K, V> e) {
+        assert false : "Entry should not be added to client topology: " + e;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onRemoved(GridDhtCacheEntry<K, V> e) {
+        assert false : "Entry should not be removed from client topology: " + 
e;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionMap localPartitionMap() {
+        lock.readLock().lock();
+
+        try {
+            return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(),
+                Collections.<Integer, GridDhtPartitionState>emptyMap(), true);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> nodes(int p, long topVer) {
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
+                ", node2part=" + node2part + ']';
+
+            Collection<ClusterNode> nodes = null;
+
+            Collection<UUID> nodeIds = part2node.get(p);
+
+            if (!F.isEmpty(nodeIds)) {
+                for (UUID nodeId : nodeIds) {
+                    ClusterNode n = cctx.discovery().node(nodeId);
+
+                    if (n != null && (topVer < 0 || n.order() <= topVer)) {
+                        if (nodes == null)
+                            nodes = new ArrayList<>();
+
+                        nodes.add(n);
+                    }
+                }
+            }
+
+            return nodes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version ({@code -1} for all nodes).
+     * @param state Partition state.
+     * @param states Additional partition states.
+     * @return List of nodes for the partition.
+     */
+    private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState 
state, GridDhtPartitionState... states) {
+        Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, 
topVer)) : null;
+
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
+                ", allIds=" + allIds + ", node2part=" + node2part + ']';
+
+            Collection<UUID> nodeIds = part2node.get(p);
+
+            // Node IDs can be null if both, primary and backup, nodes 
disappear.
+            int size = nodeIds == null ? 0 : nodeIds.size();
+
+            if (size == 0)
+                return Collections.emptyList();
+
+            List<ClusterNode> nodes = new ArrayList<>(size);
+
+            for (UUID id : nodeIds) {
+                if (topVer > 0 && !allIds.contains(id))
+                    continue;
+
+                if (hasState(p, id, state, states)) {
+                    ClusterNode n = cctx.discovery().node(id);
+
+                    if (n != null && (topVer < 0 || n.order() <= topVer))
+                        nodes.add(n);
+                }
+            }
+
+            return nodes;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> owners(int p, long topVer) {
+        return nodes(p, topVer, OWNING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> owners(int p) {
+        return owners(p, -1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> moving(int p) {
+        return nodes(p, -1, MOVING);
+    }
+
+    /**
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return List of nodes in state OWNING or MOVING.
+     */
+    private List<ClusterNode> ownersAndMoving(int p, long topVer) {
+        return nodes(p, topVer, OWNING, MOVING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long updateSequence() {
+        return updateSeq.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
+        lock.readLock().lock();
+
+        try {
+            assert node2part != null && node2part.valid() : "Invalid node2part 
[node2part: " + node2part +
+                ", locNodeId=" + cctx.localNodeId() + ", gridName=" + 
cctx.gridName() + ']';
+
+            GridDhtPartitionFullMap m = node2part;
+
+            return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), 
m.updateSequence(), m, onlyActive);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionFullMap partMap) {
+        if (log.isDebugEnabled())
+            log.debug("Updating full partition map [exchId=" + exchId + ", 
parts=" + fullMapString() + ']');
+
+        lock.writeLock().lock();
+
+        try {
+            if (exchId != null && lastExchangeId != null && 
lastExchangeId.compareTo(exchId) >= 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale exchange id for full partition map update 
(will ignore) [lastExchId=" +
+                        lastExchangeId + ", exchId=" + exchId + ']');
+
+                return null;
+            }
+
+            if (node2part != null && node2part.compareTo(partMap) >= 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale partition map for full partition map 
update (will ignore) [lastExchId=" +
+                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + 
node2part + ", newMap=" + partMap + ']');
+
+                return null;
+            }
+
+            updateSeq.incrementAndGet();
+
+            if (exchId != null)
+                lastExchangeId = exchId;
+
+            if (node2part != null) {
+                for (GridDhtPartitionMap part : node2part.values()) {
+                    GridDhtPartitionMap newPart = partMap.get(part.nodeId());
+
+                    // If for some nodes current partition has a newer map,
+                    // then we keep the newer value.
+                    if (newPart != null && newPart.updateSequence() < 
part.updateSequence()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Overriding partition map in full update 
map [exchId=" + exchId + ", curPart=" +
+                                mapString(part) + ", newPart=" + 
mapString(newPart) + ']');
+
+                        partMap.put(part.nodeId(), part);
+                    }
+                }
+
+                for (Iterator<UUID> it = partMap.keySet().iterator(); 
it.hasNext();) {
+                    UUID nodeId = it.next();
+
+                    if (!cctx.discovery().alive(nodeId)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Removing left node from full map update 
[nodeId=" + nodeId + ", partMap=" +
+                                partMap + ']');
+
+                        it.remove();
+                    }
+                }
+            }
+
+            node2part = partMap;
+
+            Map<Integer, Set<UUID>> p2n = new HashMap<>();
+
+            for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+                for (Integer p : e.getValue().keySet()) {
+                    Set<UUID> ids = p2n.get(p);
+
+                    if (ids == null)
+                        // Initialize HashSet to size 3 in anticipation that 
there won't be
+                        // more than 3 nodes per partitions.
+                        p2n.put(p, ids = U.newHashSet(3));
+
+                    ids.add(e.getKey());
+                }
+            }
+
+            part2node = p2n;
+
+            consistencyCheck();
+
+            if (log.isDebugEnabled())
+                log.debug("Partition map after full update: " + 
fullMapString());
+
+            return null;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    @Nullable @Override public GridDhtPartitionMap update(@Nullable 
GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionMap parts) {
+        if (log.isDebugEnabled())
+            log.debug("Updating single partition map [exchId=" + exchId + ", 
parts=" + mapString(parts) + ']');
+
+        if (!cctx.discovery().alive(parts.nodeId())) {
+            if (log.isDebugEnabled())
+                log.debug("Received partition update for non-existing node 
(will ignore) [exchId=" + exchId +
+                    ", parts=" + parts + ']');
+
+            return null;
+        }
+
+        lock.writeLock().lock();
+
+        try {
+            if (lastExchangeId != null && exchId != null && 
lastExchangeId.compareTo(exchId) > 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Stale exchange id for single partition map 
update (will ignore) [lastExchId=" +
+                        lastExchangeId + ", exchId=" + exchId + ']');
+
+                return null;
+            }
+
+            if (exchId != null)
+                lastExchangeId = exchId;
+
+            if (node2part == null) {
+                U.dumpStack(log, "Created invalid: " + node2part);
+
+                // Create invalid partition map.
+                node2part = new GridDhtPartitionFullMap();
+            }
+
+            GridDhtPartitionMap cur = node2part.get(parts.nodeId());
+
+            if (cur != null && cur.updateSequence() >= parts.updateSequence()) 
{
+                if (log.isDebugEnabled())
+                    log.debug("Stale update sequence for single partition map 
update (will ignore) [exchId=" + exchId +
+                        ", curSeq=" + cur.updateSequence() + ", newSeq=" + 
parts.updateSequence() + ']');
+
+                return null;
+            }
+
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
+
+            boolean changed = false;
+
+            if (cur == null || !cur.equals(parts))
+                changed = true;
+
+            node2part.put(parts.nodeId(), parts);
+
+            part2node = new HashMap<>(part2node);
+
+            // Add new mappings.
+            for (Integer p : parts.keySet()) {
+                Set<UUID> ids = part2node.get(p);
+
+                if (ids == null)
+                    // Initialize HashSet to size 3 in anticipation that there 
won't be
+                    // more than 3 nodes per partition.
+                    part2node.put(p, ids = U.newHashSet(3));
+
+                changed |= ids.add(parts.nodeId());
+            }
+
+            // Remove obsolete mappings.
+            if (cur != null) {
+                for (Integer p : F.view(cur.keySet(), 
F0.notIn(parts.keySet()))) {
+                    Set<UUID> ids = part2node.get(p);
+
+                    if (ids != null)
+                        changed |= ids.remove(parts.nodeId());
+                }
+            }
+
+            consistencyCheck();
+
+            if (log.isDebugEnabled())
+                log.debug("Partition map after single update: " + 
fullMapString());
+
+            return changed ? localPartitionMap() : null;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Updates value for single partition.
+     *
+     * @param p Partition.
+     * @param nodeId Node ID.
+     * @param state State.
+     * @param updateSeq Update sequence.
+     */
+    @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
+    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, 
long updateSeq) {
+        assert lock.isWriteLockedByCurrentThread();
+        assert nodeId.equals(cctx.localNodeId());
+
+        // In case if node joins, get topology at the time of joining node.
+        ClusterNode oldest = CU.oldest(cctx, topVer);
+
+        // If this node became the oldest node.
+        if (oldest.id().equals(cctx.localNodeId())) {
+            long seq = node2part.updateSequence();
+
+            if (seq != updateSeq) {
+                if (seq > updateSeq) {
+                    if (this.updateSeq.get() < seq) {
+                        // Update global counter if necessary.
+                        boolean b = 
this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
+
+                        assert b : "Invalid update sequence [updateSeq=" + 
updateSeq + ", seq=" + seq +
+                            ", curUpdateSeq=" + this.updateSeq.get() + ", 
node2part=" + node2part.toFullString() + ']';
+
+                        updateSeq = seq + 1;
+                    }
+                    else
+                        updateSeq = seq;
+                }
+
+                node2part.updateSequence(updateSeq);
+            }
+        }
+
+        GridDhtPartitionMap map = node2part.get(nodeId);
+
+        if (map == null)
+            node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, 
updateSeq,
+                Collections.<Integer, GridDhtPartitionState>emptyMap(), 
false));
+
+        map.updateSequence(updateSeq);
+
+        map.put(p, state);
+
+        Set<UUID> ids = part2node.get(p);
+
+        if (ids == null)
+            part2node.put(p, ids = U.newHashSet(3));
+
+        ids.add(nodeId);
+    }
+
+    /**
+     * @param nodeId Node to remove.
+     */
+    private void removeNode(UUID nodeId) {
+        assert nodeId != null;
+        assert lock.writeLock().isHeldByCurrentThread();
+
+        ClusterNode oldest = CU.oldest(cctx, topVer);
+
+        ClusterNode loc = cctx.localNode();
+
+        if (node2part != null) {
+            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
+                updateSeq.setIfGreater(node2part.updateSequence());
+
+                node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), 
updateSeq.incrementAndGet(),
+                    node2part, false);
+            }
+            else
+                node2part = new GridDhtPartitionFullMap(node2part, 
node2part.updateSequence());
+
+            part2node = new HashMap<>(part2node);
+
+            GridDhtPartitionMap parts = node2part.remove(nodeId);
+
+            if (parts != null) {
+                for (Integer p : parts.keySet()) {
+                    Set<UUID> nodeIds = part2node.get(p);
+
+                    if (nodeIds != null) {
+                        nodeIds.remove(nodeId);
+
+                        if (nodeIds.isEmpty())
+                            part2node.remove(p);
+                    }
+                }
+            }
+
+            consistencyCheck();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean own(GridDhtLocalPartition<K, V> part) {
+        assert false : "Client topology should never own a partition: " + part;
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean 
updateSeq) {
+        assert updateSeq || lock.isWriteLockedByCurrentThread();
+
+        lock.writeLock().lock();
+
+        try {
+            assert part.state() == EVICTED;
+
+            long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
+
+            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+
+            consistencyCheck();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) {
+        lock.readLock().lock();
+
+        try {
+            return node2part.get(nodeId);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats(int threshold) {
+        X.println(">>>  Cache partition topology stats [grid=" + 
cctx.gridName() + ", cacheId=" + cacheId + ']');
+    }
+
+    /**
+     * @param p Partition.
+     * @param nodeId Node ID.
+     * @param match State to match.
+     * @param matches Additional states.
+     * @return Filter for owners of this partition.
+     */
+    private boolean hasState(final int p, @Nullable UUID nodeId, final 
GridDhtPartitionState match,
+        final GridDhtPartitionState... matches) {
+        if (nodeId == null)
+            return false;
+
+        GridDhtPartitionMap parts = node2part.get(nodeId);
+
+        // Set can be null if node has been removed.
+        if (parts != null) {
+            GridDhtPartitionState state = parts.get(p);
+
+            if (state == match)
+                return true;
+
+            if (matches != null && matches.length > 0)
+                for (GridDhtPartitionState s : matches)
+                    if (state == s)
+                        return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks consistency after all operations.
+     */
+    private void consistencyCheck() {
+        if (CONSISTENCY_CHECK) {
+            assert lock.writeLock().isHeldByCurrentThread();
+
+            if (node2part == null)
+                return;
+
+            for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
+                for (Integer p : e.getValue().keySet()) {
+                    Set<UUID> nodeIds = part2node.get(p);
+
+                    assert nodeIds != null : "Failed consistency check [part=" 
+ p + ", nodeId=" + e.getKey() + ']';
+                    assert nodeIds.contains(e.getKey()) : "Failed consistency 
check [part=" + p + ", nodeId=" +
+                        e.getKey() + ", nodeIds=" + nodeIds + ']';
+                }
+            }
+
+            for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
+                for (UUID nodeId : e.getValue()) {
+                    GridDhtPartitionMap map = node2part.get(nodeId);
+
+                    assert map != null : "Failed consistency check [part=" + 
e.getKey() + ", nodeId=" + nodeId + ']';
+                    assert map.containsKey(e.getKey()) : "Failed consistency 
check [part=" + e.getKey() +
+                        ", nodeId=" + nodeId + ']';
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
new file mode 100644
index 0000000..d947ecf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.nio.*;
+
+/**
+ * Affinity assignment request.
+ */
+public class GridDhtAffinityAssignmentRequest<K, V> extends 
GridCacheMessage<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version being queried. */
+    private long topVer;
+
+    /**
+     * Empty constructor.
+     */
+    public GridDhtAffinityAssignmentRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param topVer Topology version.
+     */
+    public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) {
+        this.cacheId = cacheId;
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /**
+     * @return Requested topology version.
+     */
+    @Override public long topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 79;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDhtAffinityAssignmentRequest _clone = new 
GridDhtAffinityAssignmentRequest();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDhtAffinityAssignmentRequest _clone = 
(GridDhtAffinityAssignmentRequest)_msg;
+
+        _clone.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 3:
+                if (!commState.putLong(topVer))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 3:
+                if (buf.remaining() < 8)
+                    return false;
+
+                topVer = commState.getLong();
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAffinityAssignmentRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
new file mode 100644
index 0000000..b476667
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Affinity assignment response.
+ */
+public class GridDhtAffinityAssignmentResponse<K, V> extends 
GridCacheMessage<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology version. */
+    private long topVer;
+
+    /** Affinity assignment. */
+    @GridDirectTransient
+    @GridToStringInclude
+    private List<List<ClusterNode>> affAssignment;
+
+    /** Affinity assignment bytes. */
+    private byte[] affAssignmentBytes;
+
+    /**
+     * Empty constructor.
+     */
+    public GridDhtAffinityAssignmentResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param topVer Topology version.
+     * @param affAssignment Affinity assignment.
+     */
+    public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, 
List<List<ClusterNode>> affAssignment) {
+        this.cacheId = cacheId;
+        this.topVer = topVer;
+        this.affAssignment = affAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> affinityAssignment() {
+        return affAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 80;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDhtAffinityAssignmentResponse _clone = new 
GridDhtAffinityAssignmentResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        super.clone0(_msg);
+
+        GridDhtAffinityAssignmentResponse _clone = 
(GridDhtAffinityAssignmentResponse)_msg;
+
+        _clone.topVer = topVer;
+        _clone.affAssignment = affAssignment;
+        _clone.affAssignmentBytes = affAssignmentBytes;
+    }
+
+    /** {@inheritDoc}
+     * @param ctx*/
+    @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        if (affAssignment != null)
+            affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (affAssignmentBytes != null)
+            affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, 
ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.writeTo(buf))
+            return false;
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 3:
+                if (!commState.putByteArray(affAssignmentBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 4:
+                if (!commState.putLong(topVer))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!super.readFrom(buf))
+            return false;
+
+        switch (commState.idx) {
+            case 3:
+                byte[] affAssignmentBytes0 = commState.getByteArray();
+
+                if (affAssignmentBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                affAssignmentBytes = affAssignmentBytes0;
+
+                commState.idx++;
+
+            case 4:
+                if (buf.remaining() < 8)
+                    return false;
+
+                topVer = commState.getLong();
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAffinityAssignmentResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
new file mode 100644
index 0000000..ecc9419
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Future that fetches affinity assignment from remote cache nodes.
+ */
+public class GridDhtAssignmentFetchFuture<K, V> extends 
GridFutureAdapter<List<List<ClusterNode>>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Nodes order comparator. */
+    private static final Comparator<ClusterNode> CMP = new 
GridNodeOrderComparator();
+
+    /** Cache context. */
+    private final GridCacheContext<K, V> ctx;
+
+    /** List of available nodes this future can fetch data from. */
+    private Queue<ClusterNode> availableNodes;
+
+    /** Topology version. */
+    private final long topVer;
+
+    /** Pending node from which response is being awaited. */
+    private ClusterNode pendingNode;
+
+    /**
+     * @param ctx Cache context.
+     * @param availableNodes Available nodes.
+     */
+    public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long 
topVer, Collection<ClusterNode> availableNodes) {
+        super(ctx.kernalContext());
+
+        this.ctx = ctx;
+
+        this.topVer = topVer;
+
+        LinkedList<ClusterNode> tmp = new LinkedList<>();
+        tmp.addAll(availableNodes);
+        Collections.sort(tmp, CMP);
+
+        this.availableNodes = tmp;
+    }
+
+    /**
+     * Initializes fetch future.
+     */
+    public void init() {
+        ((GridDhtPreloader<K, 
V>)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this);
+
+        requestFromNextNode();
+    }
+
+    /**
+     * @param node Node.
+     * @param res Reponse.
+     */
+    public void onResponse(ClusterNode node, 
GridDhtAffinityAssignmentResponse<K, V> res) {
+        if (res.topologyVersion() != topVer) {
+            if (log.isDebugEnabled())
+                log.debug("Received affinity assignment for wrong topolgy 
version (will ignore) " +
+                    "[node=" + node + ", res=" + res + ", topVer=" + topVer + 
']');
+
+            return;
+        }
+
+        List<List<ClusterNode>> assignment = null;
+
+        synchronized (this) {
+            if (pendingNode != null && pendingNode.equals(node))
+                assignment = res.affinityAssignment();
+        }
+
+        if (assignment != null)
+            onDone(assignment);
+    }
+
+    /**
+     * @param leftNodeId Left node ID.
+     */
+    public void onNodeLeft(UUID leftNodeId) {
+        synchronized (this) {
+            if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
+                availableNodes.remove(pendingNode);
+
+                pendingNode = null;
+            }
+        }
+
+        requestFromNextNode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, 
@Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            ((GridDhtPreloader<K, 
V>)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Requests affinity from next node in the list.
+     */
+    private void requestFromNextNode() {
+        boolean complete;
+
+        // Avoid 'protected field is accessed in synchronized context' warning.
+        IgniteLogger log0 = log;
+
+        synchronized (this) {
+            while (!availableNodes.isEmpty()) {
+                ClusterNode node = availableNodes.poll();
+
+                try {
+                    if (log0.isDebugEnabled())
+                        log0.debug("Sending affinity fetch request to remote 
node [locNodeId=" + ctx.localNodeId() +
+                            ", node=" + node + ']');
+
+                    ctx.io().send(node, new 
GridDhtAffinityAssignmentRequest<K, V>(ctx.cacheId(), topVer),
+                        AFFINITY_POOL);
+
+                    // Close window for listener notification.
+                    if (ctx.discovery().node(node.id()) == null) {
+                        U.warn(log0, "Failed to request affinity assignment 
from remote node (node left grid, will " +
+                            "continue to another node): " + node);
+
+                        continue;
+                    }
+
+                    pendingNode = node;
+
+                    break;
+                }
+                catch (ClusterTopologyException ignored) {
+                    U.warn(log0, "Failed to request affinity assignment from 
remote node (node left grid, will " +
+                        "continue to another node): " + node);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log0, "Failed to request affinity assignment from 
remote node (will " +
+                        "continue to another node): " + node, e);
+                }
+            }
+
+            complete = pendingNode == null;
+        }
+
+        // No more nodes left, complete future with null outside of 
synchronization.
+        // Affinity should be calculated from scratch.
+        if (complete)
+            onDone((List<List<ClusterNode>>)null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
new file mode 100644
index 0000000..b789e99
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+
+/**
+ * DHT cache.
+ */
+public class GridDhtCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> 
{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Near cache. */
+    @GridToStringExclude
+    private GridNearTransactionalCache<K, V> near;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtCache() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     */
+    public GridDhtCache(GridCacheContext<K, V> ctx) {
+        super(ctx);
+    }
+
+    /**
+     * @param ctx Cache context.
+     * @param map Cache map.
+     */
+    public GridDhtCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, 
V> map) {
+        super(ctx, map);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDht() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        String name = super.name();
+
+        return name == null ? "defaultDhtCache" : name + "Dht";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        resetMetrics();
+
+        super.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        GridCacheMetricsAdapter m = new GridCacheMetricsAdapter();
+
+        m.delegate(ctx.dht().near().metrics0());
+
+        metrics = m;
+
+        ctx.dr().resetMetrics();
+    }
+
+    /**
+     * @return Near cache.
+     */
+    @Override public GridNearTransactionalCache<K, V> near() {
+        return near;
+    }
+
+    /**
+     * @param near Near cache.
+     */
+    public void near(GridNearTransactionalCache<K, V> near) {
+        this.near = near;
+    }
+}

Reply via email to