ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d0b7d9fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d0b7d9fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d0b7d9fc Branch: refs/heads/ignite-1093 Commit: d0b7d9fca8713aefeb6e6477679efb9d7a8db9e0 Parents: 76ba5d9 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Tue Aug 11 18:09:00 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Tue Aug 11 18:09:00 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCachePreloader.java | 2 +- .../cache/GridCachePreloaderAdapter.java | 2 +- .../dht/preloader/GridDhtPartitionDemander.java | 941 +++++++------------ .../dht/preloader/GridDhtPartitionSupplier.java | 21 +- .../dht/preloader/GridDhtPreloader.java | 2 +- 5 files changed, 328 insertions(+), 640 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index b8bb08e..1e915eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -91,7 +91,7 @@ public interface GridCachePreloader { * @param assignments Assignments to add. * @param forcePreload Force preload flag. */ - public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload); + public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException; /** * @param p Preload predicate. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 0adf510..68deb2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -142,7 +142,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException { // No-op. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index fdd101e..e177dae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -27,30 +27,25 @@ import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.timeout.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; import org.jsr166.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; -import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.dr.GridDrType.*; /** - * Thread pool for requesting partitions from other nodes - * and populating local cache. + * Thread pool for requesting partitions from other nodes and populating local cache. */ @SuppressWarnings("NonConstantFieldWithUpperCaseName") public class GridDhtPartitionDemander { @@ -63,35 +58,25 @@ public class GridDhtPartitionDemander { /** */ private final ReadWriteLock busyLock; - /** */ - @GridToStringInclude - private final Collection<DemandWorker> dmdWorkers; - /** Preload predicate. */ private IgnitePredicate<GridCacheEntryInfo> preloadPred; /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */ @GridToStringInclude - private SyncFuture syncFut; - - /** Preload timeout. */ - private final AtomicLong timeout; - - /** Allows demand threads to synchronize their step. */ - private CyclicBarrier barrier; + private volatile SyncFuture syncFut; /** Demand lock. */ private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); - /** */ - private int poolSize; - /** Last timeout object. */ private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>(); /** Last exchange future. */ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; + /** Assignments. */ + private volatile GridDhtPreloaderAssignments assigns; + /** * @param cctx Cache context. * @param busyLock Shutdown lock. @@ -107,53 +92,47 @@ public class GridDhtPartitionDemander { boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0; - if (enabled) { - barrier = new CyclicBarrier(poolSize); - dmdWorkers = new ArrayList<>(poolSize); + for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) { + final int idx = cnt; - for (int i = 0; i < poolSize; i++) - dmdWorkers.add(new DemandWorker(i)); + cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { + @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) { + enterBusy(); - syncFut = new SyncFuture(dmdWorkers); + try { + handleSupplyMessage(idx, id, m); + } + finally { + leaveBusy(); + } + } + }); + } } - else { - dmdWorkers = Collections.emptyList(); - syncFut = new SyncFuture(dmdWorkers); + syncFut = new SyncFuture(); + if (!enabled) // Calling onDone() immediately since preloading is disabled. syncFut.onDone(); - } - - timeout = new AtomicLong(cctx.config().getRebalanceTimeout()); } /** * */ void start() { - if (poolSize > 0) { - for (DemandWorker w : dmdWorkers) - new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start(); - } } /** * */ void stop() { - U.cancel(dmdWorkers); - - if (log.isDebugEnabled()) - log.debug("Before joining on demand workers: " + dmdWorkers); - - U.join(dmdWorkers, log); - - if (log.isDebugEnabled()) - log.debug("After joining on demand workers: " + dmdWorkers); + if (cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) + cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId())); + } lastExchangeFut = null; @@ -177,13 +156,6 @@ public class GridDhtPartitionDemander { } /** - * @return Size of this thread pool. - */ - int poolSize() { - return poolSize; - } - - /** * Force preload. */ void forcePreload() { @@ -225,23 +197,22 @@ public class GridDhtPartitionDemander { * @param idx * @return topic */ - static Object topic(int idx, int cacheId, UUID nodeId) { - return TOPIC_CACHE.topic("DemandPool", nodeId, cacheId, idx);//Todo: remove nodeId + static Object topic(int idx, int cacheId) { + return TOPIC_CACHE.topic("Demander", cacheId, idx); } /** - * + * @return {@code True} if topology changed. */ - private void leaveBusy() { - busyLock.readLock().unlock(); + private boolean topologyChanged(AffinityTopologyVersion topVer) { + return !cctx.affinity().affinityTopologyVersion().equals(topVer); } /** - * @param type Type. - * @param discoEvt Discovery event. + * */ - private void preloadEvent(int type, DiscoveryEvent discoEvt) { - preloadEvent(-1, type, discoEvt); + private void leaveBusy() { + busyLock.readLock().unlock(); } /** @@ -256,28 +227,6 @@ public class GridDhtPartitionDemander { } /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(time, MILLISECONDS); - } - - /** * @param p Partition. * @param topVer Topology version. * @return Picked owners. @@ -316,7 +265,7 @@ public class GridDhtPartitionDemander { * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ - void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -325,341 +274,194 @@ public class GridDhtPartitionDemander { if (delay == 0 || force) { assert assigns != null; - synchronized (dmdWorkers) { - for (DemandWorker w : dmdWorkers) - w.addAssignments(assigns); - } - } - else if (delay > 0) { - assert !force; + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - GridTimeoutObject obj = lastTimeoutObj.get(); + if (this.assigns != null) { + syncFut.get(); - if (obj != null) - cctx.time().removeTimeoutObject(obj); - - final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; - - assert exchFut != null : "Delaying rebalance process without topology event."; - - obj = new GridTimeoutObjectAdapter(delay) { - @Override public void onTimeout() { - exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { - cctx.shared().exchange().forcePreloadExchange(exchFut); - } - }); - } - }; - - lastTimeoutObj.set(obj); + syncFut = new SyncFuture(); + } - cctx.time().addTimeoutObject(obj); - } - } + if (assigns.isEmpty() || topologyChanged(topVer)) { + syncFut.onDone(); - /** - * - */ - void unwindUndeploys() { - demandLock.writeLock().lock(); + return; + } - try { - cctx.deploy().unwind(cctx); - } - finally { - demandLock.writeLock().unlock(); - } - } + this.assigns = assigns; - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtPartitionDemander.class, this); - } + for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { + GridDhtPartitionDemandMessage d = e.getValue(); - /** - * - */ - private class DemandWorker extends GridWorker { - /** Worker ID. */ - private int id; + d.timeout(cctx.config().getRebalanceTimeout()); + d.workerId(0);//old api support. - /** Partition-to-node assignments. */ - private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>(); + ClusterNode node = e.getKey(); - /** Hide worker logger and use cache logger instead. */ - private IgniteLogger log = GridDhtPartitionDemander.this.log; + GridConcurrentHashSet<Integer> remainings = new GridConcurrentHashSet<>(); - /** - * @param id Worker ID. - */ - private DemandWorker(int id) { - super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemander.this.log); + remainings.addAll(d.partitions()); - assert id >= 0; + syncFut.append(node.id(), remainings); - this.id = id; - } + int lsnrCnt = cctx.config().getRebalanceThreadPoolSize(); - /** - * @param assigns Assignments. - */ - void addAssignments(GridDhtPreloaderAssignments assigns) { - assert assigns != null; + List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt); - assignQ.offer(assigns); + for (int cnt = 0; cnt < lsnrCnt; cnt++) + sParts.add(new HashSet<Integer>()); - if (log.isDebugEnabled()) - log.debug("Added assignments to worker: " + this); - } + Iterator<Integer> it = d.partitions().iterator(); - /** - * @return {@code True} if topology changed. - */ - private boolean topologyChanged() { - return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged(); - } + int cnt = 0; - /** - * @param pick Node picked for preloading. - * @param p Partition. - * @param entry Preloaded entry. - * @param topVer Topology version. - * @return {@code False} if partition has become invalid during preloading. - * @throws IgniteInterruptedCheckedException If interrupted. - */ - private boolean preloadEntry( - ClusterNode pick, - int p, - GridCacheEntryInfo entry, - AffinityTopologyVersion topVer - ) throws IgniteCheckedException { - try { - GridCacheEntryEx cached = null; + while (it.hasNext()) + sParts.get(cnt++ % lsnrCnt).add(it.next()); - try { - cached = cctx.dht().entryEx(entry.key()); + for (cnt = 0; cnt < lsnrCnt; cnt++) { - if (log.isDebugEnabled()) - log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); + if (!sParts.get(cnt).isEmpty()) { - if (cctx.dht().isIgfsDataCache() && - cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) { - LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " + - "value, will ignore rebalance entries): " + name()); + // Create copy. + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); - if (cached.markObsoleteIfEmpty(null)) - cached.context().cache().removeIfObsolete(cached.key()); + initD.topic(topic(cnt, cctx.cacheId())); - return true; - } - - if (preloadPred == null || preloadPred.apply(entry)) { - if (cached.initialValue( - entry.value(), - entry.version(), - entry.ttl(), - entry.expireTime(), - true, - topVer, - cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE - )) { - cctx.evicts().touch(cached, topVer); // Start tracking. - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null); + try { + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout()); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to send partition demand message to local node", ex); } - else if (log.isDebugEnabled()) - log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + - ", part=" + p + ']'); } - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + - cached.key() + ", part=" + p + ']'); - } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); - } - - return true; - } - - /** - * @param node Node to demand from. - * @param topVer Topology version. - * @param d Demand message. - * @param exchFut Exchange future. - * @return Missed partitions. - * @throws InterruptedException If interrupted. - * @throws ClusterTopologyCheckedException If node left. - * @throws IgniteCheckedException If failed to send message. - */ - private Set<Integer> demandFromNode( - final ClusterNode node, - final AffinityTopologyVersion topVer, - final GridDhtPartitionDemandMessage d, - final GridDhtPartitionsExchangeFuture exchFut - ) throws InterruptedException, IgniteCheckedException { - final GridDhtPartitionTopology top = cctx.dht().topology(); - long timeout = GridDhtPartitionDemander.this.timeout.get(); + if (log.isInfoEnabled() && !d.partitions().isEmpty()) { + LinkedList<Integer> s = new LinkedList<>(d.partitions()); - d.timeout(timeout); - d.workerId(id); + Collections.sort(s); - final Set<Integer> missed = new HashSet<>(); + StringBuilder sb = new StringBuilder(); - final ConcurrentHashMap8<Integer, Boolean> remaining = new ConcurrentHashMap8<>(); + int start = -1; - for (int p : d.partitions()) - remaining.put(p, false); + int prev = -1; - if (isCancelled() || topologyChanged()) - return missed; + Iterator<Integer> sit = s.iterator(); - int threadCnt = cctx.config().getRebalanceThreadPoolSize(); //todo = getRebalanceThreadPoolSize / assigns.count + while (sit.hasNext()) { + int p = sit.next(); + if (start == -1) { + start = p; + prev = p; + } - List<Set<Integer>> sParts = new ArrayList<>(threadCnt); + if (prev < p - 1) { + sb.append(start); - int cnt = 0; + if (start != prev) + sb.append("-").append(prev); - while (cnt < threadCnt) { - sParts.add(new HashSet<Integer>()); + sb.append(", "); - final int idx = cnt; + start = p; + } - cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId(), node.id()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(UUID id, GridDhtPartitionSupplyMessage m) { - enterBusy(); + if (!sit.hasNext()) { + sb.append(start); - try { - handleSupplyMessage(idx, new SupplyMessage(id, m), node, topVer, top, - exchFut, missed, d, remaining); - }finally{ - leaveBusy(); + if (start != p) + sb.append("-").append(p); } + + prev = p; } - }); - cnt++; + log.info("Requested rebalancing [from node=" + node.id() + ", partitions=" + s.size() + " (" + sb.toString() + ")]"); + } } + } + else if (delay > 0) { + GridTimeoutObject obj = lastTimeoutObj.get(); - Iterator<Integer> it = d.partitions().iterator(); - - cnt = 0; + if (obj != null) + cctx.time().removeTimeoutObject(obj); - while (it.hasNext()) { - sParts.get(cnt % threadCnt).add(it.next()); + final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; - cnt++; - } + assert exchFut != null : "Delaying rebalance process without topology event."; - try { - cnt = 0; + obj = new GridTimeoutObjectAdapter(delay) { + @Override public void onTimeout() { + exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { + cctx.shared().exchange().forcePreloadExchange(exchFut); + } + }); + } + }; - while (cnt < threadCnt) { + lastTimeoutObj.set(obj); - // Create copy. - GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + cctx.time().addTimeoutObject(obj); + } + } - initD.topic(topic(cnt, cctx.cacheId(),node.id())); + /** + * + */ + void unwindUndeploys() { + demandLock.writeLock().lock(); - try { - if (logg && cctx.name().equals("cache")) - System.out.println("D "+cnt + " initial Demand "+" "+cctx.localNode().id()); + try { + cctx.deploy().unwind(cctx); + } + finally { + demandLock.writeLock().unlock(); + } + } - cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(cnt, cctx.cacheId()), initD, cctx.ioPolicy(), d.timeout()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition demand message to local node", e); - } + /** + * @param idx Index. + * @param id Node id. + * @param supply Supply. + */ + private void handleSupplyMessage( + int idx, + final UUID id, + final GridDhtPartitionSupplyMessage supply) { + ClusterNode node = cctx.node(id); - cnt++; - } + assert node != null; - do { - U.sleep(1000);//Todo: improve - } - while (!isCancelled() && !topologyChanged() && !remaining.isEmpty()); + GridDhtPartitionDemandMessage d = assigns.get(node); - return missed; - } - finally { - cnt = 0; + AffinityTopologyVersion topVer = d.topologyVersion(); - while (cnt < threadCnt) { - cctx.io().removeOrderedHandler(topic(cnt,cctx.cacheId(), node.id())); + if (topologyChanged(topVer)) { + syncFut.cancel(id); - cnt++; - } - } + return; } - boolean logg = false; - - /** - * @param s Supply message. - * @param node Node. - * @param topVer Topology version. - * @param top Topology. - * @param exchFut Exchange future. - * @param missed Missed. - * @param d initial DemandMessage. - */ - private void handleSupplyMessage( - int idx, - SupplyMessage s, - ClusterNode node, - AffinityTopologyVersion topVer, - GridDhtPartitionTopology top, - GridDhtPartitionsExchangeFuture exchFut, - Set<Integer> missed, - GridDhtPartitionDemandMessage d, - ConcurrentHashMap8 remaining) { - - if (logg && cctx.name().equals("cache")) - System.out.println("D "+idx + " handled supply message "+ cctx.localNode().id()); - - // Check that message was received from expected node. - if (!s.senderId().equals(node.id())) { - U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + - ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); + if (log.isDebugEnabled()) + log.debug("Received supply message: " + supply); - return; - } + // Check whether there were class loading errors on unmarshal + if (supply.classError() != null) { + if (log.isDebugEnabled()) + log.debug("Class got undeployed during preloading: " + supply.classError()); - if (topologyChanged()) - return; + syncFut.cancel(id); - if (log.isDebugEnabled()) - log.debug("Received supply message: " + s); + return; + } - GridDhtPartitionSupplyMessage supply = s.supply(); + final GridDhtPartitionTopology top = cctx.dht().topology(); - // Check whether there were class loading errors on unmarshal - if (supply.classError() != null) { - if (log.isDebugEnabled()) - log.debug("Class got undeployed during preloading: " + supply.classError()); + GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture(); - return; - } + try { // Preload. for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) { @@ -689,19 +491,12 @@ public class GridDhtPartitionDemander { continue; } - try { - if (!preloadEntry(node, p, entry, topVer)) { - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - - break; - } - } - catch (IgniteCheckedException ex) { - cancel(); + if (!preloadEntry(node, p, entry, topVer)) { + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - return; + break; } } @@ -710,12 +505,9 @@ public class GridDhtPartitionDemander { // If message was last for this partition, // then we take ownership. if (last) { - top.own(part);//todo: close future? - -// if (logg && cctx.name().equals("cache")) -// System.out.println("D "+idx + " last "+ p +" "+ cctx.localNode().id()); + top.own(part); - remaining.remove(p); + syncFut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); @@ -731,218 +523,139 @@ public class GridDhtPartitionDemander { } } else { - remaining.remove(p); + syncFut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - remaining.remove(p); + syncFut.onPartitionDone(id, p); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); } } - for (Integer miss : s.supply().missed()) - remaining.remove(miss); - // Only request partitions based on latest topology version. - for (Integer miss : s.supply().missed()) + for (Integer miss : supply.missed()) if (cctx.affinity().localNode(miss, topVer)) - missed.add(miss); + syncFut.onMissedPartition(id, miss); - if (!remaining.isEmpty()) { - try { - // Create copy. - GridDhtPartitionDemandMessage nextD = - new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet()); + for (Integer miss : supply.missed()) + syncFut.onPartitionDone(id, miss); - nextD.topic(topic(idx, cctx.cacheId(), node.id())); + if (!syncFut.isDone()) { - // Send demand message. - cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()), - nextD, cctx.ioPolicy(), d.timeout()); + // Create copy. + GridDhtPartitionDemandMessage nextD = + new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet()); - if (logg && cctx.name().equals("cache")) - System.out.println("D " + idx + " ack " + cctx.localNode().id()); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); + nextD.topic(topic(idx, cctx.cacheId())); - cancel(); - } + // Send demand message. + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()), + nextD, cctx.ioPolicy(), d.timeout()); } } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Node left during rebalancing (will retry) [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + syncFut.cancel(id); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to receive partitions from node (rebalancing will not " + + "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - int rebalanceOrder = cctx.config().getRebalanceOrder(); - - if (!CU.isMarshallerCache(cctx.name())) { - if (log.isDebugEnabled()) - log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); + syncFut.cancel(id); + } + } - try { - cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get(); - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " + - "[cacheName=" + cctx.name() + ']'); + /** + * @param pick Node picked for preloading. + * @param p Partition. + * @param entry Preloaded entry. + * @param topVer Topology version. + * @return {@code False} if partition has become invalid during preloading. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private boolean preloadEntry( + ClusterNode pick, + int p, + GridCacheEntryInfo entry, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + try { + GridCacheEntryEx cached = null; - return; - } - catch (IgniteCheckedException e) { - throw new Error("Ordered preload future should never fail: " + e.getMessage(), e); - } - } + try { + cached = cctx.dht().entryEx(entry.key()); - if (rebalanceOrder > 0) { - IgniteInternalFuture<?> fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder); + if (log.isDebugEnabled()) + log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); - try { - if (fut != null) { - if (log.isDebugEnabled()) - log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() + - ", rebalanceOrder=" + rebalanceOrder + ']'); + if (cctx.dht().isIgfsDataCache() && + cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) { + LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " + + "value, will ignore rebalance entries)"); - fut.get(); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to wait for ordered rebalance future (grid is stopping): " + - "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']'); + if (cached.markObsoleteIfEmpty(null)) + cached.context().cache().removeIfObsolete(cached.key()); - return; - } - catch (IgniteCheckedException e) { - throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e); - } + return true; } - GridDhtPartitionsExchangeFuture exchFut = null; - - boolean stopEvtFired = false; - - while (!isCancelled()) { - try { - barrier.await(); - - if (id == 0 && exchFut != null && !exchFut.dummy() && - cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) { - - if (!cctx.isReplicated() || !stopEvtFired) { - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - - stopEvtFired = true; - } - } - } - catch (BrokenBarrierException ignore) { - throw new InterruptedException("Demand worker stopped."); - } - - // Sync up all demand threads at this step. - GridDhtPreloaderAssignments assigns = null; - - while (assigns == null) - assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); - - demandLock.readLock().lock(); - - try { - exchFut = assigns.exchangeFuture(); - - // Assignments are empty if preloading is disabled. - if (assigns.isEmpty()) - continue; - - boolean resync = false; - - // While. - // ===== - while (!isCancelled() && !topologyChanged() && !resync) { - Collection<Integer> missed = new HashSet<>(); - - // For. - // === - for (ClusterNode node : assigns.keySet()) { - if (topologyChanged() || isCancelled()) - break; // For. - - GridDhtPartitionDemandMessage d = assigns.remove(node); - - // If another thread is already processing this message, - // move to the next node. - if (d == null) - continue; // For. - - try { - Set<Integer> set = demandFromNode(node, assigns.topologyVersion(), d, exchFut); - - if (!set.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" + - set + ']'); - - missed.addAll(set); - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Node left during rebalancing (will retry) [node=" + node.id() + - ", msg=" + e.getMessage() + ']'); - - resync = true; - - break; // For. - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); - } - } - - // Processed missed entries. - if (!missed.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Reassigning partitions that were missed: " + missed); - - assert exchFut.exchangeId() != null; - - cctx.shared().exchange().forceDummyExchange(true, exchFut); - } - else - break; // While. - } - } - finally { - demandLock.readLock().unlock(); - - syncFut.onWorkerDone(this); + if (preloadPred == null || preloadPred.apply(entry)) { + if (cached.initialValue( + entry.value(), + entry.version(), + entry.ttl(), + entry.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE + )) { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, + false, null, null, null); } - - cctx.shared().exchange().scheduleResendPartitions(); + else if (log.isDebugEnabled()) + log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + p + ']'); } + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); } - finally { - // Safety. - syncFut.onWorkerDone(this); + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + + cached.key() + ", part=" + p + ']'); } - } + catch (GridDhtInvalidPartitionException ignored) { + if (log.isDebugEnabled()) + log.debug("Partition became invalid during rebalancing (will ignore): " + p); - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DemandWorker.class, this, "assignQ", assignQ, "super", super.toString()); + return false; + } + } + catch (IgniteInterruptedCheckedException e) { + throw e; } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + + cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionDemander.class, this); } /** @@ -1035,88 +748,80 @@ public class GridDhtPartitionDemander { return assigns; } - /** - * - */ - private class SyncFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; +/** + * + */ +private class SyncFuture extends GridFutureAdapter<Object> { + /** */ + private static final long serialVersionUID = 1L; - /** Remaining workers. */ - private Collection<DemandWorker> remaining; + private ConcurrentHashMap8<UUID, Collection<Integer>> remaining = new ConcurrentHashMap8<>(); - /** - * @param workers List of workers. - */ - private SyncFuture(Collection<DemandWorker> workers) { - assert workers.size() == poolSize(); + private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); - remaining = Collections.synchronizedList(new LinkedList<>(workers)); - } + public void append(UUID nodeId, Collection<Integer> parts) { + remaining.put(nodeId, parts); - /** - * @param w Worker who iterated through all partitions. - */ - void onWorkerDone(DemandWorker w) { - if (isDone()) - return; + missed.put(nodeId, new GridConcurrentHashSet<Integer>()); + } - if (remaining.remove(w)) - if (log.isDebugEnabled()) - log.debug("Completed full partition iteration for worker [worker=" + w + ']'); + void cancel(UUID nodeId) { + if (isDone()) + return; - if (remaining.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Completed sync future."); + remaining.remove(nodeId); - onDone(); - } - } + checkIsDone(); } - /** - * Supply message wrapper. - */ - private static class SupplyMessage { - /** Sender ID. */ - private UUID sndId; - - /** Supply message. */ - private GridDhtPartitionSupplyMessage supply; - - /** - * Dummy constructor. - */ - private SupplyMessage() { - // No-op. - } + void onMissedPartition(UUID nodeId, int p) { + if (missed.get(nodeId) == null) + missed.put(nodeId, new GridConcurrentHashSet<Integer>()); - /** - * @param sndId Sender ID. - * @param supply Supply message. - */ - SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { - this.sndId = sndId; - this.supply = supply; - } + missed.get(nodeId).add(p); + } - /** - * @return Sender ID. - */ - UUID senderId() { - return sndId; - } + void onPartitionDone(UUID nodeId, int p) { + if (isDone()) + return; + + Collection<Integer> parts = remaining.get(nodeId); + + parts.remove(p); - /** - * @return Message. - */ - GridDhtPartitionSupplyMessage supply() { - return supply; + if (parts.isEmpty()) { + remaining.remove(nodeId); + + if (log.isDebugEnabled()) + log.debug("Completed full partition iteration for node [nodeId=" + nodeId + ']'); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SupplyMessage.class, this); + checkIsDone(); + } + + private void checkIsDone() { + if (remaining.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Completed sync future."); + + Collection<Integer> m = new HashSet<>(); + + for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) { + if (e.getValue() != null && !e.getValue().isEmpty()) + m.addAll(e.getValue()); + } + + if (!m.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Reassigning partitions that were missed: " + m); + + cctx.shared().exchange().forceDummyExchange(true, assigns.exchangeFuture()); + } + + missed.clear(); + + onDone(); } } } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 920d10d..b948fbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -108,7 +108,7 @@ class GridDhtPartitionSupplier { * @return topic */ static Object topic(int idx, int id) { - return TOPIC_CACHE.topic("SupplyPool", idx, id); + return TOPIC_CACHE.topic("Supplier", idx, id); } /** @@ -138,8 +138,6 @@ class GridDhtPartitionSupplier { this.preloadPred = preloadPred; } - boolean logg = false; - /** * @return {@code true} if entered to busy state. */ @@ -172,9 +170,6 @@ class GridDhtPartitionSupplier { if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) return; - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " process message " + cctx.localNode().id()); - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); @@ -191,12 +186,8 @@ class GridDhtPartitionSupplier { doneMap.remove(scId); } - if (doneMap.get(scId) != null) { - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " exit " + cctx.localNode().id()); - + if (doneMap.get(scId) != null) return; - } long bCnt = 0; @@ -282,9 +273,6 @@ class GridDhtPartitionSupplier { return; } else { - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id()); - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), cctx.cacheId()); } @@ -473,9 +461,6 @@ class GridDhtPartitionSupplier { // Mark as last supply message. s.last(part); -// if (logg && cctx.name().equals("cache")) -// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id()); - phase = 0; sctx = null; @@ -508,8 +493,6 @@ class GridDhtPartitionSupplier { */ private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) throws IgniteCheckedException { - if (logg && cctx.name().equals("cache")) - System.out.println("S sent "+ cctx.localNode().id()); try { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d0b7d9fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index a22f281..8a097ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -254,7 +254,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) throws IgniteCheckedException { demander.addAssignments(assignments, forcePreload); }