Repository: incubator-ignite Updated Branches: refs/heads/sprint-1 322408f01 -> 4563cce86
IGNITE-32 Fixed access to store session from different threads. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e9d31150 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e9d31150 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e9d31150 Branch: refs/heads/sprint-1 Commit: e9d311504f76a28fdf7ccc1781ff697e48d1555d Parents: 43a7391 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Fri Feb 13 14:05:18 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Fri Feb 13 14:05:18 2015 +0700 ---------------------------------------------------------------------- .../store/jdbc/CacheAbstractJdbcStore.java | 100 +++++++++++-------- .../cache/store/jdbc/CacheJdbcPojoStore.java | 8 +- .../store/jdbc/CacheJdbcPojoStoreTest.java | 8 +- 3 files changed, 67 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index ab21fe1..c0350eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -120,25 +120,27 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl /** * Get field value from object. * + * @param cacheName Cache name. * @param typeName Type name. * @param fieldName Field name. * @param obj Cache object. * @return Field value from object. */ - @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj) + @Nullable protected abstract Object extractField(String cacheName, String typeName, String fieldName, Object obj) throws CacheException; /** * Construct object from query result. * * @param <R> Type of result object. + * @param cacheName Cache name. * @param typeName Type name. * @param fields Fields descriptors. * @param loadColIdxs Select query columns index. * @param rs ResultSet. * @return Constructed object. */ - protected abstract <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata> fields, + protected abstract <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata> fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException; /** @@ -417,8 +419,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl ResultSet rs = stmt.executeQuery(); while (rs.next()) { - K key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); + V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); clo.apply(key, val); } @@ -466,13 +468,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl if (entryMappings != null) return entryMappings; - Collection<CacheTypeMetadata> types = ignite().cache(session().cacheName()).configuration() + Collection<CacheTypeMetadata> types = ignite().cache(cacheName).configuration() .getTypeMetadata(); entryMappings = U.newHashMap(types.size()); for (CacheTypeMetadata type : types) - entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(dialect, type)); + entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName, dialect, type)); Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings); @@ -490,14 +492,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl } /** + * @param cacheName Cache name. * @param keyTypeId Key type id. * @param key Key object. * @return Entry mapping. * @throws CacheException if mapping for key was not found. */ - private EntryMapping entryMapping(Object keyTypeId, Object key) throws CacheException { - String cacheName = session().cacheName(); - + private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException { EntryMapping em = cacheMappings(cacheName).get(keyTypeId); if (em == null) @@ -522,12 +523,14 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl if (log.isDebugEnabled()) log.debug("Start loading entries from db using user queries from arguments"); + String cacheName = session().cacheName(); + for (int i = 0; i < args.length; i += 2) { String keyType = args[i].toString(); String selQry = args[i + 1].toString(); - EntryMapping em = entryMapping(keyTypeId(keyType), keyType); + EntryMapping em = entryMapping(cacheName, keyTypeId(keyType), keyType); futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); } @@ -601,7 +604,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl @Nullable @Override public V load(K key) throws CacheLoaderException { assert key != null; - EntryMapping em = entryMapping(keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); if (log.isDebugEnabled()) log.debug("Start load value from database [table= " + em.fullTableName() + ", key=" + key + "]"); @@ -620,7 +623,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl ResultSet rs = stmt.executeQuery(); if (rs.next()) - return buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); } catch (SQLException e) { throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() + @@ -642,14 +645,16 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl try { conn = connection(); - Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(session().cacheName()).size()); + String cacheName = session().cacheName(); + + Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size()); Map<K, V> res = new HashMap<>(); for (K key : keys) { Object keyTypeId = keyTypeId(key); - EntryMapping em = entryMapping(keyTypeId, key); + EntryMapping em = entryMapping(cacheName, keyTypeId, key); LoadWorker<K, V> worker = workers.get(keyTypeId); @@ -755,7 +760,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl K key = entry.getKey(); - EntryMapping em = entryMapping(keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); if (log.isDebugEnabled()) log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]"); @@ -825,6 +830,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl Object currKeyTypeId = null; + String cacheName = session().cacheName(); + if (dialect.hasMerge()) { PreparedStatement mergeStmt = null; @@ -844,7 +851,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl Object keyTypeId = keyTypeId(key); - em = entryMapping(keyTypeId, key); + em = entryMapping(cacheName, keyTypeId, key); if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { if (mergeStmt != null) { @@ -891,7 +898,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl Object keyTypeId = keyTypeId(key); - EntryMapping em = entryMapping(keyTypeId, key); + EntryMapping em = entryMapping(cacheName, keyTypeId, key); if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { U.closeQuiet(insStmt); @@ -927,7 +934,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl @Override public void delete(Object key) throws CacheWriterException { assert key != null; - EntryMapping em = entryMapping(keyTypeId(key), key); + EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key); if (log.isDebugEnabled()) log.debug("Start remove value from database [table=" + em.fullTableName() + ", key=" + key + "]"); @@ -1027,10 +1034,12 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl int fromIdx = 0, prepared = 0; + String cachName = session().cacheName(); + for (Object key : keys) { Object keyTypeId = keyTypeId(key); - em = entryMapping(keyTypeId, key); + em = entryMapping(cachName, keyTypeId, key); if (delStmt == null) { delStmt = conn.prepareStatement(em.remQry); @@ -1082,7 +1091,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping em, Object key) throws CacheException { for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractField(em.keyType(), field.getJavaName(), key); + Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(), key); try { if (fieldVal != null) @@ -1110,28 +1119,28 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl /** * @param stmt Prepare statement. - * @param i Start index for parameters. - * @param m Type mapping description. + * @param idx Start index for parameters. + * @param em Type mapping description. * @param val Value object. * @return Next index for parameters. */ - protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping m, Object val) + protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) throws CacheWriterException { - for (CacheTypeFieldMetadata field : m.uniqValFields) { - Object fieldVal = extractField(m.valueType(), field.getJavaName(), val); + for (CacheTypeFieldMetadata field : em.uniqValFields) { + Object fieldVal = extractField(em.cacheName, em.valueType(), field.getJavaName(), val); try { if (fieldVal != null) - stmt.setObject(i++, fieldVal); + stmt.setObject(idx++, fieldVal); else - stmt.setNull(i++, field.getDatabaseType()); + stmt.setNull(idx++, field.getDatabaseType()); } catch (SQLException e) { throw new CacheWriterException("Failed to set statement parameter name: " + field.getDatabaseName(), e); } } - return i; + return idx; } /** @@ -1224,35 +1233,38 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl * Entry mapping description. */ protected static class EntryMapping { + /** Cache name. */ + private final String cacheName; + /** Database dialect. */ private final JdbcDialect dialect; /** Select border for range queries. */ - protected final String loadCacheSelRangeQry; + private final String loadCacheSelRangeQry; /** Select all items query. */ - protected final String loadCacheQry; + private final String loadCacheQry; /** Select item query. */ - protected final String loadQrySingle; + private final String loadQrySingle; /** Select items query. */ private final String loadQry; /** Merge item(s) query. */ - protected final String mergeQry; + private final String mergeQry; /** Update item query. */ - protected final String insQry; + private final String insQry; /** Update item query. */ - protected final String updQry; + private final String updQry; /** Remove item(s) query. */ - protected final String remQry; + private final String remQry; /** Max key count for load query per statement. */ - protected final int maxKeysPerStmt; + private final int maxKeysPerStmt; /** Database key columns. */ private final Collection<String> keyCols; @@ -1273,9 +1285,13 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl private final String fullTblName; /** + * @param cacheName Cache name. + * @param dialect JDBC dialect. * @param typeMeta Type metadata. */ - public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) { + public EntryMapping(String cacheName, JdbcDialect dialect, CacheTypeMetadata typeMeta) { + this.cacheName = cacheName; + this.dialect = dialect; this.typeMeta = typeMeta; @@ -1456,8 +1472,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl colIdxs.put(meta.getColumnLabel(i), i); while (rs.next()) { - K1 key = buildObject(em.keyType(), em.keyColumns(), colIdxs, rs); - V1 val = buildObject(em.valueType(), em.valueColumns(), colIdxs, rs); + K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs); + V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs); clo.apply(key, val); } @@ -1538,7 +1554,7 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl for (Object key : keys) for (CacheTypeFieldMetadata field : em.keyColumns()) { - Object fieldVal = extractField(em.keyType(), field.getJavaName(), key); + Object fieldVal = extractField(em.cacheName, em.keyType(), field.getJavaName(), key); if (fieldVal != null) stmt.setObject(i++, fieldVal); @@ -1551,8 +1567,8 @@ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> impl Map<K1, V1> entries = U.newHashMap(keys.size()); while (rs.next()) { - K1 key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs); - V1 val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs); + K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs); + V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs); entries.put(key, val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index ed1846b..8687d90 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -153,9 +153,9 @@ public class CacheJdbcPojoStore extends CacheAbstractJdbcStore<Object, Object> { } /** {@inheritDoc} */ - @Override protected <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata> fields, + @Override protected <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata> fields, Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException { - PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName); + PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName); Object obj = mc.newInstance(); @@ -176,10 +176,10 @@ public class CacheJdbcPojoStore extends CacheAbstractJdbcStore<Object, Object> { } /** {@inheritDoc} */ - @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj) + @Nullable @Override protected Object extractField(String cacheName, String typeName, String fieldName, Object obj) throws CacheException { try { - PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName); + PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName); return mc.getters.get(fieldName).invoke(obj); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9d31150/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index 1b665f7..f7dc4e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -51,7 +51,7 @@ import static org.apache.ignite.testframework.junits.cache.GridAbstractCacheStor * Class for {@code PojoCacheStore} tests. */ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { - /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */ + /** DB connection URL. */ private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; /** Default config with mapping. */ @@ -143,7 +143,8 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { Map<Object, CacheAbstractJdbcStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size()); for (CacheTypeMetadata type : typeMeta) - entryMappings.put(store.keyTypeId(type.getKeyType()), new CacheAbstractJdbcStore.EntryMapping(dialect, type)); + entryMappings.put(store.keyTypeId(type.getKeyType()), + new CacheAbstractJdbcStore.EntryMapping(null, dialect, type)); store.prepareBuilders(null, typeMeta); @@ -183,7 +184,8 @@ public class CacheJdbcPojoStoreTest extends GridCommonAbstractTest { CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class, "typeMeta"); - cacheMappings.get(null).put(OrganizationKey.class, new CacheAbstractJdbcStore.EntryMapping(dialect, typeMeta)); + cacheMappings.get(null).put(OrganizationKey.class, + new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta)); Connection conn = store.openConnection(false);