Repository: incubator-ignite Updated Branches: refs/heads/ignite-51 736158dac -> 90172c7c8
# ignite-51 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/90172c7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/90172c7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/90172c7c Branch: refs/heads/ignite-51 Commit: 90172c7c863b9f710c7d047c1dd8ee7342ec3e31 Parents: 736158d Author: sboikov <sboi...@gridgain.com> Authored: Tue Mar 3 12:15:38 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Mar 3 14:08:11 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 + .../processors/cache/CacheEvictionEntry.java | 179 +++++++++++++++++++ .../cache/GridCacheEvictionManager.java | 45 +++-- .../cache/GridCacheEvictionRequest.java | 36 ++-- .../processors/cache/GridCacheMessage.java | 75 ++------ ...actQueueFailoverDataConsistencySelfTest.java | 2 +- .../cache/spring/SpringDynamicCacheManager.java | 26 ++- 7 files changed, 264 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 57b5ac4..0100b2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -534,6 +534,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 97: + msg = new CacheEvictionEntry(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionEntry.java new file mode 100644 index 0000000..a5576f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictionEntry.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; + +/** + * + */ +public class CacheEvictionEntry implements Message { + /** */ + @GridToStringInclude + private KeyCacheObject key; + + /** */ + @GridToStringInclude + private GridCacheVersion ver; + + /** */ + private boolean near; + + /** + * Required by {@link Message}. + */ + public CacheEvictionEntry() { + // No-op. + } + + /** + * @param key Key. + * @param ver Version. + * @param near {@code true} if key should be evicted from near cache. + */ + public CacheEvictionEntry(KeyCacheObject key, GridCacheVersion ver, boolean near) { + this.key = key; + this.ver = ver; + this.near = near; + } + + /** + * @return Key. + */ + public KeyCacheObject key() { + return key; + } + + /** + * @return Version. + */ + public GridCacheVersion version() { + return ver; + } + + /** + * @return {@code True} if key should be evicted from near cache. + */ + public boolean near() { + return near; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 97; + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + key.prepareMarshal(ctx.cacheObjectContext()); + } + + /** + * @param ctx Context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + key.finishUnmarshal(ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeBoolean("near", near)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + near = reader.readBoolean("near"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + ver = reader.readMessage("ver"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index e3a9b1f..e34cb66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -383,27 +383,25 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V // Partition -> {{Key, Version}, ...}. // Group DHT and replicated cache entries by their partitions. - Map<Integer, Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>>> dhtEntries = - new HashMap<>(); + Map<Integer, Collection<CacheEvictionEntry>> dhtEntries = new HashMap<>(); - Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>> nearEntries = - new LinkedList<>(); + Collection<CacheEvictionEntry> nearEntries = new LinkedList<>(); - for (GridTuple3<KeyCacheObject, GridCacheVersion, Boolean> t : req.entries()) { - Boolean near = t.get3(); + for (CacheEvictionEntry e : req.entries()) { + boolean near = e.near(); if (!near) { // Lock is required. - Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>> col = - F.addIfAbsent(dhtEntries, cctx.affinity().partition(t.get1()), - new LinkedList<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>>()); + Collection<CacheEvictionEntry> col = + F.addIfAbsent(dhtEntries, cctx.affinity().partition(e.key()), + new LinkedList<CacheEvictionEntry>()); assert col != null; - col.add(t); + col.add(e); } else - nearEntries.add(t); + nearEntries.add(e); } GridCacheEvictionResponse res = new GridCacheEvictionResponse(cctx.cacheId(), req.futureId()); @@ -411,17 +409,16 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V GridCacheVersion obsoleteVer = cctx.versions().next(); // DHT and replicated cache entries. - for (Map.Entry<Integer, Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>>> e : - dhtEntries.entrySet()) { + for (Map.Entry<Integer, Collection<CacheEvictionEntry>> e : dhtEntries.entrySet()) { int part = e.getKey(); boolean locked = lockPartition(part); // Will return false if preloading is disabled. try { - for (GridTuple3<KeyCacheObject, GridCacheVersion, Boolean> t : e.getValue()) { - KeyCacheObject key = t.get1(); - GridCacheVersion ver = t.get2(); - Boolean near = t.get3(); + for (CacheEvictionEntry t : e.getValue()) { + KeyCacheObject key = t.key(); + GridCacheVersion ver = t.version(); + boolean near = t.near(); assert !near; @@ -446,10 +443,10 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V } // Near entries. - for (GridTuple3<KeyCacheObject, GridCacheVersion, Boolean> t : nearEntries) { - KeyCacheObject key = t.get1(); - GridCacheVersion ver = t.get2(); - Boolean near = t.get3(); + for (CacheEvictionEntry t : nearEntries) { + KeyCacheObject key = t.key(); + GridCacheVersion ver = t.version(); + boolean near = t.near(); assert near; @@ -1830,12 +1827,12 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V GridCacheEvictionRequest req = reqMap.remove(nodeId); - for (GridTuple3<KeyCacheObject, GridCacheVersion, Boolean> t : req.entries()) { - EvictionInfo info = entries.get(t.get1()); + for (CacheEvictionEntry t : req.entries()) { + EvictionInfo info = entries.get(t.key()); assert info != null; - rejectedEntries.put(t.get1(), info); + rejectedEntries.put(t.key(), info); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java index 7807ee6..661bb06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionRequest.java @@ -42,12 +42,8 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa /** Entries to clear from near and backup nodes. */ @GridToStringInclude - @GridDirectTransient - private Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>> entries; - - /** Serialized entries. */ - @GridToStringExclude - private byte[] entriesBytes; + @GridDirectCollection(CacheEvictionEntry.class) + private Collection<CacheEvictionEntry> entries; /** Topology version. */ private long topVer; @@ -84,10 +80,16 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa super.prepareMarshal(ctx); if (entries != null) { - if (ctx.deploymentEnabled()) - prepareObjects(entries, ctx); + boolean depEnabled = ctx.deploymentEnabled(); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + for (CacheEvictionEntry e : entries) { + e.prepareMarshal(cctx); - entriesBytes = ctx.marshaller().marshal(entries); + if (depEnabled) + prepareObject(e.key().value(cctx, false), ctx); + } } } @@ -95,8 +97,12 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (entriesBytes != null) - entries = ctx.marshaller().unmarshal(entriesBytes, ldr); + if (entries != null) { + GridCacheContext cctx = ctx.cacheContext(cacheId); + + for (CacheEvictionEntry e : entries) + e.finishUnmarshal(cctx, ldr); + } } /** @@ -109,7 +115,7 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa /** * @return Entries - {{Key, Version, Boolean (near or not)}, ...}. */ - Collection<GridTuple3<KeyCacheObject, GridCacheVersion, Boolean>> entries() { + Collection<CacheEvictionEntry> entries() { return entries; } @@ -131,7 +137,7 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa assert key != null; assert ver != null; - entries.add(F.t(key, ver, near)); + entries.add(new CacheEvictionEntry(key, ver, near)); } /** {@inheritDoc} */ @@ -155,7 +161,7 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa switch (writer.state()) { case 3: - if (!writer.writeByteArray("entriesBytes", entriesBytes)) + if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -189,7 +195,7 @@ public class GridCacheEvictionRequest extends GridCacheMessage implements GridCa switch (reader.state()) { case 3: - entriesBytes = reader.readByteArray("entriesBytes"); + entries = reader.readCollection("entries", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index fffa35f..2810e20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -278,8 +278,8 @@ public abstract class GridCacheMessage implements Message { info.marshal(ctx); if (ctx.deploymentEnabled()) { - prepareObject(info.key(), ctx.shared()); - prepareObject(info.value(), ctx.shared()); + prepareObject(info.key().value(ctx, false), ctx.shared()); + prepareObject(CU.value(info.value(), ctx, false), ctx.shared()); } } } @@ -495,61 +495,6 @@ public abstract class GridCacheMessage implements Message { } /** - * @param col Values collection to marshal. - * @param ctx Context. - * @return Marshaled collection. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected List<GridCacheValueBytes> marshalValuesCollection(@Nullable Collection<?> col, - GridCacheSharedContext ctx) throws IgniteCheckedException { - assert ctx != null; - - if (col == null) - return null; - - List<GridCacheValueBytes> byteCol = new ArrayList<>(col.size()); - - for (Object o : col) { - if (ctx.deploymentEnabled()) - prepareObject(o, ctx); - - byteCol.add(o == null ? null : o instanceof byte[] ? GridCacheValueBytes.plain(o) : - GridCacheValueBytes.marshaled(CU.marshal(ctx, o))); - } - - return byteCol; - } - - /** - * @param byteCol Collection to unmarshal. - * @param ctx Context. - * @param ldr Loader. - * @return Unmarshalled collection. - * @throws IgniteCheckedException If failed. - */ - @Nullable protected <T> List<T> unmarshalValueBytesCollection(@Nullable Collection<GridCacheValueBytes> byteCol, - GridCacheSharedContext ctx, ClassLoader ldr) - throws IgniteCheckedException { - assert ldr != null; - assert ctx != null; - - if (byteCol == null) - return null; - - List<T> col = new ArrayList<>(byteCol.size()); - - Marshaller marsh = ctx.marshaller(); - - for (GridCacheValueBytes item : byteCol) { - assert item == null || item.get() != null; - - col.add(item != null ? item.isPlain() ? (T)item.get() : marsh.<T>unmarshal(item.get(), ldr) : null); - } - - return col; - } - - /** * @param col Collection to marshal. * @param ctx Context. * @return Marshalled collection. @@ -587,11 +532,17 @@ public abstract class GridCacheMessage implements Message { int size = col.size(); + boolean depEnabled = ctx.deploymentEnabled(); + for (int i = 0 ; i < size; i++) { CacheObject obj = col.get(i); - if (obj != null) + if (obj != null) { obj.prepareMarshal(ctx.cacheObjectContext()); + + if (depEnabled) + prepareObject(obj.value(ctx, false), ctx.shared()); + } } } @@ -605,9 +556,15 @@ public abstract class GridCacheMessage implements Message { if (col == null) return; + boolean depEnabled = ctx.deploymentEnabled(); + for (CacheObject obj : col) { - if (obj != null) + if (obj != null) { obj.prepareMarshal(ctx.cacheObjectContext()); + + if (depEnabled) + prepareObject(obj.value(ctx, false), ctx.shared()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java index 02f622f..8b240f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java @@ -357,7 +357,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte for (int i = 0; i < gridCount(); i++) { for (GridCacheEntryEx e : ((IgniteKernal)grid(i)).context().cache().internalCache(cctx.name()).map().allEntries0()) { - if (aff.primary(grid(i).localNode(), e.key(), -1) && e.key() instanceof GridCacheQueueHeaderKey) + if (aff.primary(grid(i).localNode(), e.key(), -1) && e.key().value(cctx, false) instanceof GridCacheQueueHeaderKey) return i; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90172c7c/modules/spring/src/main/java/org/apache/ignite/cache/spring/SpringDynamicCacheManager.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/spring/SpringDynamicCacheManager.java b/modules/spring/src/main/java/org/apache/ignite/cache/spring/SpringDynamicCacheManager.java index 635e794..fc37f09 100644 --- a/modules/spring/src/main/java/org/apache/ignite/cache/spring/SpringDynamicCacheManager.java +++ b/modules/spring/src/main/java/org/apache/ignite/cache/spring/SpringDynamicCacheManager.java @@ -121,11 +121,7 @@ public class SpringDynamicCacheManager extends SpringCacheManager { if (cache == null) { cache = new SpringCache(name, grid, dataCache.projection(new ProjectionFilter(name)), - new IgniteClosure<Object, Object>() { - @Override public Object apply(Object o) { - return new DataKey(name, o); - } - }); + new DataKeyFactory(name)); org.springframework.cache.Cache old = metaCache.putIfAbsent(key, cache); @@ -161,6 +157,26 @@ public class SpringDynamicCacheManager extends SpringCacheManager { } /** + * + */ + private static class DataKeyFactory implements IgniteClosure<Object, Object> { + /** */ + private String name; + + /** + * @param name Name. + */ + public DataKeyFactory(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public Object apply(Object o) { + return new DataKey(name, o); + } + } + + /** * Meta key. */ private static class MetaKey extends GridCacheUtilityKey<MetaKey> implements Externalizable {