# sprint-2 Retry cache update in MarshallerContextImpl, added test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/940174d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/940174d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/940174d6 Branch: refs/heads/ignite-368 Commit: 940174d69c6e92350b8cbbb63217cc5ad3874e65 Parents: cbdebf1 Author: sboikov <sboi...@gridgain.com> Authored: Mon Mar 16 14:04:26 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Mar 16 14:04:26 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 30 ++- .../optimized/OptimizedMarshallerUtils.java | 16 +- ...acheStoreSessionWriteBehindAbstractTest.java | 10 +- .../OptimizedMarshallerNodeFailoverTest.java | 200 +++++++++++++++++++ .../IgniteMarshallerSelfTestSuite.java | 1 + 5 files changed, 245 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/940174d6/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 6d2520e..6f6b4ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -27,10 +27,16 @@ import java.util.concurrent.*; * Marshaller context implementation. */ public class MarshallerContextImpl extends MarshallerContextAdapter { + /** Class names cache update retries count. */ + private static final int CACHE_UPDATE_RETRIES_CNT = 5; + /** */ private final CountDownLatch latch = new CountDownLatch(1); /** */ + private IgniteLogger log; + + /** */ private volatile GridCacheAdapter<Integer, String> cache; /** @@ -39,6 +45,8 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { public void onMarshallerCacheReady(GridKernalContext ctx) { assert ctx != null; + log = ctx.log(MarshallerContextImpl.class); + cache = ctx.cache().marshallerCache(); latch.countDown(); @@ -52,12 +60,24 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { if (cache0 == null) return false; - String old = cache0.putIfAbsent(id, clsName); + for (int i = 0; i < CACHE_UPDATE_RETRIES_CNT; i++) { + try { + String old = cache0.putIfAbsent(id, clsName); + + if (old != null && !old.equals(clsName)) + throw new IgniteException("Type ID collision occurred in OptimizedMarshaller. Use " + + "OptimizedMarshallerIdMapper to resolve it [id=" + id + ", clsName1=" + clsName + + "clsName2=" + old + ']'); - if (old != null && !old.equals(clsName)) - throw new IgniteException("Type ID collision occurred in OptimizedMarshaller. Use " + - "OptimizedMarshallerIdMapper to resolve it [id=" + id + ", clsName1=" + clsName + - "clsName2=" + old + ']'); + break; + } + catch (IgniteCheckedException e) { + if (i == CACHE_UPDATE_RETRIES_CNT - 1) + throw e; + else + U.error(log, "Failed to update marshaller cache, will retry: " + e); + } + } } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/940174d6/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java index a979e8a..8c64e33 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java @@ -178,14 +178,24 @@ class OptimizedMarshallerUtils { * @return Descriptor. * @throws IOException In case of error. */ - static OptimizedClassDescriptor classDescriptor(Class cls, MarshallerContext ctx, - OptimizedMarshallerIdMapper mapper) throws IOException { + static OptimizedClassDescriptor classDescriptor(Class cls, + MarshallerContext ctx, + OptimizedMarshallerIdMapper mapper) + throws IOException + { OptimizedClassDescriptor desc = DESC_BY_CLS.get(cls); if (desc == null) { int typeId = resolveTypeId(cls.getName(), mapper); - boolean registered = ctx.registerClass(typeId, cls); + boolean registered; + + try { + registered = ctx.registerClass(typeId, cls); + } + catch (Exception e) { + throw new IOException("Failed to register class: " + cls.getName(), e); + } desc = new OptimizedClassDescriptor(cls, registered ? typeId : 0, ctx, mapper); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/940174d6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java index 4eb5a4d..fc06d7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java @@ -77,7 +77,8 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign ccfg0.setReadThrough(true); ccfg0.setWriteThrough(true); ccfg0.setWriteBehindBatchSize(10); - ccfg0.setWriteBehindFlushFrequency(1000); + ccfg0.setWriteBehindFlushSize(10); + ccfg0.setWriteBehindFlushFrequency(5000); ccfg0.setWriteBehindEnabled(true); ccfg0.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); @@ -87,7 +88,8 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign ccfg1.setReadThrough(true); ccfg1.setWriteThrough(true); ccfg1.setWriteBehindBatchSize(10); - ccfg1.setWriteBehindFlushFrequency(1000); + ccfg1.setWriteBehindFlushSize(10); + ccfg1.setWriteBehindFlushFrequency(5000); ccfg1.setWriteBehindEnabled(true); ccfg1.setName(CACHE_NAME1); @@ -123,7 +125,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign for (int i = 0; i < 10; i++) cache.put(i, i); - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); } finally { latch = null; @@ -137,7 +139,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign for (int i = 0; i < 10; i++) cache.remove(i); - assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); } finally { latch = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/940174d6/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java new file mode 100644 index 0000000..c006abb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.marshaller.optimized; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean cache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setMarshaller(new OptimizedMarshaller()); + + if (cache) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + } + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testClassCacheUpdateFailover() throws Exception { + cache = true; + + startGridsMultiThreaded(2); + + cache = false; + + IgniteCache<Integer, Object> cache0 = ignite(0).jcache(null); + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + Map<Integer, Object> map = new HashMap<>(); + + for (int j = 0; j < 10_000; j++) + map.put(j, create(i + 1)); + + final Ignite ignite = startGrid(2); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + ignite.close(); + + return null; + } + }); + + cache0.putAll(map); // Do not stop cache node, so put should not fail. + + fut.get(); + } + + cache = true; + + Ignite ignite = startGrid(2); // Check can start on more cache node. + + assertNotNull(ignite.jcache(null)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @param id Class ID. + * @return Test class instance. + */ + private static Object create(int id) { + switch (id) { + case 1: return new TestClass1(); + + case 2: return new TestClass2(); + + case 3: return new TestClass3(); + + case 4: return new TestClass4(); + + case 5: return new TestClass5(); + + case 6: return new TestClass6(); + + case 7: return new TestClass7(); + + case 8: return new TestClass8(); + + case 9: return new TestClass9(); + + case 10: return new TestClass10(); + } + + fail(); + + return null; + } + + /** + * + */ + static class TestClass1 implements Serializable {} + + /** + * + */ + static class TestClass2 implements Serializable {} + + /** + * + */ + static class TestClass3 implements Serializable {} + + /** + * + */ + static class TestClass4 implements Serializable {} + + /** + * + */ + static class TestClass5 implements Serializable {} + + /** + * + */ + static class TestClass6 implements Serializable {} + + /** + * + */ + static class TestClass7 implements Serializable {} + + /** + * + */ + static class TestClass8 implements Serializable {} + + /** + * + */ + static class TestClass9 implements Serializable {} + + /** + * + */ + static class TestClass10 implements Serializable {} +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/940174d6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java index f634ec1..1d7f196 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java @@ -39,6 +39,7 @@ public class IgniteMarshallerSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(OptimizedMarshallerTest.class)); suite.addTest(new TestSuite(OptimizedObjectStreamSelfTest.class)); suite.addTest(new TestSuite(GridUnsafeDataOutputArraySizingSelfTest.class)); + suite.addTest(new TestSuite(OptimizedMarshallerNodeFailoverTest.class)); return suite; }