ignite-1093
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64319443 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64319443 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64319443 Branch: refs/heads/ignite-1093 Commit: 64319443ab55aa4a0fc4c56182c774dec8446d48 Parents: 50d32b3 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Fri Aug 14 16:29:31 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Fri Aug 14 16:29:31 2015 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 27 ++ .../communication/GridIoMessageFactory.java | 7 +- .../processors/cache/GridCacheIoManager.java | 8 + .../dht/preloader/GridDhtPartitionDemander.java | 156 ++++--- .../dht/preloader/GridDhtPartitionSupplier.java | 25 +- .../GridDhtPartitionSupplyMessageV2.java | 423 +++++++++++++++++++ .../GridCacheMassiveRebalancingSelfTest.java | 210 --------- ...ridCacheMassiveRebalancingAsyncSelfTest.java | 37 ++ ...GridCacheMassiveRebalancingSyncSelfTest.java | 252 +++++++++++ 9 files changed, 864 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 3ad0f01..a19e136 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -57,6 +57,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default rebalance timeout (ms).*/ public static final long DFLT_REBALANCE_TIMEOUT = 10000; + /** Default rebalance batches count. */ + public static final long DFLT_REBALANCE_BATCHES_COUNT = 3; + /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU. */ public static final long DFLT_REBALANCE_THROTTLE = 0; @@ -240,6 +243,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Off-heap memory size. */ private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY; + /** Rebalance batches count. */ + private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT; + /** */ private boolean swapEnabled = DFLT_SWAP_ENABLED; @@ -1751,6 +1757,27 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * To gain better rebalancing performance supplier node can provide mode than one batch at start and provide + * one new to each next demand request. + * + * Gets number of batches generated by supply node at rebalancing start. + * + * @return + */ + public long getRebalanceBatchesCount() { + return rebalanceBatchesCount; + } + + /** + * Sets number of batches generated by supply node at rebalancing start. + * + * @param rebalanceBatchesCnt batches count. + */ + public void setRebalanceBatchesCount(long rebalanceBatchesCnt) { + this.rebalanceBatchesCount = rebalanceBatchesCnt; + } + + /** * Gets cache store session listener factories. * * @return Cache store session listener factories. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 7fe8da8..7ddbfb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -600,7 +600,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..112] - this + case 113: + msg = new GridDhtPartitionSupplyMessageV2(); + + break; + + // [-3..113] - this // [120..123] - DR // [-4..-22] - SQL default: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 84e4dc2..da55f7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -503,6 +503,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 113: { + GridDhtPartitionSupplyMessageV2 req = (GridDhtPartitionSupplyMessageV2)msg; + + U.error(log, "Supply message v2 cannot be unmarshalled.", req.classError()); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 16f7a61..262ccb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -72,9 +72,6 @@ public class GridDhtPartitionDemander { /** Last exchange future. */ private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; - /** Assignments. */ - private volatile GridDhtPreloaderAssignments assigns; - /** * @param cctx Cache context. * @param busyLock Shutdown lock. @@ -95,8 +92,8 @@ public class GridDhtPartitionDemander { for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) { final int idx = cnt; - cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessage>() { - @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessage m) { + cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionSupplyMessageV2>() { + @Override public void apply(final UUID id, final GridDhtPartitionSupplyMessageV2 m) { enterBusy(); try { @@ -110,7 +107,7 @@ public class GridDhtPartitionDemander { } } - syncFut = new SyncFuture(); + syncFut = new SyncFuture(null); if (!enabled) // Calling onDone() immediately since preloading is disabled. @@ -282,13 +279,15 @@ public class GridDhtPartitionDemander { if (delay == 0 || force) { assert assigns != null; - AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = assigns.topologyVersion(); - if (this.assigns != null) { + if (syncFut.isInited()) { syncFut.get(); - syncFut = new SyncFuture(); + syncFut = new SyncFuture(assigns); } + else + syncFut.init(assigns); if (assigns.isEmpty() || topologyChanged(topVer)) { syncFut.onDone(); @@ -296,28 +295,30 @@ public class GridDhtPartitionDemander { return; } - this.assigns = assigns; - for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) { GridDhtPartitionDemandMessage d = e.getValue(); d.timeout(cctx.config().getRebalanceTimeout()); d.workerId(0);//old api support. - ClusterNode node = e.getKey(); + final ClusterNode node = e.getKey(); final long start = U.currentTimeMillis(); final CacheConfiguration cfg = cctx.config(); + final AffinityTopologyVersion top = d.topologyVersion(); + if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + - ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + "]"); - - syncFut.listen(new CI1<Object>() { - @Override public void apply(Object t) { - U.log(log, "Completed rebalancing [cache=" + cctx.name() + ", mode=" - + cfg.getRebalanceMode() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); + ", from node=" + node.id() + ", partitions count=" + d.partitions().size() + ", topology=" + d.topologyVersion() + "]"); + + syncFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> t) { + Boolean cancelled = ((SyncFuture)t).cancelled(); + U.log(log, (cancelled ? "Cancelled" : "Completed") + " rebalancing [cache=" + cctx.name() + ", mode=" + + cfg.getRebalanceMode() + ", from node=" + node.id() + ", topology=" + top + + ", time=" + (U.currentTimeMillis() - start) + " ms]"); } }); } @@ -394,7 +395,7 @@ public class GridDhtPartitionDemander { * @param c Partitions. * @return String representation of partitions list. */ - private String partitionsList(Collection<Integer> c){ + private String partitionsList(Collection<Integer> c) { LinkedList<Integer> s = new LinkedList<>(c); Collections.sort(s); @@ -446,21 +447,19 @@ public class GridDhtPartitionDemander { private void handleSupplyMessage( int idx, final UUID id, - final GridDhtPartitionSupplyMessage supply) { - ClusterNode node = cctx.node(id); - - assert node != null; - - GridDhtPartitionDemandMessage d = assigns.get(node); - - AffinityTopologyVersion topVer = d.topologyVersion(); + final GridDhtPartitionSupplyMessageV2 supply) { + AffinityTopologyVersion topVer = supply.topologyVersion(); if (topologyChanged(topVer)) { - syncFut.cancel(id); + syncFut.onCancel(id, topVer); return; } + ClusterNode node = cctx.node(id); + + assert node != null; + if (log.isDebugEnabled()) log.debug("Received supply message: " + supply); @@ -469,15 +468,13 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Class got undeployed during preloading: " + supply.classError()); - syncFut.cancel(id); + syncFut.onCancel(id, topVer); return; } final GridDhtPartitionTopology top = cctx.dht().topology(); - GridDhtPartitionsExchangeFuture exchFut = assigns.exchangeFuture(); - try { // Preload. @@ -524,14 +521,10 @@ public class GridDhtPartitionDemander { if (last) { top.own(part); - syncFut.onPartitionDone(id, p); + syncFut.onPartitionDone(id, p, topVer); if (log.isDebugEnabled()) log.debug("Finished rebalancing partition: " + part); - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); } } finally { @@ -540,14 +533,14 @@ public class GridDhtPartitionDemander { } } else { - syncFut.onPartitionDone(id, p); + syncFut.onPartitionDone(id, p, topVer); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); } } else { - syncFut.onPartitionDone(id, p); + syncFut.onPartitionDone(id, p, topVer); if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); @@ -557,35 +550,40 @@ public class GridDhtPartitionDemander { // Only request partitions based on latest topology version. for (Integer miss : supply.missed()) if (cctx.affinity().localNode(miss, topVer)) - syncFut.onMissedPartition(id, miss); + syncFut.onMissedPartition(id, miss, topVer); for (Integer miss : supply.missed()) - syncFut.onPartitionDone(id, miss); + syncFut.onPartitionDone(id, miss, topVer); if (!syncFut.isDone()) { - // Create copy. - GridDhtPartitionDemandMessage nextD = - new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet()); + GridDhtPartitionDemandMessage d = syncFut.getDemandMessage(topVer, node); + + if (d != null) { + + // Create copy. + GridDhtPartitionDemandMessage nextD = + new GridDhtPartitionDemandMessage(d, Collections.<Integer>emptySet()); - nextD.topic(topic(idx, cctx.cacheId())); + nextD.topic(topic(idx, cctx.cacheId())); - // Send demand message. - cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()), - nextD, cctx.ioPolicy(), d.timeout()); + // Send demand message. + cctx.io().sendOrderedMessage(node, GridDhtPartitionSupplier.topic(idx, cctx.cacheId()), + nextD, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + } } } catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) log.debug("Node left during rebalancing (will retry) [node=" + node.id() + ", msg=" + e.getMessage() + ']'); - syncFut.cancel(id); + syncFut.onCancel(id, topVer); } catch (IgniteCheckedException ex) { U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', ex); + "fully finish) [node=" + node.id() + ", msg=" + supply + ']', ex); - syncFut.cancel(id); + syncFut.onCancel(id, topVer); } } @@ -687,7 +685,7 @@ public class GridDhtPartitionDemander { /** * */ - private class SyncFuture extends GridFutureAdapter<Object> { + public class SyncFuture extends GridFutureAdapter<Boolean> { /** */ private static final long serialVersionUID = 1L; @@ -695,32 +693,74 @@ public class GridDhtPartitionDemander { private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); - public void append(UUID nodeId, Collection<Integer> parts) { + /** Assignments. */ + private volatile GridDhtPreloaderAssignments assigns; + + private volatile boolean cancelled = false; + + SyncFuture(GridDhtPreloaderAssignments assigns) { + this.assigns = assigns; + } + + public AffinityTopologyVersion topologyVersion() { + return assigns != null ? assigns.topologyVersion() : null; + } + + void init( + GridDhtPreloaderAssignments assigns) { + this.assigns = assigns; + } + + boolean isInited() { + return assigns != null; + } + + void append(UUID nodeId, Collection<Integer> parts) { remaining.put(nodeId, parts); missed.put(nodeId, new GridConcurrentHashSet<Integer>()); } - void cancel(UUID nodeId) { - if (isDone()) + GridDhtPartitionDemandMessage getDemandMessage(AffinityTopologyVersion topVer, ClusterNode node) { + if (!topVer.equals(assigns.topologyVersion())) + return null; + + return assigns.get(node); + } + + void onCancel(UUID nodeId, AffinityTopologyVersion topVer) { + if (isDone() || !topVer.equals(assigns.topologyVersion())) return; remaining.remove(nodeId); + cancelled = true; + checkIsDone(); } - void onMissedPartition(UUID nodeId, int p) { + boolean cancelled() { + return cancelled; + } + + void onMissedPartition(UUID nodeId, int p, AffinityTopologyVersion topVer) { + if (isDone() || !topVer.equals(assigns.topologyVersion())) + return; + if (missed.get(nodeId) == null) missed.put(nodeId, new GridConcurrentHashSet<Integer>()); missed.get(nodeId).add(p); } - void onPartitionDone(UUID nodeId, int p) { - if (isDone()) + void onPartitionDone(UUID nodeId, int p, AffinityTopologyVersion topVer) { + if (isDone() || !topVer.equals(assigns.topologyVersion())) return; + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, + assigns.exchangeFuture().discoveryEvent()); + Collection<Integer> parts = remaining.get(nodeId); parts.remove(p); @@ -758,7 +798,7 @@ public class GridDhtPartitionDemander { cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary? - onDone(); + onDone(cancelled); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index b948fbd..c496f8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -170,8 +170,8 @@ class GridDhtPartitionSupplier { if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion())) return; - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); + GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(), + d.updateSequence(), cctx.cacheId(), d.topologyVersion()); long preloadThrottle = cctx.config().getRebalanceThrottle(); @@ -180,12 +180,13 @@ class GridDhtPartitionSupplier { T2<UUID, Object> scId = new T2<>(id, d.topic()); try { - SupplyContext sctx = scMap.remove(scId); - if (!d.partitions().isEmpty()) {//Only initial request contains partitions. doneMap.remove(scId); + scMap.remove(scId); } + SupplyContext sctx = scMap.remove(scId); + if (doneMap.get(scId) != null) return; @@ -195,7 +196,7 @@ class GridDhtPartitionSupplier { boolean newReq = true; - long maxBatchesCnt = 3;//Todo: param + long maxBatchesCnt = cctx.config().getRebalanceBatchesCount(); if (sctx != null) { phase = sctx.phase; @@ -273,8 +274,8 @@ class GridDhtPartitionSupplier { return; } else { - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); + s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); } } @@ -340,8 +341,8 @@ class GridDhtPartitionSupplier { return; } else { - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); + s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); } } @@ -443,8 +444,8 @@ class GridDhtPartitionSupplier { return; } else { - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); + s = new GridDhtPartitionSupplyMessageV2(d.workerId(), d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); } } @@ -491,7 +492,7 @@ class GridDhtPartitionSupplier { * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessageV2 s) throws IgniteCheckedException { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java new file mode 100644 index 0000000..93d0db6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Partition supply message. + */ +public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Worker ID. */ + private int workerId = -1; + + /** Update sequence. */ + private long updateSeq; + + /** Acknowledgement flag. */ + private boolean ack; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Partitions that have been fully sent. */ + @GridDirectCollection(int.class) + private Collection<Integer> last; + + /** Partitions which were not found. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Collection<Integer> missed; + + /** Entries. */ + @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) + private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>(); + + /** Message size. */ + @GridDirectTransient + private int msgSize; + + /** + * @param workerId Worker ID. + * @param updateSeq Update sequence for this node. + * @param cacheId Cache ID. + */ + GridDhtPartitionSupplyMessageV2(int workerId, long updateSeq, int cacheId, AffinityTopologyVersion topVer) { + assert workerId >= 0; + assert updateSeq > 0; + + this.cacheId = cacheId; + this.updateSeq = updateSeq; + this.workerId = workerId; + this.topVer = topVer; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionSupplyMessageV2() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean ignoreClassErrors() { + return true; + } + + /** + * @return Worker ID. + */ + int workerId() { + return workerId; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * Marks this message for acknowledgment. + */ + void markAck() { + ack = true; + } + + /** + * @return Acknowledgement flag. + */ + boolean ack() { + return ack; + } + + /** + * @return Topology version for which demand message is sent. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Flag to indicate last message for partition. + */ + Collection<Integer> last() { + return last == null ? Collections.<Integer>emptySet() : last; + } + + /** + * @param p Partition which was fully sent. + */ + void last(int p) { + if (last == null) + last = new HashSet<>(); + + if (last.add(p)) { + msgSize += 4; + + // If partition is empty, we need to add it. + if (!infos.containsKey(p)) { + CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection(); + + infoCol.init(); + + infos.put(p, infoCol); + } + } + } + + /** + * @param p Missed partition. + */ + void missed(int p) { + if (missed == null) + missed = new HashSet<>(); + + if (missed.add(p)) + msgSize += 4; + } + + /** + * @return Missed partitions. + */ + Collection<Integer> missed() { + return missed == null ? Collections.<Integer>emptySet() : missed; + } + + /** + * @return Entries. + */ + Map<Integer, CacheEntryInfoCollection> infos() { + return infos; + } + + /** + * @return Message size. + */ + int messageSize() { + return msgSize; + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos.get(p); + + if (infoCol == null) { + msgSize += 4; + + infos.put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + assert (info.key() != null || info.keyBytes() != null); + assert info.value() != null; + + // Need to call this method to initialize info properly. + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos.get(p); + + if (infoCol == null) { + msgSize += 4; + + infos.put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + + for (CacheEntryInfoCollection col : infos().values()) { + List<GridCacheEntryInfo> entries = col.infos(); + + for (int i = 0; i < entries.size(); i++) + entries.get(i).unmarshal(cacheCtx, ldr); + } + } + + /** + * @return Number of entries in message. + */ + public int size() { + return infos.size(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeBoolean("ack", ack)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeInt("workerId", workerId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + ack = reader.readBoolean("ack"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + last = reader.readCollection("last", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + missed = reader.readCollection("missed", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + workerId = reader.readInt("workerId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 113; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 10; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionSupplyMessageV2.class, this, + "size", size(), + "parts", infos.keySet(), + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java deleted file mode 100644 index 0771509..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - private static int TEST_SIZE = 1_024_000; - - /** cache name. */ - protected static String CACHE_NAME_DHT = "cache"; - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return Long.MAX_VALUE; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration iCfg = super.getConfiguration(gridName); - - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); - - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - - if (getTestGridName(3).equals(gridName)) - iCfg.setClientMode(true); - - cacheCfg.setName(CACHE_NAME_DHT); - cacheCfg.setCacheMode(CacheMode.PARTITIONED); - //cacheCfg.setRebalanceBatchSize(1024); - cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg.setRebalanceThreadPoolSize(4); - //cacheCfg.setRebalanceTimeout(1000000); - cacheCfg.setBackups(1); - - iCfg.setCacheConfiguration(cacheCfg); - return iCfg; - } - - /** - * @param ignite Ignite. - */ - private void generateData(Ignite ignite) { - try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Prepared " + i / 1_000_000 + "m entries."); - - stmr.addData(i, i); - } - } - } - - /** - * @param ignite Ignite. - * @throws IgniteCheckedException - */ - private void checkData(Ignite ignite) throws IgniteCheckedException { - for (int i = 0; i < TEST_SIZE; i++) { - if (i % 1_000_000 == 0) - log.info("Checked " + i / 1_000_000 + "m entries."); - - assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match"; - } - } - - /** - * @throws Exception - */ - public void testMassiveRebalancing() throws Exception { - Ignite ignite = startGrid(0); - - generateData(ignite); - - log.info("Preloading started."); - - long start = System.currentTimeMillis(); - - startGrid(1); - - startGrid(2); - - long spend = (System.currentTimeMillis() - start) / 1000; - - IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - - stopGrid(0); - - //TODO: refactor to get futures by topology - while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() || - f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) - U.sleep(100); - - ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); - ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); - - f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - - stopGrid(1); - - while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) - U.sleep(100); - - ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); - - checkData(grid(2)); - - log.info("Spend " + spend + " seconds to preload entries."); - - stopAllGrids(); - } - - /** - * @throws Exception - */ - public void testOpPerSecRebalancingTest() throws Exception { - startGrid(0); - - final AtomicBoolean cancelled = new AtomicBoolean(false); - - generateData(grid(0)); - - startGrid(1); - startGrid(2); - startGrid(3); - - Thread t = new Thread(new Runnable() { - @Override public void run() { - - long spend = 0; - - long ops = 0; - - while (!cancelled.get()) { - try { - long start = System.currentTimeMillis(); - - int size = 1000; - - for (int i = 0; i < size; i++) - grid(3).cachex(CACHE_NAME_DHT).remove(i); - - for (int i = 0; i < size; i++) - grid(3).cachex(CACHE_NAME_DHT).put(i, i); - - spend += System.currentTimeMillis() - start; - - ops += size * 2; - } - catch (IgniteCheckedException e) { - e.printStackTrace(); - } - - log.info("Ops. per ms: " + ops / spend); - } - } - }); - t.start(); - - stopGrid(0); - startGrid(0); - - stopGrid(0); - startGrid(0); - - stopGrid(0); - startGrid(0); - - cancelled.set(true); - t.join(); - - checkData(grid(3)); - - //stopAllGrids(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java new file mode 100644 index 0000000..8bcd6d1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +/** + * + */ +public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRebalancingSyncSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = iCfg.getCacheConfiguration()[0]; + + cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + + return iCfg; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64319443/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java new file mode 100644 index 0000000..cd12954 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.rebalancing; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.atomic.*; + +/** + * + */ +public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + private static int TEST_SIZE = 1_024_000; + + /** cache name. */ + protected static String CACHE_NAME_DHT = "cache"; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration iCfg = super.getConfiguration(gridName); + + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + + if (getTestGridName(3).equals(gridName)) + iCfg.setClientMode(true); + + cacheCfg.setName(CACHE_NAME_DHT); + cacheCfg.setCacheMode(CacheMode.PARTITIONED); + //cacheCfg.setRebalanceBatchSize(1024); + //cacheCfg.setRebalanceBatchesCount(1); + cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setRebalanceThreadPoolSize(4); + //cacheCfg.setRebalanceTimeout(1000000); + cacheCfg.setBackups(1); + + iCfg.setCacheConfiguration(cacheCfg); + return iCfg; + } + + /** + * @param ignite Ignite. + */ + protected void generateData(Ignite ignite) { + try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_NAME_DHT)) { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Prepared " + i / 1_000_000 + "m entries."); + + stmr.addData(i, i); + } + } + } + + /** + * @param ignite Ignite. + * @throws IgniteCheckedException + */ + protected void checkData(Ignite ignite) throws IgniteCheckedException { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Checked " + i / 1_000_000 + "m entries."); + + assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match"; + } + } + + /** + * @throws Exception + */ + public void testSimpleRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + startGrid(1); + + IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + f1.get(); + + long spend = (System.currentTimeMillis() - start) / 1000; + + stopGrid(0); + + checkData(grid(1)); + + log.info("Spend " + spend + " seconds to preload entries."); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void testComplexRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + long start = System.currentTimeMillis(); + + startGrid(1); + startGrid(2); + + IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + f2.get(); + + IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) { + U.sleep(100); + + f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + } + f1.get(); + + long spend = (System.currentTimeMillis() - start) / 1000; + + f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + stopGrid(0); + + //TODO: refactor to get futures by topology + while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() || + f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + U.sleep(100); + + ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + + f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + stopGrid(1); + + while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + U.sleep(100); + + ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + + checkData(grid(2)); + + log.info("Spend " + spend + " seconds to preload entries."); + + stopAllGrids(); + } + + /** + * @throws Exception + */ + public void _testOpPerSecRebalancingTest() throws Exception { + startGrid(0); + + final AtomicBoolean cancelled = new AtomicBoolean(false); + + generateData(grid(0)); + + startGrid(1); + startGrid(2); + startGrid(3); + + Thread t = new Thread(new Runnable() { + @Override public void run() { + + long spend = 0; + + long ops = 0; + + while (!cancelled.get()) { + try { + long start = System.currentTimeMillis(); + + int size = 1000; + + for (int i = 0; i < size; i++) + grid(3).cachex(CACHE_NAME_DHT).remove(i); + + for (int i = 0; i < size; i++) + grid(3).cachex(CACHE_NAME_DHT).put(i, i); + + spend += System.currentTimeMillis() - start; + + ops += size * 2; + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + + log.info("Ops. per ms: " + ops / spend); + } + } + }); + t.start(); + + stopGrid(0); + startGrid(0); + + stopGrid(0); + startGrid(0); + + stopGrid(0); + startGrid(0); + + cancelled.set(true); + t.join(); + + checkData(grid(3)); + + //stopAllGrids(); + } +} \ No newline at end of file