# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b77f2a59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b77f2a59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b77f2a59 Branch: refs/heads/ignite-63 Commit: b77f2a59dbf9e379384c2ac8f9999d235474f74e Parents: ef78d14 Author: sboikov <semen.boi...@inria.fr> Authored: Fri Jan 23 00:49:19 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Fri Jan 23 00:49:34 2015 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 816 ++++++++++ .../dht/GridDhtAffinityAssignmentRequest.java | 139 ++ .../dht/GridDhtAffinityAssignmentResponse.java | 196 +++ .../dht/GridDhtAssignmentFetchFuture.java | 183 +++ .../cache/distributed/dht/GridDhtCache.java | 103 ++ .../distributed/dht/GridDhtCacheAdapter.java | 1017 ++++++++++++ .../distributed/dht/GridDhtCacheEntry.java | 760 +++++++++ .../distributed/dht/GridDhtCacheEntryImpl.java | 172 ++ .../distributed/dht/GridDhtEmbeddedFuture.java | 92 ++ .../distributed/dht/GridDhtFinishedFuture.java | 66 + .../cache/distributed/dht/GridDhtFuture.java | 36 + .../cache/distributed/dht/GridDhtGetFuture.java | 451 ++++++ .../dht/GridDhtInvalidPartitionException.java | 51 + .../distributed/dht/GridDhtLocalPartition.java | 594 +++++++ .../distributed/dht/GridDhtLockFuture.java | 1235 +++++++++++++++ .../distributed/dht/GridDhtLockRequest.java | 596 +++++++ .../distributed/dht/GridDhtLockResponse.java | 453 ++++++ .../distributed/dht/GridDhtPartitionState.java | 55 + .../dht/GridDhtPartitionTopology.java | 207 +++ .../dht/GridDhtPartitionTopologyImpl.java | 1195 ++++++++++++++ .../distributed/dht/GridDhtTopologyFuture.java | 44 + .../dht/GridDhtTransactionalCacheAdapter.java | 1492 ++++++++++++++++++ .../distributed/dht/GridDhtTxFinishFuture.java | 532 +++++++ .../distributed/dht/GridDhtTxFinishRequest.java | 702 ++++++++ .../dht/GridDhtTxFinishResponse.java | 145 ++ .../cache/distributed/dht/GridDhtTxLocal.java | 656 ++++++++ .../distributed/dht/GridDhtTxLocalAdapter.java | 831 ++++++++++ .../cache/distributed/dht/GridDhtTxMapping.java | 169 ++ .../distributed/dht/GridDhtTxPrepareFuture.java | 1074 +++++++++++++ .../dht/GridDhtTxPrepareRequest.java | 613 +++++++ .../dht/GridDhtTxPrepareResponse.java | 471 ++++++ .../cache/distributed/dht/GridDhtTxRemote.java | 332 ++++ .../distributed/dht/GridDhtUnlockRequest.java | 221 +++ .../distributed/dht/GridNoStorageCacheMap.java | 109 ++ .../dht/GridPartitionedGetFuture.java | 715 +++++++++ .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/atomic/GridDhtAtomicCacheEntry.java | 2 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../colocated/GridDhtColocatedCacheEntry.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 2 +- .../dht/preloader/GridDhtForceKeysFuture.java | 4 +- .../preloader/GridDhtPartitionDemandPool.java | 4 +- .../dht/preloader/GridDhtPartitionMap.java | 2 +- .../preloader/GridDhtPartitionSupplyPool.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../dht/preloader/GridDhtPreloader.java | 2 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 2 +- .../distributed/near/GridNearGetFuture.java | 2 +- .../distributed/near/GridNearLockFuture.java | 2 +- .../near/GridNearTransactionalCache.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../near/GridNearTxPrepareFuture.java | 2 +- .../cache/query/GridCacheQueryManager.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 2 +- .../cache/transactions/IgniteTxManager.java | 2 +- .../GridTcpCommunicationMessageFactory.java | 2 +- .../ignite/internal/visor/cache/VisorCache.java | 2 +- .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheContext.java | 2 +- .../processors/cache/GridCacheEntryImpl.java | 2 +- .../cache/GridCacheEvictionManager.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 2 +- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../kernal/processors/cache/GridCacheUtils.java | 2 +- ...dCachePessimisticCheckCommittedTxFuture.java | 2 +- .../GridPartitionedCacheEntryImpl.java | 2 +- .../dht/GridClientPartitionTopology.java | 816 ---------- .../dht/GridDhtAffinityAssignmentRequest.java | 139 -- .../dht/GridDhtAffinityAssignmentResponse.java | 196 --- .../dht/GridDhtAssignmentFetchFuture.java | 183 --- .../cache/distributed/dht/GridDhtCache.java | 103 -- .../distributed/dht/GridDhtCacheAdapter.java | 1017 ------------ .../distributed/dht/GridDhtCacheEntry.java | 760 --------- .../distributed/dht/GridDhtCacheEntryImpl.java | 172 -- .../distributed/dht/GridDhtEmbeddedFuture.java | 92 -- .../distributed/dht/GridDhtFinishedFuture.java | 66 - .../cache/distributed/dht/GridDhtFuture.java | 36 - .../cache/distributed/dht/GridDhtGetFuture.java | 451 ------ .../dht/GridDhtInvalidPartitionException.java | 51 - .../distributed/dht/GridDhtLocalPartition.java | 594 ------- .../distributed/dht/GridDhtLockFuture.java | 1235 --------------- .../distributed/dht/GridDhtLockRequest.java | 596 ------- .../distributed/dht/GridDhtLockResponse.java | 453 ------ .../distributed/dht/GridDhtPartitionState.java | 55 - .../dht/GridDhtPartitionTopology.java | 207 --- .../dht/GridDhtPartitionTopologyImpl.java | 1195 -------------- .../distributed/dht/GridDhtTopologyFuture.java | 44 - .../dht/GridDhtTransactionalCacheAdapter.java | 1492 ------------------ .../distributed/dht/GridDhtTxFinishFuture.java | 532 ------- .../distributed/dht/GridDhtTxFinishRequest.java | 702 -------- .../dht/GridDhtTxFinishResponse.java | 145 -- .../cache/distributed/dht/GridDhtTxLocal.java | 656 -------- .../distributed/dht/GridDhtTxLocalAdapter.java | 831 ---------- .../cache/distributed/dht/GridDhtTxMapping.java | 169 -- .../distributed/dht/GridDhtTxPrepareFuture.java | 1074 ------------- .../dht/GridDhtTxPrepareRequest.java | 613 ------- .../dht/GridDhtTxPrepareResponse.java | 471 ------ .../cache/distributed/dht/GridDhtTxRemote.java | 332 ---- .../distributed/dht/GridDhtUnlockRequest.java | 221 --- .../distributed/dht/GridNoStorageCacheMap.java | 109 -- .../dht/GridPartitionedGetFuture.java | 715 --------- ...tPartitionedOnlyByteArrayValuesSelfTest.java | 168 ++ ...heAbstractTransformWriteThroughSelfTest.java | 336 ++++ ...acheAtomicExpiredEntriesPreloadSelfTest.java | 45 + .../dht/GridCacheAtomicFullApiSelfTest.java | 100 ++ .../dht/GridCacheAtomicNearCacheSelfTest.java | 820 ++++++++++ ...idCacheAtomicNearEnabledFullApiSelfTest.java | 32 + ...EnabledPrimaryWriteOrderFullApiSelfTest.java | 33 + ...eAtomicPrimaryWriteOrderFullApiSelfTest.java | 32 + ...tomicPrimaryWriteOrderReloadAllSelfTest.java | 32 + .../dht/GridCacheAtomicReloadAllSelfTest.java | 38 + .../dht/GridCacheClientOnlySelfTest.java | 38 + .../dht/GridCacheColocatedDebugTest.java | 977 ++++++++++++ .../dht/GridCacheColocatedFailoverSelfTest.java | 39 + ...eColocatedOptimisticTransactionSelfTest.java | 156 ++ ...ridCacheColocatedPreloadRestartSelfTest.java | 30 + .../GridCacheColocatedPrimarySyncSelfTest.java | 33 + .../GridCacheColocatedReloadAllSelfTest.java | 30 + .../GridCacheColocatedTxExceptionSelfTest.java | 39 + ...ssimisticOriginatingNodeFailureSelfTest.java | 49 + ...dCacheColocatedTxSingleThreadedSelfTest.java | 107 ++ .../GridCacheDaemonNodePartitionedSelfTest.java | 31 + ...cheDhtAtomicEvictionNearReadersSelfTest.java | 32 + .../GridCacheDhtAtomicRemoveFailureTest.java | 50 + .../dht/GridCacheDhtEntrySelfTest.java | 314 ++++ .../dht/GridCacheDhtEntrySetSelfTest.java | 45 + ...GridCacheDhtEvictionNearReadersSelfTest.java | 297 ++++ .../dht/GridCacheDhtEvictionSelfTest.java | 363 +++++ .../GridCacheDhtEvictionsDisabledSelfTest.java | 125 ++ ...idCacheDhtExpiredEntriesPreloadSelfTest.java | 39 + .../dht/GridCacheDhtInternalEntrySelfTest.java | 225 +++ .../dht/GridCacheDhtMappingSelfTest.java | 105 ++ .../dht/GridCacheDhtMultiBackupTest.java | 135 ++ .../dht/GridCacheDhtPreloadBigDataSelfTest.java | 228 +++ .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 459 ++++++ .../GridCacheDhtPreloadDisabledSelfTest.java | 278 ++++ .../GridCacheDhtPreloadMessageCountTest.java | 175 ++ ...ridCacheDhtPreloadMultiThreadedSelfTest.java | 175 ++ .../dht/GridCacheDhtPreloadOffHeapSelfTest.java | 39 + .../dht/GridCacheDhtPreloadPutGetSelfTest.java | 275 ++++ .../dht/GridCacheDhtPreloadSelfTest.java | 674 ++++++++ .../GridCacheDhtPreloadStartStopSelfTest.java | 269 ++++ .../dht/GridCacheDhtPreloadUnloadSelfTest.java | 321 ++++ .../dht/GridCacheDhtRemoveFailureTest.java | 44 + .../distributed/dht/GridCacheDhtTestUtils.java | 231 +++ .../dht/GridCacheDhtTxPreloadSelfTest.java | 44 + .../GridCacheExColocatedFullApiSelfTest.java | 33 + .../dht/GridCacheGlobalLoadTest.java | 171 ++ .../GridCacheGroupLockColocatedSelfTest.java | 38 + ...acheGroupLockMultiNodeColocatedSelfTest.java | 29 + ...cheGroupLockPartitionedAbstractSelfTest.java | 136 ++ ...ockPartitionedMultiNodeAbstractSelfTest.java | 173 ++ ...ePartitionedNearDisabledFullApiSelfTest.java | 33 + ...ePartitionedNearDisabledMetricsSelfTest.java | 119 ++ ...nedNearDisabledMultiNodeFullApiSelfTest.java | 34 + ...bledMultiNodeP2PDisabledFullApiSelfTest.java | 34 + ...ionedNearDisabledOffHeapFullApiSelfTest.java | 29 + ...DisabledOffHeapMultiNodeFullApiSelfTest.java | 29 + ...isabledTxOriginatingNodeFailureSelfTest.java | 31 + ...dOnlyP2PDisabledByteArrayValuesSelfTest.java | 29 + ...edOnlyP2PEnabledByteArrayValuesSelfTest.java | 29 + ...dCachePartitionedOnlyProjectionSelfTest.java | 32 + ...idCachePartitionedPreloadEventsSelfTest.java | 124 ++ ...dCachePartitionedTopologyChangeSelfTest.java | 596 +++++++ ...ransformWriteThroughBatchUpdateSelfTest.java | 29 + ...itionedTxOriginatingNodeFailureSelfTest.java | 161 ++ ...ridCachePartitionedUnloadEventsSelfTest.java | 151 ++ ...teTxConsistencyColocatedRestartSelfTest.java | 36 + .../dht/IgniteTxReentryColocatedSelfTest.java | 79 + ...GridCacheGroupLockMultiNodeNearSelfTest.java | 2 +- .../near/GridCacheNearMultiNodeSelfTest.java | 2 +- .../near/GridCacheNearReadersSelfTest.java | 2 +- .../near/GridCacheNearTxMultiNodeSelfTest.java | 2 +- ...ssimisticOriginatingNodeFailureSelfTest.java | 2 +- .../GridCachePartitionedEvictionSelfTest.java | 2 +- ...achePartitionedMultiNodeCounterSelfTest.java | 2 +- .../GridCacheConcurrentTxMultiNodeTest.java | 2 +- .../cache/GridCacheEntryMemorySizeSelfTest.java | 2 +- .../cache/GridCacheMultiUpdateLockSelfTest.java | 2 +- .../IgniteTxConcurrentGetAbstractTest.java | 2 +- .../cache/IgniteTxMultiNodeAbstractTest.java | 2 +- .../cache/IgniteTxReentryAbstractSelfTest.java | 2 +- .../GridCacheMultiNodeLockAbstractTest.java | 2 +- ...dCacheMultithreadedFailoverAbstractTest.java | 2 +- ...tPartitionedOnlyByteArrayValuesSelfTest.java | 168 -- ...heAbstractTransformWriteThroughSelfTest.java | 336 ---- ...acheAtomicExpiredEntriesPreloadSelfTest.java | 45 - .../dht/GridCacheAtomicFullApiSelfTest.java | 100 -- .../dht/GridCacheAtomicNearCacheSelfTest.java | 820 ---------- ...idCacheAtomicNearEnabledFullApiSelfTest.java | 32 - ...EnabledPrimaryWriteOrderFullApiSelfTest.java | 33 - ...eAtomicPrimaryWriteOrderFullApiSelfTest.java | 32 - ...tomicPrimaryWriteOrderReloadAllSelfTest.java | 32 - .../dht/GridCacheAtomicReloadAllSelfTest.java | 38 - .../dht/GridCacheClientOnlySelfTest.java | 38 - .../dht/GridCacheColocatedDebugTest.java | 977 ------------ .../dht/GridCacheColocatedFailoverSelfTest.java | 39 - ...eColocatedOptimisticTransactionSelfTest.java | 156 -- ...ridCacheColocatedPreloadRestartSelfTest.java | 30 - .../GridCacheColocatedPrimarySyncSelfTest.java | 33 - .../GridCacheColocatedReloadAllSelfTest.java | 30 - .../GridCacheColocatedTxExceptionSelfTest.java | 39 - ...ssimisticOriginatingNodeFailureSelfTest.java | 49 - ...dCacheColocatedTxSingleThreadedSelfTest.java | 107 -- .../GridCacheDaemonNodePartitionedSelfTest.java | 31 - ...cheDhtAtomicEvictionNearReadersSelfTest.java | 32 - .../GridCacheDhtAtomicRemoveFailureTest.java | 51 - .../dht/GridCacheDhtEntrySelfTest.java | 314 ---- .../dht/GridCacheDhtEntrySetSelfTest.java | 46 - ...GridCacheDhtEvictionNearReadersSelfTest.java | 297 ---- .../dht/GridCacheDhtEvictionSelfTest.java | 363 ----- .../GridCacheDhtEvictionsDisabledSelfTest.java | 126 -- ...idCacheDhtExpiredEntriesPreloadSelfTest.java | 39 - .../dht/GridCacheDhtInternalEntrySelfTest.java | 225 --- .../dht/GridCacheDhtMappingSelfTest.java | 105 -- .../dht/GridCacheDhtMultiBackupTest.java | 135 -- .../dht/GridCacheDhtPreloadBigDataSelfTest.java | 229 --- .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 459 ------ .../GridCacheDhtPreloadDisabledSelfTest.java | 278 ---- .../GridCacheDhtPreloadMessageCountTest.java | 175 -- ...ridCacheDhtPreloadMultiThreadedSelfTest.java | 175 -- .../dht/GridCacheDhtPreloadOffHeapSelfTest.java | 39 - .../dht/GridCacheDhtPreloadPutGetSelfTest.java | 275 ---- .../dht/GridCacheDhtPreloadSelfTest.java | 674 -------- .../GridCacheDhtPreloadStartStopSelfTest.java | 269 ---- .../dht/GridCacheDhtPreloadUnloadSelfTest.java | 322 ---- .../dht/GridCacheDhtRemoveFailureTest.java | 44 - .../distributed/dht/GridCacheDhtTestUtils.java | 231 --- .../dht/GridCacheDhtTxPreloadSelfTest.java | 45 - .../GridCacheExColocatedFullApiSelfTest.java | 33 - .../dht/GridCacheGlobalLoadTest.java | 171 -- .../GridCacheGroupLockColocatedSelfTest.java | 38 - ...acheGroupLockMultiNodeColocatedSelfTest.java | 29 - ...cheGroupLockPartitionedAbstractSelfTest.java | 136 -- ...ockPartitionedMultiNodeAbstractSelfTest.java | 173 -- ...ePartitionedNearDisabledFullApiSelfTest.java | 33 - ...ePartitionedNearDisabledMetricsSelfTest.java | 120 -- ...nedNearDisabledMultiNodeFullApiSelfTest.java | 34 - ...bledMultiNodeP2PDisabledFullApiSelfTest.java | 34 - ...ionedNearDisabledOffHeapFullApiSelfTest.java | 29 - ...DisabledOffHeapMultiNodeFullApiSelfTest.java | 29 - ...isabledTxOriginatingNodeFailureSelfTest.java | 31 - ...dOnlyP2PDisabledByteArrayValuesSelfTest.java | 29 - ...edOnlyP2PEnabledByteArrayValuesSelfTest.java | 29 - ...dCachePartitionedOnlyProjectionSelfTest.java | 32 - ...idCachePartitionedPreloadEventsSelfTest.java | 124 -- ...dCachePartitionedTopologyChangeSelfTest.java | 596 ------- ...ransformWriteThroughBatchUpdateSelfTest.java | 29 - ...itionedTxOriginatingNodeFailureSelfTest.java | 161 -- ...ridCachePartitionedUnloadEventsSelfTest.java | 151 -- ...teTxConsistencyColocatedRestartSelfTest.java | 36 - .../dht/IgniteTxReentryColocatedSelfTest.java | 79 - .../GridCacheReplicatedClientOnlySelfTest.java | 3 +- .../gridgain/loadtests/dsi/GridDsiPerfJob.java | 2 +- .../gridgain/testframework/GridTestUtils.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 2 +- .../GridCacheFullApiSelfTestSuite.java | 2 +- .../GridCacheGroupLockSelfTestSuite.java | 2 +- .../GridCacheNearOnlySelfTestSuite.java | 2 +- .../GridCacheTxRecoverySelfTestSuite.java | 2 +- .../bamboo/GridCacheFailoverTestSuite.java | 2 +- .../bamboo/GridDataGridTestSuite.java | 2 +- 267 files changed, 26850 insertions(+), 26858 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java new file mode 100644 index 0000000..586f4ef --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -0,0 +1,816 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Partition topology for node which does not have any local partitions. + */ +@GridToStringExclude +public class GridClientPartitionTopology<K, V> implements GridDhtPartitionTopology<K, V> { + /** If true, then check consistency. */ + private static final boolean CONSISTENCY_CHECK = false; + + /** Flag to control amount of output for full map. */ + private static final boolean FULL_MAP_DEBUG = false; + + /** Cache shared context. */ + private GridCacheSharedContext<K, V> cctx; + + /** Cache ID. */ + private int cacheId; + + /** Logger. */ + private final IgniteLogger log; + + /** Node to partition map. */ + private GridDhtPartitionFullMap node2part; + + /** Partition to node map. */ + private Map<Integer, Set<UUID>> part2node = new HashMap<>(); + + /** */ + private GridDhtPartitionExchangeId lastExchangeId; + + /** */ + private long topVer = -1; + + /** A future that will be completed when topology with version topVer will be ready to use. */ + private GridDhtTopologyFuture topReadyFut; + + /** */ + private final GridAtomicLong updateSeq = new GridAtomicLong(1); + + /** Lock. */ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * @param cctx Context. + * @param cacheId Cache ID. + * @param exchId Exchange ID. + */ + public GridClientPartitionTopology(GridCacheSharedContext<K, V> cctx, int cacheId, + GridDhtPartitionExchangeId exchId) { + this.cctx = cctx; + this.cacheId = cacheId; + + topVer = exchId.topologyVersion(); + + log = cctx.logger(getClass()); + + beforeExchange(exchId); + } + + /** + * @return Full map string representation. + */ + @SuppressWarnings( {"ConstantConditions"}) + private String fullMapString() { + return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); + } + + /** + * @param map Map to get string for. + * @return Full map string representation. + */ + @SuppressWarnings( {"ConstantConditions"}) + private String mapString(GridDhtPartitionMap map) { + return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); + } + + /** + * @return Cache ID. + */ + public int cacheId() { + return cacheId; + } + + /** {@inheritDoc} */ + @SuppressWarnings( {"LockAcquiredButNotSafelyReleased"}) + @Override public void readLock() { + lock.readLock().lock(); + } + + /** {@inheritDoc} */ + @Override public void readUnlock() { + lock.readLock().unlock(); + } + + /** {@inheritDoc} */ + @Override public void updateTopologyVersion(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture<K, V> exchFut) { + lock.writeLock().lock(); + + try { + assert exchId.topologyVersion() > topVer : "Invalid topology version [topVer=" + topVer + + ", exchId=" + exchId + ']'; + + topVer = exchId.topologyVersion(); + + topReadyFut = exchFut; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public long topologyVersion() { + lock.readLock().lock(); + + try { + assert topVer > 0; + + return topVer; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyVersionFuture() { + lock.readLock().lock(); + + try { + assert topReadyFut != null; + + return topReadyFut; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void beforeExchange(GridDhtPartitionExchangeId exchId) { + ClusterNode loc = cctx.localNode(); + + lock.writeLock().lock(); + + try { + assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; + + if (!exchId.isJoined()) + removeNode(exchId.nodeId()); + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = CU.oldest(cctx, topVer); + + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + + long updateSeq = this.updateSeq.incrementAndGet(); + + // If this is the oldest node. + if (oldest.id().equals(loc.id())) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq); + + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, node2part, false); + + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean afterExchange(GridDhtPartitionExchangeId exchId) throws IgniteCheckedException { + long topVer = exchId.topologyVersion(); + + lock.writeLock().lock(); + + try { + assert topVer == exchId.topologyVersion() : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; + + if (log.isDebugEnabled()) + log.debug("Partition map before afterExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + + updateSeq.incrementAndGet(); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtLocalPartition<K, V> localPartition(int p, long topVer, boolean create) + throws GridDhtInvalidPartitionException { + if (!create) + return null; + + throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition [part=" + p + + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition<K, V> localPartition(K key, boolean create) { + return localPartition(1, -1, create); + } + + /** {@inheritDoc} */ + @Override public List<GridDhtLocalPartition<K, V>> localPartitions() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public Collection<GridDhtLocalPartition<K, V>> currentLocalPartitions() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition<K, V> onAdded(long topVer, GridDhtCacheEntry<K, V> e) { + assert false : "Entry should not be added to client topology: " + e; + + return null; + } + + /** {@inheritDoc} */ + @Override public void onRemoved(GridDhtCacheEntry<K, V> e) { + assert false : "Entry should not be removed from client topology: " + e; + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionMap localPartitionMap() { + lock.readLock().lock(); + + try { + return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), + Collections.<Integer, GridDhtPartitionState>emptyMap(), true); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes(int p, long topVer) { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", node2part=" + node2part + ']'; + + Collection<ClusterNode> nodes = null; + + Collection<UUID> nodeIds = part2node.get(p); + + if (!F.isEmpty(nodeIds)) { + for (UUID nodeId : nodeIds) { + ClusterNode n = cctx.discovery().node(nodeId); + + if (n != null && (topVer < 0 || n.order() <= topVer)) { + if (nodes == null) + nodes = new ArrayList<>(); + + nodes.add(n); + } + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @param p Partition. + * @param topVer Topology version ({@code -1} for all nodes). + * @param state Partition state. + * @param states Additional partition states. + * @return List of nodes for the partition. + */ + private List<ClusterNode> nodes(int p, long topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + Collection<UUID> allIds = topVer > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", allIds=" + allIds + ", node2part=" + node2part + ']'; + + Collection<UUID> nodeIds = part2node.get(p); + + // Node IDs can be null if both, primary and backup, nodes disappear. + int size = nodeIds == null ? 0 : nodeIds.size(); + + if (size == 0) + return Collections.emptyList(); + + List<ClusterNode> nodes = new ArrayList<>(size); + + for (UUID id : nodeIds) { + if (topVer > 0 && !allIds.contains(id)) + continue; + + if (hasState(p, id, state, states)) { + ClusterNode n = cctx.discovery().node(id); + + if (n != null && (topVer < 0 || n.order() <= topVer)) + nodes.add(n); + } + } + + return nodes; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p, long topVer) { + return nodes(p, topVer, OWNING); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> owners(int p) { + return owners(p, -1); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> moving(int p) { + return nodes(p, -1, MOVING); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return List of nodes in state OWNING or MOVING. + */ + private List<ClusterNode> ownersAndMoving(int p, long topVer) { + return nodes(p, topVer, OWNING, MOVING); + } + + /** {@inheritDoc} */ + @Override public long updateSequence() { + return updateSeq.get(); + } + + /** {@inheritDoc} */ + @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { + lock.readLock().lock(); + + try { + assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + + ", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.gridName() + ']'; + + GridDhtPartitionFullMap m = node2part; + + return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionFullMap partMap) { + if (log.isDebugEnabled()) + log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + + lock.writeLock().lock(); + + try { + if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ']'); + + return null; + } + + if (node2part != null && node2part.compareTo(partMap) >= 0) { + if (log.isDebugEnabled()) + log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']'); + + return null; + } + + updateSeq.incrementAndGet(); + + if (exchId != null) + lastExchangeId = exchId; + + if (node2part != null) { + for (GridDhtPartitionMap part : node2part.values()) { + GridDhtPartitionMap newPart = partMap.get(part.nodeId()); + + // If for some nodes current partition has a newer map, + // then we keep the newer value. + if (newPart != null && newPart.updateSequence() < part.updateSequence()) { + if (log.isDebugEnabled()) + log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + + mapString(part) + ", newPart=" + mapString(newPart) + ']'); + + partMap.put(part.nodeId(), part); + } + } + + for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();) { + UUID nodeId = it.next(); + + if (!cctx.discovery().alive(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + + partMap + ']'); + + it.remove(); + } + } + } + + node2part = partMap; + + Map<Integer, Set<UUID>> p2n = new HashMap<>(); + + for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { + for (Integer p : e.getValue().keySet()) { + Set<UUID> ids = p2n.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partitions. + p2n.put(p, ids = U.newHashSet(3)); + + ids.add(e.getKey()); + } + } + + part2node = p2n; + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after full update: " + fullMapString()); + + return null; + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts) { + if (log.isDebugEnabled()) + log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); + + if (!cctx.discovery().alive(parts.nodeId())) { + if (log.isDebugEnabled()) + log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + + ", parts=" + parts + ']'); + + return null; + } + + lock.writeLock().lock(); + + try { + if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { + if (log.isDebugEnabled()) + log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + + lastExchangeId + ", exchId=" + exchId + ']'); + + return null; + } + + if (exchId != null) + lastExchangeId = exchId; + + if (node2part == null) { + U.dumpStack(log, "Created invalid: " + node2part); + + // Create invalid partition map. + node2part = new GridDhtPartitionFullMap(); + } + + GridDhtPartitionMap cur = node2part.get(parts.nodeId()); + + if (cur != null && cur.updateSequence() >= parts.updateSequence()) { + if (log.isDebugEnabled()) + log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId + + ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']'); + + return null; + } + + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part = new GridDhtPartitionFullMap(node2part, updateSeq); + + boolean changed = false; + + if (cur == null || !cur.equals(parts)) + changed = true; + + node2part.put(parts.nodeId(), parts); + + part2node = new HashMap<>(part2node); + + // Add new mappings. + for (Integer p : parts.keySet()) { + Set<UUID> ids = part2node.get(p); + + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); + + changed |= ids.add(parts.nodeId()); + } + + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = part2node.get(p); + + if (ids != null) + changed |= ids.remove(parts.nodeId()); + } + } + + consistencyCheck(); + + if (log.isDebugEnabled()) + log.debug("Partition map after single update: " + fullMapString()); + + return changed ? localPartitionMap() : null; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Updates value for single partition. + * + * @param p Partition. + * @param nodeId Node ID. + * @param state State. + * @param updateSeq Update sequence. + */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) { + assert lock.isWriteLockedByCurrentThread(); + assert nodeId.equals(cctx.localNodeId()); + + // In case if node joins, get topology at the time of joining node. + ClusterNode oldest = CU.oldest(cctx, topVer); + + // If this node became the oldest node. + if (oldest.id().equals(cctx.localNodeId())) { + long seq = node2part.updateSequence(); + + if (seq != updateSeq) { + if (seq > updateSeq) { + if (this.updateSeq.get() < seq) { + // Update global counter if necessary. + boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1); + + assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq + + ", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']'; + + updateSeq = seq + 1; + } + else + updateSeq = seq; + } + + node2part.updateSequence(updateSeq); + } + } + + GridDhtPartitionMap map = node2part.get(nodeId); + + if (map == null) + node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, + Collections.<Integer, GridDhtPartitionState>emptyMap(), false)); + + map.updateSequence(updateSeq); + + map.put(p, state); + + Set<UUID> ids = part2node.get(p); + + if (ids == null) + part2node.put(p, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + + /** + * @param nodeId Node to remove. + */ + private void removeNode(UUID nodeId) { + assert nodeId != null; + assert lock.writeLock().isHeldByCurrentThread(); + + ClusterNode oldest = CU.oldest(cctx, topVer); + + ClusterNode loc = cctx.localNode(); + + if (node2part != null) { + if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) { + updateSeq.setIfGreater(node2part.updateSequence()); + + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(), + node2part, false); + } + else + node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); + + part2node = new HashMap<>(part2node); + + GridDhtPartitionMap parts = node2part.remove(nodeId); + + if (parts != null) { + for (Integer p : parts.keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + if (nodeIds != null) { + nodeIds.remove(nodeId); + + if (nodeIds.isEmpty()) + part2node.remove(p); + } + } + } + + consistencyCheck(); + } + } + + /** {@inheritDoc} */ + @Override public boolean own(GridDhtLocalPartition<K, V> part) { + assert false : "Client topology should never own a partition: " + part; + + return false; + } + + /** {@inheritDoc} */ + @Override public void onEvicted(GridDhtLocalPartition<K, V> part, boolean updateSeq) { + assert updateSeq || lock.isWriteLockedByCurrentThread(); + + lock.writeLock().lock(); + + try { + assert part.state() == EVICTED; + + long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); + + updateLocal(part.id(), cctx.localNodeId(), part.state(), seq); + + consistencyCheck(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) { + lock.readLock().lock(); + + try { + return node2part.get(nodeId); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats(int threshold) { + X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); + } + + /** + * @param p Partition. + * @param nodeId Node ID. + * @param match State to match. + * @param matches Additional states. + * @return Filter for owners of this partition. + */ + private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, + final GridDhtPartitionState... matches) { + if (nodeId == null) + return false; + + GridDhtPartitionMap parts = node2part.get(nodeId); + + // Set can be null if node has been removed. + if (parts != null) { + GridDhtPartitionState state = parts.get(p); + + if (state == match) + return true; + + if (matches != null && matches.length > 0) + for (GridDhtPartitionState s : matches) + if (state == s) + return true; + } + + return false; + } + + /** + * Checks consistency after all operations. + */ + private void consistencyCheck() { + if (CONSISTENCY_CHECK) { + assert lock.writeLock().isHeldByCurrentThread(); + + if (node2part == null) + return; + + for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { + for (Integer p : e.getValue().keySet()) { + Set<UUID> nodeIds = part2node.get(p); + + assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']'; + assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" + + e.getKey() + ", nodeIds=" + nodeIds + ']'; + } + } + + for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { + for (UUID nodeId : e.getValue()) { + GridDhtPartitionMap map = node2part.get(nodeId); + + assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; + assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + + ", nodeId=" + nodeId + ']'; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java new file mode 100644 index 0000000..d947ecf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.nio.*; + +/** + * Affinity assignment request. + */ +public class GridDhtAffinityAssignmentRequest<K, V> extends GridCacheMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version being queried. */ + private long topVer; + + /** + * Empty constructor. + */ + public GridDhtAffinityAssignmentRequest() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param topVer Topology version. + */ + public GridDhtAffinityAssignmentRequest(int cacheId, long topVer) { + this.cacheId = cacheId; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @return Requested topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 79; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtAffinityAssignmentRequest _clone = new GridDhtAffinityAssignmentRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtAffinityAssignmentRequest _clone = (GridDhtAffinityAssignmentRequest)_msg; + + _clone.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (!commState.putLong(topVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + if (buf.remaining() < 8) + return false; + + topVer = commState.getLong(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAffinityAssignmentRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java new file mode 100644 index 0000000..b476667 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.nio.*; +import java.util.*; + +/** + * Affinity assignment response. + */ +public class GridDhtAffinityAssignmentResponse<K, V> extends GridCacheMessage<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Topology version. */ + private long topVer; + + /** Affinity assignment. */ + @GridDirectTransient + @GridToStringInclude + private List<List<ClusterNode>> affAssignment; + + /** Affinity assignment bytes. */ + private byte[] affAssignmentBytes; + + /** + * Empty constructor. + */ + public GridDhtAffinityAssignmentResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param topVer Topology version. + * @param affAssignment Affinity assignment. + */ + public GridDhtAffinityAssignmentResponse(int cacheId, long topVer, List<List<ClusterNode>> affAssignment) { + this.cacheId = cacheId; + this.topVer = topVer; + this.affAssignment = affAssignment; + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** + * @return Topology version. + */ + @Override public long topologyVersion() { + return topVer; + } + + /** + * @return Affinity assignment. + */ + public List<List<ClusterNode>> affinityAssignment() { + return affAssignment; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 80; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridDhtAffinityAssignmentResponse _clone = new GridDhtAffinityAssignmentResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + super.clone0(_msg); + + GridDhtAffinityAssignmentResponse _clone = (GridDhtAffinityAssignmentResponse)_msg; + + _clone.topVer = topVer; + _clone.affAssignment = affAssignment; + _clone.affAssignmentBytes = affAssignmentBytes; + } + + /** {@inheritDoc} + * @param ctx*/ + @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (affAssignment != null) + affAssignmentBytes = ctx.marshaller().marshal(affAssignment); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (affAssignmentBytes != null) + affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 3: + if (!commState.putByteArray(affAssignmentBytes)) + return false; + + commState.idx++; + + case 4: + if (!commState.putLong(topVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (commState.idx) { + case 3: + byte[] affAssignmentBytes0 = commState.getByteArray(); + + if (affAssignmentBytes0 == BYTE_ARR_NOT_READ) + return false; + + affAssignmentBytes = affAssignmentBytes0; + + commState.idx++; + + case 4: + if (buf.remaining() < 8) + return false; + + topVer = commState.getLong(); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAffinityAssignmentResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java new file mode 100644 index 0000000..ecc9419 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * Future that fetches affinity assignment from remote cache nodes. + */ +public class GridDhtAssignmentFetchFuture<K, V> extends GridFutureAdapter<List<List<ClusterNode>>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Nodes order comparator. */ + private static final Comparator<ClusterNode> CMP = new GridNodeOrderComparator(); + + /** Cache context. */ + private final GridCacheContext<K, V> ctx; + + /** List of available nodes this future can fetch data from. */ + private Queue<ClusterNode> availableNodes; + + /** Topology version. */ + private final long topVer; + + /** Pending node from which response is being awaited. */ + private ClusterNode pendingNode; + + /** + * @param ctx Cache context. + * @param availableNodes Available nodes. + */ + public GridDhtAssignmentFetchFuture(GridCacheContext<K, V> ctx, long topVer, Collection<ClusterNode> availableNodes) { + super(ctx.kernalContext()); + + this.ctx = ctx; + + this.topVer = topVer; + + LinkedList<ClusterNode> tmp = new LinkedList<>(); + tmp.addAll(availableNodes); + Collections.sort(tmp, CMP); + + this.availableNodes = tmp; + } + + /** + * Initializes fetch future. + */ + public void init() { + ((GridDhtPreloader<K, V>)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this); + + requestFromNextNode(); + } + + /** + * @param node Node. + * @param res Reponse. + */ + public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse<K, V> res) { + if (res.topologyVersion() != topVer) { + if (log.isDebugEnabled()) + log.debug("Received affinity assignment for wrong topolgy version (will ignore) " + + "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']'); + + return; + } + + List<List<ClusterNode>> assignment = null; + + synchronized (this) { + if (pendingNode != null && pendingNode.equals(node)) + assignment = res.affinityAssignment(); + } + + if (assignment != null) + onDone(assignment); + } + + /** + * @param leftNodeId Left node ID. + */ + public void onNodeLeft(UUID leftNodeId) { + synchronized (this) { + if (pendingNode != null && pendingNode.id().equals(leftNodeId)) { + availableNodes.remove(pendingNode); + + pendingNode = null; + } + } + + requestFromNextNode(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + ((GridDhtPreloader<K, V>)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this); + + return true; + } + + return false; + } + + /** + * Requests affinity from next node in the list. + */ + private void requestFromNextNode() { + boolean complete; + + // Avoid 'protected field is accessed in synchronized context' warning. + IgniteLogger log0 = log; + + synchronized (this) { + while (!availableNodes.isEmpty()) { + ClusterNode node = availableNodes.poll(); + + try { + if (log0.isDebugEnabled()) + log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() + + ", node=" + node + ']'); + + ctx.io().send(node, new GridDhtAffinityAssignmentRequest<K, V>(ctx.cacheId(), topVer), + AFFINITY_POOL); + + // Close window for listener notification. + if (ctx.discovery().node(node.id()) == null) { + U.warn(log0, "Failed to request affinity assignment from remote node (node left grid, will " + + "continue to another node): " + node); + + continue; + } + + pendingNode = node; + + break; + } + catch (ClusterTopologyException ignored) { + U.warn(log0, "Failed to request affinity assignment from remote node (node left grid, will " + + "continue to another node): " + node); + } + catch (IgniteCheckedException e) { + U.error(log0, "Failed to request affinity assignment from remote node (will " + + "continue to another node): " + node, e); + } + } + + complete = pendingNode == null; + } + + // No more nodes left, complete future with null outside of synchronization. + // Affinity should be calculated from scratch. + if (complete) + onDone((List<List<ClusterNode>>)null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java new file mode 100644 index 0000000..b789e99 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCache.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.tostring.*; + +import java.io.*; + +/** + * DHT cache. + */ +public class GridDhtCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Near cache. */ + @GridToStringExclude + private GridNearTransactionalCache<K, V> near; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtCache() { + // No-op. + } + + /** + * @param ctx Context. + */ + public GridDhtCache(GridCacheContext<K, V> ctx) { + super(ctx); + } + + /** + * @param ctx Cache context. + * @param map Cache map. + */ + public GridDhtCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) { + super(ctx, map); + } + + /** {@inheritDoc} */ + @Override public boolean isDht() { + return true; + } + + /** {@inheritDoc} */ + @Override public String name() { + String name = super.name(); + + return name == null ? "defaultDhtCache" : name + "Dht"; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + resetMetrics(); + + super.start(); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + GridCacheMetricsAdapter m = new GridCacheMetricsAdapter(); + + m.delegate(ctx.dht().near().metrics0()); + + metrics = m; + + ctx.dr().resetMetrics(); + } + + /** + * @return Near cache. + */ + @Override public GridNearTransactionalCache<K, V> near() { + return near; + } + + /** + * @param near Near cache. + */ + public void near(GridNearTransactionalCache<K, V> near) { + this.near = near; + } +}