IGNITE-45 - WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c083c91d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c083c91d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c083c91d

Branch: refs/heads/ignite-45
Commit: c083c91deaa8b7061c1a0a43f700c349dce72023
Parents: 501bd5c
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Mar 3 15:59:26 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Mar 3 15:59:26 2015 -0800

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  6 ++
 .../affinity/AffinityTopologyVersion.java       | 53 ++++++++++--
 .../GridCachePartitionExchangeManager.java      | 86 ++++++++++++--------
 .../processors/cache/GridCacheProcessor.java    |  7 +-
 4 files changed, 105 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..a984142 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.checkpoint.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -492,6 +493,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 89:
+                msg = new AffinityTopologyVersion();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index be6fae5..12e3f8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -17,23 +17,28 @@
 
 package org.apache.ignite.internal.processors.affinity;
 
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 
 import java.io.*;
+import java.nio.*;
 
 /**
  *
  */
-public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersion>, Externalizable {
+public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersion>, Externalizable, Message {
     /** */
-    public static final AffinityTopologyVersion NONE = new 
AffinityTopologyVersion(-1);
+    public static final AffinityTopologyVersion NONE = new 
AffinityTopologyVersion(-1, 0);
 
     /** */
-    public static final AffinityTopologyVersion ZERO = new 
AffinityTopologyVersion(0);
+    public static final AffinityTopologyVersion ZERO = new 
AffinityTopologyVersion(0, 0);
 
     /** */
     private long topVer;
 
+    /** */
+    private int minorTopVer;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -42,10 +47,14 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
     }
 
     /**
-     * @param ver Version.
+     * @param topVer Version.
      */
-    public AffinityTopologyVersion(long ver) {
-        topVer = ver;
+    public AffinityTopologyVersion(
+        long topVer,
+        int minorTopVer
+    ) {
+        this.topVer = topVer;
+        this.minorTopVer = minorTopVer;
     }
 
     /**
@@ -66,7 +75,8 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
      *
      */
     public AffinityTopologyVersion previous() {
-        return new AffinityTopologyVersion(topVer - 1);
+        // TODO IGNITE-45.
+        return new AffinityTopologyVersion(topVer - 1, 0);
     }
 
     /** {@inheritDoc} */
@@ -87,11 +97,36 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeLong(topVer);
+        out.writeInt(minorTopVer);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
         topVer = in.readLong();
+        minorTopVer = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        // TODO: implement.
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        // TODO: implement.
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 89;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        // TODO: implement.
+        return 0;
     }
 
     /**
@@ -107,11 +142,11 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
     public static AffinityTopologyVersion readFrom(MessageReader msgReader) {
         long topVer = msgReader.readLong("topVer.idx");
 
-        return new AffinityTopologyVersion(topVer);
+        return new AffinityTopologyVersion(topVer, 0);
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return String.valueOf(topVer);
+        return S.toString(AffinityTopologyVersion.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 09edf52..76ecea4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -46,6 +46,7 @@ import java.util.concurrent.locks.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.events.EventType.*;
+import static 
org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
 
@@ -82,6 +83,9 @@ public class GridCachePartitionExchangeManager<K, V> extends 
GridCacheSharedMana
     @GridToStringExclude
     private final ConcurrentMap<Integer, GridClientPartitionTopology<K, V>> 
clientTops = new ConcurrentHashMap8<>();
 
+    /** Minor topology version incremented each time a new dynamic cache is 
started. */
+    private volatile int minorTopVer;
+
     /** */
     private volatile GridDhtPartitionsExchangeFuture<K, V> 
lastInitializedFuture;
 
@@ -105,52 +109,61 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             try {
                 ClusterNode loc = cctx.localNode();
 
-                assert e.type() == EVT_NODE_JOINED || e.type() == 
EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED;
+                assert e.type() == EVT_NODE_JOINED || e.type() == 
EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED ||
+                    e.type() == EVT_DISCOVERY_CUSTOM_EVT;
 
                 final ClusterNode n = e.eventNode();
 
-                assert !loc.id().equals(n.id());
+                if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
+                    assert !loc.id().equals(n.id());
 
-                if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
-                    assert cctx.discovery().node(n.id()) == null;
+                    if (e.type() == EVT_NODE_LEFT || e.type() == 
EVT_NODE_FAILED) {
+                        assert cctx.discovery().node(n.id()) == null;
 
-                    for (GridDhtPartitionsExchangeFuture<K, V> f : 
exchFuts.values())
-                        f.onNodeLeft(n.id());
-                }
+                        for (GridDhtPartitionsExchangeFuture<K, V> f : 
exchFuts.values())
+                            f.onNodeLeft(n.id());
+                    }
 
-                assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() 
: "Node joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
+                    assert
+                        e.type() != EVT_NODE_JOINED || n.order() > loc.order() 
:
+                        "Node joined with smaller-than-local " +
+                            "order [newOrder=" + n.order() + ", locOrder=" + 
loc.order() + ']';
 
-                GridDhtPartitionExchangeId exchId = exchangeId(n.id(), new 
AffinityTopologyVersion(e.topologyVersion()),
-                    e.type());
+                    GridDhtPartitionExchangeId exchId = exchangeId(n.id(),
+                        new AffinityTopologyVersion(e.topologyVersion(), 
minorTopVer = 0),
+                        e.type());
 
-                GridDhtPartitionsExchangeFuture<K, V> exchFut = 
exchangeFuture(exchId, e);
+                    GridDhtPartitionsExchangeFuture<K, V> exchFut = 
exchangeFuture(exchId, e);
 
-                // Start exchange process.
-                pendingExchangeFuts.add(exchFut);
+                    // Start exchange process.
+                    pendingExchangeFuts.add(exchFut);
 
-                // Event callback - without this callback future will never 
complete.
-                exchFut.onEvent(exchId, e);
+                    // Event callback - without this callback future will 
never complete.
+                    exchFut.onEvent(exchId, e);
 
-                if (log.isDebugEnabled())
-                    log.debug("Discovery event (will start exchange): " + 
exchId);
-
-                locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                    @Override public void apply(IgniteInternalFuture<?> t) {
-                        if (!enterBusy())
-                            return;
-
-                        try {
-                            // Unwind in the order of discovery events.
-                            for (GridDhtPartitionsExchangeFuture<K, V> f = 
pendingExchangeFuts.poll(); f != null;
-                                f = pendingExchangeFuts.poll())
-                                addFuture(f);
-                        }
-                        finally {
-                            leaveBusy();
+                    if (log.isDebugEnabled())
+                        log.debug("Discovery event (will start exchange): " + 
exchId);
+
+                    locExchFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) 
{
+                            if (!enterBusy())
+                                return;
+
+                            try {
+                                // Unwind in the order of discovery events.
+                                for (GridDhtPartitionsExchangeFuture<K, V> f = 
pendingExchangeFuts.poll(); f != null;
+                                    f = pendingExchangeFuts.poll())
+                                    addFuture(f);
+                            }
+                            finally {
+                                leaveBusy();
+                            }
                         }
-                    }
-                });
+                    });
+                }
+                else {
+                    // TODO.
+                }
             }
             finally {
                 leaveBusy();
@@ -166,7 +179,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         exchWorker = new ExchangeWorker();
 
-        cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, 
EVT_NODE_LEFT, EVT_NODE_FAILED);
+        cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, 
EVT_NODE_LEFT, EVT_NODE_FAILED,
+            EVT_DISCOVERY_CUSTOM_EVT);
 
         cctx.io().addHandler(0, GridDhtPartitionsSingleMessage.class,
             new MessageHandler<GridDhtPartitionsSingleMessage<K, V>>() {
@@ -200,7 +214,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         assert startTime > 0;
 
-        final AffinityTopologyVersion startTopVer = new 
AffinityTopologyVersion(loc.order());
+        final AffinityTopologyVersion startTopVer = new 
AffinityTopologyVersion(loc.order(), minorTopVer);
 
         GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, 
EVT_NODE_JOINED);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c083c91d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 260cab0..069930e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -25,7 +25,10 @@ import org.apache.ignite.cache.affinity.rendezvous.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.events.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -565,7 +568,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         ctx.discovery().setCustomEventListener(new 
GridPlainInClosure<Serializable>() {
             @Override public void apply(Serializable evt) {
                 if (evt instanceof DynamicCacheDescriptor)
-                    onCacheDeploymentRequested((DynamicCacheDescriptor)evt);
+                    onCacheStartRequested((DynamicCacheDescriptor)evt);
             }
         });
 
@@ -1290,7 +1293,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      *
      * @param startDesc Cache start descriptor.
      */
-    private void onCacheDeploymentRequested(DynamicCacheDescriptor startDesc) {
+    private void onCacheStartRequested(DynamicCacheDescriptor startDesc) {
         CacheConfiguration ccfg = startDesc.cacheConfiguration();
 
         // Check if cache with the same name was concurrently started form 
different node.

Reply via email to