ignite-946: introduced VersionedEntry, supported versioned entry for Cache.invoke/randomEntry/localEntries methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/01c02465 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/01c02465 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/01c02465 Branch: refs/heads/ignite-264 Commit: 01c02465ff6924842644bf5b3447d324966cc5f9 Parents: 7ed4d15 Author: Denis Magda <dma...@gridgain.com> Authored: Thu Jul 30 13:50:40 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 31 15:49:10 2015 +0300 ---------------------------------------------------------------------- .../ignite/cache/version/VersionedEntry.java | 73 +++++++++++++ .../ignite/cache/version/package-info.java | 21 ++++ .../processors/cache/CacheEntryImpl.java | 20 ++++ .../processors/cache/CacheInvokeEntry.java | 24 ++++- .../cache/CacheVersionedEntryImpl.java | 80 --------------- .../processors/cache/GridCacheAdapter.java | 13 ++- .../processors/cache/GridCacheMapEntry.java | 42 +++++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 3 +- .../local/atomic/GridLocalAtomicCache.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxEntry.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 3 +- .../cache/version/CacheVersionedEntryImpl.java | 102 +++++++++++++++++++ .../resources/META-INF/classnames.properties | 2 +- 15 files changed, 287 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java new file mode 100644 index 0000000..6f9d8f6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/VersionedEntry.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.version; + +import javax.cache.*; +import java.util.*; + +/** + * Cache entry along with version information. + */ +public interface VersionedEntry<K, V> extends Cache.Entry<K, V> { + /** + * Versions comparator. + */ + public static final Comparator<VersionedEntry> VERSIONS_COMPARATOR = new Comparator<VersionedEntry>() { + @Override public int compare(VersionedEntry o1, VersionedEntry o2) { + int res = Integer.compare(o1.topologyVersion(), o2.topologyVersion()); + + if (res != 0) + return res; + + res = Long.compare(o1.order(), o2.order()); + + if (res != 0) + return res; + + return Integer.compare(o1.nodeOrder(), o2.nodeOrder()); + } + }; + + /** + * Gets entry's topology version. + * + * @return Topology version plus number of seconds from the start time of the first grid node. + */ + public int topologyVersion(); + + /** + * Gets entry's order. + * + * @return Version order. + */ + public long order(); + + /** + * Gets entry's node order. + * + * @return Node order on which this version was assigned. + */ + public int nodeOrder(); + + /** + * Gets entry's global time. + * + * @return Adjusted time. + */ + public long globalTime(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java new file mode 100644 index 0000000..9aeaba2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/version/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains cache version based implementations. + */ +package org.apache.ignite.cache.version; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java index 3bd7ef4..98f3616 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.version.*; + import javax.cache.*; import java.io.*; @@ -33,6 +36,9 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, V>, Externalizable { /** */ private V val; + /** Entry version. */ + private GridCacheVersion ver; + /** * Required by {@link Externalizable}. */ @@ -49,6 +55,17 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, V>, Externalizable { this.val = val; } + /** + * @param key Key. + * @param val Value. + * @param ver Entry version. + */ + public CacheEntryImpl(K key, V val, GridCacheVersion ver) { + this.key = key; + this.val = val; + this.ver = ver; + } + /** {@inheritDoc} */ @Override public K getKey() { return key; @@ -65,6 +82,9 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, V>, Externalizable { if(cls.isAssignableFrom(getClass())) return cls.cast(this); + if (ver != null && cls.isAssignableFrom(VersionedEntry.class)) + return (T)new CacheVersionedEntryImpl<>(key, val, ver); + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index 2817748..e6f8d4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -35,17 +37,23 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta /** */ private V oldVal; + /** Entry version. */ + private GridCacheVersion ver; + /** * @param cctx Cache context. * @param keyObj Key cache object. * @param valObj Cache object value. + * @param ver Entry version. */ public CacheInvokeEntry(GridCacheContext cctx, KeyCacheObject keyObj, - @Nullable CacheObject valObj) { + @Nullable CacheObject valObj, + GridCacheVersion ver) { super(cctx, keyObj, valObj); this.hadVal = valObj != null; + this.ver = ver; } /** @@ -54,15 +62,18 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta * @param key Key value. * @param valObj Value cache object. * @param val Value. + * @param ver Entry version. */ public CacheInvokeEntry(GridCacheContext<K, V> ctx, KeyCacheObject keyObj, @Nullable K key, @Nullable CacheObject valObj, - @Nullable V val) { + @Nullable V val, + GridCacheVersion ver) { super(ctx, keyObj, key, valObj, val); this.hadVal = valObj != null || val != null; + this.ver = ver; } /** {@inheritDoc} */ @@ -108,6 +119,15 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (cls.isAssignableFrom(VersionedEntry.class)) + return (T)new CacheVersionedEntryImpl<>(getKey(), getValue(), ver); + + return super.unwrap(cls); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheInvokeEntry.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java deleted file mode 100644 index 59394f5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryImpl.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * - */ -public class CacheVersionedEntryImpl<K, V> extends CacheEntryImpl<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** Version. */ - private Object ver; - - /** - * Required by {@link Externalizable}. - */ - public CacheVersionedEntryImpl() { - // No-op. - } - - /** - * @param key Key. - * @param val Value (always null). - * @param ver Version. - */ - public CacheVersionedEntryImpl(K key, V val, Object ver) { - super(key, val); - - assert val == null; - - this.ver = ver; - } - - /** - * @return Version. - */ - @Nullable public Object version() { - return ver; - } - - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(ver); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - ver = in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", ver=" + ver + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 94bcc93..d125382 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3691,7 +3691,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc); - return new CacheEntryImpl<>(lazyEntry.getKey(), val); + GridCacheVersion ver = null; + + try { + ver = lazyEntry.unwrap(GridCacheVersion.class); + } + catch (IllegalArgumentException e) { + log.error("Failed to unwrap entry version information", e); + } + + return new CacheEntryImpl<>(lazyEntry.getKey(), val, ver); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -4614,7 +4623,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V val0 = ctx.unwrapPortableIfNeeded(val0, true); } - return new CacheEntryImpl<>((K)key0, (V)val0); + return new CacheEntryImpl<>((K)key0, (V)val0, entry.version()); } catch (GridCacheFilterFailedException ignore) { assert false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f85a18b..45ff619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -609,16 +609,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteCacheExpiryPolicy expirePlc) throws IgniteCheckedException, GridCacheEntryRemovedException { return innerGet0(tx, - readSwap, - readThrough, - evt, - unmarshal, - updateMetrics, - tmp, - subjId, - transformClo, - taskName, - expirePlc); + readSwap, + readThrough, + evt, + unmarshal, + updateMetrics, + tmp, + subjId, + transformClo, + taskName, + expirePlc); } /** {@inheritDoc} */ @@ -1385,7 +1385,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert entryProcessor != null; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key, old); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key, old, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1653,7 +1653,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme oldVal = rawGetOrUnmarshalUnlocked(true); - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1878,7 +1878,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -3531,7 +3531,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = rawGetOrUnmarshal(false); return new CacheEntryImpl<>(key.<K>value(cctx.cacheObjectContext(), false), - CU.<V>value(val, cctx, false)); + CU.<V>value(val, cctx, false), ver); } catch (GridCacheFilterFailedException ignored) { throw new IgniteException("Should never happen."); @@ -3593,6 +3593,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return new CacheVersionedEntryImpl<>(key.<K>value(cctx.cacheObjectContext(), false), null, ver); } + /** + * @return Entry which holds key, value and version. + */ + private synchronized <K, V> CacheVersionedEntryImpl<K, V> wrapVersionedWithValue() { + V val = this.val == null ? null : this.val.<V>value(cctx.cacheObjectContext(), false); + + return new CacheVersionedEntryImpl<>(key.<K>value(cctx.cacheObjectContext(), false), val, ver); + } + /** {@inheritDoc} */ @Override public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, @Nullable CacheEntryPredicate[] filter) throws IgniteCheckedException { @@ -4020,7 +4029,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return (T)wrapEviction(); if (cls.isAssignableFrom(CacheVersionedEntryImpl.class)) - return (T)wrapVersioned(); + return cls == CacheVersionedEntryImpl.class ? (T)wrapVersioned() : (T)wrapVersionedWithValue(); + + if (cls.isAssignableFrom(GridCacheVersion.class)) + return (T)ver; if (cls.isAssignableFrom(GridCacheMapEntry.this.getClass())) return (T)GridCacheMapEntry.this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index fbc8c84..9bd5de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -324,7 +324,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { try { CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry<>(txEntry.context(), key, val); + new CacheInvokeEntry<>(txEntry.context(), key, val, txEntry.cached().version()); EntryProcessor<Object, Object, Object> processor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0a21979..5dff4ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1313,7 +1313,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Object oldVal = null; Object updatedVal = null; - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old, + entry.version()); CacheObject updated; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index bcbdec4..8dd3276 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1057,7 +1057,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { Object oldVal = null; - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old, + entry.version()); CacheObject updated; Object updatedVal = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 7190249..0d14012 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1230,7 +1230,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val); + txEntry.key(), key, cacheVal, val, txEntry.cached().version()); try { EntryProcessor<Object, Object, Object> processor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 247d350..7f06380 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -573,7 +573,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) { try { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, + entry.version()); EntryProcessor processor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 0a61b1a..d8797fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2522,7 +2522,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, + txEntry.cached().version()); EntryProcessor<Object, Object, ?> entryProcessor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java new file mode 100644 index 0000000..6d1e0c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionedEntryImpl.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.version; + +import org.apache.ignite.cache.version.*; +import org.apache.ignite.internal.processors.cache.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public class CacheVersionedEntryImpl<K, V> extends CacheEntryImpl<K, V> implements VersionedEntry<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** Version. */ + private GridCacheVersion ver; + + /** + * Required by {@link Externalizable}. + */ + public CacheVersionedEntryImpl() { + // No-op. + } + + /** + * @param key Key. + * @param val Value (always null). + * @param ver Version. + */ + public CacheVersionedEntryImpl(K key, V val, GridCacheVersion ver) { + super(key, val); + + assert val == null; + + this.ver = ver; + } + + /** + * @return Version. + */ + @Nullable public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public int topologyVersion() { + return ver.topologyVersion(); + } + + /** {@inheritDoc} */ + @Override public int nodeOrder() { + return ver.nodeOrder(); + } + + /** {@inheritDoc} */ + @Override public long order() { + return ver.order(); + } + + /** {@inheritDoc} */ + @Override public long globalTime() { + return ver.globalTime(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + ver = (GridCacheVersion)in.readObject(); + } + + /** {@inheritDoc} */ + public String toString() { + return "VersionedEntry [key=" + getKey() + ", val=" + getValue() + ", topVer=" + ver.topologyVersion() + + ", nodeOrder=" + ver.nodeOrder() + ", order=" + ver.order() + ", globalTime=" + ver.globalTime() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/01c02465/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index b3eed46..ff75b02 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -289,7 +289,7 @@ org.apache.ignite.internal.processors.cache.CacheOperationContext org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException org.apache.ignite.internal.processors.cache.CacheType -org.apache.ignite.internal.processors.cache.CacheVersionedEntryImpl +org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryImpl org.apache.ignite.internal.processors.cache.CacheWeakQueryIteratorsHolder$WeakQueryFutureIterator org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest