http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java new file mode 100644 index 0000000..920d10d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jsr166.*; + +import java.util.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Thread pool for supplying partitions to demanding nodes. + */ +class GridDhtPartitionSupplier { + /** */ + private final GridCacheContext<?, ?> cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private final ReadWriteLock busyLock; + + /** */ + private GridDhtPartitionTopology top; + + /** */ + private final boolean depEnabled; + + /** Preload predicate. */ + private IgnitePredicate<GridCacheEntryInfo> preloadPred; + + /** Supply context map. */ + private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>(); + + /** Done map. */ + private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>(); + + /** + * @param cctx Cache context. + * @param busyLock Shutdown lock. + */ + GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { + assert cctx != null; + assert busyLock != null; + + this.cctx = cctx; + this.busyLock = busyLock; + + log = cctx.logger(getClass()); + + top = cctx.dht().topology(); + + if (!cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) { + final int idx = cnt; + + cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + if (!enterBusy()) + return; + + try { + processMessage(m, id, idx); + } + finally { + leaveBusy(); + } + } + }); + } + } + + depEnabled = cctx.gridDeploy().enabled(); + } + + /** + * @param idx Index. + * @param id Node id. + * @return topic + */ + static Object topic(int idx, int id) { + return TOPIC_CACHE.topic("SupplyPool", idx, id); + } + + /** + * + */ + void start() { + } + + /** + * + */ + void stop() { + top = null; + + if (!cctx.kernalContext().clientNode()) { + for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) + cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId())); + } + } + + /** + * Sets preload predicate for supply pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { + this.preloadPred = preloadPred; + } + + boolean logg = false; + + /** + * @return {@code true} if entered to busy state. + */ + private boolean enterBusy() { + if (busyLock.readLock().tryLock()) + return true; + + if (log.isDebugEnabled()) + log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId()); + + return false; + } + + /** + * + */ + private void leaveBusy() { + busyLock.readLock().unlock(); + } + + /** + * @param d Demand message. + * @param id Node uuid. + * @param idx Index. + */ + private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) { + assert d != null; + assert id != null; + + if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) + return; + + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " process message " + cctx.localNode().id()); + + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); + + long preloadThrottle = cctx.config().getRebalanceThrottle(); + + ClusterNode node = cctx.discovery().node(id); + + T2<UUID, Object> scId = new T2<>(id, d.topic()); + + try { + SupplyContext sctx = scMap.remove(scId); + + if (!d.partitions().isEmpty()) {//Only initial request contains partitions. + doneMap.remove(scId); + } + + if (doneMap.get(scId) != null) { + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " exit " + cctx.localNode().id()); + + return; + } + + long bCnt = 0; + + int phase = 0; + + boolean newReq = true; + + long maxBatchesCnt = 3;//Todo: param + + if (sctx != null) { + phase = sctx.phase; + + maxBatchesCnt = 1; + } + + Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); + + while ((sctx != null && newReq) || partIt.hasNext()) { + int part = sctx != null && newReq ? sctx.part : partIt.next(); + + newReq = false; + + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (phase == 0 && cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + if (phase == 0) + phase = 1; + + if (phase == 1) { + Iterator<GridDhtCacheEntry> entIt = sctx != null ? + (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator(); + + while (entIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (!reply(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr); + + swapLsnr = null; + + return; + } + else { + if (logg && cctx.name().equals("cache")) + System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id()); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + } + + GridCacheEntryEx e = entIt.next(); + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + } + + if (phase == 1) + phase = 2; + + if (phase == 2 && cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ? + (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt : + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; + + while (iter.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (!reply(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr); + + swapLsnr = null; + + return; + } + else { + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + } + + Map.Entry<byte[], GridCacheSwapEntry> e = iter.next(); + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + if (partMissing) + continue; + } + finally { + iter.close(); + } + } + } + + if (swapLsnr == null && sctx != null) + swapLsnr = sctx.swapLsnr; + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (phase == 2) + phase = 3; + + if (phase == 3 && swapLsnr != null) { + Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); + + swapLsnr = null; + + Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ? + (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator(); + + while (lsnrIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (!reply(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr); + + return; + } + else { + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + } + + GridCacheEntryInfo info = lsnrIt.next(); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + +// if (logg && cctx.name().equals("cache")) +// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id()); + + phase = 0; + + sctx = null; + } + finally { + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + reply(node, d, s); + + doneMap.put(scId, true); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + id, e); + } + } + + /** + * @param n Node. + * @param d DemandMessage + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + throws IgniteCheckedException { + if (logg && cctx.name().equals("cache")) + System.out.println("S sent "+ cctx.localNode().id()); + + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + return false; + } + } + + /** + * @param t Tuple. + * @param phase Phase. + * @param partIt Partition it. + * @param part Partition. + * @param entryIt Entry it. + * @param swapLsnr Swap listener. + */ + private void saveSupplyContext( + T2 t, + int phase, + Iterator<Integer> partIt, + int part, + Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){ + scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); + } + + /** + * Supply context. + */ + private static class SupplyContext{ + /** Phase. */ + private int phase; + + /** Partition iterator. */ + private Iterator<Integer> partIt; + + /** Entry iterator. */ + private Iterator<?> entryIt; + + /** Swap listener. */ + private GridCacheEntryInfoCollectSwapListener swapLsnr; + + /** Partition. */ + int part; + + /** + * @param phase Phase. + * @param partIt Partition iterator. + * @param entryIt Entry iterator. + * @param swapLsnr Swap listener. + * @param part Partition. + */ + public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt, + GridCacheEntryInfoCollectSwapListener swapLsnr, int part) { + this.phase = phase; + this.partIt = partIt; + this.entryIt = entryIt; + this.swapLsnr = swapLsnr; + this.part = part; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java deleted file mode 100644 index c1c9941..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ /dev/null @@ -1,576 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jsr166.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; - -/** - * Thread pool for supplying partitions to demanding nodes. - */ -class GridDhtPartitionSupplyPool { - /** */ - private final GridCacheContext<?, ?> cctx; - - /** */ - private final IgniteLogger log; - - /** */ - private GridDhtPartitionTopology top; - - /** */ - private final boolean depEnabled; - - /** Preload predicate. */ - private IgnitePredicate<GridCacheEntryInfo> preloadPred; - - /** Supply context map. */ - private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>(); - - /** Done map. */ - private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor - - /** - * @param cctx Cache context. - * @param busyLock Shutdown lock. - */ - GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) { - assert cctx != null; - assert busyLock != null; - - this.cctx = cctx; - - log = cctx.logger(getClass()); - - top = cctx.dht().topology(); - - int cnt = 0; - - if (!cctx.kernalContext().clientNode()) { - while (cnt < cctx.config().getRebalanceThreadPoolSize()) { - final int idx = cnt; - - cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processMessage(m, id, idx); - } - }); - - cnt++; - } - } - - depEnabled = cctx.gridDeploy().enabled(); - } - - /** - * @param idx Index. - * @param id Node id. - * @return topic - */ - static Object topic(int idx, int id) { - return TOPIC_CACHE.topic("SupplyPool", idx, id); - } - - /** - * - */ - void start() { - } - - /** - * - */ - void stop() { - top = null; - } - - /** - * Sets preload predicate for supply pool. - * - * @param preloadPred Preload predicate. - */ - void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { - this.preloadPred = preloadPred; - } - - boolean logg = false; - - /** - * @param d Demand message. - * @param id Node uuid. - */ - private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) { - assert d != null; - assert id != null; - - if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) - return; - - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " process message " + cctx.localNode().id()); - - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); - - long preloadThrottle = cctx.config().getRebalanceThrottle(); - - ClusterNode node = cctx.discovery().node(id); - - T2<UUID, Object> scId = new T2<>(id, d.topic()); - - try { - SupplyContext sctx = scMap.remove(scId); - - if (!d.partitions().isEmpty()) {//Only first request contains partitions. - doneMap.remove(scId); - } - - if (doneMap.get(scId) != null) { - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " exit " + cctx.localNode().id()); - - return; - } - - long bCnt = 0; - - int phase = 0; - - boolean newReq = true; - - long maxBatchesCnt = 3;//Todo: param - - if (sctx != null) { - phase = sctx.phase; - - maxBatchesCnt = 1; - } - - Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); - - while ((sctx != null && newReq) || partIt.hasNext()) { - int part = sctx != null && newReq ? sctx.part : partIt.next(); - - newReq = false; - - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + id + ']'); - - continue; - } - - GridCacheEntryInfoCollectSwapListener swapLsnr = null; - - try { - if (phase == 0 && cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); - - cctx.swap().addOffHeapListener(part, swapLsnr); - cctx.swap().addSwapListener(part, swapLsnr); - } - - boolean partMissing = false; - - if (phase == 0) - phase = 1; - - if (phase == 1) { - Iterator<GridDhtCacheEntry> entIt = sctx != null ? - (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator(); - - while (entIt.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition [part=" + part + - ", nodeId=" + id + ']'); - - partMissing = true; - - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr); - - swapLsnr = null; - - return; - } - else { - if (logg && cctx.name().equals("cache")) - System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id()); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); - } - } - - GridCacheEntryEx e = entIt.next(); - - GridCacheEntryInfo info = e.info(); - - if (info != null && !info.isNew()) { - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - if (partMissing) - continue; - - } - - if (phase == 1) - phase = 2; - - if (phase == 2 && cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ? - (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt : - cctx.swap().iterator(part); - - // Iterator may be null if space does not exist. - if (iter != null) { - try { - boolean prepared = false; - - while (iter.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + id + ']'); - - partMissing = true; - - break; // For. - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr); - - swapLsnr = null; - - return; - } - else { - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); - } - } - - Map.Entry<byte[], GridCacheSwapEntry> e = iter.next(); - - GridCacheSwapEntry swapEntry = e.getValue(); - - GridCacheEntryInfo info = new GridCacheEntryInfo(); - - info.keyBytes(e.getKey()); - info.ttl(swapEntry.ttl()); - info.expireTime(swapEntry.expireTime()); - info.version(swapEntry.version()); - info.value(swapEntry.value()); - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); - else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + - "cache entry): " + info); - - continue; - } - - // Need to manually prepare cache message. - if (depEnabled && !prepared) { - ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : - swapEntry.valueClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : - null; - - if (ldr == null) - continue; - - if (ldr instanceof GridDeploymentInfo) { - s.prepare((GridDeploymentInfo)ldr); - - prepared = true; - } - } - } - - if (partMissing) - continue; - } - finally { - iter.close(); - } - } - } - - if (swapLsnr == null && sctx != null) - swapLsnr = sctx.swapLsnr; - - // Stop receiving promote notifications. - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - - if (phase == 2) - phase = 3; - - if (phase == 3 && swapLsnr != null) { - Collection<GridCacheEntryInfo> entries = swapLsnr.entries(); - - swapLsnr = null; - - Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ? - (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator(); - - while (lsnrIt.hasNext()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + id + ']'); - - // No need to continue iteration over swap entries. - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - if (++bCnt >= maxBatchesCnt) { - saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr); - - return; - } - else { - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); - } - } - - GridCacheEntryInfo info = lsnrIt.next(); - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - // Mark as last supply message. - s.last(part); - -// if (logg && cctx.name().equals("cache")) -// System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id()); - - phase = 0; - - sctx = null; - } - finally { - loc.release(); - - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - } - } - - reply(node, d, s); - - doneMap.put(scId, true); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + id, e); - } - } - - /** - * @param n Node. - * @param s Supply message. - * @return {@code True} if message was sent, {@code false} if recipient left grid. - * @throws IgniteCheckedException If failed. - */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) - throws IgniteCheckedException { - if (logg && cctx.name().equals("cache")) - System.out.println("S sent "+ cctx.localNode().id()); - - try { - if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); - - return true; - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + n.id()); - - return false; - } - } - - - /** - * Demand message wrapper. - */ - private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param sndId Sender ID. - * @param msg Message. - */ - DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) { - super(sndId, msg); - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public DemandMessage() { - // No-op. - } - - /** - * @return Sender ID. - */ - UUID senderId() { - return get1(); - } - - /** - * @return Message. - */ - public GridDhtPartitionDemandMessage message() { - return get2(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']'; - } - } - - - /** - * @param t T. - * @param phase Phase. - * @param partIt Partition it. - * @param entryIt Entry it. - * @param swapLsnr Swap listener. - */ - private void saveSupplyContext( - T2 t, - int phase, - Iterator<Integer> partIt, - int part, - Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){ - scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part)); - } - - private static class SupplyContext{ - private int phase; - - private Iterator<Integer> partIt; - - private Iterator<?> entryIt; - - private GridCacheEntryInfoCollectSwapListener swapLsnr; - - int part; - - public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt, - GridCacheEntryInfoCollectSwapListener swapLsnr, int part) { - this.phase = phase; - this.partIt = partIt; - this.entryIt = entryIt; - this.swapLsnr = swapLsnr; - this.part = part; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index fbcbc37..a22f281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool supplyPool; + private GridDhtPartitionSupplier supplier; /** Partition demanders. */ - private GridDhtPartitionDemandPool demandPool; + private GridDhtPartitionDemander demander; /** Start future. */ private GridFutureAdapter<Object> startFut; @@ -159,8 +159,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } }); - supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); + supplier = new GridDhtPartitionSupplier(cctx, busyLock); + demander = new GridDhtPartitionDemander(cctx, busyLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -180,18 +180,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { topVer.setIfGreater(startTopVer); - supplyPool.start(); - demandPool.start(); + supplier.start(); + demander.start(); } /** {@inheritDoc} */ @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { super.preloadPredicate(preloadPred); - assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()"; + assert supplier != null && demander != null : "preloadPredicate may be called only after start()"; - supplyPool.preloadPredicate(preloadPred); - demandPool.preloadPredicate(preloadPred); + supplier.preloadPredicate(preloadPred); + demander.preloadPredicate(preloadPred); } /** {@inheritDoc} */ @@ -205,11 +205,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { // Acquire write busy lock. busyLock.writeLock().lock(); - if (supplyPool != null) - supplyPool.stop(); + if (supplier != null) + supplier.stop(); - if (demandPool != null) - demandPool.stop(); + if (demander != null) + demander.stop(); top = null; } @@ -226,7 +226,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); - demandPool.syncFuture().listen(new CI1<Object>() { + demander.syncFuture().listen(new CI1<Object>() { @Override public void apply(Object t) { U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); @@ -245,17 +245,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - demandPool.updateLastExchangeFuture(lastFut); + demander.updateLastExchangeFuture(lastFut); } /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { - return demandPool.assign(exchFut); + return demander.assign(exchFut); } /** {@inheritDoc} */ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { - demandPool.addAssignments(assignments, forcePreload); + demander.addAssignments(assignments, forcePreload); } /** @@ -267,7 +267,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> syncFuture() { - return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture(); } /** @@ -526,12 +526,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public void forcePreload() { - demandPool.forcePreload(); + demander.forcePreload(); } /** {@inheritDoc} */ @Override public void unwindUndeploys() { - demandPool.unwindUndeploys(); + demander.unwindUndeploys(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java index 4992d19..5148753 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java @@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - private static int TEST_SIZE = 10_024_000; + private static int TEST_SIZE = 1_024_000; /** cache name. */ protected static String CACHE_NAME_DHT = "cache";