http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java deleted file mode 100644 index a6bfc49..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridPartitionedCacheEntryImpl.java +++ /dev/null @@ -1,423 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCachePeekMode.*; - -/** - * Partitioned cache entry public API. - */ -public class GridPartitionedCacheEntryImpl<K, V> extends GridCacheEntryImpl<K, V> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridPartitionedCacheEntryImpl() { - // No-op. - } - - /** - * @param nearPrj Parent projection or {@code null} if entry belongs to default cache. - * @param ctx Near cache context. - * @param key key. - * @param cached Cached entry (either from near or dht cache map). - */ - public GridPartitionedCacheEntryImpl(GridCacheProjectionImpl<K, V> nearPrj, GridCacheContext<K, V> ctx, K key, - @Nullable GridCacheEntryEx<K, V> cached) { - super(nearPrj, ctx, key, cached); - - assert !this.ctx.isDht() || ctx.isColocated(); - } - - /** - * @return Dht cache. - */ - public GridDhtCacheAdapter<K, V> dht() { - return ctx.isColocated() ? ctx.colocated() : ctx.isDhtAtomic() ? ctx.dht() : ctx.near().dht(); - } - - /** - * @return Near cache. - */ - public GridNearCacheAdapter<K, V> near() { - return ctx.near(); - } - - /** {@inheritDoc} */ - @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException { - if (modes.contains(NEAR_ONLY) && ctx.isNear()) - return peekNear0(modes, CU.<K, V>empty()); - - V val = null; - - if (!modes.contains(PARTITIONED_ONLY)) - val = super.peek(modes); - - if (val == null) - val = peekDht0(modes, CU.<K, V>empty()); - - return val; - } - - /** - * @param filter Filter. - * @return Peeked value. - */ - @Nullable public V peekDht(@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) { - try { - return peekDht0(SMART, filter); - } - catch (IgniteCheckedException e) { - // Should never happen. - throw new IgniteException("Unable to perform entry peek() operation.", e); - } - } - - /** - * @param modes Peek modes. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @Nullable private V peekNear0(@Nullable Collection<GridCachePeekMode> modes, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (F.isEmpty(modes)) - return peekNear0(SMART, filter); - - assert modes != null; - - for (GridCachePeekMode mode : modes) { - V val = peekNear0(mode, filter); - - if (val != null) - return val; - } - - return null; - } - - /** - * @param mode Peek mode. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked"}) - @Nullable private V peekNear0(@Nullable GridCachePeekMode mode, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (mode == null) - mode = SMART; - - while (true) { - GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection(); - - if (prjPerCall != null) - filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), filter)); - - GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall); - - try { - GridCacheEntryEx<K, V> entry = near().peekEx(key); - - return entry == null ? null : ctx.cloneOnFlag(entry.peek(mode, filter)); - } - catch (GridCacheEntryRemovedException ignore) { - // No-op. - } - finally { - ctx.gate().leave(prev); - } - } - } - - /** - * @param modes Peek modes. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @Nullable private V peekDht0(@Nullable Collection<GridCachePeekMode> modes, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (F.isEmpty(modes)) - return peekDht0(SMART, filter); - - assert modes != null; - - for (GridCachePeekMode mode : modes) { - V val = peekDht0(mode, filter); - - if (val != null) - return val; - } - - return null; - } - - /** - * @param mode Peek mode. - * @param filter Optional entry filter. - * @return Peeked value. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings({"unchecked"}) - @Nullable private V peekDht0(@Nullable GridCachePeekMode mode, - @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException { - if (mode == null) - mode = SMART; - - while (true) { - GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection(); - - if (prjPerCall != null) - filter = ctx.vararg(F0.and(ctx.vararg(proxy.predicate()), filter)); - - GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall); - - try { - GridCacheEntryEx<K, V> entry = dht().peekEx(key); - - if (entry == null) - return null; - else { - GridTuple<V> peek = entry.peek0(false, mode, filter, ctx.tm().localTxx()); - - return peek != null ? ctx.cloneOnFlag(peek.get()) : null; - } - } - catch (GridCacheEntryRemovedException ignore) { - // No-op. - } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false; - - return null; - } - finally { - ctx.gate().leave(prev); - } - } - } - - /** {@inheritDoc} */ - @Override protected GridCacheEntryEx<K, V> entryEx(boolean touch, long topVer) { - try { - return ctx.affinity().localNode(key, topVer) ? dht().entryEx(key, touch) : - ctx.isNear() ? near().entryEx(key, touch) : - new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 0, 0); - } - catch (GridDhtInvalidPartitionException ignore) { - return ctx.isNear() ? near().entryEx(key) : - new GridDhtDetachedCacheEntry<>(ctx, key, 0, null, null, 0, 0); - } - } - - /** {@inheritDoc} */ - @Override protected GridCacheEntryEx<K, V> peekEx(long topVer) { - try { - return ctx.affinity().localNode(key, topVer) ? dht().peekEx(key) : - ctx.isNear() ? near().peekEx(key) : null; - } - catch (GridDhtInvalidPartitionException ignore) { - return ctx.isNear() ? near().peekEx(key) : null; - } - } - - /** {@inheritDoc} */ - @Override public <V1> V1 addMeta(String name, V1 val) { - V1 v = null; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - v = de.addMeta(name, val); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : - near().entryExx(key, ctx.affinity().affinityTopologyVersion()); - - if (ne != null) { - V1 v1 = ne.addMeta(name, val); - - if (v == null) - v = v1; - } - } - - return v; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"RedundantCast"}) - @Override public <V1> V1 meta(String name) { - V1 v = null; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - v = (V1)de.meta(name); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = near().peekExx(key); - - if (ne != null) { - V1 v1 = (V1)ne.meta(name); - - if (v == null) - v = v1; - } - } - - return v; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"RedundantCast"}) - @Override public <V1> V1 putMetaIfAbsent(String name, Callable<V1> c) { - V1 v = null; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - v = (V1)de.putMetaIfAbsent(name, c); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : - near().entryExx(key, ctx.affinity().affinityTopologyVersion()); - - if (ne != null) { - V1 v1 = (V1)ne.putMetaIfAbsent(name, c); - - if (v == null) - v = v1; - } - } - - return v; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"RedundantCast"}) - @Override public <V1> V1 putMetaIfAbsent(String name, V1 val) { - V1 v = null; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - v = (V1)de.putMetaIfAbsent(name, val); - - GridNearCacheEntry<K, V> ne = de != null ? near().peekExx(key) : - near().entryExx(key, ctx.affinity().affinityTopologyVersion()); - - if (ne != null) { - V1 v1 = (V1)ne.putMetaIfAbsent(name, val); - - if (v == null) - v = v1; - } - - return v; - } - - /** {@inheritDoc} */ - @SuppressWarnings( {"RedundantCast"}) - @Override public <V1> V1 removeMeta(String name) { - V1 v = null; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - v = (V1)de.removeMeta(name); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = near().peekExx(key); - - if (ne != null) { - V1 v1 = (V1)ne.removeMeta(name); - - if (v == null) - v = v1; - } - } - - return v; - } - - /** {@inheritDoc} */ - @Override public <V1> boolean removeMeta(String name, V1 val) { - boolean b = false; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - b = de.removeMeta(name, val); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = near().peekExx(key); - - if (ne != null) - b |= ne.removeMeta(name, val); - } - - return b; - } - - /** {@inheritDoc} */ - @Override public <V1> boolean replaceMeta(String name, V1 curVal, V1 newVal) { - boolean b = false; - - GridDhtCacheEntry<K, V> de = dht().peekExx(key); - - if (de != null) - b = de.replaceMeta(name, curVal, newVal); - - if (ctx.isNear()) { - GridNearCacheEntry<K, V> ne = near().peekExx(key); - - if (ne != null) - b |= ne.replaceMeta(name, curVal, newVal); - } - - return b; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridPartitionedCacheEntryImpl.class, this, super.toString()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java deleted file mode 100644 index 1255e4f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/IgniteExternalizableExpiryPolicy.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import javax.cache.expiry.*; -import java.io.*; -import java.util.concurrent.*; - -/** - * Externalizable wrapper for {@link ExpiryPolicy}. - */ -public class IgniteExternalizableExpiryPolicy implements ExpiryPolicy, Externalizable, IgniteOptimizedMarshallable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) - private static Object GG_CLASS_ID; - - /** */ - private ExpiryPolicy plc; - - /** */ - private static final byte CREATE_TTL_MASK = 0x01; - - /** */ - private static final byte UPDATE_TTL_MASK = 0x02; - - /** */ - private static final byte ACCESS_TTL_MASK = 0x04; - - /** */ - private Duration forCreate; - - /** */ - private Duration forUpdate; - - /** */ - private Duration forAccess; - - /** - * Required by {@link Externalizable}. - */ - public IgniteExternalizableExpiryPolicy() { - // No-op. - } - - /** - * @param plc Expiry policy. - */ - public IgniteExternalizableExpiryPolicy(ExpiryPolicy plc) { - assert plc != null; - - this.plc = plc; - } - - /** {@inheritDoc} */ - @Override public Object ggClassId() { - return GG_CLASS_ID; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return forCreate; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return forAccess; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return forUpdate; - } - - /** - * @param out Output stream. - * @param duration Duration. - * @throws IOException If failed. - */ - private void writeDuration(ObjectOutput out, @Nullable Duration duration) throws IOException { - if (duration != null) { - if (duration.isEternal()) - out.writeLong(0); - else if (duration.getDurationAmount() == 0) - out.writeLong(1); - else - out.writeLong(duration.getTimeUnit().toMillis(duration.getDurationAmount())); - } - } - - /** - * @param in Input stream. - * @return Duration. - * @throws IOException If failed. - */ - private Duration readDuration(ObjectInput in) throws IOException { - long ttl = in.readLong(); - - assert ttl >= 0; - - if (ttl == 0) - return Duration.ETERNAL; - - return new Duration(TimeUnit.MILLISECONDS, ttl); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - byte flags = 0; - - Duration create = plc.getExpiryForCreation(); - - if (create != null) - flags |= CREATE_TTL_MASK; - - Duration update = plc.getExpiryForUpdate(); - - if (update != null) - flags |= UPDATE_TTL_MASK; - - Duration access = plc.getExpiryForAccess(); - - if (access != null) - flags |= ACCESS_TTL_MASK; - - out.writeByte(flags); - - writeDuration(out, create); - - writeDuration(out, update); - - writeDuration(out, access); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - byte flags = in.readByte(); - - if ((flags & CREATE_TTL_MASK) != 0) - forCreate = readDuration(in); - - if ((flags & UPDATE_TTL_MASK) != 0) - forUpdate = readDuration(in); - - if ((flags & ACCESS_TTL_MASK) != 0) - forAccess = readDuration(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteExternalizableExpiryPolicy.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java new file mode 100644 index 0000000..d0c0eb6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.junit.Assert.*; + +/** + * Tests for byte array values in distributed caches. + */ +public abstract class GridCacheAbstractDistributedByteArrayValuesSelfTest extends + GridCacheAbstractByteArrayValuesSelfTest { + /** Grids. */ + protected static Ignite[] ignites; + + /** Regular caches. */ + private static GridCache<Integer, Object>[] caches; + + /** Offheap values caches. */ + private static GridCache<Integer, Object>[] cachesOffheap; + + /** Offheap tiered caches. */ + private static GridCache<Integer, Object>[] cachesOffheapTiered; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setCacheConfiguration(cacheConfiguration(), + offheapCacheConfiguration(), + offheapTieredCacheConfiguration()); + + c.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + c.setPeerClassLoadingEnabled(peerClassLoading()); + + return c; + } + + /** + * @return Whether peer class loading is enabled. + */ + protected abstract boolean peerClassLoading(); + + /** + * @return Whether portable mode is enabled. + */ + protected boolean portableEnabled() { + return false; + } + + /** + * @return How many grids to start. + */ + protected int gridCount() { + return 3; + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = cacheConfiguration0(); + + cfg.setName(CACHE_REGULAR); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** + * @return Internal cache configuration. + */ + protected abstract CacheConfiguration cacheConfiguration0(); + + /** + * @return Offheap cache configuration. + */ + protected CacheConfiguration offheapCacheConfiguration() { + CacheConfiguration cfg = offheapCacheConfiguration0(); + + cfg.setName(CACHE_OFFHEAP); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** + * @return Offheap tiered cache configuration. + */ + protected CacheConfiguration offheapTieredCacheConfiguration() { + CacheConfiguration cfg = offheapTieredCacheConfiguration0(); + + cfg.setName(CACHE_OFFHEAP_TIERED); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** + * @return Internal offheap cache configuration. + */ + protected abstract CacheConfiguration offheapCacheConfiguration0(); + + /** + * @return Internal offheap cache configuration. + */ + protected abstract CacheConfiguration offheapTieredCacheConfiguration0(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void beforeTestsStarted() throws Exception { + int gridCnt = gridCount(); + + assert gridCnt > 0; + + ignites = new Ignite[gridCnt]; + + caches = new GridCache[gridCnt]; + cachesOffheap = new GridCache[gridCnt]; + cachesOffheapTiered = new GridCache[gridCnt]; + + for (int i = 0; i < gridCnt; i++) { + ignites[i] = startGrid(i); + + caches[i] = ignites[i].cache(CACHE_REGULAR); + cachesOffheap[i] = ignites[i].cache(CACHE_OFFHEAP); + cachesOffheapTiered[i] = ignites[i].cache(CACHE_OFFHEAP_TIERED); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + caches = null; + cachesOffheap = null; + cachesOffheapTiered = null; + + ignites = null; + } + + /** + * Check whether cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimistic() throws Exception { + testTransaction0(caches, PESSIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimisticMixed() throws Exception { + testTransactionMixed0(caches, PESSIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Check whether offheap cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimisticOffheap() throws Exception { + testTransaction0(cachesOffheap, PESSIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether offheap cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimisticOffheapTiered() throws Exception { + testTransaction0(cachesOffheapTiered, PESSIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether offheap cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimisticOffheapMixed() throws Exception { + testTransactionMixed0(cachesOffheap, PESSIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Check whether offheap cache with byte array entry works correctly in PESSIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testPessimisticOffheapTieredMixed() throws Exception { + testTransactionMixed0(cachesOffheapTiered, PESSIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Check whether cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimistic() throws Exception { + testTransaction0(caches, OPTIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimisticMixed() throws Exception { + testTransactionMixed0(caches, OPTIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Check whether offheap cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimisticOffheap() throws Exception { + testTransaction0(cachesOffheap, OPTIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether offheap cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimisticOffheapTiered() throws Exception { + testTransaction0(cachesOffheapTiered, OPTIMISTIC, KEY_1, wrap(1)); + } + + /** + * Check whether offheap cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimisticOffheapMixed() throws Exception { + testTransactionMixed0(cachesOffheap, OPTIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Check whether offheap cache with byte array entry works correctly in OPTIMISTIC transaction. + * + * @throws Exception If failed. + */ + public void testOptimisticOffheapTieredMixed() throws Exception { + testTransactionMixed0(cachesOffheapTiered, OPTIMISTIC, KEY_1, wrap(1), KEY_2, 1); + } + + /** + * Test swapping. + * + * @throws Exception If failed. + */ + public void testSwap() throws Exception { + for (GridCache<Integer, Object> cache : caches) + assert cache.configuration().isSwapEnabled(); + + byte[] val1 = wrap(1); + + GridCache<Integer, Object> primaryCache = null; + + for (GridCache<Integer, Object> cache : caches) { + if (cache.entry(SWAP_TEST_KEY).primary()) { + primaryCache = cache; + + break; + } + } + + assert primaryCache != null; + + primaryCache.put(SWAP_TEST_KEY, val1); + + assert Arrays.equals(val1, (byte[])primaryCache.get(SWAP_TEST_KEY)); + + assert primaryCache.evict(SWAP_TEST_KEY); + + assert primaryCache.peek(SWAP_TEST_KEY) == null; + + assert Arrays.equals(val1, (byte[])primaryCache.promote(SWAP_TEST_KEY)); + } + + /** + * Test transaction behavior. + * + * @param caches Caches. + * @param concurrency Concurrency. + * @param key Key. + * @param val Value. + * @throws Exception If failed. + */ + private void testTransaction0(GridCache<Integer, Object>[] caches, IgniteTxConcurrency concurrency, + Integer key, byte[] val) throws Exception { + testTransactionMixed0(caches, concurrency, key, val, null, null); + } + + /** + * Test transaction behavior. + * + * @param caches Caches. + * @param concurrency Concurrency. + * @param key1 Key 1. + * @param val1 Value 1. + * @param key2 Key 2. + * @param val2 Value 2. + * @throws Exception If failed. + */ + private void testTransactionMixed0(GridCache<Integer, Object>[] caches, IgniteTxConcurrency concurrency, + Integer key1, byte[] val1, @Nullable Integer key2, @Nullable Object val2) throws Exception { + for (GridCache<Integer, Object> cache : caches) { + IgniteTx tx = cache.txStart(concurrency, REPEATABLE_READ); + + try { + cache.put(key1, val1); + + if (key2 != null) + cache.put(key2, val2); + + tx.commit(); + } + finally { + tx.close(); + } + + for (GridCache<Integer, Object> cacheInner : caches) { + tx = cacheInner.txStart(concurrency, REPEATABLE_READ); + + try { + assertArrayEquals(val1, (byte[])cacheInner.get(key1)); + + if (key2 != null) { + Object actual = cacheInner.get(key2); + + assertEquals(val2, actual); + } + + tx.commit(); + } + finally { + tx.close(); + } + } + + tx = cache.txStart(concurrency, REPEATABLE_READ); + + try { + cache.remove(key1); + + if (key2 != null) + cache.remove(key2); + + tx.commit(); + } + finally { + tx.close(); + } + + assertNull(cache.get(key1)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java new file mode 100644 index 0000000..37b787d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheFlag.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Tests cache access from within jobs. + */ +public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Job counter. */ + private static final AtomicInteger cntr = new AtomicInteger(0); + + /** */ + private static final int GRID_CNT = 4; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridCacheProjection<String, int[]> cache = grid(0).cache(null).flagsOn(SYNC_COMMIT). + projection(String.class, int[].class); + + cache.removeAll(); + + for (int i = 0; i < GRID_CNT; i++) { + Ignite g = grid(i); + + GridCache<String, int[]> c = g.cache(null); + + assertEquals("Cache is not empty: " + c.entrySet(), 0, c.size()); + } + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticRepeatableRead() throws Exception { + checkTransactions(PESSIMISTIC, REPEATABLE_READ, 1000); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticSerializable() throws Exception { + checkTransactions(PESSIMISTIC, SERIALIZABLE, 1000); + } + + /** + * @param concur Concurrency. + * @param isolation Isolation. + * @param jobCnt Job count. + * @throws Exception If fails. + */ + private void checkTransactions(final IgniteTxConcurrency concur, final IgniteTxIsolation isolation, + final int jobCnt) throws Exception { + + info("Grid 0: " + grid(0).localNode().id()); + info("Grid 1: " + grid(1).localNode().id()); + info("Grid 2: " + grid(2).localNode().id()); + info("Grid 3: " + grid(3).localNode().id()); + + Ignite ignite = grid(0); + + Collection<IgniteFuture<?>> futs = new LinkedList<>(); + + IgniteCompute comp = ignite.compute().enableAsync(); + + for (int i = 0; i < jobCnt; i++) { + comp.apply(new CX1<Integer, Void>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public Void applyx(final Integer i) throws IgniteCheckedException { + GridCache<String, int[]> cache = this.ignite.cache(null); + + try (IgniteTx tx = cache.txStart(concur, isolation)) { + int[] arr = cache.get("TestKey"); + + if (arr == null) + arr = new int[jobCnt]; + + arr[i] = 1; + + cache.put("TestKey", arr); + + int c = cntr.getAndIncrement(); + + if (c % 50 == 0) + X.println("Executing transaction [i=" + i + ", c=" + c + ']'); + + tx.commit(); + } + + return null; + } + }, i); + + futs.add(comp.future()); + } + + for (IgniteFuture<?> fut : futs) + fut.get(); // Wait for completion. + + for (int i = 0; i < GRID_CNT; i++) { + GridCacheProjection<String, int[]> c = grid(i).cache(null).projection(String.class, int[].class); + + // Do within transaction to make sure that lock is acquired + // which means that all previous transactions have committed. + + try (IgniteTx tx = c.txStart(concur, isolation)) { + int[] arr = c.get("TestKey"); + + assertNotNull(arr); + assertEquals(jobCnt, arr.length); + + for (int j : arr) + assertEquals(1, j); + + tx.commit(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java new file mode 100644 index 0000000..08a6cb9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java @@ -0,0 +1,886 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheConfiguration.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Test node restart. + */ +@SuppressWarnings({"PointlessArithmeticExpression"}) +public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbstractTest { + /** Cache name. */ + protected static final String CACHE_NAME = "TEST_CACHE"; + + /** */ + private static final long TEST_TIMEOUT = 5 * 60 * 1000; + + /** Default backups. */ + private static final int DFLT_BACKUPS = 1; + + /** Partitions. */ + private static final int DFLT_PARTITIONS = 521; + + /** Preload batch size. */ + private static final int DFLT_BATCH_SIZE = DFLT_PRELOAD_BATCH_SIZE; + + /** Number of key backups. Each test method can set this value as required. */ + protected int backups = DFLT_BACKUPS; + + /** */ + private static final int DFLT_NODE_CNT = 4; + + /** */ + private static final int DFLT_KEY_CNT = 100; + + /** */ + private static final int DFLT_RETRIES = 10; + + /** */ + private static final Random RAND = new Random(); + + /** */ + private static volatile int idx = -1; + + /** Preload mode. */ + protected GridCachePreloadMode preloadMode = ASYNC; + + /** */ + protected int preloadBatchSize = DFLT_BATCH_SIZE; + + /** Number of partitions. */ + protected int partitions = DFLT_PARTITIONS; + + /** Node count. */ + protected int nodeCnt = DFLT_NODE_CNT; + + /** Key count. */ + protected int keyCnt = DFLT_KEY_CNT; + + /** Retries. */ + private int retries = DFLT_RETRIES; + + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + // Discovery. + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = DFLT_BACKUPS; + partitions = DFLT_PARTITIONS; + preloadMode = ASYNC; + preloadBatchSize = DFLT_BATCH_SIZE; + nodeCnt = DFLT_NODE_CNT; + keyCnt = DFLT_KEY_CNT; + retries = DFLT_RETRIES; + idx = -1; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** + * @throws Exception If failed. + */ + private void startGrids() throws Exception { + for (int i = 0; i < nodeCnt; i++) { + startGrid(i); + + if (idx < 0) + idx = i; + } + } + + /** + * @throws Exception If failed. + */ + public void testRestart() throws Exception { + preloadMode = SYNC; + partitions = 3; + nodeCnt = 2; + keyCnt = 10; + retries = 3; + + info("*** STARTING TEST ***"); + + startGrids(); + + try { + GridCache<Integer, String> c = grid(idx).cache(CACHE_NAME); + + for (int j = 0; j < retries; j++) { + for (int i = 0; i < keyCnt; i++) + c.putx(i, Integer.toString(i)); + + info("Stored items."); + + checkGet(c, j); + + info("Stopping node: " + idx); + + stopGrid(idx); + + info("Starting node: " + idx); + + Ignite ignite = startGrid(idx); + + c = ignite.cache(CACHE_NAME); + + checkGet(c, j); + } + } + finally { + stopAllGrids(true); + } + } + + /** + * @param c Cache. + * @param attempt Attempt. + * @throws Exception If failed. + */ + private void checkGet(GridCache<Integer, String> c, int attempt) throws Exception { + for (int i = 0; i < keyCnt; i++) { + String v = c.get(i); + + if (v == null) { + printFailureDetails(c, i, attempt); + + fail("Value is null [key=" + i + ", attempt=" + attempt + "]"); + } + + if (!Integer.toString(i).equals(v)) { + printFailureDetails(c, i, attempt); + + fail("Wrong value for key [key=" + + i + ", actual value=" + v + ", expected value=" + Integer.toString(i) + "]"); + } + } + + info("Read items."); + } + + /** + * @return Transaction concurrency to use in tests. + */ + protected IgniteTxConcurrency txConcurrency() { + return PESSIMISTIC; + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutTwoNodesNoBackups() throws Throwable { + backups = 0; + nodeCnt = 2; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 30000; + + checkRestartWithPut(duration, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxTwoNodesNoBackups() throws Throwable { + backups = 0; + nodeCnt = 2; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 30000; + + checkRestartWithTx(duration, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutTwoNodesOneBackup() throws Throwable { + backups = 1; + nodeCnt = 2; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 30000; + + checkRestartWithPut(duration, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxTwoNodesOneBackup() throws Throwable { + backups = 1; + nodeCnt = 2; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 30000; + + checkRestartWithTx(duration, 1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutFourNodesNoBackups() throws Throwable { + backups = 0; + nodeCnt = 4; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 60000; + + checkRestartWithPut(duration, 2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxFourNodesNoBackups() throws Throwable { + backups = 0; + nodeCnt = 4; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 60000; + + checkRestartWithTx(duration, 2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutFourNodesOneBackups() throws Throwable { + backups = 1; + nodeCnt = 4; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 60000; + + checkRestartWithPut(duration, 2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxFourNodesOneBackups() throws Throwable { + backups = 1; + nodeCnt = 4; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 60000; + + checkRestartWithTx(duration, 2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutSixNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 6; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithPut(duration, 3, 3); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxSixNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 6; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithTx(duration, 3, 3); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutEightNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 8; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithPut(duration, 4, 4); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxEightNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 8; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithTx(duration, 4, 4); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithPutTenNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 10; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithPut(duration, 5, 5); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxTenNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 10; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithTx(duration, 5, 5); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxPutAllTenNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 10; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithTxPutAll(duration, 5, 5); + } + + /** + * @throws Exception If failed. + */ + public void testRestartWithTxPutAllFourNodesTwoBackups() throws Throwable { + backups = 2; + nodeCnt = 4; + keyCnt = 10; + partitions = 29; + preloadMode = ASYNC; + + long duration = 90000; + + checkRestartWithTxPutAll(duration, 2, 2); + } + + /** + * @param duration Test duration. + * @param putThreads Put threads count. + * @param restartThreads Restart threads count. + * @throws Exception If failed. + */ + public void checkRestartWithPut(long duration, int putThreads, int restartThreads) throws Throwable { + final long endTime = System.currentTimeMillis() + duration; + + final AtomicReference<Throwable> err = new AtomicReference<>(); + + startGrids(); + + Collection<Thread> threads = new LinkedList<>(); + + try { + final int logFreq = 20; + + final AtomicInteger putCntr = new AtomicInteger(); + + final CyclicBarrier barrier = new CyclicBarrier(putThreads + restartThreads); + + for (int i = 0; i < putThreads; i++) { + final int gridIdx = i; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting put thread..."); + + GridCache<Integer, String> cache = grid(gridIdx).cache(CACHE_NAME); + + while (System.currentTimeMillis() < endTime && err.get() == null) { + int key = RAND.nextInt(keyCnt); + + try { + cache.put(key, Integer.toString(key)); + } + catch (IgniteTxRollbackException | ClusterTopologyException ignored) { + // It is ok if primary node leaves grid. + } + + int c = putCntr.incrementAndGet(); + + if (c % logFreq == 0) + info(">>> Put iteration [cnt=" + c + ", key=" + key + ']'); + } + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to put value in cache.", e); + } + } + }, "put-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (int i = 0; i < restartThreads; i++) { + final int gridIdx = i + putThreads; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting restart thread..."); + + int cnt = 0; + + while (System.currentTimeMillis() < endTime && err.get() == null) { + log.info(">>>>>>> Stopping grid " + gridIdx); + + stopGrid(gridIdx); + + log.info(">>>>>>> Starting grid " + gridIdx); + + startGrid(gridIdx); + + int c = ++cnt; + + if (c % logFreq == 0) + info(">>> Restart iteration: " + c); + } + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to restart grid node.", e); + } + } + }, "restart-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (Thread t : threads) + t.join(); + + if (err.get() != null) + throw err.get(); + } + finally { + stopAllGrids(); + } + } + + /** + * @param duration Test duration. + * @param putThreads Put threads count. + * @param restartThreads Restart threads count. + * @throws Exception If failed. + */ + public void checkRestartWithTx(long duration, int putThreads, int restartThreads) throws Throwable { + final long endTime = System.currentTimeMillis() + duration; + + final AtomicReference<Throwable> err = new AtomicReference<>(); + + startGrids(); + + Collection<Thread> threads = new LinkedList<>(); + + try { + final int logFreq = 20; + + final AtomicInteger txCntr = new AtomicInteger(); + + final CyclicBarrier barrier = new CyclicBarrier(putThreads + restartThreads); + + final int txKeys = 3; + + for (int i = 0; i < putThreads; i++) { + final int gridIdx = i; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting put thread..."); + + Ignite ignite = grid(gridIdx); + + UUID locNodeId = ignite.cluster().localNode().id(); + + GridCache<Integer, String> cache = ignite.cache(CACHE_NAME); + + List<Integer> keys = new ArrayList<>(txKeys); + + while (System.currentTimeMillis() < endTime && err.get() == null) { + keys.clear(); + + for (int i = 0; i < txKeys; i++) + keys.add(RAND.nextInt(keyCnt)); + + // Ensure lock order. + Collections.sort(keys); + + int c = 0; + + try { + try (IgniteTx tx = cache.txStart(txConcurrency(), REPEATABLE_READ)) { + c = txCntr.incrementAndGet(); + + if (c % logFreq == 0) + info(">>> Tx iteration started [cnt=" + c + ", keys=" + keys + ", " + + "locNodeId=" + locNodeId + ']'); + + for (int key : keys) { + int op = cacheOp(); + + if (op == 1) + cache.put(key, Integer.toString(key)); + else if (op == 2) + cache.remove(key); + else + cache.get(key); + } + + tx.commit(); + } + catch (ClusterTopologyException ignored) { + // It is ok if primary node leaves grid. + } + } + catch (ClusterTopologyException ignored) { + // It is ok if primary node leaves grid. + } + + if (c % logFreq == 0) + info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " + + "locNodeId=" + locNodeId + ']'); + } + + info(">>> " + Thread.currentThread().getName() + " finished."); + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to put value in cache.", e); + } + } + }, "put-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (int i = 0; i < restartThreads; i++) { + final int gridIdx = i + putThreads; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting restart thread..."); + + int cnt = 0; + + while (System.currentTimeMillis() < endTime && err.get() == null) { + stopGrid(gridIdx); + startGrid(gridIdx); + + int c = ++cnt; + + if (c % logFreq == 0) + info(">>> Restart iteration: " + c); + } + + info(">>> " + Thread.currentThread().getName() + " finished."); + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to restart grid node.", e); + } + } + }, "restart-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (Thread t : threads) + t.join(); + + if (err.get() != null) + throw err.get(); + } + finally { + stopAllGrids(); + } + } + + /** + * @param duration Test duration. + * @param putThreads Put threads count. + * @param restartThreads Restart threads count. + * @throws Exception If failed. + */ + public void checkRestartWithTxPutAll(long duration, int putThreads, int restartThreads) throws Throwable { + final long endTime = System.currentTimeMillis() + duration; + + final AtomicReference<Throwable> err = new AtomicReference<>(); + + startGrids(); + + Collection<Thread> threads = new LinkedList<>(); + + try { + final int logFreq = 20; + + final AtomicInteger txCntr = new AtomicInteger(); + + final CyclicBarrier barrier = new CyclicBarrier(putThreads + restartThreads); + + final int txKeys = 3; + + for (int i = 0; i < putThreads; i++) { + final int gridIdx = i; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting put thread..."); + + Ignite ignite = grid(gridIdx); + + UUID locNodeId = ignite.cluster().localNode().id(); + + GridCache<Integer, String> cache = ignite.cache(CACHE_NAME); + + List<Integer> keys = new ArrayList<>(txKeys); + + while (System.currentTimeMillis() < endTime && err.get() == null) { + keys.clear(); + + for (int i = 0; i < txKeys; i++) + keys.add(RAND.nextInt(keyCnt)); + + // Ensure lock order. + Collections.sort(keys); + + int c = 0; + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + c = txCntr.incrementAndGet(); + + if (c % logFreq == 0) + info(">>> Tx iteration started [cnt=" + c + ", keys=" + keys + ", " + + "locNodeId=" + locNodeId + ']'); + + Map<Integer, String> batch = new LinkedHashMap<>(); + + for (int key : keys) + batch.put(key, String.valueOf(key)); + + cache.putAll(batch); + + tx.commit(); + } + catch (ClusterTopologyException ignored) { + // It is ok if primary node leaves grid. + } + + if (c % logFreq == 0) + info(">>> Tx iteration finished [cnt=" + c + ", keys=" + keys + ", " + + "locNodeId=" + locNodeId + ']'); + } + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to put value in cache.", e); + } + } + }, "put-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (int i = 0; i < restartThreads; i++) { + final int gridIdx = i + putThreads; + + Thread t = new Thread(new Runnable() { + @Override public void run() { + try { + barrier.await(); + + info("Starting restart thread..."); + + int cnt = 0; + + while (System.currentTimeMillis() < endTime && err.get() == null) { + stopGrid(gridIdx); + startGrid(gridIdx); + + int c = ++cnt; + + if (c % logFreq == 0) + info(">>> Restart iteration: " + c); + } + } + catch (Exception e) { + err.compareAndSet(null, e); + + error("Failed to restart grid node.", e); + } + } + }, "restart-worker-" + i); + + t.start(); + + threads.add(t); + } + + for (Thread t : threads) + t.join(); + + if (err.get() != null) + throw err.get(); + } + finally { + stopAllGrids(); + } + } + + /** + * @return Cache operation. + */ + private int cacheOp() { + return RAND.nextInt(3) + 1; + } + + /** + * @param c Cache projection. + * @param key Key. + * @param attempt Attempt. + */ + private void printFailureDetails(GridCache<Integer, String> c, int key, int attempt) { + error("*** Failure details ***"); + error("Key: " + key); + error("Partition: " + c.configuration().getAffinity().partition(key)); + error("Attempt: " + attempt); + error("Node: " + c.gridProjection().ignite().cluster().localNode().id()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPartitionedByteArrayValuesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPartitionedByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPartitionedByteArrayValuesSelfTest.java new file mode 100644 index 0000000..d01c8f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPartitionedByteArrayValuesSelfTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for byte array values in PARTITIONED caches. + */ +public abstract class GridCacheAbstractPartitionedByteArrayValuesSelfTest extends + GridCacheAbstractDistributedByteArrayValuesSelfTest { + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TransactionsConfiguration tCfg = new TransactionsConfiguration(); + + tCfg.setTxSerializableEnabled(true); + + cfg.setTransactionsConfiguration(tCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration0() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setDistributionMode(distributionMode()); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setSwapEnabled(true); + cfg.setEvictSynchronized(false); + cfg.setEvictNearSynchronized(false); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration offheapCacheConfiguration0() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setDistributionMode(distributionMode()); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setMemoryMode(OFFHEAP_VALUES); + cfg.setOffHeapMaxMemory(100 * 1024 * 1024); + cfg.setQueryIndexEnabled(false); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration offheapTieredCacheConfiguration0() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setDistributionMode(distributionMode()); + cfg.setBackups(1); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setMemoryMode(OFFHEAP_TIERED); + cfg.setOffHeapMaxMemory(100 * 1024 * 1024); + cfg.setQueryIndexEnabled(false); + cfg.setPortableEnabled(portableEnabled()); + + return cfg; + } + + /** + * @return Distribution mode. + */ + protected abstract GridCacheDistributionMode distributionMode(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java new file mode 100644 index 0000000..5bfc0ae --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test ensuring that PRIMARY_SYNC mode works correctly. + */ +public abstract class GridCacheAbstractPrimarySyncSelfTest extends GridCommonAbstractTest { + /** Grids count. */ + private static final int GRID_CNT = 3; + + /** IP_FINDER. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); + ccfg.setBackups(1); + ccfg.setPreloadMode(SYNC); + ccfg.setDistributionMode(distributionMode()); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + assert GRID_CNT > 1; + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @return Distribution mode. + */ + protected abstract GridCacheDistributionMode distributionMode(); + + /** + * @throws Exception If failed. + */ + public void testPrimarySync() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + for (int j = 0; j < GRID_CNT; j++) { + GridCache<Integer, Integer> cache = grid(j).cache(null); + + if (cache.entry(i).primary()) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(i, i); + + tx.commit(); + } + + assert cache.get(i) == i; + + break; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b89b472d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java new file mode 100644 index 0000000..971ee60 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests timeout exception when message gets lost. + */ +public class GridCacheAtomicTimeoutSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + public static final int GRID_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestCommunicationSpi commSpi = new TestCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(1); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setDistributionMode(PARTITIONED_ONLY); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + cfg.setNetworkTimeout(3000); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(GRID_CNT); + } + + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + final GridKernal grid = (GridKernal)grid(i); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid.configuration().getCommunicationSpi(); + + commSpi.skipNearRequest = false; + commSpi.skipNearResponse = false; + commSpi.skipDhtRequest = false; + commSpi.skipDhtResponse = false; + + GridTestUtils.retryAssert(log, 10, 100, new CA() { + @Override public void apply() { + assertTrue(grid.internalCache().context().mvcc().atomicFutures().isEmpty()); + } + }); + } + } + + /** + * @throws Exception If failed. + */ + public void testNearUpdateRequestLost() throws Exception { + Ignite ignite = grid(0); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); + + GridCache<Object, Object> cache = ignite.cache(null); + + int key = keyForTest(); + + cache.put(key, 0); + + commSpi.skipNearRequest = true; + + IgniteFuture<Object> fut = cache.putAsync(key, 1); + + Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); + + GridTcpNioCommunicationClient client = (GridTcpNioCommunicationClient)clients.get(grid(1).localNode().id()); + + client.session().close().get(); + + try { + fut.get(); + + fail(); + } + catch (GridCacheAtomicUpdateTimeoutException ignore) { + // Expected exception. + } + } + + /** + * @throws Exception If failed. + */ + public void testNearUpdateResponseLost() throws Exception { + Ignite ignite = grid(0); + + GridCache<Object, Object> cache = ignite.cache(null); + + int key = keyForTest(); + + cache.put(key, 0); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(1).configuration().getCommunicationSpi(); + + commSpi.skipNearResponse = true; + + IgniteFuture<Object> fut = cache.putAsync(key, 1); + + Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); + + GridTcpNioCommunicationClient client = (GridTcpNioCommunicationClient)clients.get(grid(0).localNode().id()); + + client.session().close().get(); + + try { + fut.get(); + + fail(); + } + catch (GridCacheAtomicUpdateTimeoutException ignore) { + // Expected exception. + } + } + + /** + * @throws Exception If failed. + */ + public void testDhtUpdateRequestLost() throws Exception { + Ignite ignite = grid(0); + + GridCache<Object, Object> cache = ignite.cache(null); + + int key = keyForTest(); + + cache.put(key, 0); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(1).configuration().getCommunicationSpi(); + + commSpi.skipDhtRequest = true; + + IgniteFuture<Object> fut = cache.putAsync(key, 1); + + Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); + + GridTcpNioCommunicationClient client = (GridTcpNioCommunicationClient)clients.get(grid(2).localNode().id()); + + client.session().close().get(); + + try { + fut.get(); + + fail(); + } + catch (IgniteCheckedException e) { + assertTrue("Invalid exception thrown: " + e, X.hasCause(e, GridCacheAtomicUpdateTimeoutException.class) + || X.hasSuppressed(e, GridCacheAtomicUpdateTimeoutException.class)); + } + } + + /** + * @throws Exception If failed. + */ + public void testDhtUpdateResponseLost() throws Exception { + Ignite ignite = grid(0); + + GridCache<Object, Object> cache = ignite.cache(null); + + int key = keyForTest(); + + cache.put(key, 0); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(2).configuration().getCommunicationSpi(); + + commSpi.skipDhtResponse = true; + + IgniteFuture<Object> fut = cache.putAsync(key, 1); + + Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients"); + + GridTcpNioCommunicationClient client = (GridTcpNioCommunicationClient)clients.get(grid(1).localNode().id()); + + client.session().close().get(); + + try { + fut.get(); + + fail(); + } + catch (IgniteCheckedException e) { + assertTrue("Invalid exception thrown: " + e, X.hasCause(e, GridCacheAtomicUpdateTimeoutException.class) + || X.hasSuppressed(e, GridCacheAtomicUpdateTimeoutException.class)); + } + } + + /** + * @return Key for test; + */ + private int keyForTest() { + int i = 0; + + GridCacheAffinity<Object> aff = grid(0).cache(null).affinity(); + + while (!aff.isPrimary(grid(1).localNode(), i) || !aff.isBackup(grid(2).localNode(), i)) + i++; + + return i; + } + + /** + * Communication SPI that will count single partition update messages. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private boolean skipNearRequest; + + /** */ + private boolean skipNearResponse; + + /** */ + private boolean skipDhtRequest; + + /** */ + private boolean skipDhtResponse; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + if (!skipMessage((GridIoMessage)msg)) + super.sendMessage(node, msg); + } + + /** + * Checks if message should be skipped. + * + * @param msg Message. + */ + private boolean skipMessage(GridIoMessage msg) { + return msg.message() instanceof GridNearAtomicUpdateRequest && skipNearRequest + || msg.message() instanceof GridNearAtomicUpdateResponse && skipNearResponse + || msg.message() instanceof GridDhtAtomicUpdateRequest && skipDhtRequest + || msg.message() instanceof GridDhtAtomicUpdateResponse && skipDhtResponse; + } + } + +}