ignite-484-1 - improved retry
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94060c9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94060c9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94060c9e Branch: refs/heads/ignite-843 Commit: 94060c9ef41161c7262a28044ddb176f86814b01 Parents: 10febf2 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Jun 17 19:46:42 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Jun 17 19:46:42 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 26 ++++-- .../h2/twostep/GridReduceQueryExecutor.java | 86 ++++++++++++++------ ...lientQueryReplicatedNodeRestartSelfTest.java | 50 ++++++++++-- 3 files changed, 125 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index aaf64ee..2503a87 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -48,6 +48,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*; @@ -230,6 +231,15 @@ public class GridMapQueryExecutor { } /** + * @param cctx Cache context. + * @param p Partition ID. + * @return Partition. + */ + private GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) { + return cctx.topology().localPartition(p, NONE, false); + } + + /** * @param cacheNames Cache names. * @param topVer Topology version. * @param explicitParts Explicit partitions list. @@ -263,10 +273,12 @@ public class GridMapQueryExecutor { GridReservable r = reservations.get(grpKey); if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (!r.reserve()) - return false; // We need explicit partitions here -> retry. + if (r != ReplicatedReservation.INSTANCE) { + if (!r.reserve()) + return false; // We need explicit partitions here -> retry. - reserved.add(r); + reserved.add(r); + } } else { // Try to reserve partitions one by one. int partsCnt = cctx.affinity().partitions(); @@ -274,7 +286,7 @@ public class GridMapQueryExecutor { if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. if (r == null) { // Check only once. for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false); + GridDhtLocalPartition part = partition(cctx, p); // We don't need to reserve partitions because they will not be evicted in replicated caches. if (part == null || part.state() != OWNING) @@ -290,7 +302,7 @@ public class GridMapQueryExecutor { partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); for (int partId : partIds) { - GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); + GridDhtLocalPartition part = partition(cctx, partId); if (part == null || part.state() != OWNING || !part.reserve()) return false; @@ -806,12 +818,12 @@ public class GridMapQueryExecutor { /** {@inheritDoc} */ @Override public boolean reserve() { - return true; + throw new IllegalStateException(); } /** {@inheritDoc} */ @Override public void release() { - // No-op. + throw new IllegalStateException(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index c570d24..6635dde 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -335,7 +335,7 @@ public class GridReduceQueryExecutor { ) { String space = cctx.name(); - Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer)); + Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer)); if (F.isEmpty(nodes)) throw new CacheException("No data nodes found for cache: " + space); @@ -351,7 +351,7 @@ public class GridReduceQueryExecutor { throw new CacheException("Queries running on replicated cache should not contain JOINs " + "with partitioned tables."); - Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer); + Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer); if (F.isEmpty(extraNodes)) throw new CacheException("No data nodes found for cache: " + extraSpace); @@ -398,7 +398,18 @@ public class GridReduceQueryExecutor { * @return Cursor. */ public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) { - for (;;) { + for (int attempt = 0;; attempt++) { + if (attempt != 0) { + try { + Thread.sleep(attempt * 10); // Wait for exchange. + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new CacheException("Query was interrupted.", e); + } + } + long qryReqId = reqIdGen.incrementAndGet(); QueryRun r = new QueryRun(); @@ -422,9 +433,9 @@ public class GridReduceQueryExecutor { if (isPreloadingActive(cctx, extraSpaces)) { if (cctx.isReplicated()) - nodes = replicatedDataNodes(cctx, extraSpaces); + nodes = replicatedUnstableDataNodes(cctx, extraSpaces); else { - partsMap = partitionLocations(cctx, extraSpaces); + partsMap = partitionedUnstableDataNodes(cctx, extraSpaces); nodes = partsMap == null ? null : partsMap.keySet(); } @@ -538,9 +549,6 @@ public class GridReduceQueryExecutor { catch (IgniteCheckedException | RuntimeException e) { U.closeQuiet(r.conn); - if (e instanceof CacheException) - throw (CacheException)e; - throw new CacheException("Failed to run reduce query locally.", e); } finally { @@ -559,10 +567,14 @@ public class GridReduceQueryExecutor { * @param extraSpaces Extra spaces. * @return Collection of all data nodes owning all the caches or {@code null} for retry. */ - private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx, + List<String> extraSpaces) { assert cctx.isReplicated() : cctx.name() + " must be replicated"; - Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx); + Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx); + + if (F.isEmpty(nodes)) + return null; // Retry. if (!F.isEmpty(extraSpaces)) { for (String extraSpace : extraSpaces) { @@ -575,7 +587,12 @@ public class GridReduceQueryExecutor { throw new CacheException("Queries running on replicated cache should not contain JOINs " + "with partitioned tables."); - nodes.retainAll(owningReplicatedDataNodes(extraCctx)); + Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx); + + if (F.isEmpty(extraOwners)) + return null; // Retry. + + nodes.retainAll(extraOwners); if (nodes.isEmpty()) return null; // Retry. @@ -586,34 +603,43 @@ public class GridReduceQueryExecutor { } /** + * @param space Cache name. + * @param topVer Topology version. + * @return Collection of data nodes. + */ + private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) { + Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer); + + return res != null ? res : Collections.<ClusterNode>emptySet(); + } + + /** * Collects all the nodes owning all the partitions for the given replicated cache. * * @param cctx Cache context. - * @return Owning nodes. + * @return Owning nodes or {@code null} if we can't find owners for some partitions. */ - private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) { + private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) { assert cctx.isReplicated() : cctx.name() + " must be replicated"; String space = cctx.name(); - Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE)); + Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE)); if (dataNodes.isEmpty()) throw new CacheException("No data nodes found for cache '" + space + "'"); // Find all the nodes owning all the partitions for replicated cache. - for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) { + for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = cctx.topology().owners(p); - if (owners.isEmpty()) - throw new CacheException("No data nodes found for cache '" + space + - "' for partition " + p); + if (F.isEmpty(owners)) + return null; // Retry. dataNodes.retainAll(owners); if (dataNodes.isEmpty()) - throw new CacheException("No data nodes found for cache '" + space + - "' owning all the partitions."); + return null; // Retry. } return dataNodes; @@ -627,7 +653,8 @@ public class GridReduceQueryExecutor { * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. */ @SuppressWarnings("unchecked") - private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx, + List<String> extraSpaces) { assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; final int partsCnt = cctx.affinity().partitions(); @@ -653,8 +680,12 @@ public class GridReduceQueryExecutor { for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = cctx.topology().owners(p); - if (F.isEmpty(owners)) + if (F.isEmpty(owners)) { + if (!F.isEmpty(dataNodes(cctx.name(), NONE))) + return null; // Retry. + throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p); + } partLocs[p] = new HashSet<>(owners); } @@ -671,9 +702,13 @@ public class GridReduceQueryExecutor { for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = extraCctx.topology().owners(p); - if (F.isEmpty(owners)) + if (F.isEmpty(owners)) { + if (!F.isEmpty(dataNodes(extraSpace, NONE))) + return null; // Retry. + throw new CacheException("No data nodes found for cache '" + extraSpace + "' for partition " + p); + } if (partLocs[p] == null) partLocs[p] = new HashSet<>(owners); @@ -693,7 +728,10 @@ public class GridReduceQueryExecutor { if (!extraCctx.isReplicated()) continue; - Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx); + Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx); + + if (F.isEmpty(dataNodes)) + return null; // Retry. for (Set<ClusterNode> partLoc : partLocs) { partLoc.retainAll(dataNodes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94060c9e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java index 23f44c0..3f23005 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java @@ -64,6 +64,9 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom }; /** */ + private static final List<List<?>> FAKE = new LinkedList<>(); + + /** */ private static final int GRID_CNT = 5; /** */ @@ -191,7 +194,7 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom public void testRestarts() throws Exception { int duration = 90 * 1000; int qryThreadNum = 5; - int restartThreadsNum = 2; // 2 of 4 data nodes + int restartThreadsNum = 3; // 3 of 4 data nodes final int nodeLifeTime = 2 * 1000; final int logFreq = 10; @@ -212,13 +215,32 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom final AtomicInteger qryCnt = new AtomicInteger(); final AtomicBoolean qrysDone = new AtomicBoolean(); + final List<Integer> cacheSize = new ArrayList<>(4); + for (int i = 0; i < GRID_CNT - 1; i++) { - for (String cacheName : F.asList("co", "pr", "pe", "pu")) - assertClient(grid(i).cache(cacheName), false); + int j = 0; + + for (String cacheName : F.asList("co", "pr", "pe", "pu")) { + IgniteCache<?,?> cache = grid(i).cache(cacheName); + + assertClient(cache, false); + + if (i == 0) + cacheSize.add(cache.size()); + else + assertEquals(cacheSize.get(j++).intValue(), cache.size()); + } } - for (String cacheName : F.asList("co", "pr", "pe", "pu")) - assertClient(grid(GRID_CNT - 1).cache(cacheName), true); + int j = 0; + + for (String cacheName : F.asList("co", "pr", "pe", "pu")) { + IgniteCache<?,?> cache = grid(GRID_CNT - 1).cache(cacheName); + + assertClient(cache, true); + + assertEquals(cacheSize.get(j++).intValue(), cache.size()); + } final IgniteCache<?,?> clientCache = grid(GRID_CNT - 1).cache("pu"); @@ -234,8 +256,10 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom if (smallPageSize) qry.setPageSize(3); + List<List<?>> res; + try { - assertEquals(pRes, clientCache.query(qry).getAll()); + res = clientCache.query(qry).getAll(); } catch (CacheException e) { assertTrue("On large page size must retry.", smallPageSize); @@ -259,6 +283,20 @@ public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCom fail("Must fail inside of GridResultPage.fetchNextPage or subclass."); } + + res = FAKE; + } + + if (res != FAKE && !res.equals(pRes)) { + int j = 0; + + // Check for data loss. + for (String cacheName : F.asList("co", "pr", "pe", "pu")) { + assertEquals(cacheName, cacheSize.get(j++).intValue(), + grid(GRID_CNT - 1).cache(cacheName).size()); + } + + assertEquals(pRes, res); // Fail with nice message. } int c = qryCnt.incrementAndGet();