# ignite-sprint-5 use consistent topology version in streamer
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc1d427f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc1d427f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc1d427f Branch: refs/heads/ignite-389 Commit: dc1d427fa2f5235a5779058a444e3845946a7e07 Parents: 20e5677 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jun 5 09:54:37 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jun 5 11:21:36 2015 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityProcessor.java | 23 ++++- .../datastreamer/DataStreamerImpl.java | 92 ++++++++++++++------ .../DataStreamerMultiThreadedSelfTest.java | 59 +++++++++---- 3 files changed, 129 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index daa2bc2..aac63c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * * @param cacheName Cache name. * @param key Key to map. + * @param topVer Topology version. * @return Affinity nodes, primary first. * @throws IgniteCheckedException If failed. */ - public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException { + public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, + K key, + AffinityTopologyVersion topVer) + throws IgniteCheckedException + { A.notNull(key, "key"); - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - AffinityInfo affInfo = affinityCache(cacheName, topVer); if (affInfo == null) @@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Map single key to primary and backup nodes. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Affinity nodes, primary first. + * @throws IgniteCheckedException If failed. + */ + public <K> List<ClusterNode> mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) + throws IgniteCheckedException + { + return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx()); + } + + /** * Gets affinity key for cache key. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index d16167a..ed8e573 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -198,19 +198,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Remap regular mappings. final Buffer buf = bufMappings.remove(id); + // Only async notification is possible since + // discovery thread may be trapped otherwise. if (buf != null) { - // Only async notification is possible since - // discovery thread may be trapped otherwise. - ctx.closure().callLocalSafe( - new Callable<Object>() { - @Override public Object call() throws Exception { - buf.onNodeLeft(); - - return null; - } - }, - true /* system pool */ - ); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf.onNodeLeft(); + } + }, discoEvt.topologyVersion(), true); } } }; @@ -248,6 +243,31 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param c Closure to run. + * @param topVer Topology version to wait for. + * @param async Async flag. + */ + private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) { + AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0); + + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + ctx.closure().runLocalSafe(c, true); + } + }); + } + else { + if (async) + ctx.closure().runLocalSafe(c, true); + else + c.run(); + } + } + + /** * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { @@ -527,6 +547,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed boolean initPda = ctx.deploy().enabled() && jobPda == null; + AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion(); + for (DataStreamerEntry entry : entries) { List<ClusterNode> nodes; @@ -543,7 +565,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed initPda = false; } - nodes = nodes(key); + nodes = nodes(key, topVer); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -621,10 +643,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } }; - GridFutureAdapter<?> f; + final GridFutureAdapter<?> f; try { - f = buf.update(entriesForNode, lsnr); + f = buf.update(entriesForNode, topVer, lsnr); } catch (IgniteInterruptedCheckedException e1) { resFut.onDone(e1); @@ -633,30 +655,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (ctx.discovery().node(nodeId) == null) { - if (bufMappings.remove(nodeId, buf)) - buf.onNodeLeft(); + if (bufMappings.remove(nodeId, buf)) { + final Buffer buf0 = buf; - if (f != null) - f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + - "(node has left): " + nodeId)); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf0.onNodeLeft(); + + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + }, ctx.discovery().topologyVersion(), false); + } } } } /** * @param key Key to map. + * @param topVer Topology version. * @return Nodes to send requests to. * @throws IgniteCheckedException If failed. */ - private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException { + private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException { GridAffinityProcessor aff = ctx.affinity(); List<ClusterNode> res = null; if (!allowOverwrite()) - res = aff.mapKeyToPrimaryAndBackups(cacheName, key); + res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer); else { - ClusterNode node = aff.mapKeyToNode(cacheName, key); + ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer); if (node != null) res = Collections.singletonList(node); @@ -959,11 +989,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param newEntries Infos. + * @param topVer Topology version. * @param lsnr Listener for the operation future. * @throws IgniteInterruptedCheckedException If failed. * @return Future for operation. */ @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, + AffinityTopologyVersion topVer, IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException { List<DataStreamerEntry> entries0 = null; GridFutureAdapter<Object> curFut0; @@ -986,7 +1018,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (entries0 != null) { - submit(entries0, curFut0); + submit(entries0, topVer, curFut0); if (cancelled) curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this)); @@ -1023,7 +1055,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (entries0 != null) - submit(entries0, curFut0); + submit(entries0, null, curFut0); // Create compound future for this flush. GridCompoundFuture<Object, Object> res = null; @@ -1068,10 +1100,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param entries Entries to submit. + * @param topVer Topology version. * @param curFut Current future. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut) + private void submit(final Collection<DataStreamerEntry> entries, + @Nullable AffinityTopologyVersion topVer, + final GridFutureAdapter<Object> curFut) throws IgniteInterruptedCheckedException { assert entries != null; assert !entries.isEmpty(); @@ -1160,6 +1195,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed reqs.put(reqId, (GridFutureAdapter<Object>)fut); + if (topVer == null) + topVer = ctx.cache().context().exchange().readyAffinityVersion(); + DataStreamerRequest req = new DataStreamerRequest( reqId, topicBytes, @@ -1174,7 +1212,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - ctx.cache().context().exchange().readyAffinityVersion()); + topVer); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java index 2382a66..e0092d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java @@ -41,6 +41,9 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** */ + private boolean dynamicCache; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -50,13 +53,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoSpi); + if (!dynamicCache) + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { CacheConfiguration ccfg = defaultCacheConfiguration(); ccfg.setCacheMode(PARTITIONED); ccfg.setBackups(1); - cfg.setCacheConfiguration(ccfg); - return cfg; + return ccfg; } /** {@inheritDoc} */ @@ -68,6 +80,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopIgnites() throws Exception { + startStopIgnites(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopIgnitesDynamicCache() throws Exception { + dynamicCache = true; + + startStopIgnites(); + } + + /** + * @throws Exception If failed. + */ + private void startStopIgnites() throws Exception { for (int attempt = 0; attempt < 3; ++attempt) { log.info("Iteration: " + attempt); @@ -75,28 +103,29 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { Set<IgniteFuture> futs = new HashSet<>(); - IgniteInternalFuture<?> fut; + final AtomicInteger igniteId = new AtomicInteger(1); - try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { - dataLdr.maxRemapCount(0); + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 1; i < 5; ++i) + startGrid(igniteId.incrementAndGet()); - final AtomicInteger igniteId = new AtomicInteger(1); + return true; + } + }, 2, "start-node-thread"); - fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 1; i < 5; ++i) - startGrid(igniteId.incrementAndGet()); + if (dynamicCache) + ignite.getOrCreateCache(cacheConfiguration()); - return true; - } - }, 2, "start-node-thread"); + try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { + dataLdr.maxRemapCount(0); - Random random = new Random(); + Random rnd = new Random(); long endTime = U.currentTimeMillis() + 15_000; while (!fut.isDone() && U.currentTimeMillis() < endTime) - futs.add(dataLdr.addData(random.nextInt(100_000), random.nextInt(100_000))); + futs.add(dataLdr.addData(rnd.nextInt(100_000), String.valueOf(rnd.nextInt(100_000)))); } for (IgniteFuture f : futs)