# IGNITE-32: fixed jdbc store after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9077809 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9077809 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9077809 Branch: refs/heads/sprint-1 Commit: f90778095ebcd9b3427699e21ac90240a6d529bf Parents: e1b03b7 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Mon Feb 2 15:11:22 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Mon Feb 2 15:11:22 2015 +0700 ---------------------------------------------------------------------- .../ignite/cache/store/jdbc/JdbcCacheStore.java | 389 +++++++------------ .../cache/store/jdbc/JdbcPojoCacheStore.java | 23 +- .../store/jdbc/PojoJdbcCacheStoreTest.java | 14 +- 3 files changed, 160 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java index 41a4024..15e1373 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java @@ -21,6 +21,8 @@ import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.cache.store.jdbc.dialect.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.interop.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -35,7 +37,6 @@ import javax.sql.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; /** @@ -50,10 +51,6 @@ import java.util.concurrent.locks.*; * <h3>Optional</h3> * <ul> * <li>Data source (see {@link #setDataSource(DataSource)}</li> - * <li>Connection URL (see {@link #setConnectionUrl(String)})</li> - * <li>User name (see {@link #setUser(String)})</li> - * <li>Password (see {@link #setPassword(String)})</li> - * <li>Create table query (see {@link #setConnectionUrl(String)})</li> * <li>Maximum batch size for writeAll and deleteAll operations. (see {@link #setBatchSize(int)})</li> * <li>Max workers thread count. These threads are responsible for load cache. (see {@link #setMaxPoolSize(int)})</li> * <li>Parallel load cache minimum threshold. (see {@link #setParallelLoadCacheMinimumThreshold(int)})</li> @@ -77,7 +74,10 @@ import java.util.concurrent.locks.*; * <p> * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> */ -public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { +public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements GridInteropAware { + /** Max attempt write count. */ + protected static final int MAX_ATTEMPT_WRITE_COUNT = 2; + /** Default batch size for put and remove operations. */ protected static final int DFLT_BATCH_SIZE = 512; @@ -91,36 +91,15 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { @IgniteLoggerResource protected IgniteLogger log; - /** Init guard. */ - @GridToStringExclude - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Init latch. */ - @GridToStringExclude - private final CountDownLatch initLatch = new CountDownLatch(1); - /** Lock for metadata cache. */ @GridToStringExclude - private final Lock buildMetaCacheLock = new ReentrantLock(); - - /** Successful initialization flag. */ - private boolean initOk; + private final Lock cacheMappingsLock = new ReentrantLock(); /** Data source. */ protected DataSource dataSrc; - /** Connection URL. */ - protected String connUrl; - - /** User name for database access. */ - protected String user; - - /** Password for database access. */ - @GridToStringExclude - protected String passwd; - /** Cache with entry mapping description. (cache name, (key id, mapping description)). */ - protected Map<Integer, Map<Object, EntryMapping>> cacheMappings = new ConcurrentHashMap<>(); + protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap(); /** Database dialect. */ protected JdbcDialect dialect; @@ -158,20 +137,20 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { throws CacheLoaderException; /** - * Extract type id from key object. + * Extract key type id from key object. * * @param key Key object. - * @return Type id. + * @return Key type id. */ protected abstract Object keyTypeId(Object key) throws CacheException; /** - * Extract type id from key class name. + * Extract key type id from key class name. * * @param type String description of key type. - * @return Type id. + * @return Key type id. */ - protected abstract Object keyId(String type) throws CacheException; + protected abstract Object keyTypeId(String type) throws CacheException; /** * Prepare internal store specific builders for provided types metadata. @@ -179,7 +158,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { * @param types Collection of types. * @throws CacheException If failed to prepare. */ - protected abstract void prepareBuilders(Collection<CacheQueryTypeMetadata> types) throws CacheException; + protected abstract void prepareBuilders(@Nullable String cacheName, Collection<CacheQueryTypeMetadata> types) + throws CacheException; /** * Perform dialect resolution. @@ -234,69 +214,23 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { return cacheName != null ? cacheName.hashCode() : 0; } - /** - * Initializes store. - * - * @throws CacheException If failed to initialize. - */ - private void init() throws CacheException { - if (initLatch.getCount() > 0) { - if (initGuard.compareAndSet(false, true)) { - if (log.isDebugEnabled()) - log.debug("Initializing cache store."); - - if (dataSrc == null && F.isEmpty(connUrl)) - throw new CacheException("Failed to initialize cache store (connection is not provided)."); - - try { - if (dialect == null) - dialect = resolveDialect(); - - initOk = true; - } - finally { - initLatch.countDown(); - } - } - else - try { - if (initLatch.getCount() > 0) - initLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new CacheException(e); - } - } - - if (!initOk) - throw new CacheException("Cache store was not properly initialized."); - - Integer cacheKey = cacheKeyId(); - - if (!cacheMappings.containsKey(cacheKey)) { - buildMetaCacheLock.lock(); - - try { - if (!cacheMappings.containsKey(cacheKey)) { - Collection<CacheQueryTypeMetadata> typeMetadata = - ignite().cache(session().cacheName()).configuration().getQueryConfiguration().getTypeMetadata(); - - Map<Object, EntryMapping> entryMappings = U.newHashMap(typeMetadata.size()); + /** {@inheritDoc} */ + @Override public void configure(Object... params) { + // No-op. + } - for (CacheQueryTypeMetadata type : typeMetadata) - entryMappings.put(keyId(type.getKeyType()), new EntryMapping(dialect, type)); + /** {@inheritDoc} */ + @Override public void initialize(GridKernalContext ctx) throws IgniteCheckedException { + if (dataSrc == null) + throw new IgniteCheckedException("Failed to initialize cache store (data source is not provided)."); - cacheMappings.put(cacheKey, Collections.unmodifiableMap(entryMappings)); + if (dialect == null) + dialect = resolveDialect(); + } - prepareBuilders(typeMetadata); - } - } - finally { - buildMetaCacheLock.unlock(); - } - } + /** {@inheritDoc} */ + @Override public void destroy(GridKernalContext ctx) throws IgniteCheckedException { + // No-op. } /** @@ -307,8 +241,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { * @throws SQLException In case of error. */ protected Connection openConnection(boolean autocommit) throws SQLException { - Connection conn = dataSrc != null ? dataSrc.getConnection() : - DriverManager.getConnection(connUrl, user, passwd); + Connection conn = dataSrc.getConnection(); conn.setAutoCommit(autocommit); @@ -467,16 +400,57 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { } /** - * @param key Entry key. + * @return Type mappings for specified cache name. + * + * @throws CacheException If failed to initialize. + */ + private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException { + Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName); + + if (entryMappings != null) + return entryMappings; + + cacheMappingsLock.lock(); + + try { + entryMappings = cacheMappings.get(cacheName); + + if (entryMappings != null) + return entryMappings; + + Collection<CacheQueryTypeMetadata> typeMetadata = + ignite().cache(session().cacheName()).configuration().getQueryConfiguration().getTypeMetadata(); + + entryMappings = U.newHashMap(typeMetadata.size()); + + for (CacheQueryTypeMetadata type : typeMetadata) + entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(dialect, type)); + + Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings); + + mappings.put(cacheName, entryMappings); + + prepareBuilders(cacheName, typeMetadata); + + cacheMappings = mappings; + + return entryMappings; + } + finally { + cacheMappingsLock.unlock(); + } + } + + /** + * @param keyTypeId Key type id. + * @param key Key object. * @return Entry mapping. * @throws CacheException if mapping for key was not found. */ - private EntryMapping entryMapping(Object keyId, Object key) throws CacheException { + private EntryMapping entryMapping(Object keyTypeId, Object key) throws CacheException { String cacheName = session().cacheName(); - init(); - - EntryMapping em = cacheMappings.get(cacheKeyId()).get(keyId); + EntryMapping em = cacheMappings(cacheName).get(keyTypeId); if (em == null) throw new CacheException("Failed to find mapping description for key: " + key + @@ -505,18 +479,18 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { String selQry = args[i + 1].toString(); - EntryMapping em = entryMapping(keyId(keyType), keyType); + EntryMapping em = entryMapping(keyTypeId(keyType), keyType); futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo))); } } else { - init(); + Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values(); if (log.isDebugEnabled()) log.debug("Start loading all cache types entries from db"); - for (EntryMapping em : cacheMappings.get(cacheKeyId()).values()) { + for (EntryMapping em : entryMappings) { if (parallelLoadCacheMinThreshold > 0) { Connection conn = null; @@ -619,7 +593,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings.get(cacheKeyId()).size()); + Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(session().cacheName()).size()); Map<K, V> res = new HashMap<>(); @@ -644,9 +618,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { return res; } - catch (SQLException e) { - throw new CacheWriterException("Failed to open connection", e); - } catch (Exception e) { throw new CacheWriterException("Failed to load entries from database", e); } @@ -685,23 +656,40 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { else { V val = entry.getValue(); - stmt = conn.prepareStatement(em.updQry); + for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) { + stmt = conn.prepareStatement(em.updQry); - int i = fillValueParameters(stmt, 1, em, val); + int i = fillValueParameters(stmt, 1, em, val); - fillKeyParameters(stmt, i, em, key); + fillKeyParameters(stmt, i, em, key); - if (stmt.executeUpdate() == 0) { - stmt.close(); + if (stmt.executeUpdate() == 0) { + U.closeQuiet(stmt); - stmt = conn.prepareStatement(em.insQry); + stmt = conn.prepareStatement(em.insQry); - i = fillKeyParameters(stmt, em, key); + i = fillKeyParameters(stmt, em, key); - fillValueParameters(stmt, i, em, val); + fillValueParameters(stmt, i, em, val); - stmt.executeUpdate(); + try { + stmt.executeUpdate(); + } + catch (SQLException e) { + // The error with code 23505 is thrown when trying to insert a row that + // would violate a unique index or primary key. + // TODO check with all RDBMS + if (e.getErrorCode() == 23505) + continue; + + throw e; + } + } + + return; } + + throw new CacheWriterException("Failed write entry to database: " + entry); } } catch (SQLException e) { @@ -717,19 +705,15 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { throws CacheWriterException { assert entries != null; - init(); - - Connection conn = null; - - try { - conn = connection(); + if (dialect.hasMerge()) { + Connection conn = null; - if (dialect.hasMerge()) { - Map<Object, PreparedStatement> stmts = U.newHashMap(cacheMappings.get(cacheKeyId()).size()); + PreparedStatement mergeStmt = null; - Object prevKeyTypeId = null; + try { + conn = connection(); - PreparedStatement mergeStmt = null; + Object currKeyTypeId = null; int cnt = 0; @@ -740,20 +724,19 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { EntryMapping em = entryMapping(keyTypeId, key); - if (prevKeyTypeId != null && !prevKeyTypeId.equals(keyTypeId)) { - mergeStmt = stmts.get(prevKeyTypeId); - - mergeStmt.executeBatch(); + if (mergeStmt == null) { + mergeStmt = conn.prepareStatement(em.mergeQry); - cnt = 0; + currKeyTypeId = keyTypeId; } - prevKeyTypeId = keyTypeId; + if (!currKeyTypeId.equals(keyTypeId)) { + mergeStmt.executeBatch(); - mergeStmt = stmts.get(keyTypeId); + currKeyTypeId = keyTypeId; - if (mergeStmt == null) - stmts.put(keyTypeId, mergeStmt = conn.prepareStatement(em.mergeQry)); + cnt = 0; + } int i = fillKeyParameters(mergeStmt, em, key); @@ -767,61 +750,19 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { if (mergeStmt != null && cnt % batchSz != 0) mergeStmt.executeBatch(); - - for (PreparedStatement st : stmts.values()) - U.closeQuiet(st); } - else { - Map<Object, T2<PreparedStatement, PreparedStatement>> stmts = - U.newHashMap(cacheMappings.get(cacheKeyId()).size()); - - for (Cache.Entry<? extends K, ? extends V> entry : entries) { - K key = entry.getKey(); - - Object keyTypeId = keyTypeId(key); - - EntryMapping em = entryMapping(keyTypeId, key); - - T2<PreparedStatement, PreparedStatement> pair = stmts.get(keyTypeId); - - if (pair == null) - stmts.put(keyTypeId, - pair = new T2<>(conn.prepareStatement(em.updQry), conn.prepareStatement(em.insQry))); - - PreparedStatement updStmt = pair.get1(); - - assert updStmt != null; - - int i = fillValueParameters(updStmt, 1, em, entry.getValue()); - - fillKeyParameters(updStmt, i, em, key); - - if (updStmt.executeUpdate() == 0) { - PreparedStatement insStmt = pair.get2(); - - assert insStmt != null; - - i = fillKeyParameters(insStmt, em, key); - - fillValueParameters(insStmt, i, em, entry.getValue()); - - insStmt.executeUpdate(); - } - } - - for (T2<PreparedStatement, PreparedStatement> pair : stmts.values()) { - U.closeQuiet(pair.get1()); + catch (SQLException e) { + throw new CacheWriterException("Failed to write entries in database", e); + } + finally { + U.closeQuiet(mergeStmt); - U.closeQuiet(pair.get2()); - } + closeConnection(conn); } } - catch (SQLException e) { - throw new CacheWriterException("Failed to open connection", e); - } - finally { - closeConnection(conn); - } + else + for (Cache.Entry<? extends K, ? extends V> e : entries) + write(e); } /** {@inheritDoc} */ @@ -844,7 +785,8 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { fillKeyParameters(stmt, em, key); - stmt.executeUpdate(); + if (stmt.executeUpdate() == 0) + log.warning("Nothing was deleted in database for key: " + key); } catch (SQLException e) { throw new CacheWriterException("Failed to remove value from database by key: " + key, e); @@ -863,9 +805,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { try { conn = connection(); - Map<Object, PreparedStatement> stmts = U.newHashMap(cacheMappings.get(cacheKeyId()).size()); - - Object prevKeyTypeId = null; + Object currKeyTypeId = null; PreparedStatement delStmt = null; @@ -876,20 +816,19 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { EntryMapping em = entryMapping(keyTypeId, key); - if (prevKeyTypeId != null && !prevKeyTypeId.equals(keyTypeId)) { - delStmt = stmts.get(prevKeyTypeId); + if (delStmt == null) { + delStmt = conn.prepareStatement(em.remQry); - delStmt.executeBatch(); - - cnt = 0; + currKeyTypeId = keyTypeId; } - prevKeyTypeId = keyTypeId; + if (!currKeyTypeId.equals(keyTypeId)) { + delStmt.executeBatch(); - delStmt = stmts.get(keyTypeId); + currKeyTypeId = keyTypeId; - if (delStmt == null) - stmts.put(keyTypeId, delStmt = conn.prepareStatement(em.remQry)); + cnt = 0; + } fillKeyParameters(delStmt, em, key); @@ -902,13 +841,9 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { if (delStmt != null && cnt % batchSz != 0) delStmt.executeBatch(); - for (PreparedStatement st : stmts.values()) - U.closeQuiet(st); + // TODO check delete result? } catch (SQLException e) { - throw new CacheWriterException("Failed to open connection", e); - } - catch (Exception e) { throw new CacheWriterException("Failed to remove values from database", e); } finally { @@ -993,48 +928,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> { } /** - * @return Connection URL. - */ - public String getConnectionUrl() { - return connUrl; - } - - /** - * @param connUrl Connection URL. - */ - public void setConnectionUrl(String connUrl) { - this.connUrl = connUrl; - } - - /** - * @return Password for database access. - */ - public String getPassword() { - return passwd; - } - - /** - * @param passwd Password for database access. - */ - public void setPassword(String passwd) { - this.passwd = passwd; - } - - /** - * @return User name for database access. - */ - public String getUser() { - return user; - } - - /** - * @param user User name for database access. - */ - public void setUser(String user) { - this.user = user; - } - - /** * Get database dialect. * * @return Database dialect. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java index 71bb5b4..834d49f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java @@ -131,29 +131,30 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { } /** Methods cache. */ - protected Map<String, PojoMethodsCache> mtdsCache; + protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = Collections.emptyMap(); /** {@inheritDoc} */ - @Override protected void prepareBuilders(Collection<CacheQueryTypeMetadata> types) throws CacheException { - mtdsCache = U.newHashMap(types.size() * 2); + @Override protected void prepareBuilders(@Nullable String cacheName, Collection<CacheQueryTypeMetadata> types) + throws CacheException { + Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2); for (CacheQueryTypeMetadata type : types) { CacheQueryTableMetadata tblMeta = type.getTableMetadata(); - PojoMethodsCache keyCache = new PojoMethodsCache(type.getKeyType(), tblMeta.getKeyColumns()); - - mtdsCache.put(type.getKeyType(), keyCache); + typeMethods.put(type.getKeyType(), new PojoMethodsCache(type.getKeyType(), tblMeta.getKeyColumns())); - mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), tblMeta.getValueColumns())); + typeMethods.put(type.getType(), new PojoMethodsCache(type.getType(), tblMeta.getValueColumns())); } - mtdsCache = Collections.unmodifiableMap(mtdsCache); + mtdsCache = new HashMap<>(mtdsCache); + + mtdsCache.put(cacheName, typeMethods); } /** {@inheritDoc} */ @Override protected <R> R buildObject(String typeName, Collection<CacheQueryTableColumnMetadata> fields, ResultSet rs) throws CacheLoaderException { - PojoMethodsCache t = mtdsCache.get(typeName); + PojoMethodsCache t = mtdsCache.get(session().cacheName()).get(typeName); Object obj = t.newInstance(); @@ -172,7 +173,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj) throws CacheException { try { - PojoMethodsCache mc = mtdsCache.get(typeName); + PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName); return mc.getters.get(fieldName).invoke(obj); } @@ -187,7 +188,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> { } /** {@inheritDoc} */ - @Override protected Object keyId(String type) throws CacheException { + @Override protected Object keyTypeId(String type) throws CacheException { try { return Class.forName(type); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9077809/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java index ba3ec6b..b7acf23 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoJdbcCacheStoreTest.java @@ -117,23 +117,23 @@ public class PojoJdbcCacheStoreTest extends GridCommonAbstractTest { springCtx.refresh(); - Collection<CacheQueryTypeMetadata> typeMetadata = + Collection<CacheQueryTypeMetadata> typeMeta = springCtx.getBeansOfType(CacheQueryTypeMetadata.class).values(); - Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> cacheMappings = new ConcurrentHashMap<>(); + Map<Integer, Map<Object, JdbcCacheStore.EntryMapping>> cacheMappings = new HashMap<>(); JdbcDialect dialect = store.resolveDialect(); GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "dialect", dialect); - Map<Object, JdbcCacheStore.EntryMapping> entryMappings = U.newHashMap(typeMetadata.size()); + Map<Object, JdbcCacheStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size()); - for (CacheQueryTypeMetadata type : typeMetadata) - entryMappings.put(store.keyId(type.getKeyType()), new JdbcCacheStore.EntryMapping(dialect, type)); + for (CacheQueryTypeMetadata type : typeMeta) + entryMappings.put(store.keyTypeId(type.getKeyType()), new JdbcCacheStore.EntryMapping(dialect, type)); - store.prepareBuilders(typeMetadata); + store.prepareBuilders(null, typeMeta); - cacheMappings.put(0, Collections.unmodifiableMap(entryMappings)); + cacheMappings.put(null, Collections.unmodifiableMap(entryMappings)); GridTestUtils.setFieldValue(store, JdbcCacheStore.class, "cacheMappings", cacheMappings); }