ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db72f531 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db72f531 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db72f531 Branch: refs/heads/ignite-1093 Commit: db72f531342231cf45f49e395056c12cce3a79e5 Parents: d0b7d9f Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Tue Aug 11 18:58:05 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Tue Aug 11 18:58:05 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 135 +------------------ .../dht/preloader/GridDhtPreloader.java | 123 ++++++++++++++++- 2 files changed, 122 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e177dae..fca9f53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -65,9 +65,6 @@ public class GridDhtPartitionDemander { @GridToStringInclude private volatile SyncFuture syncFut; - /** Demand lock. */ - private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); - /** Last timeout object. */ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>(); @@ -227,44 +224,10 @@ public class GridDhtPartitionDemander { } /** - * @param p Partition. - * @param topVer Topology version. - * @return Picked owners. - */ - private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { - Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); - - int affCnt = affNodes.size(); - - Collection<ClusterNode> rmts = remoteOwners(p, topVer); - - int rmtCnt = rmts.size(); - - if (rmtCnt <= affCnt) - return rmts; - - List<ClusterNode> sorted = new ArrayList<>(rmts); - - // Sort in descending order, so nodes with higher order will be first. - Collections.sort(sorted, CU.nodeComparator(false)); - - // Pick newest nodes. - return sorted.subList(0, affCnt); - } - - /** - * @param p Partition. - * @param topVer Topology version. - * @return Nodes owning this partition. - */ - private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { - return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); - } - - /** * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -408,20 +371,6 @@ public class GridDhtPartitionDemander { } /** - * - */ - void unwindUndeploys() { - demandLock.writeLock().lock(); - - try { - cctx.deploy().unwind(cctx); - } - finally { - demandLock.writeLock().unlock(); - } - } - - /** * @param idx Index. * @param id Node id. * @param supply Supply. @@ -666,88 +615,6 @@ public class GridDhtPartitionDemander { void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { lastExchangeFut = lastFut; } - - /** - * @param exchFut Exchange future. - * @return Assignments of partitions to nodes. - */ - GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { - // No assignments for disabled preloader. - GridDhtPartitionTopology top = cctx.dht().topology(); - - if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - - int partCnt = cctx.affinity().partitions(); - - assert exchFut.forcePreload() || exchFut.dummyReassign() || - exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : - "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", topVer=" + top.topologyVersion() + ']'; - - GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - - AffinityTopologyVersion topVer = assigns.topologyVersion(); - - for (int p = 0; p < partCnt; p++) { - if (cctx.shared().exchange().hasPendingExchange()) { - if (log.isDebugEnabled()) - log.debug("Skipping assignments creation, exchange worker has pending assignments: " + - exchFut.exchangeId()); - - break; - } - - // If partition belongs to local node. - if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); - - assert part != null; - assert part.id() == p; - - if (part.state() != MOVING) { - if (log.isDebugEnabled()) - log.debug("Skipping partition assignment (state is not MOVING): " + part); - - continue; // For. - } - - Collection<ClusterNode> picked = pickedOwners(p, topVer); - - if (picked.isEmpty()) { - top.own(part); - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); - } - - if (log.isDebugEnabled()) - log.debug("Owning partition as there are no other owners: " + part); - } - else { - ClusterNode n = F.first(picked); - - GridDhtPartitionDemandMessage msg = assigns.get(n); - - if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage( - top.updateSequence(), - exchFut.exchangeId().topologyVersion(), - cctx.cacheId())); - } - - msg.addPartition(p); - } - } - } - - return assigns; - } - /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db72f531/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 8a097ed..d994a19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.util.GridConcurrentFactory.*; /** @@ -76,6 +77,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts = new ConcurrentHashMap8<>(); + /** Demand lock. */ + private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -250,7 +254,115 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { - return demander.assign(exchFut); + // No assignments for disabled preloader. + GridDhtPartitionTopology top = cctx.dht().topology(); + + if (!cctx.rebalanceEnabled()) + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + + int partCnt = cctx.affinity().partitions(); + + assert exchFut.forcePreload() || exchFut.dummyReassign() || + exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : + "Topology version mismatch [exchId=" + exchFut.exchangeId() + + ", topVer=" + top.topologyVersion() + ']'; + + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + + AffinityTopologyVersion topVer = assigns.topologyVersion(); + + for (int p = 0; p < partCnt; p++) { + if (cctx.shared().exchange().hasPendingExchange()) { + if (log.isDebugEnabled()) + log.debug("Skipping assignments creation, exchange worker has pending assignments: " + + exchFut.exchangeId()); + + break; + } + + // If partition belongs to local node. + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + assert part.id() == p; + + if (part.state() != MOVING) { + if (log.isDebugEnabled()) + log.debug("Skipping partition assignment (state is not MOVING): " + part); + + continue; // For. + } + + Collection<ClusterNode> picked = pickedOwners(p, topVer); + + if (picked.isEmpty()) { + top.own(part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + DiscoveryEvent discoEvt = exchFut.discoveryEvent(); + + cctx.events().addPreloadEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), + discoEvt.type(), discoEvt.timestamp()); + } + + if (log.isDebugEnabled()) + log.debug("Owning partition as there are no other owners: " + part); + } + else { + ClusterNode n = F.first(picked); + + GridDhtPartitionDemandMessage msg = assigns.get(n); + + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchFut.exchangeId().topologyVersion(), + cctx.cacheId())); + } + + msg.addPartition(p); + } + } + } + + return assigns; + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Picked owners. + */ + private Collection<ClusterNode> pickedOwners(int p, AffinityTopologyVersion topVer) { + Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer); + + int affCnt = affNodes.size(); + + Collection<ClusterNode> rmts = remoteOwners(p, topVer); + + int rmtCnt = rmts.size(); + + if (rmtCnt <= affCnt) + return rmts; + + List<ClusterNode> sorted = new ArrayList<>(rmts); + + // Sort in descending order, so nodes with higher order will be first. + Collections.sort(sorted, CU.nodeComparator(false)); + + // Pick newest nodes. + return sorted.subList(0, affCnt); + } + + /** + * @param p Partition. + * @param topVer Topology version. + * @return Nodes owning this partition. + */ + private Collection<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) { + return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); } /** {@inheritDoc} */ @@ -531,7 +643,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void unwindUndeploys() { - demander.unwindUndeploys(); + demandLock.writeLock().lock(); + + try { + cctx.deploy().unwind(cctx); + } + finally { + demandLock.writeLock().unlock(); + } } /**