IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aa11f644 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aa11f644 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aa11f644 Branch: refs/heads/ignite-104 Commit: aa11f6446f14174e0ca4e67b85b1403ec6ed7016 Parents: 112c567 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Jul 29 18:01:36 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Jul 29 18:01:36 2015 -0700 ---------------------------------------------------------------------- .../dht/atomic/GridAtomicMappingKey.java | 86 ------------------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 87 +++++++++++++++++--- 2 files changed, 75 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java deleted file mode 100644 index 52e3c7f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Mapping Key. - */ -class GridAtomicMappingKey { - /** Node ID. */ - private final UUID nodeId; - - /** Partition. */ - private final int part; - - /** - * @param nodeId Node ID. - * @param part Partition. - */ - GridAtomicMappingKey(UUID nodeId, int part) { - assert nodeId != null; - assert part >= -1 : part; - - this.nodeId = nodeId; - this.part = part; - } - - /** - * @return Node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** - * @return Partition. - */ - int partition() { - return part; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridAtomicMappingKey key = (GridAtomicMappingKey)o; - - return nodeId.equals(key.nodeId) && part == key.part; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + part; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAtomicMappingKey.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 8595dc7..93c20da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Mappings. */ @GridToStringInclude - private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; @@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() { - @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) { + return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() { + @Override public ClusterNode apply(MappingKey mappingKey) { return cctx.kernalContext().discovery().node(mappingKey.nodeId()); } }), F.notNull()); @@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); - Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size()); + Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size()); - for (GridAtomicMappingKey mappingKey : mappings.keySet()) { + for (MappingKey mappingKey : mappings.keySet()) { if (mappingKey.nodeId().equals(nodeId)) mappingKeys.add(mappingKey); } if (!mappingKeys.isEmpty()) { - for (GridAtomicMappingKey mappingKey : mappingKeys) + for (MappingKey mappingKey : mappingKeys) mappings.remove(mappingKey); checkComplete(); @@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); - GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); + MappingKey mappingKey = new MappingKey(nodeId, part); if (!nodeId.equals(cctx.localNodeId())) { GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1; for (UUID nodeId : readers) { - GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part); + MappingKey mappingKey = new MappingKey(nodeId, part); GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey); @@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public void map() { if (!mappings.isEmpty()) { - for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { - GridAtomicMappingKey mappingKey = e.getKey(); + for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) { + MappingKey mappingKey = e.getKey(); GridDhtAtomicUpdateRequest req = e.getValue(); UUID nodeId = mappingKey.nodeId(); @@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } - mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition())); + mappings.remove(new MappingKey(nodeId, updateRes.partition())); checkComplete(); } @@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); for (Integer part : res.partitions()) - mappings.remove(new GridAtomicMappingKey(nodeId, part)); + mappings.remove(new MappingKey(nodeId, part)); checkComplete(); } @@ -468,4 +468,67 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return S.toString(GridDhtAtomicUpdateFuture.class, this); } + /** + * Mapping Key. + */ + private static class MappingKey { + /** Node ID. */ + private final UUID nodeId; + + /** Partition. */ + private final int part; + + /** + * @param nodeId Node ID. + * @param part Partition. + */ + MappingKey(UUID nodeId, int part) { + assert nodeId != null; + assert part >= -1 : part; + + this.nodeId = nodeId; + this.part = part; + } + + /** + * @return Node ID. + */ + UUID nodeId() { + return nodeId; + } + + /** + * @return Partition. + */ + int partition() { + return part; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MappingKey key = (MappingKey)o; + + return nodeId.equals(key.nodeId) && part == key.part; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + part; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MappingKey.class, this); + } + } }