Repository: incubator-ignite Updated Branches: refs/heads/ignite-454 073db0ceb -> fd12c0008
# ignite-375 Add support UUID in pojo store. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8d38c529 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8d38c529 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8d38c529 Branch: refs/heads/ignite-454 Commit: 8d38c52925e46ceb04239dde8d01e5e12fee6751 Parents: 86987b0 Author: anovikov <anovi...@gridgain.com> Authored: Wed Mar 18 10:18:47 2015 +0700 Committer: anovikov <anovi...@gridgain.com> Committed: Wed Mar 18 10:18:47 2015 +0700 ---------------------------------------------------------------------- .../store/jdbc/CacheAbstractJdbcStore.java | 162 ++++-- .../cache/store/jdbc/CacheJdbcPojoStore.java | 6 +- .../core/src/test/config/store/jdbc/Ignite.xml | 54 ++ .../store/jdbc/CacheJdbcPojoStoreTest.java | 559 ++----------------- 4 files changed, 239 insertions(+), 542 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d38c529/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 80a22c0..79f7315 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -34,6 +34,7 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.integration.*; import javax.sql.*; +import java.nio.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; @@ -127,7 +128,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD; /** - * Get field value from object. + * Get field value from object for use as query parameter. * * @param cacheName Cache name. * @param typeName Type name. @@ -135,7 +136,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @param obj Cache object. * @return Field value from object. */ - @Nullable protected abstract Object extractField(@Nullable String cacheName, String typeName, String fieldName, + @Nullable protected abstract Object extractParameter(@Nullable String cacheName, String typeName, String fieldName, Object obj) throws CacheException; /** @@ -387,7 +388,26 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, return EMPTY_COLUMN_VALUE; } - return rs.getObject(colIdx); + Object val = rs.getObject(colIdx); + + if (type == UUID.class && val != null) { + if (val instanceof UUID) + return val; + + if (val instanceof byte[]) { + ByteBuffer bb = ByteBuffer.wrap((byte[])val); + + long most = bb.getLong(); + long least = bb.getLong(); + + return new UUID(most, least); + } + + if (val instanceof String) + return UUID.fromString((String)val); + } + + return val; } /** @@ -461,6 +481,65 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, } /** + * Object is a simple type. + * + * @param cls Class. + * @return {@code True} if object is a simple type. + */ + protected static boolean simpleType(Class<?> cls) { + return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) || + java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) || + UUID.class.isAssignableFrom(cls)); + } + + /** + * @param clsName Class name. + * @param fields Fields descriptors. + * @throws CacheException If failed. + */ + private static void checkMapping(@Nullable String cacheName, String clsName, + Collection<CacheTypeFieldMetadata> fields) throws CacheException { + try { + Class<?> cls = Class.forName(clsName); + + if (simpleType(cls)) { + if (fields.size() != 1) + throw new CacheException("More than one field for simple type [cache name=" + cacheName + + ", type=" + clsName + " ]"); + + CacheTypeFieldMetadata field = F.first(fields); + + if (field.getDatabaseName() == null) + throw new CacheException("Missing database name in mapping description [cache name=" + cacheName + + ", type=" + clsName + " ]"); + + if (field.getJavaName() != null) + throw new CacheException("Missing field name in mapping description [cache name=" + cacheName + + ", type=" + clsName + " ]"); + + field.setJavaType(cls); + } + else + for (CacheTypeFieldMetadata field : fields) { + if (field.getDatabaseName() == null) + throw new CacheException("Missing database name in mapping description [cache name=" + cacheName + + ", type=" + clsName + " ]"); + + if (field.getJavaName() == null) + throw new CacheException("Missing field name in mapping description [cache name=" + cacheName + + ", type=" + clsName + " ]"); + + if (field.getJavaType() == null) + throw new CacheException("Missing field type in mapping description [cache name=" + cacheName + + ", type=" + clsName + " ]"); + } + } + catch (ClassNotFoundException e) { + throw new CacheException("Failed to find class: " + clsName, e); + } + } + + /** * @return Type mappings for specified cache name. * @throws CacheException If failed to initialize. */ @@ -491,6 +570,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName + ", key type=" + type.getKeyType() + "]"); + checkMapping(cacheName, type.getKeyType(), type.getKeyFields()); + checkMapping(cacheName, type.getValueType(), type.getValueFields()); + entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName, dialect, type)); } @@ -972,7 +1054,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, if (delCnt != 1) U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key + - "expected=1, actual=" + delCnt + "]"); + ", expected=1, actual=" + delCnt + "]"); } catch (SQLException e) { throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() + @@ -1100,29 +1182,52 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, } /** + * Sets the value of the designated parameter using the given object. + * + * @param stmt Prepare statement. + * @param i Index for parameters. + * @param field Field descriptor. + * @param fieldVal Field value. + * @throws CacheException If failed to set statement parameter. + */ + protected void fillParameter(PreparedStatement stmt, int i, CacheTypeFieldMetadata field, @Nullable Object fieldVal) + throws CacheException { + try { + if (fieldVal != null) { + if (field.getJavaType() == UUID.class) { + if (field.getDatabaseType() == Types.BINARY) + fieldVal = U.uuidToBytes((UUID)fieldVal); + else if (F.contains(new int[] { Types.CHAR, Types.VARCHAR }, field.getDatabaseType())) + fieldVal = fieldVal.toString(); + } + + stmt.setObject(i, fieldVal); + } + else + stmt.setNull(i, field.getDatabaseType()); + } + catch (SQLException e) { + throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e); + } + } + + /** * @param stmt Prepare statement. - * @param i Start index for parameters. + * @param idx Start index for parameters. * @param em Entry mapping. * @param key Key object. * @return Next index for parameters. + * @throws CacheException If failed to set statement parameter. */ - protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping em, + protected int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em, Object key) throws CacheException { for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(), key); + Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key); - try { - if (fieldVal != null) - stmt.setObject(i++, fieldVal); - else - stmt.setNull(i++, field.getDatabaseType()); - } - catch (SQLException e) { - throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e); - } + fillParameter(stmt, idx++, field, fieldVal); } - return i; + return idx; } /** @@ -1130,6 +1235,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @param m Type mapping description. * @param key Key object. * @return Next index for parameters. + * @throws CacheException If failed to set statement parameter. */ protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException { return fillKeyParameters(stmt, 1, m, key); @@ -1141,21 +1247,14 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, * @param em Type mapping description. * @param val Value object. * @return Next index for parameters. + * @throws CacheException If failed to set statement parameter. */ protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) throws CacheWriterException { for (CacheTypeFieldMetadata field : em.uniqValFields) { - Object fieldVal = extractField(em.cacheName, em.valueType(), field.getJavaName(), val); + Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaName(), val); - try { - if (fieldVal != null) - stmt.setObject(idx++, fieldVal); - else - stmt.setNull(idx++, field.getDatabaseType()); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to set statement parameter name: " + field.getDatabaseName(), e); - } + fillParameter(stmt, idx++, field, fieldVal); } return idx; @@ -1582,16 +1681,13 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, try { stmt = conn.prepareStatement(em.loadQuery(keys.size())); - int i = 1; + int idx = 1; for (Object key : keys) for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(), key); + Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key); - if (fieldVal != null) - stmt.setObject(i++, fieldVal); - else - stmt.setNull(i++, field.getDatabaseType()); + fillParameter(stmt, idx++, field, fieldVal); } ResultSet rs = stmt.executeQuery(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d38c529/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index a92ff95..8dcfc10 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -67,8 +67,7 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { try { cls = Class.forName(clsName); - if (simple = (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) || - java.util.Date.class.isAssignableFrom(cls)) || Boolean.class.isAssignableFrom(cls)) + if (simple = simpleType(cls)) return; ctor = cls.getDeclaredConstructor(); @@ -188,7 +187,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> { } /** {@inheritDoc} */ - @Nullable @Override protected Object extractField(String cacheName, String typeName, String fieldName, Object obj) + @Nullable @Override protected Object extractParameter(String cacheName, String typeName, String fieldName, + Object obj) throws CacheException { try { PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d38c529/modules/core/src/test/config/store/jdbc/Ignite.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/store/jdbc/Ignite.xml b/modules/core/src/test/config/store/jdbc/Ignite.xml index 9eb598d..3a64d60 100644 --- a/modules/core/src/test/config/store/jdbc/Ignite.xml +++ b/modules/core/src/test/config/store/jdbc/Ignite.xml @@ -183,4 +183,58 @@ </list> </property> </bean> + <bean class="org.apache.ignite.cache.CacheTypeMetadata"> + <property name="databaseSchema" value="PUBLIC"/> + <property name="databaseTable" value="STRING_ENTRIES"/> + <property name="keyType" value="java.lang.String"/> + <property name="valueType" value="java.lang.String"/> + <property name="keyFields"> + <list> + <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata"> + <property name="databaseName" value="KEY"/> + <property name="databaseType"> + <util:constant static-field="java.sql.Types.VARCHAR"/> + </property> + </bean> + </list> + </property> + <property name="valueFields"> + <list> + <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata"> + <property name="databaseName" value="VAL"/> + <property name="databaseType"> + <util:constant static-field="java.sql.Types.VARCHAR"/> + </property> + </bean> + </list> + </property> + </bean> + <bean class="org.apache.ignite.cache.CacheTypeMetadata"> + <property name="databaseSchema" value="PUBLIC"/> + <property name="databaseTable" value="UUID_ENTRIES"/> + <property name="keyType" value="java.util.UUID"/> + <property name="valueType" value="java.util.UUID"/> + <property name="keyFields"> + <list> + <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata"> + <property name="databaseName" value="KEY"/> + <property name="databaseType"> + <util:constant static-field="java.sql.Types.BINARY"/> + </property> + <property name="javaType" value="java.util.UUID"/> + </bean> + </list> + </property> + <property name="valueFields"> + <list> + <bean class="org.apache.ignite.cache.CacheTypeFieldMetadata"> + <property name="databaseName" value="VAL"/> + <property name="databaseType"> + <util:constant static-field="java.sql.Types.BINARY"/> + </property> + <property name="javaType" value="java.util.UUID"/> + </bean> + </list> + </property> + </bean> </beans> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d38c529/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index e870fd6..76cc119 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -19,38 +19,29 @@ package org.apache.ignite.cache.store.jdbc; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.dialect.*; import org.apache.ignite.cache.store.jdbc.model.*; -import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.cache.*; -import org.apache.ignite.testframework.junits.common.*; -import org.apache.ignite.transactions.*; import org.h2.jdbcx.*; -import org.jetbrains.annotations.*; import org.springframework.beans.*; import org.springframework.beans.factory.xml.*; import org.springframework.context.support.*; import org.springframework.core.io.*; -import javax.cache.*; -import javax.cache.integration.*; import java.net.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest.*; /** * Class for {@code PojoCacheStore} tests. */ -public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { +public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<CacheJdbcPojoStore<Object, Object>> { /** DB connection URL. */ private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; @@ -63,29 +54,16 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { /** Person count. */ protected static final int PERSON_CNT = 100000; - /** */ - protected TestThreadLocalCacheSession ses = new TestThreadLocalCacheSession(); - - /** */ - protected final CacheJdbcPojoStore store; - /** * @throws Exception If failed. */ - @SuppressWarnings({"AbstractMethodCallInConstructor", "OverriddenMethodCallDuringObjectConstruction"}) public CacheJdbcPojoStoreTest() throws Exception { - super(false); - - store = store(); - - inject(store); + // No-op. } - /** - * @return Store. - */ - protected CacheJdbcPojoStore store() throws IgniteCheckedException { - CacheJdbcPojoStore store = new CacheJdbcPojoStore(); + /** {@inheritDoc} */ + @Override protected CacheJdbcPojoStore<Object, Object> store() { + CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>(); // PGPoolingDataSource ds = new PGPoolingDataSource(); // ds.setUser("postgres"); @@ -101,18 +79,6 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); - return store; - } - - /** - * @param store Store. - * @throws Exception If failed. - */ - protected void inject(CacheAbstractJdbcStore store) throws Exception { - getTestResources().inject(store); - - GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses); - URL cfgUrl; try { @@ -123,7 +89,7 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { } if (cfgUrl == null) - throw new Exception("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG); + throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG); try { GenericApplicationContext springCtx = new GenericApplicationContext(); @@ -154,68 +120,77 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { } catch (BeansException e) { if (X.hasCause(e, ClassNotFoundException.class)) - throw new IgniteCheckedException("Failed to instantiate Spring XML application context " + + throw new IgniteException("Failed to instantiate Spring XML application context " + "(make sure all classes used in Spring configuration are present at CLASSPATH) " + "[springUrl=" + cfgUrl + ']', e); else - throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" + + throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" + cfgUrl + ", err=" + e.getMessage() + ']', e); } + + return store; } /** + * @param store Store. * @throws Exception If failed. */ - public void testWriteRetry() throws Exception { - // Special dialect that will skip updates, to test write retry. - BasicJdbcDialect dialect = new BasicJdbcDialect() { - /** {@inheritDoc} */ - @Override public String updateQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols) { - return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0"; - } - }; - - store.setDialect(dialect); - - Map<String, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings = - GridTestUtils.getFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings"); + @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception { + getTestResources().inject(store); - CacheAbstractJdbcStore.EntryMapping em = cacheMappings.get(null).get(OrganizationKey.class); + GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses); + } - CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class, "typeMeta"); + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Connection conn = store.openConnection(false); - cacheMappings.get(null).put(OrganizationKey.class, - new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta)); + Statement stmt = conn.createStatement(); - Connection conn = store.openConnection(false); + try { + stmt.executeUpdate("delete from String_Entries"); + } + catch (SQLException ignore) { + // No-op. + } - PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); + try { + stmt.executeUpdate("delete from UUID_Entries"); + } + catch (SQLException ignore) { + // No-op. + } - orgStmt.setInt(1, 1); - orgStmt.setString(2, "name" + 1); - orgStmt.setString(3, "city" + 1); + try { + stmt.executeUpdate("delete from Organization"); + } + catch (SQLException ignore) { + // No-op. + } - orgStmt.executeUpdate(); + try { + stmt.executeUpdate("delete from Person"); + } + catch (SQLException ignore) { + // No-op. + } - U.closeQuiet(orgStmt); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person_Complex (id integer not null, org_id integer not null, city_id integer not null, name varchar(50), PRIMARY KEY(id))"); conn.commit(); - OrganizationKey k1 = new OrganizationKey(1); - Organization v1 = new Organization(1, "Name1", "City1"); + U.closeQuiet(stmt); - ses.newSession(null); + U.closeQuiet(conn); - try { - store.write(new CacheEntryImpl<>(k1, v1)); - } - catch (CacheWriterException e) { - if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") || - e.getSuppressed().length != 2) - throw e; - } + super.beforeTest(); } + /** * @throws Exception If failed. */ @@ -332,432 +307,4 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { assertTrue(prnKeys.isEmpty()); assertTrue(prnComplexKeys.isEmpty()); } - - /** - * @throws Exception If failed. - */ - public void testStore() throws Exception { - // Create dummy transaction - Transaction tx = new DummyTx(); - - ses.newSession(tx); - - OrganizationKey k1 = new OrganizationKey(1); - Organization v1 = new Organization(1, "Name1", "City1"); - - OrganizationKey k2 = new OrganizationKey(2); - Organization v2 = new Organization(2, "Name2", "City2"); - - store.write(new CacheEntryImpl<>(k1, v1)); - store.write(new CacheEntryImpl<>(k2, v2)); - - store.txEnd(true); - - ses.newSession(null); - - assertEquals(v1, store.load(k1)); - assertEquals(v2, store.load(k2)); - - ses.newSession(tx); - - OrganizationKey k3 = new OrganizationKey(3); - - assertNull(store.load(k3)); - - store.delete(k1); - - store.txEnd(true); - - assertNull(store.load(k1)); - assertEquals(v2, store.load(k2)); - - ses.newSession(null); - - assertNull(store.load(k3)); - - OrganizationKey k4 = new OrganizationKey(4); - Organization v4 = new Organization(4, null, "City4"); - - assertNull(store.load(k4)); - - store.write(new CacheEntryImpl<>(k4, v4)); - - assertEquals(v4, store.load(k4)); - } - - /** - * @throws IgniteCheckedException if failed. - */ - public void testRollback() throws IgniteCheckedException { - Transaction tx = new DummyTx(); - - ses.newSession(tx); - - OrganizationKey k1 = new OrganizationKey(1); - Organization v1 = new Organization(1, "Name1", "City1"); - - // Put. - store.write(new CacheEntryImpl<>(k1, v1)); - - store.txEnd(false); // Rollback. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertNull(store.load(k1)); - - OrganizationKey k2 = new OrganizationKey(2); - Organization v2 = new Organization(2, "Name2", "City2"); - - // Put all. - assertNull(store.load(k2)); - - Collection<Cache.Entry<?, ?>> col = new ArrayList<>(); - - col.add(new CacheEntryImpl<>(k2, v2)); - - store.writeAll(col); - - store.txEnd(false); // Rollback. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertNull(store.load(k2)); - - OrganizationKey k3 = new OrganizationKey(3); - Organization v3 = new Organization(3, "Name3", "City3"); - - col = new ArrayList<>(); - - col.add(new CacheEntryImpl<>(k3, v3)); - - store.writeAll(col); - - store.txEnd(true); // Commit. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertEquals(v3, store.load(k3)); - - OrganizationKey k4 = new OrganizationKey(4); - Organization v4 = new Organization(4, "Name4", "City4"); - - store.write(new CacheEntryImpl<>(k4, v4)); - - store.txEnd(false); // Rollback. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertNull(store.load(k4)); - - assertEquals(v3, store.load(k3)); - - // Remove. - store.delete(k3); - - store.txEnd(false); // Rollback. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertEquals(v3, store.load(k3)); - - store.deleteAll(Arrays.asList(new OrganizationKey(-100))); - - // Remove all. - store.deleteAll(Arrays.asList(k3)); - - store.txEnd(false); // Rollback. - - tx = new DummyTx(); - - ses.newSession(tx); - - assertEquals(v3, store.load(k3)); - } - - /** - */ - public void testAllOpsWithTXNoCommit() { - doTestAllOps(new DummyTx(), false); - } - - /** - */ - public void testAllOpsWithTXCommit() { - doTestAllOps(new DummyTx(), true); - } - - /** - */ - public void testAllOpsWithoutTX() { - doTestAllOps(null, false); - } - - /** - * @param tx Transaction. - * @param commit Commit. - */ - private void doTestAllOps(@Nullable Transaction tx, boolean commit) { - try { - ses.newSession(tx); - - final OrganizationKey k1 = new OrganizationKey(1); - final Organization v1 = new Organization(1, "Name1", "City1"); - - store.write(new CacheEntryImpl<>(k1, v1)); - - if (tx != null && commit) { - store.txEnd(true); - - tx = new DummyTx(); - - ses.newSession(tx); - } - - if (tx == null || commit) - assertEquals(v1, store.load(k1)); - - Collection<Cache.Entry<?, ?>> col = new ArrayList<>(); - - final OrganizationKey k2 = new OrganizationKey(2); - final Organization v2 = new Organization(2, "Name2", "City2"); - - final OrganizationKey k3 = new OrganizationKey(3); - final Organization v3 = new Organization(3, "Name3", "City3"); - - col.add(new CacheEntryImpl<>(k2, v2)); - col.add(new CacheEntryImpl<>(k3, v3)); - - store.writeAll(col); - - if (tx != null && commit) { - store.txEnd(true); - - tx = new DummyTx(); - - ses.newSession(tx); - } - - final AtomicInteger cntr = new AtomicInteger(); - - final OrganizationKey no_such_key = new OrganizationKey(4); - - if (tx == null || commit) { - Map<Object, Object> loaded = store.loadAll(Arrays.asList(k1, k2, k3, no_such_key)); - - for (Map.Entry<Object, Object> e : loaded.entrySet()) { - Object key = e.getKey(); - Object val = e.getValue(); - - if (k1.equals(key)) - assertEquals(v1, val); - - if (k2.equals(key)) - assertEquals(v2, val); - - if (k3.equals(key)) - assertEquals(v3, val); - - if (no_such_key.equals(key)) - fail(); - - cntr.incrementAndGet(); - } - - assertEquals(3, cntr.get()); - } - - store.deleteAll(Arrays.asList(k2, k3)); - - if (tx != null && commit) { - store.txEnd(true); - - tx = new DummyTx(); - - ses.newSession(tx); - } - - if (tx == null || commit) { - assertNull(store.load(k2)); - assertNull(store.load(k3)); - assertEquals(v1, store.load(k1)); - } - - store.delete(k1); - - if (tx != null && commit) { - store.txEnd(true); - - tx = new DummyTx(); - - ses.newSession(tx); - } - - if (tx == null || commit) - assertNull(store.load(k1)); - } - finally { - if (tx != null) - store.txEnd(false); - } - } - - /** - * @throws Exception If failed. - */ - public void testSimpleMultithreading() throws Exception { - final Random rnd = new Random(); - - final Queue<OrganizationKey> queue = new LinkedBlockingQueue<>(); - - multithreaded(new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - for (int i = 0; i < 1000; i++) { - Transaction tx = rnd.nextBoolean() ? new DummyTx() : null; - - ses.newSession(tx); - - int op = rnd.nextInt(10); - - boolean queueEmpty = false; - - if (op < 4) { // Load. - OrganizationKey key = queue.poll(); - - if (key == null) - queueEmpty = true; - else { - if (rnd.nextBoolean()) - assertNotNull(store.load(key)); - else { - Map<Object, Object> loaded = store.loadAll(Collections.singleton(key)); - - assertEquals(1, loaded.size()); - - Map.Entry<Object, Object> e = loaded.entrySet().iterator().next(); - - OrganizationKey k = (OrganizationKey)e.getKey(); - Organization v = (Organization)e.getValue(); - - assertTrue(k.getId().equals(v.getId())); - } - - if (tx != null) - store.txEnd(true); - - queue.add(key); - } - } - else if (op < 6) { // Remove. - OrganizationKey key = queue.poll(); - - if (key == null) - queueEmpty = true; - else { - if (rnd.nextBoolean()) - store.delete(key); - else - store.deleteAll(Collections.singleton(key)); - - if (tx != null) - store.txEnd(true); - } - } - else { // Update. - OrganizationKey key = queue.poll(); - - if (key == null) - queueEmpty = true; - else { - Organization val = - new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId()); - - Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val); - - if (rnd.nextBoolean()) - store.write(entry); - else { - Collection<Cache.Entry<?, ?>> col = new ArrayList<>(); - - col.add(entry); - - store.writeAll(col); - } - - if (tx != null) - store.txEnd(true); - - queue.add(key); - } - } - - if (queueEmpty) { // Add. - OrganizationKey key = new OrganizationKey(rnd.nextInt()); - Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId()); - - Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val); - - if (rnd.nextBoolean()) - store.write(entry); - else { - Collection<Cache.Entry<?, ?>> col = new ArrayList<>(); - - col.add(entry); - - store.writeAll(col); - } - - if (tx != null) - store.txEnd(true); - - queue.add(key); - } - } - - return null; - } - }, 37); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - Connection conn = store.openConnection(false); - - Statement stmt = conn.createStatement(); - - try { - stmt.executeUpdate("delete from Organization"); - } - catch (SQLException ignore) { - // no-op - } - - try { - stmt.executeUpdate("delete from Person"); - } - catch (SQLException ignore) { - // no-op - } - - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS Person_Complex (id integer not null, org_id integer not null, city_id integer not null, name varchar(50), PRIMARY KEY(id))"); - - conn.commit(); - - U.closeQuiet(stmt); - - U.closeQuiet(conn); - } }