# IGNITE-32: fixed jdbc store after review.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9077809
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9077809
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9077809

Branch: refs/heads/sprint-1
Commit: f90778095ebcd9b3427699e21ac90240a6d529bf
Parents: e1b03b7
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Mon Feb 2 15:11:22 2015 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Mon Feb 2 15:11:22 2015 +0700

----------------------------------------------------------------------
 .../ignite/cache/store/jdbc/JdbcCacheStore.java | 389 +++++++------------
 .../cache/store/jdbc/JdbcPojoCacheStore.java    |  23 +-
 .../store/jdbc/PojoJdbcCacheStoreTest.java      |  14 +-
 3 files changed, 160 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 41a4024..15e1373 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -21,6 +21,8 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cache.store.jdbc.dialect.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.interop.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -35,7 +37,6 @@ import javax.sql.*;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
 /**
@@ -50,10 +51,6 @@ import java.util.concurrent.locks.*;
  * <h3>Optional</h3>
  * <ul>
  *     <li>Data source (see {@link #setDataSource(DataSource)}</li>
- *     <li>Connection URL (see {@link #setConnectionUrl(String)})</li>
- *     <li>User name (see {@link #setUser(String)})</li>
- *     <li>Password (see {@link #setPassword(String)})</li>
- *     <li>Create table query (see {@link #setConnectionUrl(String)})</li>
  *     <li>Maximum batch size for writeAll and deleteAll operations. (see 
{@link #setBatchSize(int)})</li>
  *     <li>Max workers thread count. These threads are responsible for load 
cache. (see {@link #setMaxPoolSize(int)})</li>
  *     <li>Parallel load cache minimum threshold. (see {@link 
#setParallelLoadCacheMinimumThreshold(int)})</li>
@@ -77,7 +74,10 @@ import java.util.concurrent.locks.*;
  * <p>
  * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
  */
-public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
+public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements 
GridInteropAware {
+    /** Max attempt write count. */
+    protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
+
     /** Default batch size for put and remove operations. */
     protected static final int DFLT_BATCH_SIZE = 512;
 
@@ -91,36 +91,15 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
     @IgniteLoggerResource
     protected IgniteLogger log;
 
-    /** Init guard. */
-    @GridToStringExclude
-    private final AtomicBoolean initGuard = new AtomicBoolean();
-
-    /** Init latch. */
-    @GridToStringExclude
-    private final CountDownLatch initLatch = new CountDownLatch(1);
-
     /** Lock for metadata cache. */
     @GridToStringExclude
-    private final Lock buildMetaCacheLock = new ReentrantLock();
-
-    /** Successful initialization flag. */
-    private boolean initOk;
+    private final Lock cacheMappingsLock = new ReentrantLock();
 
     /** Data source. */
     protected DataSource dataSrc;
 
-    /** Connection URL. */
-    protected String connUrl;
-
-    /** User name for database access. */
-    protected String user;
-
-    /** Password for database access. */
-    @GridToStringExclude
-    protected String passwd;
-
     /** Cache with entry mapping description. (cache name, (key id, mapping 
description)). */
-    protected Map<Integer, Map<Object, EntryMapping>> cacheMappings = new 
ConcurrentHashMap<>();
+    protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = 
Collections.emptyMap();
 
     /** Database dialect. */
     protected JdbcDialect dialect;
@@ -158,20 +137,20 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
         throws CacheLoaderException;
 
     /**
-     * Extract type id from key object.
+     * Extract key type id from key object.
      *
      * @param key Key object.
-     * @return Type id.
+     * @return Key type id.
      */
     protected abstract Object keyTypeId(Object key) throws CacheException;
 
     /**
-     * Extract type id from key class name.
+     * Extract key type id from key class name.
      *
      * @param type String description of key type.
-     * @return Type id.
+     * @return Key type id.
      */
-    protected abstract Object keyId(String type) throws CacheException;
+    protected abstract Object keyTypeId(String type) throws CacheException;
 
     /**
      * Prepare internal store specific builders for provided types metadata.
@@ -179,7 +158,8 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
      * @param types Collection of types.
      * @throws CacheException If failed to prepare.
      */
-    protected abstract void prepareBuilders(Collection<CacheQueryTypeMetadata> 
types) throws CacheException;
+    protected abstract void prepareBuilders(@Nullable String cacheName, 
Collection<CacheQueryTypeMetadata> types)
+        throws CacheException;
 
     /**
      * Perform dialect resolution.
@@ -234,69 +214,23 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
         return cacheName != null ? cacheName.hashCode() : 0;
     }
 
-    /**
-     * Initializes store.
-     *
-     * @throws CacheException If failed to initialize.
-     */
-    private void init() throws CacheException {
-        if (initLatch.getCount() > 0) {
-            if (initGuard.compareAndSet(false, true)) {
-                if (log.isDebugEnabled())
-                    log.debug("Initializing cache store.");
-
-                if (dataSrc == null && F.isEmpty(connUrl))
-                    throw new CacheException("Failed to initialize cache store 
(connection is not provided).");
-
-                try {
-                    if (dialect == null)
-                        dialect = resolveDialect();
-
-                    initOk = true;
-                }
-                finally {
-                    initLatch.countDown();
-                }
-            }
-            else
-                try {
-                    if (initLatch.getCount() > 0)
-                        initLatch.await();
-                }
-                catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-
-                    throw new CacheException(e);
-                }
-        }
-
-        if (!initOk)
-            throw new CacheException("Cache store was not properly 
initialized.");
-
-        Integer cacheKey = cacheKeyId();
-
-        if (!cacheMappings.containsKey(cacheKey)) {
-            buildMetaCacheLock.lock();
-
-            try {
-                if (!cacheMappings.containsKey(cacheKey)) {
-                    Collection<CacheQueryTypeMetadata> typeMetadata =
-                        
ignite().cache(session().cacheName()).configuration().getQueryConfiguration().getTypeMetadata();
-
-                    Map<Object, EntryMapping> entryMappings = 
U.newHashMap(typeMetadata.size());
+    /** {@inheritDoc} */
+    @Override public void configure(Object... params) {
+        // No-op.
+    }
 
-                    for (CacheQueryTypeMetadata type : typeMetadata)
-                        entryMappings.put(keyId(type.getKeyType()), new 
EntryMapping(dialect, type));
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext ctx) throws 
IgniteCheckedException {
+        if (dataSrc == null)
+            throw new IgniteCheckedException("Failed to initialize cache store 
(data source is not provided).");
 
-                    cacheMappings.put(cacheKey, 
Collections.unmodifiableMap(entryMappings));
+        if (dialect == null)
+            dialect = resolveDialect();
+    }
 
-                    prepareBuilders(typeMetadata);
-                }
-            }
-            finally {
-                buildMetaCacheLock.unlock();
-            }
-        }
+    /** {@inheritDoc} */
+    @Override public void destroy(GridKernalContext ctx) throws 
IgniteCheckedException {
+        // No-op.
     }
 
     /**
@@ -307,8 +241,7 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
      * @throws SQLException In case of error.
      */
     protected Connection openConnection(boolean autocommit) throws 
SQLException {
-        Connection conn = dataSrc != null ? dataSrc.getConnection() :
-            DriverManager.getConnection(connUrl, user, passwd);
+        Connection conn = dataSrc.getConnection();
 
         conn.setAutoCommit(autocommit);
 
@@ -467,16 +400,57 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
     }
 
     /**
-     * @param key Entry key.
+     * @return Type mappings for specified cache name.
+     *
+     * @throws CacheException If failed to initialize.
+     */
+    private Map<Object, EntryMapping> cacheMappings(@Nullable String 
cacheName) throws CacheException {
+        Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
+
+        if (entryMappings != null)
+            return entryMappings;
+
+        cacheMappingsLock.lock();
+
+        try {
+            entryMappings = cacheMappings.get(cacheName);
+
+            if (entryMappings != null)
+                return entryMappings;
+
+            Collection<CacheQueryTypeMetadata> typeMetadata =
+                
ignite().cache(session().cacheName()).configuration().getQueryConfiguration().getTypeMetadata();
+
+            entryMappings = U.newHashMap(typeMetadata.size());
+
+            for (CacheQueryTypeMetadata type : typeMetadata)
+                entryMappings.put(keyTypeId(type.getKeyType()), new 
EntryMapping(dialect, type));
+
+            Map<String, Map<Object, EntryMapping>> mappings = new 
HashMap<>(cacheMappings);
+
+            mappings.put(cacheName, entryMappings);
+
+            prepareBuilders(cacheName, typeMetadata);
+
+            cacheMappings = mappings;
+
+            return entryMappings;
+        }
+        finally {
+            cacheMappingsLock.unlock();
+        }
+    }
+
+    /**
+     * @param keyTypeId Key type id.
+     * @param key Key object.
      * @return Entry mapping.
      * @throws CacheException if mapping for key was not found.
      */
-    private EntryMapping entryMapping(Object keyId, Object key) throws 
CacheException {
+    private EntryMapping entryMapping(Object keyTypeId, Object key) throws 
CacheException {
         String cacheName = session().cacheName();
 
-        init();
-
-        EntryMapping em = cacheMappings.get(cacheKeyId()).get(keyId);
+        EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
 
         if (em == null)
             throw new CacheException("Failed to find mapping description for 
key: " + key +
@@ -505,18 +479,18 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
                     String selQry = args[i + 1].toString();
 
-                    EntryMapping em = entryMapping(keyId(keyType), keyType);
+                    EntryMapping em = entryMapping(keyTypeId(keyType), 
keyType);
 
                     futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, 
selQry, clo)));
                 }
             }
             else {
-                init();
+                Collection<EntryMapping> entryMappings = 
cacheMappings(session().cacheName()).values();
 
                 if (log.isDebugEnabled())
                     log.debug("Start loading all cache types entries from db");
 
-                for (EntryMapping em : 
cacheMappings.get(cacheKeyId()).values()) {
+                for (EntryMapping em : entryMappings) {
                     if (parallelLoadCacheMinThreshold > 0) {
                         Connection conn = null;
 
@@ -619,7 +593,7 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
         try {
             conn = connection();
 
-            Map<Object, LoadWorker<K, V>> workers = 
U.newHashMap(cacheMappings.get(cacheKeyId()).size());
+            Map<Object, LoadWorker<K, V>> workers = 
U.newHashMap(cacheMappings(session().cacheName()).size());
 
             Map<K, V> res = new HashMap<>();
 
@@ -644,9 +618,6 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
             return res;
         }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to open connection", e);
-        }
         catch (Exception e) {
             throw new CacheWriterException("Failed to load entries from 
database", e);
         }
@@ -685,23 +656,40 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
             else {
                 V val = entry.getValue();
 
-                stmt = conn.prepareStatement(em.updQry);
+                for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; 
attempt++) {
+                    stmt = conn.prepareStatement(em.updQry);
 
-                int i = fillValueParameters(stmt, 1, em, val);
+                    int i = fillValueParameters(stmt, 1, em, val);
 
-                fillKeyParameters(stmt, i, em, key);
+                    fillKeyParameters(stmt, i, em, key);
 
-                if (stmt.executeUpdate() == 0) {
-                    stmt.close();
+                    if (stmt.executeUpdate() == 0) {
+                        U.closeQuiet(stmt);
 
-                    stmt = conn.prepareStatement(em.insQry);
+                        stmt = conn.prepareStatement(em.insQry);
 
-                    i = fillKeyParameters(stmt, em, key);
+                        i = fillKeyParameters(stmt, em, key);
 
-                    fillValueParameters(stmt, i, em, val);
+                        fillValueParameters(stmt, i, em, val);
 
-                    stmt.executeUpdate();
+                        try {
+                            stmt.executeUpdate();
+                        }
+                        catch (SQLException e) {
+                            // The error with code 23505 is thrown when trying 
to insert a row that
+                            // would violate a unique index or primary key.
+                            // TODO check with all RDBMS
+                            if (e.getErrorCode() == 23505)
+                                continue;
+
+                            throw e;
+                        }
+                    }
+
+                    return;
                 }
+
+                throw new CacheWriterException("Failed write entry to 
database: " + entry);
             }
         }
         catch (SQLException e) {
@@ -717,19 +705,15 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
         throws CacheWriterException {
         assert entries != null;
 
-        init();
-
-        Connection conn = null;
-
-        try {
-            conn = connection();
+        if (dialect.hasMerge()) {
+            Connection conn = null;
 
-            if (dialect.hasMerge()) {
-                Map<Object, PreparedStatement> stmts = 
U.newHashMap(cacheMappings.get(cacheKeyId()).size());
+            PreparedStatement mergeStmt = null;
 
-                Object prevKeyTypeId  = null;
+            try {
+                conn = connection();
 
-                PreparedStatement mergeStmt = null;
+                Object currKeyTypeId = null;
 
                 int cnt = 0;
 
@@ -740,20 +724,19 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
                     EntryMapping em = entryMapping(keyTypeId, key);
 
-                    if (prevKeyTypeId != null && 
!prevKeyTypeId.equals(keyTypeId)) {
-                        mergeStmt = stmts.get(prevKeyTypeId);
-
-                        mergeStmt.executeBatch();
+                    if (mergeStmt == null) {
+                        mergeStmt = conn.prepareStatement(em.mergeQry);
 
-                        cnt = 0;
+                        currKeyTypeId = keyTypeId;
                     }
 
-                    prevKeyTypeId  = keyTypeId;
+                    if (!currKeyTypeId.equals(keyTypeId)) {
+                        mergeStmt.executeBatch();
 
-                    mergeStmt = stmts.get(keyTypeId);
+                        currKeyTypeId = keyTypeId;
 
-                    if (mergeStmt == null)
-                        stmts.put(keyTypeId, mergeStmt = 
conn.prepareStatement(em.mergeQry));
+                        cnt = 0;
+                    }
 
                     int i = fillKeyParameters(mergeStmt, em, key);
 
@@ -767,61 +750,19 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
                 if (mergeStmt != null && cnt % batchSz != 0)
                     mergeStmt.executeBatch();
-
-                for (PreparedStatement st : stmts.values())
-                    U.closeQuiet(st);
             }
-            else {
-                Map<Object, T2<PreparedStatement, PreparedStatement>> stmts =
-                    U.newHashMap(cacheMappings.get(cacheKeyId()).size());
-
-                for (Cache.Entry<? extends K, ? extends V> entry : entries) {
-                    K key = entry.getKey();
-
-                    Object keyTypeId = keyTypeId(key);
-
-                    EntryMapping em = entryMapping(keyTypeId, key);
-
-                    T2<PreparedStatement, PreparedStatement> pair = 
stmts.get(keyTypeId);
-
-                    if (pair == null)
-                        stmts.put(keyTypeId,
-                            pair = new T2<>(conn.prepareStatement(em.updQry), 
conn.prepareStatement(em.insQry)));
-
-                    PreparedStatement updStmt = pair.get1();
-
-                    assert updStmt != null;
-
-                    int i = fillValueParameters(updStmt, 1, em, 
entry.getValue());
-
-                    fillKeyParameters(updStmt, i, em, key);
-
-                    if (updStmt.executeUpdate() == 0) {
-                        PreparedStatement insStmt = pair.get2();
-
-                        assert insStmt != null;
-
-                        i = fillKeyParameters(insStmt, em, key);
-
-                        fillValueParameters(insStmt, i, em, entry.getValue());
-
-                        insStmt.executeUpdate();
-                    }
-                }
-
-                for (T2<PreparedStatement, PreparedStatement> pair :  
stmts.values()) {
-                    U.closeQuiet(pair.get1());
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to write entries in 
database", e);
+            }
+            finally {
+                U.closeQuiet(mergeStmt);
 
-                    U.closeQuiet(pair.get2());
-                }
+                closeConnection(conn);
             }
         }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to open connection", e);
-        }
-        finally {
-            closeConnection(conn);
-        }
+        else
+            for (Cache.Entry<? extends K, ? extends V> e : entries)
+                write(e);
     }
 
     /** {@inheritDoc} */
@@ -844,7 +785,8 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
             fillKeyParameters(stmt, em, key);
 
-            stmt.executeUpdate();
+            if (stmt.executeUpdate() == 0)
+                log.warning("Nothing was deleted in database for key: " + key);
         }
         catch (SQLException e) {
             throw new CacheWriterException("Failed to remove value from 
database by key: " + key, e);
@@ -863,9 +805,7 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
         try {
             conn = connection();
 
-            Map<Object, PreparedStatement> stmts = 
U.newHashMap(cacheMappings.get(cacheKeyId()).size());
-
-            Object prevKeyTypeId  = null;
+            Object currKeyTypeId  = null;
 
             PreparedStatement delStmt = null;
 
@@ -876,20 +816,19 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
 
                 EntryMapping em = entryMapping(keyTypeId, key);
 
-                if (prevKeyTypeId != null && !prevKeyTypeId.equals(keyTypeId)) 
{
-                    delStmt = stmts.get(prevKeyTypeId);
+                if (delStmt == null) {
+                    delStmt = conn.prepareStatement(em.remQry);
 
-                    delStmt.executeBatch();
-
-                    cnt = 0;
+                    currKeyTypeId = keyTypeId;
                 }
 
-                prevKeyTypeId  = keyTypeId;
+                if (!currKeyTypeId.equals(keyTypeId)) {
+                    delStmt.executeBatch();
 
-                delStmt = stmts.get(keyTypeId);
+                    currKeyTypeId = keyTypeId;
 
-                if (delStmt == null)
-                    stmts.put(keyTypeId, delStmt = 
conn.prepareStatement(em.remQry));
+                    cnt = 0;
+                }
 
                 fillKeyParameters(delStmt, em, key);
 
@@ -902,13 +841,9 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
             if (delStmt != null && cnt % batchSz != 0)
                 delStmt.executeBatch();
 
-            for (PreparedStatement st : stmts.values())
-                U.closeQuiet(st);
+            // TODO check delete result?
         }
         catch (SQLException e) {
-            throw new CacheWriterException("Failed to open connection", e);
-        }
-        catch (Exception e) {
             throw new CacheWriterException("Failed to remove values from 
database", e);
         }
         finally {
@@ -993,48 +928,6 @@ public abstract class JdbcCacheStore<K, V> extends 
CacheStore<K, V> {
     }
 
     /**
-     * @return Connection URL.
-     */
-    public String getConnectionUrl() {
-        return connUrl;
-    }
-
-    /**
-     * @param connUrl Connection URL.
-     */
-    public void setConnectionUrl(String connUrl) {
-        this.connUrl = connUrl;
-    }
-
-    /**
-     * @return Password for database access.
-     */
-    public String getPassword() {
-        return passwd;
-    }
-
-    /**
-     * @param passwd Password for database access.
-     */
-    public void setPassword(String passwd) {
-        this.passwd = passwd;
-    }
-
-    /**
-     * @return User name for database access.
-     */
-    public String getUser() {
-        return user;
-    }
-
-    /**
-     * @param user User name for database access.
-     */
-    public void setUser(String user) {
-        this.user = user;
-    }
-
-    /**
      * Get database dialect.
      *
      * @return Database dialect.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
index 71bb5b4..834d49f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
@@ -131,29 +131,30 @@ public class JdbcPojoCacheStore extends 
JdbcCacheStore<Object, Object> {
     }
 
     /** Methods cache. */
-    protected Map<String, PojoMethodsCache> mtdsCache;
+    protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = 
Collections.emptyMap();
 
     /** {@inheritDoc} */
-    @Override protected void 
prepareBuilders(Collection<CacheQueryTypeMetadata> types) throws CacheException 
{
-        mtdsCache = U.newHashMap(types.size() * 2);
+    @Override protected void prepareBuilders(@Nullable String cacheName, 
Collection<CacheQueryTypeMetadata> types)
+        throws CacheException {
+        Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() 
* 2);
 
         for (CacheQueryTypeMetadata type : types) {
             CacheQueryTableMetadata tblMeta = type.getTableMetadata();
 
-            PojoMethodsCache keyCache = new 
PojoMethodsCache(type.getKeyType(), tblMeta.getKeyColumns());
-
-            mtdsCache.put(type.getKeyType(), keyCache);
+            typeMethods.put(type.getKeyType(), new 
PojoMethodsCache(type.getKeyType(), tblMeta.getKeyColumns()));
 
-            mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), 
tblMeta.getValueColumns()));
+            typeMethods.put(type.getType(), new 
PojoMethodsCache(type.getType(), tblMeta.getValueColumns()));
         }
 
-        mtdsCache = Collections.unmodifiableMap(mtdsCache);
+        mtdsCache = new HashMap<>(mtdsCache);
+
+        mtdsCache.put(cacheName, typeMethods);
     }
 
     /** {@inheritDoc} */
     @Override protected <R> R buildObject(String typeName, 
Collection<CacheQueryTableColumnMetadata> fields,
         ResultSet rs) throws CacheLoaderException {
-        PojoMethodsCache t = mtdsCache.get(typeName);
+        PojoMethodsCache t = 
mtdsCache.get(session().cacheName()).get(typeName);
 
         Object obj = t.newInstance();
 
@@ -172,7 +173,7 @@ public class JdbcPojoCacheStore extends 
JdbcCacheStore<Object, Object> {
     @Nullable @Override protected Object extractField(String typeName, String 
fieldName, Object obj)
         throws CacheException {
         try {
-            PojoMethodsCache mc = mtdsCache.get(typeName);
+            PojoMethodsCache mc = 
mtdsCache.get(session().cacheName()).get(typeName);
 
             return mc.getters.get(fieldName).invoke(obj);
         }
@@ -187,7 +188,7 @@ public class JdbcPojoCacheStore extends 
JdbcCacheStore<Object, Object> {
     }
 
     /** {@inheritDoc} */
-    @Override protected Object keyId(String type) throws CacheException {
+    @Override protected Object keyTypeId(String type) throws CacheException {
         try {
             return Class.forName(type);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
index ba3ec6b..b7acf23 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java
@@ -117,23 +117,23 @@ public class PojoJdbcCacheStoreTest extends 
GridCommonAbstractTest {
 
             springCtx.refresh();
 
-            Collection<CacheQueryTypeMetadata> typeMetadata =
+            Collection<CacheQueryTypeMetadata> typeMeta =
                 
springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values();
 
-            Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> 
cacheMappings = new ConcurrentHashMap<>();
+            Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> 
cacheMappings = new HashMap<>();
 
             JdbcDialect dialect = store.resolveDialect();
 
             GridTestUtils.setFieldValue(store, JdbcCacheStore.class, 
"dialect", dialect);
 
-            Map<Object, JdbcCacheStore.EntryMapping> entryMappings = 
U.newHashMap(typeMetadata.size());
+            Map<Object, JdbcCacheStore.EntryMapping> entryMappings = 
U.newHashMap(typeMeta.size());
 
-            for (CacheQueryTypeMetadata type : typeMetadata)
-                entryMappings.put(store.keyId(type.getKeyType()), new 
JdbcCacheStore.EntryMapping(dialect, type));
+            for (CacheQueryTypeMetadata type : typeMeta)
+                entryMappings.put(store.keyTypeId(type.getKeyType()), new 
JdbcCacheStore.EntryMapping(dialect, type));
 
-            store.prepareBuilders(typeMetadata);
+            store.prepareBuilders(null, typeMeta);
 
-            cacheMappings.put(0, Collections.unmodifiableMap(entryMappings));
+            cacheMappings.put(null, 
Collections.unmodifiableMap(entryMappings));
 
             GridTestUtils.setFieldValue(store, JdbcCacheStore.class, 
"cacheMappings", cacheMappings);
         }

Reply via email to