#ignite-710: DataStreamer message is processed before cache started

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f13b63d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f13b63d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f13b63d7

Branch: refs/heads/ignite-794
Commit: f13b63d73606e73f2f5d45c0b9291252444b7437
Parents: 0a173f4
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Apr 24 10:29:58 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Apr 24 10:29:58 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../datastreamer/DataStreamProcessor.java       |  28 ++++-
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../datastreamer/DataStreamerRequest.java       |  38 ++++++-
 .../DataStreamerMultiThreadedSelfTest.java      | 101 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   1 +
 6 files changed, 166 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 851fc44..5f82ae2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -409,7 +409,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
-    @Nullable IgniteInternalFuture<?> 
affinityReadyFuture(AffinityTopologyVersion ver) {
+    public @Nullable IgniteInternalFuture<?> 
affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = 
lastInitializedFut;
 
         if (lastInitializedFut0 != null && 
lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 3a2936f..9e53bb5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -173,7 +174,7 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
      * @param nodeId Sender ID.
      * @param req Request.
      */
-    private void processRequest(UUID nodeId, DataStreamerRequest req) {
+    private void processRequest(final UUID nodeId, final DataStreamerRequest 
req) {
         if (!busyLock.enterBusy()) {
             if (log.isDebugEnabled())
                 log.debug("Ignoring data load request (node is stopping): " + 
req);
@@ -185,6 +186,31 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             if (log.isDebugEnabled())
                 log.debug("Processing data load request: " + req);
 
+            AffinityTopologyVersion locAffVer = 
ctx.cache().context().exchange().readyAffinityVersion();
+            AffinityTopologyVersion rmtAffVer = req.topologyVersion();
+
+            if (locAffVer.compareTo(rmtAffVer) < 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Received request has higher affinity topology 
version [request=" + req +
+                        ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
+
+                IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
+
+                if (fut != null && !fut.isDone()) {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) 
{
+                            ctx.closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    processRequest(nodeId, req);
+                                }
+                            }, false);
+                        }
+                    });
+
+                    return;
+                }
+            }
+
             Object topic;
 
             try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 002831c..a69e033 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1173,7 +1173,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.userVersion() : null,
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
-                    dep == null);
+                    dep == null,
+                    ctx.cache().context().exchange().readyAffinityVersion());
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index a216ffe..0d24ee0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.datastreamer;
 
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -77,6 +78,9 @@ public class DataStreamerRequest implements Message {
     /** */
     private boolean forceLocDep;
 
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
     /**
      * {@code Externalizable} support.
      */
@@ -98,6 +102,7 @@ public class DataStreamerRequest implements Message {
      * @param ldrParticipants Loader participants.
      * @param clsLdrId Class loader ID.
      * @param forceLocDep Force local deployment.
+     * @param topVer Topology version.
      */
     public DataStreamerRequest(long reqId,
         byte[] resTopicBytes,
@@ -111,7 +116,10 @@ public class DataStreamerRequest implements Message {
         String userVer,
         Map<UUID, IgniteUuid> ldrParticipants,
         IgniteUuid clsLdrId,
-        boolean forceLocDep) {
+        boolean forceLocDep,
+        @NotNull AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
         this.reqId = reqId;
         this.resTopicBytes = resTopicBytes;
         this.cacheName = cacheName;
@@ -125,6 +133,7 @@ public class DataStreamerRequest implements Message {
         this.ldrParticipants = ldrParticipants;
         this.clsLdrId = clsLdrId;
         this.forceLocDep = forceLocDep;
+        this.topVer = topVer;
     }
 
     /**
@@ -218,6 +227,13 @@ public class DataStreamerRequest implements Message {
         return forceLocDep;
     }
 
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataStreamerRequest.class, this);
@@ -302,12 +318,18 @@ public class DataStreamerRequest implements Message {
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeString("userVer", userVer))
                     return false;
 
@@ -419,7 +441,7 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 11:
-                updaterBytes = reader.readByteArray("updaterBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -427,6 +449,14 @@ public class DataStreamerRequest implements Message {
                 reader.incrementState();
 
             case 12:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 userVer = reader.readString("userVer");
 
                 if (!reader.isLastRead())
@@ -446,6 +476,6 @@ public class DataStreamerRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
new file mode 100644
index 0000000..5eedd8d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Tests for DataStreamer.
+ */
+public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopIgnites() throws Exception {
+        for (int attempt = 0; attempt < 3; ++attempt) {
+            final Ignite ignite = startGrid(0);
+
+            Set<IgniteFuture> futs = new HashSet<>();
+
+            try (final DataStreamerImpl dataLdr = 
(DataStreamerImpl)ignite.dataStreamer(null)) {
+                dataLdr.maxRemapCount(0);
+
+                final AtomicInteger igniteId = new AtomicInteger(1);
+
+                IgniteInternalFuture<?> fut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 1; i < 5; ++i)
+                            startGrid(igniteId.incrementAndGet());
+
+                        return true;
+                    }
+                }, 2, "startedGridThread");
+
+               Random random = new Random();
+
+                while (!fut.isDone())
+                    futs.add(dataLdr.addData(random.nextInt(100000), 
random.nextInt(100000)));
+            }
+
+            for (IgniteFuture f : futs)
+                f.get();
+
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 49fbcbb..9a4451c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -114,6 +114,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
         suite.addTestSuite(DataStreamProcessorSelfTest.class);
+        suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);
         suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class);
         suite.addTestSuite(GridCacheClearAllSelfTest.class);

Reply via email to