IGNITE-45 - WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c083c91d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c083c91d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c083c91d Branch: refs/heads/ignite-45 Commit: c083c91deaa8b7061c1a0a43f700c349dce72023 Parents: 501bd5c Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Mar 3 15:59:26 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Mar 3 15:59:26 2015 -0800 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 6 ++ .../affinity/AffinityTopologyVersion.java | 53 ++++++++++-- .../GridCachePartitionExchangeManager.java | 86 ++++++++++++-------- .../processors/cache/GridCacheProcessor.java | 7 +- 4 files changed, 105 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6109d74..a984142 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.checkpoint.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -492,6 +493,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 89: + msg = new AffinityTopologyVersion(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java index be6fae5..12e3f8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -17,23 +17,28 @@ package org.apache.ignite.internal.processors.affinity; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; import java.io.*; +import java.nio.*; /** * */ -public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersion>, Externalizable { +public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersion>, Externalizable, Message { /** */ - public static final AffinityTopologyVersion NONE = new AffinityTopologyVersion(-1); + public static final AffinityTopologyVersion NONE = new AffinityTopologyVersion(-1, 0); /** */ - public static final AffinityTopologyVersion ZERO = new AffinityTopologyVersion(0); + public static final AffinityTopologyVersion ZERO = new AffinityTopologyVersion(0, 0); /** */ private long topVer; + /** */ + private int minorTopVer; + /** * Empty constructor required by {@link Externalizable}. */ @@ -42,10 +47,14 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi } /** - * @param ver Version. + * @param topVer Version. */ - public AffinityTopologyVersion(long ver) { - topVer = ver; + public AffinityTopologyVersion( + long topVer, + int minorTopVer + ) { + this.topVer = topVer; + this.minorTopVer = minorTopVer; } /** @@ -66,7 +75,8 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi * */ public AffinityTopologyVersion previous() { - return new AffinityTopologyVersion(topVer - 1); + // TODO IGNITE-45. + return new AffinityTopologyVersion(topVer - 1, 0); } /** {@inheritDoc} */ @@ -87,11 +97,36 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeLong(topVer); + out.writeInt(minorTopVer); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { topVer = in.readLong(); + minorTopVer = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + // TODO: implement. + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + // TODO: implement. + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 89; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + // TODO: implement. + return 0; } /** @@ -107,11 +142,11 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi public static AffinityTopologyVersion readFrom(MessageReader msgReader) { long topVer = msgReader.readLong("topVer.idx"); - return new AffinityTopologyVersion(topVer); + return new AffinityTopologyVersion(topVer, 0); } /** {@inheritDoc} */ @Override public String toString() { - return String.valueOf(topVer); + return S.toString(AffinityTopologyVersion.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 09edf52..76ecea4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -46,6 +46,7 @@ import java.util.concurrent.locks.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*; @@ -82,6 +83,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @GridToStringExclude private final ConcurrentMap<Integer, GridClientPartitionTopology<K, V>> clientTops = new ConcurrentHashMap8<>(); + /** Minor topology version incremented each time a new dynamic cache is started. */ + private volatile int minorTopVer; + /** */ private volatile GridDhtPartitionsExchangeFuture<K, V> lastInitializedFuture; @@ -105,52 +109,61 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana try { ClusterNode loc = cctx.localNode(); - assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED; + assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED || + e.type() == EVT_DISCOVERY_CUSTOM_EVT; final ClusterNode n = e.eventNode(); - assert !loc.id().equals(n.id()); + if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) { + assert !loc.id().equals(n.id()); - if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { - assert cctx.discovery().node(n.id()) == null; + if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { + assert cctx.discovery().node(n.id()) == null; - for (GridDhtPartitionsExchangeFuture<K, V> f : exchFuts.values()) - f.onNodeLeft(n.id()); - } + for (GridDhtPartitionsExchangeFuture<K, V> f : exchFuts.values()) + f.onNodeLeft(n.id()); + } - assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + - "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; + assert + e.type() != EVT_NODE_JOINED || n.order() > loc.order() : + "Node joined with smaller-than-local " + + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - GridDhtPartitionExchangeId exchId = exchangeId(n.id(), new AffinityTopologyVersion(e.topologyVersion()), - e.type()); + GridDhtPartitionExchangeId exchId = exchangeId(n.id(), + new AffinityTopologyVersion(e.topologyVersion(), minorTopVer = 0), + e.type()); - GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e); + GridDhtPartitionsExchangeFuture<K, V> exchFut = exchangeFuture(exchId, e); - // Start exchange process. - pendingExchangeFuts.add(exchFut); + // Start exchange process. + pendingExchangeFuts.add(exchFut); - // Event callback - without this callback future will never complete. - exchFut.onEvent(exchId, e); + // Event callback - without this callback future will never complete. + exchFut.onEvent(exchId, e); - if (log.isDebugEnabled()) - log.debug("Discovery event (will start exchange): " + exchId); - - locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - if (!enterBusy()) - return; - - try { - // Unwind in the order of discovery events. - for (GridDhtPartitionsExchangeFuture<K, V> f = pendingExchangeFuts.poll(); f != null; - f = pendingExchangeFuts.poll()) - addFuture(f); - } - finally { - leaveBusy(); + if (log.isDebugEnabled()) + log.debug("Discovery event (will start exchange): " + exchId); + + locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + if (!enterBusy()) + return; + + try { + // Unwind in the order of discovery events. + for (GridDhtPartitionsExchangeFuture<K, V> f = pendingExchangeFuts.poll(); f != null; + f = pendingExchangeFuts.poll()) + addFuture(f); + } + finally { + leaveBusy(); + } } - } - }); + }); + } + else { + // TODO. + } } finally { leaveBusy(); @@ -166,7 +179,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker = new ExchangeWorker(); - cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, + EVT_DISCOVERY_CUSTOM_EVT); cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class, new MessageHandler<GridDhtPartitionsSingleMessage<K, V>>() { @@ -200,7 +214,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert startTime > 0; - final AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(loc.order()); + final AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(loc.order(), minorTopVer); GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 260cab0..069930e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -25,7 +25,10 @@ import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.events.*; +import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.datastructures.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -565,7 +568,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { @Override public void apply(Serializable evt) { if (evt instanceof DynamicCacheDescriptor) - onCacheDeploymentRequested((DynamicCacheDescriptor)evt); + onCacheStartRequested((DynamicCacheDescriptor)evt); } }); @@ -1290,7 +1293,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param startDesc Cache start descriptor. */ - private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) { + private void onCacheStartRequested(DynamicCacheDescriptor startDesc) { CacheConfiguration ccfg = startDesc.cacheConfiguration(); // Check if cache with the same name was concurrently started form different node.