Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 9699e1f8c -> 64602bba6
# ignite-901 WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/64602bba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/64602bba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/64602bba Branch: refs/heads/ignite-901 Commit: 64602bba63723d99bb88783c2c7e3a29d033a1dc Parents: 9699e1f Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 6 10:20:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 6 10:20:18 2015 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 26 ++++++++++++++++++++ .../datastructures/DataStructuresProcessor.java | 3 +++ .../datastructures/GridCacheQueueAdapter.java | 11 +++++++++ .../datastructures/GridCacheSetImpl.java | 13 ++++++++++ .../datastructures/GridCacheSetProxy.java | 7 ++++++ .../processors/task/GridTaskProcessor.java | 15 +++++++++-- .../IgniteClientReconnectCacheTest.java | 6 ++--- .../IgniteClientReconnectCollectionsTest.java | 18 +++++++------- 8 files changed, 84 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index b5c5161..3691ee6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -106,6 +106,32 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** + * Client reconnect callback. + * @throws IgniteCheckedException If failed. + */ + public void onReconnected() throws IgniteCheckedException { + for (Map.Entry<IgniteUuid, GridCacheSetProxy> e : setsMap.entrySet()) { + GridCacheSetProxy set = e.getValue(); + + if (!set.delegate().checkHeader()) { + set.blockOnRemove(); + + setsMap.remove(e.getKey(), e.getValue()); + } + } + + for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) { + GridCacheQueueProxy queue = e.getValue(); + + if (!queue.delegate().checkHeader()) { + queue.delegate().onRemoved(false); + + queuesMap.remove(e.getKey(), e.getValue()); + } + } + } + + /** * @throws IgniteCheckedException If thread is interrupted or manager * was not successfully initialized. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 4637bd0..b2c3c04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -198,6 +198,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { e.getValue().onRemoved(); } } + + for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) + cctx.dataStructures().onReconnected(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 9fd7356..f62c4eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -106,6 +106,17 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp writeSem = bounded() ? new Semaphore(hdr.capacity() - hdr.size(), true) : null; } + /** + * @return {@code True} if queue header found in cache. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public boolean checkHeader() throws IgniteCheckedException { + GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey); + + return !queueRemoved(hdr, id); + } + /** {@inheritDoc} */ @Override public String name() { return queueName; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f74fe95..936662a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -101,6 +101,19 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite return rmvd; } + /** + * @return {@code True} if set header found in cache. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public boolean checkHeader() throws IgniteCheckedException { + IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 = ctx.cache(); + + GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name)); + + return hdr != null && hdr.id().equals(id); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public int size() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index ba43da7..38c124a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -78,6 +78,13 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { } /** + * @return Set delegate. + */ + public GridCacheSetImpl delegate() { + return delegate; + } + + /** * Remove callback. */ public void blockOnRemove() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 39f6bd5..22add73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -76,8 +76,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { private final LongAdder8 execTasks = new LongAdder8(); /** */ - private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = - new ThreadLocal<>(); + private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = new ThreadLocal<>(); /** */ private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); @@ -119,6 +118,18 @@ public class GridTaskProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Client node disconnected."); + + for (GridTaskWorker<?, ?> worker : tasks.values()) { + worker.cancel(); + + worker.finishTask(null, err); + } + } + + /** {@inheritDoc} */ @SuppressWarnings("TooBroadScope") @Override public void onKernalStop(boolean cancel) { lock.writeLock(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 258eef9..f9e2a9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -252,8 +252,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final CountDownLatch reconnectLatch = new CountDownLatch(1); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); @@ -521,8 +520,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac }); GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { return clientCache.get(1); } }, IllegalStateException.class, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/64602bba/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 54e1329..98be9f2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -83,20 +83,20 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA /** * @throws Exception If failed. */ - public void testQueueReconnectInProg() throws Exception { + public void testQueueReconnectInProgress() throws Exception { CollectionConfiguration colCfg = new CollectionConfiguration(); colCfg.setCacheMode(PARTITIONED); colCfg.setAtomicityMode(TRANSACTIONAL); - queueReconnectInProg(colCfg); + queueReconnectInProgress(colCfg); colCfg = new CollectionConfiguration(); colCfg.setCacheMode(PARTITIONED); colCfg.setAtomicityMode(ATOMIC); - queueReconnectInProg(colCfg); + queueReconnectInProgress(colCfg); } /** @@ -121,7 +121,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA /** * @throws Exception If failed. */ - public void testSetReconnectRemove() throws Exception { + public void testSetReconnectRemoved() throws Exception { CollectionConfiguration colCfg = new CollectionConfiguration(); colCfg.setCacheMode(PARTITIONED); @@ -140,20 +140,20 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA /** * @throws Exception If failed. */ - public void testSetReconnectInProg() throws Exception { + public void testSetReconnectInProgress() throws Exception { CollectionConfiguration colCfg = new CollectionConfiguration(); colCfg.setCacheMode(PARTITIONED); colCfg.setAtomicityMode(ATOMIC); - setReconnectInProg(colCfg); + setReconnectInProgress(colCfg); colCfg = new CollectionConfiguration(); colCfg.setCacheMode(PARTITIONED); colCfg.setAtomicityMode(TRANSACTIONAL); - setReconnectInProg(colCfg); + setReconnectInProgress(colCfg); } /** @@ -230,7 +230,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA * @param colCfg Collection configuration. * @throws Exception If failed. */ - private void setReconnectInProg(final CollectionConfiguration colCfg) throws Exception { + private void setReconnectInProgress(final CollectionConfiguration colCfg) throws Exception { Ignite client = grid(serverCount()); assertTrue(client.cluster().localNode().isClient()); @@ -358,7 +358,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA * @param colCfg Collection configuration. * @throws Exception If failed. */ - private void queueReconnectInProg(final CollectionConfiguration colCfg) throws Exception { + private void queueReconnectInProgress(final CollectionConfiguration colCfg) throws Exception { Ignite client = grid(serverCount()); assertTrue(client.cluster().localNode().isClient());