ignite-484-1 - refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4a534059 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4a534059 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4a534059 Branch: refs/heads/ignite-484-1 Commit: 4a534059e4bf46b061e1272d432fd00c6f87acb0 Parents: 2b7dc3b Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Jun 12 17:21:14 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Jun 12 17:21:14 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 10 +- .../dht/GridDhtPartitionsReservation.java | 169 ++++++++++++++----- .../query/h2/twostep/GridMapQueryExecutor.java | 24 +-- 3 files changed, 141 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 3670b8e..018ffd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -143,19 +143,15 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, assert state.getReference() != EVICTED : "we can reserve only active partitions"; assert state.getStamp() != 0 : "partition must be already reserved before adding group reservation"; - if (!reservations.addIfAbsent(r)) - return false; - - r.register(this); - - return true; + return reservations.addIfAbsent(r); } /** * @param r Reservation. */ public void removeReservation(GridDhtPartitionsReservation r) { - reservations.remove(r); + if (!reservations.remove(r)) + throw new IllegalStateException("Reservation was already removed."); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java index fcd6088..71a1859 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java @@ -19,70 +19,126 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; import java.util.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; /** * Reservation mechanism for multiple partitions allowing to do a reservation in one operation. */ public class GridDhtPartitionsReservation implements GridReservable { /** */ + private static final GridDhtLocalPartition[] EMPTY = {}; + + /** */ + private final Object appKey; + + /** */ private final GridCacheContext<?,?> cctx; /** */ private final AffinityTopologyVersion topVer; /** */ - private final List<GridDhtLocalPartition> parts = new ArrayList<>(); + private final AtomicReference<GridDhtLocalPartition[]> parts = new AtomicReference<>(); /** */ private final AtomicInteger reservations = new AtomicInteger(); /** */ - private final IgniteInClosure<GridDhtPartitionsReservation> finalize; + private volatile CI1<GridDhtPartitionsReservation> unpublish; /** * @param topVer AffinityTopologyVersion version. * @param cctx Cache context. - * @param finalize Finalizing closure. + * @param appKey Application key for reservation. */ - public GridDhtPartitionsReservation( - AffinityTopologyVersion topVer, - GridCacheContext<?,?> cctx, - IgniteInClosure<GridDhtPartitionsReservation> finalize) { + public GridDhtPartitionsReservation(AffinityTopologyVersion topVer, GridCacheContext<?,?> cctx, Object appKey) { assert topVer != null; assert cctx != null; + assert appKey != null; this.topVer = topVer; this.cctx = cctx; - this.finalize = finalize; + this.appKey = appKey; } /** - * @return Topology version. + * Registers all the given partitions for this reservation. + * + * @param parts Partitions. + * @return {@code true} If registration succeeded and this reservation can be published. */ - public AffinityTopologyVersion topologyVersion() { - return topVer; - } + public boolean register(Collection<? extends GridReservable> parts) { + assert !F.isEmpty(parts) : "empty partitions list"; - /** - * @return Cache context. - */ - public GridCacheContext<?,?> cacheContext() { - return cctx; + GridDhtLocalPartition[] arr = new GridDhtLocalPartition[parts.size()]; + + int i = 0; + int prevPart = -1; + boolean sorted = true; // Most probably it is a sorted list. + + for (GridReservable part : parts) { + arr[i] = (GridDhtLocalPartition)part; + + if (sorted) { // Make sure it will be a sorted array. + int id = arr[i].id(); + + if (id <= prevPart) + sorted = false; + + prevPart = id; + } + + i++; + } + + if (!sorted) + Arrays.sort(arr); + + i = 0; + prevPart = -1; + + // Register in correct sort order. + for (GridDhtLocalPartition part : arr) { + if (prevPart == part.id()) + throw new IllegalStateException("Duplicated partitions."); + + prevPart = part.id(); + + if (!part.addReservation(this)) { + if (i != 0) + throw new IllegalStateException( + "Trying to reserve different sets of partitions for the same topology version."); + + return false; + } + + i++; + } + + if (!this.parts.compareAndSet(null, arr)) + throw new IllegalStateException("Partitions can be registered only once."); + + return true; } /** - * Registers partition for this group reservation. + * Must be called when this reservation is published. * - * @param part Partition. + * @param unpublish Closure to unpublish this reservation when it will become invalid. */ - public void register(GridDhtLocalPartition part) { - parts.add(part); + public void onPublish(CI1<GridDhtPartitionsReservation> unpublish) { + assert unpublish != null; + assert this.unpublish == null; + + this.unpublish = unpublish; + + if (reservations.get() == -1) + unregister(); } /** @@ -91,6 +147,8 @@ public class GridDhtPartitionsReservation implements GridReservable { * @return {@code true} If succeeded. */ @Override public boolean reserve() { + assert parts.get() != null : "partitions must be registered before the first reserve attempt"; + for (;;) { int r = reservations.get(); @@ -105,6 +163,25 @@ public class GridDhtPartitionsReservation implements GridReservable { } /** + * @param parts Partitions. + */ + private static void tryEvict(GridDhtLocalPartition[] parts) { + if (parts == null) // Can be not initialized yet. + return ; + + for (GridDhtLocalPartition part : parts) + tryEvict(part); + } + + /** + * @param part Partition. + */ + private static void tryEvict(GridDhtLocalPartition part) { + if (part.state() == RENTING && part.reservations() == 0) + part.tryEvictAsync(true); + } + + /** * Releases all the registered partitions. */ @Override public void release() { @@ -116,12 +193,8 @@ public class GridDhtPartitionsReservation implements GridReservable { if (reservations.compareAndSet(r, r - 1)) { // If it was the last reservation and topology version changed -> attempt to evict partitions. - if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) { - for (GridDhtLocalPartition part : parts) { - if (part.state() == RENTING) - part.tryEvictAsync(true); - } - } + if (r == 1 && !topVer.equals(cctx.topology().topologyVersion())) + tryEvict(parts.get()); return; } @@ -129,6 +202,26 @@ public class GridDhtPartitionsReservation implements GridReservable { } /** + * Unregisters this reservation from all the partitions. + */ + private void unregister() { + GridDhtLocalPartition[] arr = parts.get(); + + if (!F.isEmpty(arr) && unpublish != null && parts.compareAndSet(arr, EMPTY)) { + // Reverse order makes sure that addReservation on the same topVer reservation will fail on the first partition. + for (int i = arr.length - 1; i >= 0; i--) { + GridDhtLocalPartition part = arr[i]; + + part.removeReservation(this); + + tryEvict(part); + } + + unpublish.apply(this); + } + } + + /** * Must be checked in {@link GridDhtLocalPartition#tryEvict(boolean)}. * If returns {@code true} then probably partition will be evicted (or at least cleared), * so this reservation object becomes invalid and must be dropped from the partition. @@ -146,12 +239,7 @@ public class GridDhtPartitionsReservation implements GridReservable { return r == -1; if (reservations.compareAndSet(0, -1)) { - // Remove our self. - for (GridDhtLocalPartition part : parts) - part.removeReservation(this); - - if (finalize != null) - finalize.apply(this); + unregister(); return true; } @@ -169,13 +257,18 @@ public class GridDhtPartitionsReservation implements GridReservable { GridDhtPartitionsReservation that = (GridDhtPartitionsReservation)o; - return topVer.equals(that.topVer) && cctx == that.cctx; + return cctx == that.cctx && topVer.equals(that.topVer) && appKey.equals(that.appKey); } /** {@inheritDoc} */ @Override public int hashCode() { - String cache = cctx.name(); + String name = cctx.name(); + + int result = name == null ? 0 : name.hashCode(); + + result = 31 * result + appKey.hashCode(); + result = 31 * result + topVer.hashCode(); - return 31 * topVer.hashCode() + cache == null ? 0 : cache.hashCode(); + return result; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4a534059/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index a8bc6e0..42f01cb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -290,27 +290,17 @@ public class GridMapQueryExecutor { if (explicitParts == null) { // We reserved all the primary partitions for cache, attempt to add group reservation. - GridDhtPartitionsReservation reservation = new GridDhtPartitionsReservation(topVer, cctx, - new CI1<GridDhtPartitionsReservation>() { + GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); + + if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) { + if (reservations.putIfAbsent(grpKey, grp) != null) + throw new IllegalStateException("Reservation already exists."); + + grp.onPublish(new CI1<GridDhtPartitionsReservation>() { @Override public void apply(GridDhtPartitionsReservation r) { reservations.remove(grpKey, r); } }); - - for (int p = reserved.size() - partIds.size(); p < reserved.size(); p++) { - if (!((GridDhtLocalPartition)reserved.get(p)).addReservation(reservation)) { - // Can fail to add only on the first partition because of the same order of partitions. - assert p == reserved.size() - partIds.size() : p; - - reservation = null; - - break; - } - } - - if (reservation != null) { // If we were able to add reservation to all partitions, publish it. - if (reservations.putIfAbsent(grpKey, reservation) != null) - throw new IllegalStateException(); } } }