ignite-gg-9933 - value type can be a class
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d7eb3f8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d7eb3f8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d7eb3f8d Branch: refs/heads/ignite-offheap-hang Commit: d7eb3f8d4d864ae22ca353ff801ae26cc4eed46b Parents: e444ffc Author: S.Vladykin <svlady...@gridgain.com> Authored: Sat Mar 21 06:04:21 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Sat Mar 21 06:04:21 2015 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridH2AbstractKeyValueRow.java | 32 +- .../query/h2/opt/GridH2KeyValueRowOffheap.java | 4 +- .../processors/query/h2/opt/GridH2Table.java | 6 + ...CacheOffheapTieredMultithreadedSelfTest.java | 319 +++++++++++++++++++ .../IgniteCacheQueryMultiThreadedSelfTest.java | 40 +-- .../IgniteCacheQuerySelfTestSuite.java | 3 +- 6 files changed, 371 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java index ef4c3a8..e65597c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -176,6 +176,8 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { */ public synchronized void onUnswap(Object val) throws IgniteCheckedException { setValue(VAL_COL, wrap(val, desc.valueType())); + + notifyAll(); } /** @@ -193,14 +195,30 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { setValue(VAL_COL, new WeakValue(upd)); + notifyAll(); + return exp; } /** + * @param attempt Attempt. * @return Synchronized value. */ - protected synchronized Value syncValue() { - return super.getValue(VAL_COL); + protected synchronized Value syncValue(int attempt) { + Value v = super.getValue(VAL_COL); + + if (v == null && attempt != 0) { + try { + wait(attempt); + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + + v = super.getValue(VAL_COL); + } + + return v; } /** {@inheritDoc} */ @@ -209,6 +227,8 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { Value v = super.getValue(col); if (col == VAL_COL) { + int attempt = 0; + while ((v = WeakValue.unwrap(v)) == null) { v = getOffheapValue(VAL_COL); @@ -240,12 +260,16 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { } else { // If nothing found in swap then we should be already unswapped. - v = syncValue(); + v = syncValue(attempt); } } catch (IgniteCheckedException e) { throw new IgniteException(e); } + + if (attempt++ == 300) // wait 45 sec in syncValue + throw new IgniteException("Failed to get cache value from offheap or swap for key: " + + getValue(KEY_COL).getObject()); } } @@ -254,7 +278,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { v = getOffheapValue(KEY_COL); - assert v != null : v; + assert v != null; setValue(KEY_COL, v); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java index 8341233..cdf2e84 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java @@ -254,8 +254,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { } /** {@inheritDoc} */ - @Override protected synchronized Value syncValue() { - Value v = super.syncValue(); + @Override protected synchronized Value syncValue(int attempt) { + Value v = super.syncValue(attempt); if (v != null) return v; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 71f5ff4..f346402 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -40,6 +40,9 @@ import java.util.concurrent.locks.*; */ public class GridH2Table extends TableBase { /** */ + private static final long MAX_LOCK_ATTEMPT_TIME = 30_000; + + /** */ private final String spaceName; /** */ @@ -207,6 +210,9 @@ public class GridH2Table extends TableBase { catch (InterruptedException e) { throw new IgniteException("Thread got interrupted while trying to acquire index lock.", e); } + + if (waitTime > MAX_LOCK_ATTEMPT_TIME) + throw new IgniteException("Failed to lock table: " + getName()); } boolean snapshoted = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java new file mode 100644 index 0000000..783c312 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapTieredMultithreadedSelfTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Based on Yardstick benchmark. + */ +public class IgniteCacheOffheapTieredMultithreadedSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int RANGE = 1_000_000; + + /** */ + private static IgniteCache<Integer, Object> cache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration<?,?> cacheCfg = new CacheConfiguration<>(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(ATOMIC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setBackups(1); + cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + cacheCfg.setIndexedTypes( + Integer.class, Person.class + ); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + awaitPartitionMapExchange(); + + cache = grid(0).jcache(null); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testQueryPut() throws Exception { + final AtomicBoolean end = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while(!end.get()) { + if (rnd.nextBoolean()) { + double salary = rnd.nextDouble() * RANGE * 1000; + + double maxSalary = salary + 1000; + + Collection<Cache.Entry<Integer, Object>> entries = executeQuery(salary, maxSalary); + + for (Cache.Entry<Integer, Object> entry : entries) { + Person p = (Person)entry.getValue(); + + if (p.getSalary() < salary || p.getSalary() > maxSalary) + throw new Exception("Invalid person retrieved [min=" + salary + ", max=" + maxSalary + + ", person=" + p + ']'); + } + } + else { + int i = rnd.nextInt(RANGE); + + cache.put(i, new Person(i, "firstName" + i, "lastName" + i, i * 1000)); + } + } + + return null; + } + }, 100); + + Thread.sleep(45_000); + + end.set(true); + + fut.get(); + } + + /** + * @param minSalary Min salary. + * @param maxSalary Max salary. + * @return Query result. + * @throws Exception If failed. + */ + private Collection<Cache.Entry<Integer, Object>> executeQuery(double minSalary, double maxSalary) throws Exception { + SqlQuery qry = new SqlQuery(Person.class, "salary >= ? and salary <= ?"); + + qry.setArgs(minSalary, maxSalary); + + return cache.query(qry).getAll(); + } + + /** + * Person record used for query test. + */ + public static class Person implements Externalizable { + /** Person ID. */ + @QuerySqlField(index = true) + private int id; + + /** Organization ID. */ + @QuerySqlField(index = true) + private int orgId; + + /** First name (not-indexed). */ + @QuerySqlField + private String firstName; + + /** Last name (not indexed). */ + @QuerySqlField + private String lastName; + + /** Salary. */ + @QuerySqlField(index = true) + private double salary; + + /** + * Constructs empty person. + */ + public Person() { + // No-op. + } + + /** + * Constructs person record that is not linked to any organization. + * + * @param id Person ID. + * @param firstName First name. + * @param lastName Last name. + * @param salary Salary. + */ + public Person(int id, String firstName, String lastName, double salary) { + this(id, 0, firstName, lastName, salary); + } + + /** + * Constructs person record. + * + * @param id Person ID. + * @param orgId Organization ID. + * @param firstName First name. + * @param lastName Last name. + * @param salary Salary. + */ + public Person(int id, int orgId, String firstName, String lastName, double salary) { + this.id = id; + this.orgId = orgId; + this.firstName = firstName; + this.lastName = lastName; + this.salary = salary; + } + + /** + * @return Person id. + */ + public int getId() { + return id; + } + + /** + * @param id Person id. + */ + public void setId(int id) { + this.id = id; + } + + /** + * @return Organization id. + */ + public int getOrganizationId() { + return orgId; + } + + /** + * @param orgId Organization id. + */ + public void setOrganizationId(int orgId) { + this.orgId = orgId; + } + + /** + * @return Person first name. + */ + public String getFirstName() { + return firstName; + } + + /** + * @param firstName Person first name. + */ + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + /** + * @return Person last name. + */ + public String getLastName() { + return lastName; + } + + /** + * @param lastName Person last name. + */ + public void setLastName(String lastName) { + this.lastName = lastName; + } + + /** + * @return Salary. + */ + public double getSalary() { + return salary; + } + + /** + * @param salary Salary. + */ + public void setSalary(double salary) { + this.salary = salary; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + out.writeInt(orgId); + out.writeUTF(firstName); + out.writeUTF(lastName); + out.writeDouble(salary); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + orgId = in.readInt(); + firstName = in.readUTF(); + lastName = in.readUTF(); + salary = in.readDouble(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || (o instanceof Person) && id == ((Person)o).id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Person [firstName=" + firstName + + ", id=" + id + + ", orgId=" + orgId + + ", lastName=" + lastName + + ", salary=" + salary + + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java index 2e54404..f3bf64c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -53,7 +54,7 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes private static final boolean TEST_INFO = true; /** Number of test grids (nodes). Should not be less than 2. */ - private static final int GRID_CNT = 2; + private static final int GRID_CNT = 3; /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); @@ -65,7 +66,7 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes private static AtomicInteger idxUnswapCnt = new AtomicInteger(); /** */ - private static final long DURATION = 30 * 1000; + private static final long DURATION = 60 * 1000; /** Don't start grid by default. */ public IgniteCacheQueryMultiThreadedSelfTest() { @@ -290,15 +291,8 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes case 4: int from = rnd.nextInt(valCnt); - QueryCursor<Cache.Entry<Integer, String>> qry = c.query( - new SqlQuery(String.class, "_val between ? and ?").setArgs( - String.valueOf(from), String.valueOf(from + 250))); - - Collection<Cache.Entry<Integer, String>> res = qry.getAll(); - - for (Cache.Entry<Integer, String> ignored : res) { - //No-op. - } + c.query(new SqlQuery(String.class, "_val between ? and ?").setArgs( + String.valueOf(from), String.valueOf(from + 250))).getAll(); } } } @@ -403,15 +397,15 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes final Ignite g = grid(0); // Put test values into cache. - final IgniteCache<Integer, Object> c = g.jcache(null); + final IgniteCache<Integer, String> c = g.jcache(null); assertEquals(0, g.jcache(null).size()); - assertEquals(0, c.query(new SqlQuery(Object.class, "1 = 1")).getAll().size()); + assertEquals(0, c.query(new SqlQuery(String.class, "true")).getAll().size()); - Random rnd = new Random(); + Random rnd = new GridRandom(); - for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) { - c.put(i, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt))); + for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(2)) { + c.put(i, String.valueOf(rnd.nextInt(valCnt))); if (evictsEnabled() && rnd.nextBoolean()) c.localEvict(Arrays.asList(i)); @@ -421,15 +415,14 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() { @Override public void applyx() throws IgniteCheckedException { - Random rnd = new Random(); + Random rnd = new GridRandom(); while (!done.get()) { int key = rnd.nextInt(keyCnt); switch (rnd.nextInt(5)) { case 0: - c.put(key, rnd.nextBoolean() ? (long)rnd.nextInt(valCnt) : - String.valueOf(rnd.nextInt(valCnt))); + c.put(key, String.valueOf(rnd.nextInt(valCnt))); break; case 1: @@ -448,13 +441,8 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes case 4: int from = rnd.nextInt(valCnt); - Collection<Cache.Entry<Integer, Object>> res = c.query( - new SqlQuery(Object.class, "_val between ? and ?").setArgs(from, from + 250)) - .getAll(); - - for (Cache.Entry<Integer, Object> ignored : res) { - //No-op. - } + c.query(new SqlQuery(Object.class, "_val between ? and ?").setArgs( + String.valueOf(from), String.valueOf(from + 250))).getAll(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7eb3f8d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index e29af9f..6f5b99e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -57,7 +57,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheQueryIndexSelfTest.class); suite.addTestSuite(GridCacheQueryInternalKeysSelfTest.class); suite.addTestSuite(IgniteCacheQueryMultiThreadedSelfTest.class); - //suite.addTestSuite(IgniteCacheQueryEvictsMultiThreadedSelfTest.class); + suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); + suite.addTestSuite(IgniteCacheQueryEvictsMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryOffheapMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);