# IGNITE-32: Review jdbc cache store.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/76c2f747 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/76c2f747 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/76c2f747 Branch: refs/heads/ignite-32 Commit: 76c2f747f47b217a733f6749677799de8a395e56 Parents: d84215c Author: AKuznetsov <akuznet...@gridgain.com> Authored: Wed Jan 28 17:17:16 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Wed Jan 28 17:17:16 2015 +0700 ---------------------------------------------------------------------- .../ignite/cache/store/jdbc/JdbcCacheStore.java | 246 ++++++++----------- .../cache/store/jdbc/JdbcPojoCacheStore.java | 6 +- 2 files changed, 102 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76c2f747/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java index a0fa0a6..6e34443 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java @@ -68,14 +68,12 @@ import java.util.concurrent.atomic.*; * <pre name="code" class="xml"> * ... * <bean id="cache.jdbc.store" - * class="org.gridgain.grid.cache.store.jdbc.JdbcPojoCacheStore"> + * class="org.apache.ignite.cache.store.jdbc.JdbcPojoCacheStore"> * <property name="connectionUrl" value="jdbc:h2:mem:"/> * </bean> * ... * </pre> * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> */ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { @@ -85,12 +83,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { /** Default batch size for put and remove operations. */ protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512; - /** Connection attribute name. */ - protected static final String ATTR_CONN = "JDBC_STORE_CONNECTION"; - - /** Auto-injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; + /** Connection attribute property name. */ + protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; /** Auto-injected logger instance. */ @IgniteLoggerResource @@ -124,7 +118,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { protected Collection<CacheQueryTypeMetadata> typeMetadata; /** Cache with query by type. */ - protected Map<IgniteBiTuple<String, Object>, EntryMapping> typeMapping; + protected Map<IgniteBiTuple<String, Object>, EntryMapping> entryMappings; /** Database dialect. */ protected JdbcDialect dialect; @@ -222,6 +216,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { if (dbProductName.startsWith("DB2/")) return new DB2Dialect(); + log.warning("Unknown database: " + dbProductName + ". BasicJdbcDialect will be used."); + return new BasicJdbcDialect(); } @@ -244,7 +240,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { if (typeMetadata == null) - throw new CacheException("Failed to initialize cache store (mappping description is not provided)."); + throw new CacheException("Failed to initialize cache store (mapping description is not provided)."); buildTypeCache(); @@ -296,14 +292,13 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { if (ses.transaction() != null) { Map<String, Connection> prop = ses.properties(); - Connection conn = prop.get(ATTR_CONN); + Connection conn = prop.get(ATTR_CONN_PROP); if (conn == null) { conn = openConnection(false); - // Store connection in session, so it can be accessed - // for other operations on the same session. - prop.put(ATTR_CONN, conn); + // Store connection in session to used it for other operations in the same session. + prop.put(ATTR_CONN_PROP, conn); } return conn; @@ -344,8 +339,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { IgniteTx tx = ses.transaction(); - Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN); - + Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP); if (conn != null) { assert tx != null; @@ -373,7 +367,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { * Construct load cache from range. * * @param m Type mapping description. - * @param clo Closure for loaded values. + * @param clo Closure that will be applied to loaded values. * @param lowerBound Lower bound for range. * @param upperBound Upper bound for range. * @return Callable for pool submit. @@ -389,7 +383,9 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = openConnection(true); - stmt = conn.prepareStatement(m.loadCacheRangeQuery(lowerBound != null, upperBound != null)); + stmt = conn.prepareStatement(lowerBound == null && upperBound == null + ? m.loadCacheQry + : m.loadCacheRangeQuery(lowerBound != null, upperBound != null)); int ix = 1; @@ -434,38 +430,21 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { * @return Callable for pool submit. */ private Callable<Void> loadCacheFull(final EntryMapping m, final IgniteBiInClosure<K, V> clo) { - return new Callable<Void>() { - @Override public Void call() throws Exception { - Connection conn = null; - - PreparedStatement stmt = null; - - try { - conn = openConnection(true); - - stmt = conn.prepareStatement(m.loadCacheQry); - - ResultSet rs = stmt.executeQuery(); - - while (rs.next()) { - K key = buildObject(m.keyType(), m.keyDescriptors(), rs); - V val = buildObject(m.valueType(), m.valueDescriptors(), rs); + return loadCacheRange(m, clo, null, null); + } - clo.apply(key, val); - } - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to load cache", e); - } - finally { - U.closeQuiet(stmt); + /** + * @param key Entry key. + * @return Entry mapping. + * @throws CacheException if mapping for key was not found. + */ + private EntryMapping entryMapping(Object keyId, Object key) throws CacheException { + EntryMapping em = entryMappings.get(new IgniteBiTuple<>(session().cacheName(), keyId)); - U.closeQuiet(conn); - } + if (em == null) + throw new CacheException("Failed to find mapping description for key: " + key); - return null; - } - }; + return em; } /** {@inheritDoc} */ @@ -480,54 +459,47 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { if (args != null && args.length > 0) { if (args.length % 2 != 0) - throw new CacheLoaderException("Wrong argument count"); + throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length); if (log.isDebugEnabled()) - log.debug("Start loading entries from db using queries from arguments"); + log.debug("Start loading entries from db using user queries from arguments"); for (int i = 0; i < args.length; i += 2) { String keyType = args[i].toString(); - final String selQry = args[i + 1].toString(); - - CacheStoreSession ses = session(); + String selQry = args[i + 1].toString(); - Object typeId = keyId(keyType); + EntryMapping em = entryMapping(keyId(keyType), keyType); - final EntryMapping m = typeMapping.get(new IgniteBiTuple<>(ses.cacheName(), typeId)); - - if (m == null) - throw new CacheWriterException("Failed to find mapping description for key: " + keyType); - - futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(m, selQry, clo))); + futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); } } else { if (log.isDebugEnabled()) - log.debug("Start loading all types entries from db"); + log.debug("Start loading all cache types entries from db"); - for (EntryMapping m : typeMapping.values()) { + for (EntryMapping em : entryMappings.values()) { if (parallelLoadCacheMinThreshold > 0) { Connection conn = null; try { conn = connection(); - PreparedStatement stmt = conn.prepareStatement(m.loadCacheSelRangeQry); + PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry); stmt.setInt(1, parallelLoadCacheMinThreshold); ResultSet rs = stmt.executeQuery(); if (rs.next()) { - int keyCnt = m.keyCols.size(); + int keyCnt = em.keyCols.size(); Object[] upperBound = new Object[keyCnt]; for (int i = 0; i < keyCnt; i++) upperBound[i] = rs.getObject(i + 1); - futs.add(pool.submit(loadCacheRange(m, clo, null, upperBound))); + futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound))); while (rs.next()) { Object[] lowerBound = upperBound; @@ -537,23 +509,23 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { for (int i = 0; i < keyCnt; i++) upperBound[i] = rs.getObject(i + 1); - futs.add(pool.submit(loadCacheRange(m, clo, lowerBound, upperBound))); + futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound))); } - futs.add(pool.submit(loadCacheRange(m, clo, upperBound, null))); + futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null))); } else - futs.add(pool.submit(loadCacheFull(m, clo))); + futs.add(pool.submit(loadCacheFull(em, clo))); } catch (SQLException ignored) { - futs.add(pool.submit(loadCacheFull(m, clo))); + futs.add(pool.submit(loadCacheFull(em, clo))); } finally { U.closeQuiet(conn); } } else - futs.add(pool.submit(loadCacheFull(m, clo))); + futs.add(pool.submit(loadCacheFull(em, clo))); } } @@ -571,10 +543,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { init(); - EntryMapping type = typeMapping.get(new IgniteBiTuple<>(session().cacheName(), keyId(key))); - - if (type == null) - throw new CacheLoaderException("Failed to find mapping description for key: " + key); + EntryMapping em = entryMapping(keyId(key), key); if (log.isDebugEnabled()) log.debug("Start load value from database by key: " + key); @@ -586,14 +555,14 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - stmt = conn.prepareStatement(type.loadQrySingle); + stmt = conn.prepareStatement(em.loadQrySingle); - fillKeyParameters(stmt, type, key); + fillKeyParameters(stmt, em, key); ResultSet rs = stmt.executeQuery(); if (rs.next()) - return buildObject(type.valueType(), type.valueDescriptors(), rs); + return buildObject(em.valueType(), em.valueDescriptors(), rs); } catch (SQLException e) { throw new CacheLoaderException("Failed to load object by key: " + key, e); @@ -616,27 +585,24 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - Map<Object, LoadWorker<K, V>> workers = U.newHashMap(typeMapping.size()); + Map<Object, LoadWorker<K, V>> workers = U.newHashMap(entryMappings.size()); Map<K, V> res = new HashMap<>(); for (K key : keys) { - Object typeId = keyId(key); + Object keyId = keyId(key); - EntryMapping m = typeMapping.get(new IgniteBiTuple<>(session().cacheName(), typeId)); + EntryMapping em = entryMapping(keyId, key); - if (m == null) - throw new CacheWriterException("Failed to find mapping description for key: " + key); - - LoadWorker<K, V> worker = workers.get(typeId); + LoadWorker<K, V> worker = workers.get(keyId); if (worker == null) - workers.put(typeId, worker = new LoadWorker<>(conn, m)); + workers.put(keyId, worker = new LoadWorker<>(conn, em)); worker.keys.add(key); - if (worker.keys.size() == m.maxKeysPerStmt) - res.putAll(workers.remove(typeId).call()); + if (worker.keys.size() == em.maxKeysPerStmt) + res.putAll(workers.remove(keyId).call()); } for (LoadWorker<K, V> worker : workers.values()) @@ -663,10 +629,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { K key = entry.getKey(); - EntryMapping type = typeMapping.get(new IgniteBiTuple<>(session().cacheName(), keyId(key))); - - if (type == null) - throw new CacheWriterException("Failed to find mapping description for entry: " + entry); + EntryMapping em = entryMapping(keyId(key), key); if (log.isDebugEnabled()) log.debug("Start write entry to database: " + entry); @@ -679,31 +642,31 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { conn = connection(); if (dialect.hasMerge()) { - stmt = conn.prepareStatement(type.mergeQry); + stmt = conn.prepareStatement(em.mergeQry); - int i = fillKeyParameters(stmt, type, key); + int i = fillKeyParameters(stmt, em, key); - fillValueParameters(stmt, i, type, entry.getValue()); + fillValueParameters(stmt, i, em, entry.getValue()); stmt.executeUpdate(); } else { V val = entry.getValue(); - stmt = conn.prepareStatement(type.updQry); + stmt = conn.prepareStatement(em.updQry); - int i = fillValueParameters(stmt, 1, type, val); + int i = fillValueParameters(stmt, 1, em, val); - fillKeyParameters(stmt, i, type, key); + fillKeyParameters(stmt, i, em, key); if (stmt.executeUpdate() == 0) { stmt.close(); - stmt = conn.prepareStatement(type.insQry); + stmt = conn.prepareStatement(em.insQry); - i = fillKeyParameters(stmt, type, key); + i = fillKeyParameters(stmt, em, key); - fillValueParameters(stmt, i, type, val); + fillValueParameters(stmt, i, em, val); stmt.executeUpdate(); } @@ -729,43 +692,40 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - String cacheName = session().cacheName(); - if (dialect.hasMerge()) { - Map<Object, PreparedStatement> stmts = U.newHashMap(typeMapping.size()); + Map<Object, PreparedStatement> stmts = U.newHashMap(entryMappings.size()); - Object prevTypeId = null; + Object prevKeyId = null; PreparedStatement mergeStmt = null; int cnt = 0; for (Cache.Entry<? extends K, ? extends V> entry : entries) { - Object typeId = keyId(entry.getKey()); + K key = entry.getKey(); - final EntryMapping m = typeMapping.get(new IgniteBiTuple<>(cacheName, typeId)); + Object keyId = keyId(key); - if (m == null) - throw new CacheWriterException("Failed to find mapping description for key: " + entry.getKey()); + EntryMapping em = entryMapping(keyId, key); - if (prevTypeId != null && !prevTypeId.equals(typeId)) { - mergeStmt = stmts.get(prevTypeId); + if (prevKeyId != null && !prevKeyId.equals(keyId)) { + mergeStmt = stmts.get(prevKeyId); mergeStmt.executeBatch(); cnt = 0; } - prevTypeId = typeId; + prevKeyId = keyId; - mergeStmt = stmts.get(typeId); + mergeStmt = stmts.get(keyId); if (mergeStmt == null) - stmts.put(typeId, mergeStmt = conn.prepareStatement(m.mergeQry)); + stmts.put(keyId, mergeStmt = conn.prepareStatement(em.mergeQry)); - int i = fillKeyParameters(mergeStmt, m, entry.getKey()); + int i = fillKeyParameters(mergeStmt, em, key); - fillValueParameters(mergeStmt, i, m, entry.getValue()); + fillValueParameters(mergeStmt, i, em, entry.getValue()); mergeStmt.addBatch(); @@ -780,39 +740,37 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { U.closeQuiet(st); } else { - Map<Object, T2<PreparedStatement, PreparedStatement>> stmts = U.newHashMap(typeMapping.size()); + Map<Object, T2<PreparedStatement, PreparedStatement>> stmts = U.newHashMap(entryMappings.size()); for (Cache.Entry<? extends K, ? extends V> entry : entries) { - Object typeId = keyId(entry.getKey()); + K key = entry.getKey(); - final EntryMapping m = typeMapping.get(new IgniteBiTuple<>(cacheName, typeId)); + Object keyId = keyId(key); - if (m == null) - throw new CacheWriterException("Failed to find mapping description for key: " + - entry.getKey()); + EntryMapping em = entryMapping(keyId, key); - T2<PreparedStatement, PreparedStatement> pair = stmts.get(typeId); + T2<PreparedStatement, PreparedStatement> pair = stmts.get(keyId); if (pair == null) - stmts.put(typeId, - pair = new T2<>(conn.prepareStatement(m.updQry), conn.prepareStatement(m.insQry))); + stmts.put(keyId, + pair = new T2<>(conn.prepareStatement(em.updQry), conn.prepareStatement(em.insQry))); PreparedStatement updStmt = pair.get1(); assert updStmt != null; - int i = fillValueParameters(updStmt, 1, m, entry.getValue()); + int i = fillValueParameters(updStmt, 1, em, entry.getValue()); - fillKeyParameters(updStmt, i, m, entry.getKey()); + fillKeyParameters(updStmt, i, em, key); if (updStmt.executeUpdate() == 0) { PreparedStatement insStmt = pair.get2(); assert insStmt != null; - i = fillKeyParameters(insStmt, m, entry.getKey()); + i = fillKeyParameters(insStmt, em, key); - fillValueParameters(insStmt, i, m, entry.getValue()); + fillValueParameters(insStmt, i, em, entry.getValue()); insStmt.executeUpdate(); } @@ -839,10 +797,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { init(); - EntryMapping type = typeMapping.get(new IgniteBiTuple<>(session().cacheName(), keyId(key))); - - if (type == null) - throw new CacheWriterException("Failed to find mapping description for key: " + key); + EntryMapping em = entryMapping(keyId(key), key); if (log.isDebugEnabled()) log.debug("Start remove value from database by key: " + key); @@ -854,9 +809,9 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - stmt = conn.prepareStatement(type.remQry); + stmt = conn.prepareStatement(em.remQry); - fillKeyParameters(stmt, type, key); + fillKeyParameters(stmt, em, key); stmt.executeUpdate(); } @@ -877,38 +832,35 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - Map<Object, PreparedStatement> stmts = U.newHashMap(typeMapping.size()); + Map<Object, PreparedStatement> stmts = U.newHashMap(entryMappings.size()); - Object prevTypeId = null; + Object prevKeyId = null; PreparedStatement delStmt = null; int cnt = 0; for (Object key : keys) { - Object typeId = keyId(key); - - EntryMapping m = typeMapping.get(new IgniteBiTuple<>(session().cacheName(), typeId)); + Object keyId = keyId(key); - if (m == null) - throw new CacheWriterException("Failed to find mapping description for key: " + key); + EntryMapping em = entryMapping(keyId, key); - if (prevTypeId != null && !prevTypeId.equals(typeId)) { - delStmt = stmts.get(prevTypeId); + if (prevKeyId != null && !prevKeyId.equals(keyId)) { + delStmt = stmts.get(prevKeyId); delStmt.executeBatch(); cnt = 0; } - prevTypeId = typeId; + prevKeyId = keyId; - delStmt = stmts.get(typeId); + delStmt = stmts.get(keyId); if (delStmt == null) - stmts.put(typeId, delStmt = conn.prepareStatement(m.remQry)); + stmts.put(keyId, delStmt = conn.prepareStatement(em.remQry)); - fillKeyParameters(delStmt, m, key); + fillKeyParameters(delStmt, em, key); delStmt.addBatch(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/76c2f747/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java index 8d2201a..8bc82bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java @@ -136,7 +136,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { /** {@inheritDoc} */ @Override protected void buildTypeCache() throws CacheException { - typeMapping = U.newHashMap(typeMetadata.size()); + entryMappings = U.newHashMap(typeMetadata.size()); mtdsCache = U.newHashMap(typeMetadata.size() * 2); @@ -145,13 +145,13 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { mtdsCache.put(type.getKeyType(), keyCache); - typeMapping.put(new IgniteBiTuple<String, Object>(null, keyId(type.getKeyType())), + entryMappings.put(new IgniteBiTuple<String, Object>(null, keyId(type.getKeyType())), new EntryMapping(dialect, type)); mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), type.getValueDescriptors())); } - typeMapping = Collections.unmodifiableMap(typeMapping); + entryMappings = Collections.unmodifiableMap(entryMappings); mtdsCache = Collections.unmodifiableMap(mtdsCache); }