# IGNITE-32: Renaming JDBC store class.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f55d1983
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f55d1983
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f55d1983

Branch: refs/heads/ignite-111
Commit: f55d198335e7aadab46603c42865d393b0e4f347
Parents: fe7c2af
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Fri Feb 6 10:14:55 2015 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Fri Feb 6 10:14:55 2015 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      | 1560 ++++++++++++++++++
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |  205 +++
 .../ignite/cache/store/jdbc/JdbcCacheStore.java | 1560 ------------------
 .../cache/store/jdbc/JdbcPojoCacheStore.java    |  205 ---
 ...ractJdbcCacheStoreMultithreadedSelfTest.java |  196 ---
 .../CacheJdbcPojoStoreMultitreadedSelfTest.java |   35 +
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |  702 ++++++++
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |  197 +++
 .../PojoJdbcCacheStoreMultitreadedSelfTest.java |   34 -
 .../store/jdbc/PojoJdbcCacheStoreTest.java      |  702 --------
 .../yardstick/config/ignite-store-config.xml    |    4 +-
 .../jdbc/IgniteJdbcStoreAbstractBenchmark.java  |    4 +-
 12 files changed, 2703 insertions(+), 2701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
new file mode 100644
index 0000000..8e550f2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -0,0 +1,1560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.dialect.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static java.sql.Statement.*;
+
+/**
+ * Base {@link CacheStore} implementation backed by JDBC. This implementation 
stores objects in underlying database
+ * using mapping description.
+ * <p>
+ * <h2 class="header">Configuration</h2>
+ * Sections below describe mandatory and optional configuration settings as 
well
+ * as providing example using Java and Spring XML.
+ * <h3>Mandatory</h3>
+ * There are no mandatory configuration parameters.
+ * <h3>Optional</h3>
+ * <ul>
+ *     <li>Data source (see {@link #setDataSource(DataSource)}</li>
+ *     <li>Maximum batch size for writeAll and deleteAll operations. (see 
{@link #setBatchSize(int)})</li>
+ *     <li>Max workers thread count. These threads are responsible for load 
cache. (see {@link #setMaxPoolSize(int)})</li>
+ *     <li>Parallel load cache minimum threshold. (see {@link 
#setParallelLoadCacheMinimumThreshold(int)})</li>
+ * </ul>
+ * <h2 class="header">Java Example</h2>
+ * <pre name="code" class="java">
+ *     ...
+ *     JdbcPojoCacheStore store = new JdbcPojoCacheStore();
+ *     ...
+ *
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * <pre name="code" class="xml">
+ *     ...
+ *     &lt;bean id=&quot;cache.jdbc.store&quot;
+ *         
class=&quot;org.apache.ignite.cache.store.jdbc.JdbcPojoCacheStore&quot;&gt;
+ *         &lt;property name=&quot;connectionUrl&quot; 
value=&quot;jdbc:h2:mem:&quot;/&gt;
+ *     &lt;/bean&gt;
+ *     ...
+ * </pre>
+ * <p>
+ * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
+ */
+public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V> 
implements LifecycleAware {
+    /** Max attempt write count. */
+    protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
+
+    /** Default batch size for put and remove operations. */
+    protected static final int DFLT_BATCH_SIZE = 512;
+
+    /** Default batch size for put and remove operations. */
+    protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 
512;
+
+    /** Connection attribute property name. */
+    protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+
+    /** Empty column value. */
+    protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null };
+
+    /** Auto-injected logger instance. */
+    @IgniteLoggerResource
+    protected IgniteLogger log;
+
+    /** Lock for metadata cache. */
+    @GridToStringExclude
+    private final Lock cacheMappingsLock = new ReentrantLock();
+
+    /** Data source. */
+    protected DataSource dataSrc;
+
+    /** Cache with entry mapping description. (cache name, (key id, mapping 
description)). */
+    protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = 
Collections.emptyMap();
+
+    /** Database dialect. */
+    protected JdbcDialect dialect;
+
+    /** Max workers thread count. These threads are responsible for load 
cache. */
+    private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+    /** Maximum batch size for writeAll and deleteAll operations. */
+    private int batchSz = DFLT_BATCH_SIZE;
+
+    /** Parallel load cache minimum threshold. If {@code 0} then load 
sequentially. */
+    private int parallelLoadCacheMinThreshold = 
DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+    /**
+     * Get field value from object.
+     *
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param obj Cache object.
+     * @return Field value from object.
+     */
+    @Nullable protected abstract Object extractField(String typeName, String 
fieldName, Object obj)
+        throws CacheException;
+
+    /**
+     * Construct object from query result.
+     *
+     * @param <R> Type of result object.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param loadColIdxs Select query columns index.
+     * @param rs ResultSet.
+     * @return Constructed object.
+     */
+    protected abstract <R> R buildObject(String typeName, 
Collection<CacheTypeFieldMetadata> fields,
+        Map<String, Integer> loadColIdxs, ResultSet rs) throws 
CacheLoaderException;
+
+    /**
+     * Extract key type id from key object.
+     *
+     * @param key Key object.
+     * @return Key type id.
+     */
+    protected abstract Object keyTypeId(Object key) throws CacheException;
+
+    /**
+     * Extract key type id from key class name.
+     *
+     * @param type String description of key type.
+     * @return Key type id.
+     */
+    protected abstract Object keyTypeId(String type) throws CacheException;
+
+    /**
+     * Prepare internal store specific builders for provided types metadata.
+     *
+     * @param types Collection of types.
+     * @throws CacheException If failed to prepare.
+     */
+    protected abstract void prepareBuilders(@Nullable String cacheName, 
Collection<CacheTypeMetadata> types)
+        throws CacheException;
+
+    /**
+     * Perform dialect resolution.
+     *
+     * @return The resolved dialect.
+     * @throws CacheException Indicates problems accessing the metadata.
+     */
+    protected JdbcDialect resolveDialect() throws CacheException {
+        Connection conn = null;
+
+        String dbProductName = null;
+
+        try {
+            conn = openConnection(false);
+
+            dbProductName = conn.getMetaData().getDatabaseProductName();
+        }
+        catch (SQLException e) {
+            throw new CacheException("Failed access to metadata for detect 
database dialect.", e);
+        }
+        finally {
+            U.closeQuiet(conn);
+        }
+
+        if ("H2".equals(dbProductName))
+            return new H2Dialect();
+
+        if ("MySQL".equals(dbProductName))
+            return new MySQLDialect();
+
+        if (dbProductName.startsWith("Microsoft SQL Server"))
+            return new SQLServerDialect();
+
+        if ("Oracle".equals(dbProductName))
+            return new OracleDialect();
+
+        if (dbProductName.startsWith("DB2/"))
+            return new DB2Dialect();
+
+        U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be 
used): " + dbProductName);
+
+        return new BasicJdbcDialect();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (dataSrc == null)
+            throw new IgniteException("Failed to initialize cache store (data 
source is not provided).");
+
+        if (dialect == null)
+            dialect = resolveDialect();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /**
+     * Gets connection from a pool.
+     *
+     * @param autocommit {@code true} If connection should use autocommit mode.
+     * @return Pooled connection.
+     * @throws SQLException In case of error.
+     */
+    protected Connection openConnection(boolean autocommit) throws 
SQLException {
+        Connection conn = dataSrc.getConnection();
+
+        conn.setAutoCommit(autocommit);
+
+        return conn;
+    }
+
+    /**
+     * @return Connection.
+     * @throws SQLException In case of error.
+     */
+    protected Connection connection() throws SQLException {
+        CacheStoreSession ses = session();
+
+        if (ses.transaction() != null) {
+            Map<String, Connection> prop = ses.properties();
+
+            Connection conn = prop.get(ATTR_CONN_PROP);
+
+            if (conn == null) {
+                conn = openConnection(false);
+
+                // Store connection in session to used it for other operations 
in the same session.
+                prop.put(ATTR_CONN_PROP, conn);
+            }
+
+            return conn;
+        }
+        // Transaction can be null in case of simple load operation.
+        else
+            return openConnection(true);
+    }
+
+    /**
+     * Closes connection.
+     *
+     * @param conn Connection to close.
+     */
+    protected void closeConnection(@Nullable Connection conn) {
+        CacheStoreSession ses = session();
+
+        // Close connection right away if there is no transaction.
+        if (ses.transaction() == null)
+            U.closeQuiet(conn);
+    }
+
+    /**
+     * Closes allocated resources depending on transaction status.
+     *
+     * @param conn Allocated connection.
+     * @param st Created statement,
+     */
+    protected void end(@Nullable Connection conn, @Nullable Statement st) {
+        U.closeQuiet(st);
+
+        closeConnection(conn);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void txEnd(boolean commit) throws CacheWriterException {
+        CacheStoreSession ses = session();
+
+        IgniteTx tx = ses.transaction();
+
+        Connection conn = ses.<String, 
Connection>properties().remove(ATTR_CONN_PROP);
+
+        if (conn != null) {
+            assert tx != null;
+
+            try {
+                if (commit)
+                    conn.commit();
+                else
+                    conn.rollback();
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException(
+                    "Failed to end transaction [xid=" + tx.xid() + ", commit=" 
+ commit + ']', e);
+            }
+            finally {
+                U.closeQuiet(conn);
+            }
+        }
+
+        if (tx != null && log.isDebugEnabled())
+            log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + 
commit + ']');
+    }
+
+    /**
+     * Retrieves the value of the designated column in the current row of this 
<code>ResultSet</code> object and
+     * will convert to the requested Java data type.
+     *
+     * @param rs Result set.
+     * @param colIdx Column index in result set.
+     * @param type Class representing the Java data type to convert the 
designated column to.
+     * @return Value in column.
+     * @throws SQLException If a database access error occurs or this method 
is called.
+     */
+    protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) 
throws SQLException {
+        if (type == int.class)
+            return rs.getInt(colIdx);
+
+        if (type == long.class)
+            return rs.getLong(colIdx);
+
+        if (type == double.class)
+            return rs.getDouble(colIdx);
+
+        if (type == boolean.class)
+            return rs.getBoolean(colIdx);
+
+        if (type == byte.class)
+            return rs.getByte(colIdx);
+
+        if (type == short.class)
+            return rs.getShort(colIdx);
+
+        if (type == float.class)
+            return rs.getFloat(colIdx);
+
+        if (type == Integer.class || type == Long.class || type == 
Double.class ||
+            type == Byte.class || type == Short.class ||  type == Float.class) 
{
+            Object val = rs.getObject(colIdx);
+
+            if (val != null) {
+                Number num = (Number)val;
+
+                if (type == Integer.class)
+                    return num.intValue();
+                else if (type == Long.class)
+                    return num.longValue();
+                else if (type == Double.class)
+                    return num.doubleValue();
+                else if (type == Byte.class)
+                    return num.byteValue();
+                else if (type == Short.class)
+                    return num.shortValue();
+                else if (type == Float.class)
+                    return num.floatValue();
+            }
+            else
+                return EMPTY_COLUMN_VALUE;
+        }
+
+        return rs.getObject(colIdx);
+    }
+
+    /**
+     * Construct load cache from range.
+     *
+     * @param em Type mapping description.
+     * @param clo Closure that will be applied to loaded values.
+     * @param lowerBound Lower bound for range.
+     * @param upperBound Upper bound for range.
+     * @return Callable for pool submit.
+     */
+    private Callable<Void> loadCacheRange(final EntryMapping em, final 
IgniteBiInClosure<K, V> clo,
+        @Nullable final Object[] lowerBound, @Nullable final Object[] 
upperBound) {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Connection conn = null;
+
+                PreparedStatement stmt = null;
+
+                try {
+                    conn = openConnection(true);
+
+                    stmt = conn.prepareStatement(lowerBound == null && 
upperBound == null
+                        ? em.loadCacheQry
+                        : em.loadCacheRangeQuery(lowerBound != null, 
upperBound != null));
+
+                    int ix = 1;
+
+                    if (lowerBound != null)
+                        for (int i = lowerBound.length; i > 0; i--)
+                            for (int j = 0; j < i; j++)
+                                stmt.setObject(ix++, lowerBound[j]);
+
+                    if (upperBound != null)
+                        for (int i = upperBound.length; i > 0; i--)
+                            for (int j = 0; j < i; j++)
+                                stmt.setObject(ix++, upperBound[j]);
+
+                    ResultSet rs = stmt.executeQuery();
+
+                    while (rs.next()) {
+                        K key = buildObject(em.keyType(), em.keyColumns(), 
em.loadColIdxs, rs);
+                        V val = buildObject(em.valueType(), em.valueColumns(), 
em.loadColIdxs, rs);
+
+                        clo.apply(key, val);
+                    }
+                }
+                catch (SQLException e) {
+                    throw new IgniteCheckedException("Failed to load cache", 
e);
+                }
+                finally {
+                    U.closeQuiet(stmt);
+
+                    U.closeQuiet(conn);
+                }
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * Construct load cache in one select.
+     *
+     * @param m Type mapping description.
+     * @param clo Closure for loaded values.
+     * @return Callable for pool submit.
+     */
+    private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, 
V> clo) {
+        return loadCacheRange(m, clo, null, null);
+    }
+
+    /**
+     * @return Type mappings for specified cache name.
+     * @throws CacheException If failed to initialize.
+     */
+    private Map<Object, EntryMapping> cacheMappings(@Nullable String 
cacheName) throws CacheException {
+        Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
+
+        if (entryMappings != null)
+            return entryMappings;
+
+        cacheMappingsLock.lock();
+
+        try {
+            entryMappings = cacheMappings.get(cacheName);
+
+            if (entryMappings != null)
+                return entryMappings;
+
+            Collection<CacheTypeMetadata> types = 
ignite().cache(session().cacheName()).configuration()
+                .getTypeMetadata();
+
+            entryMappings = U.newHashMap(types.size());
+
+            for (CacheTypeMetadata type : types)
+                entryMappings.put(keyTypeId(type.getKeyType()), new 
EntryMapping(dialect, type));
+
+            Map<String, Map<Object, EntryMapping>> mappings = new 
HashMap<>(cacheMappings);
+
+            mappings.put(cacheName, entryMappings);
+
+            prepareBuilders(cacheName, types);
+
+            cacheMappings = mappings;
+
+            return entryMappings;
+        }
+        finally {
+            cacheMappingsLock.unlock();
+        }
+    }
+
+    /**
+     * @param keyTypeId Key type id.
+     * @param key Key object.
+     * @return Entry mapping.
+     * @throws CacheException if mapping for key was not found.
+     */
+    private EntryMapping entryMapping(Object keyTypeId, Object key) throws 
CacheException {
+        String cacheName = session().cacheName();
+
+        EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
+
+        if (em == null)
+            throw new CacheException("Failed to find mapping description 
[key=" + key +
+                ", cache=" + (cacheName != null ? cacheName : "<default>") + 
"]");
+
+        return em;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(final IgniteBiInClosure<K, V> clo, 
@Nullable Object... args)
+        throws CacheLoaderException {
+        try {
+            ExecutorService pool = Executors.newFixedThreadPool(maxPoolSz);
+
+            Collection<Future<?>> futs = new ArrayList<>();
+
+            if (args != null && args.length > 0) {
+                if (args.length % 2 != 0)
+                    throw new CacheLoaderException("Expected even number of 
arguments, but found: " + args.length);
+
+                if (log.isDebugEnabled())
+                    log.debug("Start loading entries from db using user 
queries from arguments");
+
+                for (int i = 0; i < args.length; i += 2) {
+                    String keyType = args[i].toString();
+
+                    String selQry = args[i + 1].toString();
+
+                    EntryMapping em = entryMapping(keyTypeId(keyType), 
keyType);
+
+                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, 
selQry, clo)));
+                }
+            }
+            else {
+                Collection<EntryMapping> entryMappings = 
cacheMappings(session().cacheName()).values();
+
+                if (log.isDebugEnabled())
+                    log.debug("Start loading all cache types entries from db");
+
+                for (EntryMapping em : entryMappings) {
+                    if (parallelLoadCacheMinThreshold > 0) {
+                        Connection conn = null;
+
+                        try {
+                            conn = connection();
+
+                            PreparedStatement stmt = 
conn.prepareStatement(em.loadCacheSelRangeQry);
+
+                            stmt.setInt(1, parallelLoadCacheMinThreshold);
+
+                            ResultSet rs = stmt.executeQuery();
+
+                            if (rs.next()) {
+                                int keyCnt = em.keyCols.size();
+
+                                Object[] upperBound = new Object[keyCnt];
+
+                                for (int i = 0; i < keyCnt; i++)
+                                    upperBound[i] = rs.getObject(i + 1);
+
+                                futs.add(pool.submit(loadCacheRange(em, clo, 
null, upperBound)));
+
+                                while (rs.next()) {
+                                    Object[] lowerBound = upperBound;
+
+                                    upperBound = new Object[keyCnt];
+
+                                    for (int i = 0; i < keyCnt; i++)
+                                        upperBound[i] = rs.getObject(i + 1);
+
+                                    futs.add(pool.submit(loadCacheRange(em, 
clo, lowerBound, upperBound)));
+                                }
+
+                                futs.add(pool.submit(loadCacheRange(em, clo, 
upperBound, null)));
+                            }
+                            else
+                                futs.add(pool.submit(loadCacheFull(em, clo)));
+                        }
+                        catch (SQLException ignored) {
+                            futs.add(pool.submit(loadCacheFull(em, clo)));
+                        }
+                        finally {
+                            U.closeQuiet(conn);
+                        }
+                    }
+                    else
+                        futs.add(pool.submit(loadCacheFull(em, clo)));
+                }
+            }
+
+            for (Future<?> fut : futs)
+                U.get(fut);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheLoaderException("Failed to load cache", 
e.getCause());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public V load(K key) throws CacheLoaderException {
+        assert key != null;
+
+        EntryMapping em = entryMapping(keyTypeId(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Start load value from database [table= " + 
em.fullTableName() + ", key=" + key + "]");
+
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = connection();
+
+            stmt = conn.prepareStatement(em.loadQrySingle);
+
+            fillKeyParameters(stmt, em, key);
+
+            ResultSet rs = stmt.executeQuery();
+
+            if (rs.next())
+                return buildObject(em.valueType(), em.valueColumns(), 
em.loadColIdxs, rs);
+        }
+        catch (SQLException e) {
+            throw new CacheLoaderException("Failed to load object [table=" + 
em.fullTableName() +
+                ", key=" + key + "]", e);
+        }
+        finally {
+            end(conn, stmt);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws 
CacheLoaderException {
+        assert keys != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            Map<Object, LoadWorker<K, V>> workers = 
U.newHashMap(cacheMappings(session().cacheName()).size());
+
+            Map<K, V> res = new HashMap<>();
+
+            for (K key : keys) {
+                Object keyTypeId = keyTypeId(key);
+
+                EntryMapping em = entryMapping(keyTypeId, key);
+
+                LoadWorker<K, V> worker = workers.get(keyTypeId);
+
+                if (worker == null)
+                    workers.put(keyTypeId, worker = new LoadWorker<>(conn, 
em));
+
+                worker.keys.add(key);
+
+                if (worker.keys.size() == em.maxKeysPerStmt)
+                    res.putAll(workers.remove(keyTypeId).call());
+            }
+
+            for (LoadWorker<K, V> worker : workers.values())
+                res.putAll(worker.call());
+
+            return res;
+        }
+        catch (Exception e) {
+            throw new CacheWriterException("Failed to load entries from 
database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /**
+     * @param insStmt Insert statement.
+     * @param updStmt Update statement.
+     * @param em Entry mapping.
+     * @param entry Cache entry.
+     */
+    private void writeUpsert(PreparedStatement insStmt, PreparedStatement 
updStmt,
+        EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws 
CacheWriterException {
+        try {
+            CacheWriterException we = null;
+
+            for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; 
attempt++) {
+                int paramIdx = fillValueParameters(updStmt, 1, em, 
entry.getValue());
+
+                fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
+
+                if (updStmt.executeUpdate() == 0) {
+                    paramIdx = fillKeyParameters(insStmt, em, entry.getKey());
+
+                    fillValueParameters(insStmt, paramIdx, em, 
entry.getValue());
+
+                    try {
+                        insStmt.executeUpdate();
+
+                        if (attempt > 0)
+                            U.warn(log, "Entry was inserted in database on 
second try [table=" + em.fullTableName() +
+                                ", entry=" + entry + "]");
+                    }
+                    catch (SQLException e) {
+                        String sqlState = e.getSQLState();
+
+                        SQLException nested = e.getNextException();
+
+                        while (sqlState == null && nested != null) {
+                            sqlState = nested.getSQLState();
+
+                            nested = nested.getNextException();
+                        }
+
+                        // The error with code 23505 or 23000 is thrown when 
trying to insert a row that
+                        // would violate a unique index or primary key.
+                        if ("23505".equals(sqlState) || 
"23000".equals(sqlState)) {
+                            if (we == null)
+                                we = new CacheWriterException("Failed insert 
entry in database, violate a unique" +
+                                    " index or primary key [table=" + 
em.fullTableName() + ", entry=" + entry + "]");
+
+                            we.addSuppressed(e);
+
+                            U.warn(log, "Failed insert entry in database, 
violate a unique index or primary key" +
+                                " [table=" + em.fullTableName() + ", entry=" + 
entry + "]");
+
+                            continue;
+                        }
+
+                        throw new CacheWriterException("Failed insert entry in 
database [table=" + em.fullTableName() +
+                            ", entry=" + entry, e);
+                    }
+                }
+
+                if (attempt > 0)
+                    U.warn(log, "Entry was updated in database on second try 
[table=" + em.fullTableName() +
+                        ", entry=" + entry + "]");
+
+                return;
+            }
+
+            throw we;
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed update entry in database 
[table=" + em.fullTableName() +
+                ", entry=" + entry + "]", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) 
throws CacheWriterException {
+        assert entry != null;
+
+        K key = entry.getKey();
+
+        EntryMapping em = entryMapping(keyTypeId(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Start write entry to database [table=" + 
em.fullTableName() + ", entry=" + entry + "]");
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            if (dialect.hasMerge()) {
+                PreparedStatement stmt = null;
+
+                try {
+                    stmt = conn.prepareStatement(em.mergeQry);
+
+                    int i = fillKeyParameters(stmt, em, key);
+
+                    fillValueParameters(stmt, i, em, entry.getValue());
+
+                    int updCnt = stmt.executeUpdate();
+
+                    if (updCnt != 1)
+                        U.warn(log, "Unexpected number of updated entries 
[table=" + em.fullTableName() +
+                            ", entry=" + entry + "expected=1, actual=" + 
updCnt + "]");
+                }
+                finally {
+                    U.closeQuiet(stmt);
+                }
+            }
+            else {
+                PreparedStatement insStmt = null;
+
+                PreparedStatement updStmt = null;
+
+                try {
+                    insStmt = conn.prepareStatement(em.insQry);
+
+                    updStmt = conn.prepareStatement(em.updQry);
+
+                    writeUpsert(insStmt, updStmt, em, entry);
+                }
+                finally {
+                    U.closeQuiet(insStmt);
+
+                    U.closeQuiet(updStmt);
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to write entry to database 
[table=" + em.fullTableName() +
+                ", entry=" + entry + "]", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? 
extends V>> entries)
+        throws CacheWriterException {
+        assert entries != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            Object currKeyTypeId = null;
+
+            if (dialect.hasMerge()) {
+                PreparedStatement mergeStmt = null;
+
+                try {
+                    EntryMapping em = null;
+
+                    LazyValue<Object[]> lazyEntries = new 
LazyValue<Object[]>() {
+                        @Override public Object[] create() {
+                            return entries.toArray();
+                        }
+                    };
+
+                    int fromIdx = 0, prepared = 0;
+
+                    for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
+                        K key = entry.getKey();
+
+                        Object keyTypeId = keyTypeId(key);
+
+                        em = entryMapping(keyTypeId, key);
+
+                        if (currKeyTypeId == null || 
!currKeyTypeId.equals(keyTypeId)) {
+                            if (mergeStmt != null) {
+                                executeBatch(em, mergeStmt, "writeAll", 
fromIdx, prepared, lazyEntries);
+
+                                U.closeQuiet(mergeStmt);
+                            }
+
+                            mergeStmt = conn.prepareStatement(em.mergeQry);
+
+                            currKeyTypeId = keyTypeId;
+
+                            prepared = 0;
+                        }
+
+                        int i = fillKeyParameters(mergeStmt, em, key);
+
+                        fillValueParameters(mergeStmt, i, em, 
entry.getValue());
+
+                        mergeStmt.addBatch();
+
+                        if (++prepared % batchSz == 0) {
+                            executeBatch(em, mergeStmt, "writeAll", fromIdx, 
prepared, lazyEntries);
+
+                            prepared = 0;
+                        }
+                    }
+
+                    if (mergeStmt != null && prepared % batchSz != 0)
+                        executeBatch(em, mergeStmt, "writeAll", fromIdx, 
prepared, lazyEntries);
+                }
+                finally {
+                    U.closeQuiet(mergeStmt);
+                }
+            }
+            else {
+                PreparedStatement insStmt = null;
+
+                PreparedStatement updStmt = null;
+
+                try {
+                    for (Cache.Entry<? extends K, ? extends V> entry : 
entries) {
+                        K key = entry.getKey();
+
+                        Object keyTypeId = keyTypeId(key);
+
+                        EntryMapping em = entryMapping(keyTypeId, key);
+
+                        if (currKeyTypeId == null || 
!currKeyTypeId.equals(keyTypeId)) {
+                            U.closeQuiet(insStmt);
+
+                            insStmt = conn.prepareStatement(em.insQry);
+
+                            U.closeQuiet(updStmt);
+
+                            updStmt = conn.prepareStatement(em.updQry);
+
+                            currKeyTypeId = keyTypeId;
+                        }
+
+                        writeUpsert(insStmt, updStmt, em, entry);
+                    }
+                }
+                finally {
+                    U.closeQuiet(insStmt);
+
+                    U.closeQuiet(updStmt);
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to write entries in 
database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void delete(Object key) throws CacheWriterException {
+        assert key != null;
+
+        EntryMapping em = entryMapping(keyTypeId(key), key);
+
+        if (log.isDebugEnabled())
+            log.debug("Start remove value from database [table=" + 
em.fullTableName() + ", key=" + key + "]");
+
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = connection();
+
+            stmt = conn.prepareStatement(em.remQry);
+
+            fillKeyParameters(stmt, em, key);
+
+            int delCnt = stmt.executeUpdate();
+
+            if (delCnt != 1)
+                U.warn(log, "Unexpected number of deleted entries [table=" + 
em.fullTableName() + ", key=" + key +
+                    "expected=1, actual=" + delCnt + "]");
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to remove value from 
database [table=" + em.fullTableName() +
+                ", key=" + key + "]", e);
+        }
+        finally {
+            end(conn, stmt);
+        }
+    }
+
+    /**
+     * @param em Entry mapping.
+     * @param stmt Statement.
+     * @param desc Statement description for error message.
+     * @param fromIdx Objects in batch start from index.
+     * @param prepared Expected objects in batch.
+     * @param lazyObjs All objects used in batch statement as array.
+     */
+    private void executeBatch(EntryMapping em, Statement stmt, String desc, 
int fromIdx, int prepared,
+        LazyValue<Object[]> lazyObjs) throws SQLException {
+        try {
+            int[] rowCounts = stmt.executeBatch();
+
+            int numOfRowCnt = rowCounts.length;
+
+            if (numOfRowCnt != prepared)
+                U.warn(log, "Unexpected number of updated rows [table=" + 
em.fullTableName() + ", expected=" + prepared +
+                    ", actual=" + numOfRowCnt + "]");
+
+            for (int i = 0; i < numOfRowCnt; i++) {
+                int cnt = rowCounts[i];
+
+                if (cnt != 1 && cnt != SUCCESS_NO_INFO) {
+                    Object[] objs = lazyObjs.value();
+
+                    U.warn(log, "Batch " + desc + " returned unexpected 
updated row count [table=" + em.fullTableName() +
+                        ", entry=" + objs[fromIdx + i] + ", expected=1, 
actual=" + cnt + "]");
+                }
+            }
+        }
+        catch (BatchUpdateException be) {
+            int[] rowCounts = be.getUpdateCounts();
+
+            for (int i = 0; i < rowCounts.length; i++) {
+                if (rowCounts[i] == EXECUTE_FAILED) {
+                    Object[] objs = lazyObjs.value();
+
+                    U.warn(log, "Batch " + desc + " failed on execution 
[table=" + em.fullTableName() +
+                        ", entry=" + objs[fromIdx + i] + "]");
+                }
+            }
+
+            throw be;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deleteAll(final Collection<?> keys) throws 
CacheWriterException {
+        assert keys != null;
+
+        Connection conn = null;
+
+        try {
+            conn = connection();
+
+            Object currKeyTypeId = null;
+
+            EntryMapping em = null;
+
+            PreparedStatement delStmt = null;
+
+            LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() {
+                @Override public Object[] create() {
+                    return keys.toArray();
+                }
+            };
+
+            int fromIdx = 0, prepared = 0;
+
+            for (Object key : keys) {
+                Object keyTypeId = keyTypeId(key);
+
+                em = entryMapping(keyTypeId, key);
+
+                if (delStmt == null) {
+                    delStmt = conn.prepareStatement(em.remQry);
+
+                    currKeyTypeId = keyTypeId;
+                }
+
+                if (!currKeyTypeId.equals(keyTypeId)) {
+                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, 
lazyKeys);
+
+                    fromIdx += prepared;
+
+                    prepared = 0;
+
+                    currKeyTypeId = keyTypeId;
+                }
+
+                fillKeyParameters(delStmt, em, key);
+
+                delStmt.addBatch();
+
+                if (++prepared % batchSz == 0) {
+                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, 
lazyKeys);
+
+                    fromIdx += prepared;
+
+                    prepared = 0;
+                }
+            }
+
+            if (delStmt != null && prepared % batchSz != 0)
+                executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, 
lazyKeys);
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to remove values from 
database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param i Start index for parameters.
+     * @param em Entry mapping.
+     * @param key Key object.
+     * @return Next index for parameters.
+     */
+    protected int fillKeyParameters(PreparedStatement stmt, int i, 
EntryMapping em,
+        Object key) throws CacheException {
+        for (CacheTypeFieldMetadata field : em.keyColumns()) {
+            Object fieldVal = extractField(em.keyType(), field.getJavaName(), 
key);
+
+            try {
+                if (fieldVal != null)
+                    stmt.setObject(i++, fieldVal);
+                else
+                    stmt.setNull(i++, field.getDatabaseType());
+            }
+            catch (SQLException e) {
+                throw new CacheException("Failed to set statement parameter 
name: " + field.getDatabaseName(), e);
+            }
+        }
+
+        return i;
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param m Type mapping description.
+     * @param key Key object.
+     * @return Next index for parameters.
+     */
+    protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, 
Object key) throws CacheException {
+        return fillKeyParameters(stmt, 1, m, key);
+    }
+
+    /**
+     * @param stmt Prepare statement.
+     * @param i Start index for parameters.
+     * @param m Type mapping description.
+     * @param val Value object.
+     * @return Next index for parameters.
+     */
+    protected int fillValueParameters(PreparedStatement stmt, int i, 
EntryMapping m, Object val)
+        throws CacheWriterException {
+        for (CacheTypeFieldMetadata field : m.uniqValFields) {
+            Object fieldVal = extractField(m.valueType(), field.getJavaName(), 
val);
+
+            try {
+                if (fieldVal != null)
+                    stmt.setObject(i++, fieldVal);
+                else
+                    stmt.setNull(i++, field.getDatabaseType());
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to set statement 
parameter name: " + field.getDatabaseName(), e);
+            }
+        }
+
+        return i;
+    }
+
+    /**
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /**
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Get database dialect.
+     *
+     * @return Database dialect.
+     */
+    public JdbcDialect getDialect() {
+        return dialect;
+    }
+
+    /**
+     * Set database dialect.
+     *
+     * @param dialect Database dialect.
+     */
+    public void setDialect(JdbcDialect dialect) {
+        this.dialect = dialect;
+    }
+
+    /**
+     * Get Max workers thread count. These threads are responsible for execute 
query.
+     *
+     * @return Max workers thread count.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSz;
+    }
+
+    /**
+     * Set Max workers thread count. These threads are responsible for execute 
query.
+     *
+     * @param maxPoolSz Max workers thread count.
+     */
+    public void setMaxPoolSize(int maxPoolSz) {
+        this.maxPoolSz = maxPoolSz;
+    }
+
+    /**
+     * Get maximum batch size for delete and delete operations.
+     *
+     * @return Maximum batch size.
+     */
+    public int getBatchSize() {
+        return batchSz;
+    }
+
+    /**
+     * Set maximum batch size for write and delete operations.
+     *
+     * @param batchSz Maximum batch size.
+     */
+    public void setBatchSize(int batchSz) {
+        this.batchSz = batchSz;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @return If {@code 0} then load sequentially.
+     */
+    public int getParallelLoadCacheMinimumThreshold() {
+        return parallelLoadCacheMinThreshold;
+    }
+
+    /**
+     * Parallel load cache minimum row count threshold.
+     *
+     * @param parallelLoadCacheMinThreshold Minimum row count threshold. If 
{@code 0} then load sequentially.
+     */
+    public void setParallelLoadCacheMinimumThreshold(int 
parallelLoadCacheMinThreshold) {
+        this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+    }
+
+    /**
+     * Entry mapping description.
+     */
+    protected static class EntryMapping {
+        /** Database dialect. */
+        private final JdbcDialect dialect;
+
+        /** Select border for range queries. */
+        protected final String loadCacheSelRangeQry;
+
+        /** Select all items query. */
+        protected final String loadCacheQry;
+
+        /** Select item query. */
+        protected final String loadQrySingle;
+
+        /** Select items query. */
+        private final String loadQry;
+
+        /** Merge item(s) query. */
+        protected final String mergeQry;
+
+        /** Update item query. */
+        protected final String insQry;
+
+        /** Update item query. */
+        protected final String updQry;
+
+        /** Remove item(s) query. */
+        protected final String remQry;
+
+        /** Max key count for load query per statement. */
+        protected final int maxKeysPerStmt;
+
+        /** Database key columns. */
+        private final Collection<String> keyCols;
+
+        /** Database unique value columns. */
+        private final Collection<String> cols;
+
+        /** Select query columns index. */
+        private final Map<String, Integer> loadColIdxs;
+
+        /** Unique value fields. */
+        private final Collection<CacheTypeFieldMetadata> uniqValFields;
+
+        /** Type metadata. */
+        private final CacheTypeMetadata typeMeta;
+
+        /** Full table name. */
+        private final String fullTblName;
+
+        /**
+         * @param typeMeta Type metadata.
+         */
+        public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) {
+            this.dialect = dialect;
+
+            this.typeMeta = typeMeta;
+
+            final Collection<CacheTypeFieldMetadata> keyFields = 
typeMeta.getKeyFields();
+
+            Collection<CacheTypeFieldMetadata> valFields = 
typeMeta.getValueFields();
+
+            uniqValFields = F.view(valFields, new 
IgnitePredicate<CacheTypeFieldMetadata>() {
+                @Override public boolean apply(CacheTypeFieldMetadata col) {
+                    return !keyFields.contains(col);
+                }
+            });
+
+            String schema = typeMeta.getDatabaseSchema();
+
+            String tblName = typeMeta.getDatabaseTable();
+
+            fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName;
+
+            keyCols = databaseColumns(keyFields);
+
+            Collection<String> uniqValCols = databaseColumns(uniqValFields);
+
+            cols = F.concat(false, keyCols, uniqValCols);
+
+            loadColIdxs = U.newHashMap(cols.size());
+
+            int idx = 1;
+
+            for (String col : cols)
+                loadColIdxs.put(col, idx++);
+
+            loadCacheQry = dialect.loadCacheQuery(fullTblName, cols);
+
+            loadCacheSelRangeQry = 
dialect.loadCacheSelectRangeQuery(fullTblName, keyCols);
+
+            loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1);
+
+            maxKeysPerStmt = dialect.getMaxParamsCnt() / keyCols.size();
+
+            loadQry = dialect.loadQuery(fullTblName, keyCols, cols, 
maxKeysPerStmt);
+
+            insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols);
+
+            updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols);
+
+            mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols);
+
+            remQry = dialect.removeQuery(fullTblName, keyCols);
+        }
+
+        /**
+         * Extract database column names from {@link CacheTypeFieldMetadata}.
+         *
+         * @param dsc collection of {@link CacheTypeFieldMetadata}.
+         */
+        private static Collection<String> 
databaseColumns(Collection<CacheTypeFieldMetadata> dsc) {
+            return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() {
+                /** {@inheritDoc} */
+                @Override public String apply(CacheTypeFieldMetadata col) {
+                    return col.getDatabaseName();
+                }
+            });
+        }
+
+        /**
+         * Construct query for select values with key count less or equal 
{@code maxKeysPerStmt}
+         *
+         * @param keyCnt Key count.
+         */
+        protected String loadQuery(int keyCnt) {
+            assert keyCnt <= maxKeysPerStmt;
+
+            if (keyCnt == maxKeysPerStmt)
+                return loadQry;
+
+            if (keyCnt == 1)
+                return loadQrySingle;
+
+            return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt);
+        }
+
+        /**
+         * Construct query for select values in range.
+         *
+         * @param appendLowerBound Need add lower bound for range.
+         * @param appendUpperBound Need add upper bound for range.
+         * @return Query with range.
+         */
+        protected String loadCacheRangeQuery(boolean appendLowerBound, boolean 
appendUpperBound) {
+            return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, 
appendLowerBound, appendUpperBound);
+        }
+
+        /** Key type. */
+        protected String keyType() {
+            return typeMeta.getKeyType();
+        }
+
+        /** Value type. */
+        protected String valueType() {
+            return typeMeta.getValueType();
+        }
+
+        /**
+         * Gets key columns.
+         *
+         * @return Key columns.
+         */
+        protected Collection<CacheTypeFieldMetadata> keyColumns() {
+            return typeMeta.getKeyFields();
+        }
+
+        /**
+         * Gets value columns.
+         *
+         * @return Value columns.
+         */
+        protected Collection<CacheTypeFieldMetadata> valueColumns() {
+            return typeMeta.getValueFields();
+        }
+
+        /**
+         * Get full table name.
+         *
+         * @return &lt;schema&gt;.&lt;table name&gt
+         */
+        protected String fullTableName() {
+            return fullTblName;
+        }
+    }
+
+    /**
+     * Worker for load cache using custom user query.
+     *
+     * @param <K1> Key type.
+     * @param <V1> Value type.
+     */
+    private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> 
{
+        /** Entry mapping description. */
+        private final EntryMapping em;
+
+        /** User query. */
+        private final String qry;
+
+        /** Closure for loaded values. */
+        private final IgniteBiInClosure<K1, V1> clo;
+
+        /**
+         * @param em Entry mapping description.
+         * @param qry User query.
+         * @param clo Closure for loaded values.
+         */
+        private LoadCacheCustomQueryWorker(EntryMapping em, String qry, 
IgniteBiInClosure<K1, V1> clo) {
+            this.em = em;
+            this.qry = qry;
+            this.clo = clo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            Connection conn = null;
+
+            PreparedStatement stmt = null;
+
+            try {
+                conn = openConnection(true);
+
+                stmt = conn.prepareStatement(qry);
+
+                ResultSet rs = stmt.executeQuery();
+
+                while (rs.next()) {
+                    K1 key = buildObject(em.keyType(), em.keyColumns(), 
em.loadColIdxs, rs);
+                    V1 val = buildObject(em.valueType(), em.valueColumns(), 
em.loadColIdxs, rs);
+
+                    clo.apply(key, val);
+                }
+
+                return null;
+            }
+            catch (SQLException e) {
+                throw new CacheLoaderException("Failed to execute custom query 
for load cache", e);
+            }
+            finally {
+                U.closeQuiet(stmt);
+
+                U.closeQuiet(conn);
+            }
+        }
+    }
+
+    /**
+     * Lazy initialization of value.
+     *
+     * @param <T> Cached object type
+     */
+    private abstract static class LazyValue<T> {
+        /** Cached value. */
+        private T val;
+
+        /**
+         * @return Construct value.
+         */
+        protected abstract T create();
+
+        /**
+         * @return Value.
+         */
+        public T value() {
+            if (val == null)
+                val = create();
+
+            return val;
+        }
+    }
+
+    /**
+     * Worker for load by keys.
+     *
+     * @param <K1> Key type.
+     * @param <V1> Value type.
+     */
+    private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
+        /** Connection. */
+        private final Connection conn;
+
+        /** Keys for load. */
+        private final Collection<K1> keys;
+
+        /** Entry mapping description. */
+        private final EntryMapping em;
+
+        /**
+         * @param conn Connection.
+         * @param em Entry mapping description.
+         */
+        private LoadWorker(Connection conn, EntryMapping em) {
+            this.conn = conn;
+            this.em = em;
+
+            keys = new ArrayList<>(em.maxKeysPerStmt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K1, V1> call() throws Exception {
+            PreparedStatement stmt = null;
+
+            try {
+                stmt = conn.prepareStatement(em.loadQuery(keys.size()));
+
+                int i = 1;
+
+                for (Object key : keys)
+                    for (CacheTypeFieldMetadata field : em.keyColumns()) {
+                        Object fieldVal = extractField(em.keyType(), 
field.getJavaName(), key);
+
+                        if (fieldVal != null)
+                            stmt.setObject(i++, fieldVal);
+                        else
+                            stmt.setNull(i++, field.getDatabaseType());
+                    }
+
+                ResultSet rs = stmt.executeQuery();
+
+                Map<K1, V1> entries = U.newHashMap(keys.size());
+
+                while (rs.next()) {
+                    K1 key = buildObject(em.keyType(), em.keyColumns(), 
em.loadColIdxs, rs);
+                    V1 val = buildObject(em.valueType(), em.valueColumns(), 
em.loadColIdxs, rs);
+
+                    entries.put(key, val);
+                }
+
+                return entries;
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
new file mode 100644
index 0000000..ed1846b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.lang.reflect.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Base class for {@link CacheStore} that implementation backed by JDBC and 
POJO via reflection.
+ *
+ * This implementation stores objects in underlying database using java beans 
mapping description via reflection.
+ */
+public class CacheJdbcPojoStore extends CacheAbstractJdbcStore<Object, Object> 
{
+    /**
+     * POJO methods cache.
+     */
+    protected static class PojoMethodsCache {
+        /** POJO class. */
+        protected final Class<?> cls;
+
+        /** Constructor for POJO object. */
+        private final Constructor ctor;
+
+        /** Cached setters for POJO object. */
+        private final Map<String, Method> getters;
+
+        /** Cached getters for POJO object. */
+        private final Map<String, Method> setters;
+
+        /**
+         * POJO methods cache.
+         *
+         * @param clsName Class name.
+         * @param fields Fields.
+         */
+        public PojoMethodsCache(String clsName, 
Collection<CacheTypeFieldMetadata> fields) throws CacheException {
+            try {
+                cls = Class.forName(clsName);
+
+                ctor = cls.getDeclaredConstructor();
+
+                if (!ctor.isAccessible())
+                    ctor.setAccessible(true);
+            }
+            catch (ClassNotFoundException e) {
+                throw new CacheException("Failed to find class: " + clsName, 
e);
+            }
+            catch (NoSuchMethodException e) {
+                throw new CacheException("Failed to find empty constructor for 
class: " + clsName, e);
+            }
+
+            setters = U.newHashMap(fields.size());
+
+            getters = U.newHashMap(fields.size());
+
+            for (CacheTypeFieldMetadata field : fields) {
+                String prop = capitalFirst(field.getJavaName());
+
+                try {
+                    getters.put(field.getJavaName(), cls.getMethod("get" + 
prop));
+                }
+                catch (NoSuchMethodException ignored) {
+                    try {
+                        getters.put(field.getJavaName(), cls.getMethod("is" + 
prop));
+                    }
+                    catch (NoSuchMethodException e) {
+                        throw new CacheException("Failed to find getter for 
property " + field.getJavaName() +
+                            " of class: " + cls.getName(), e);
+                    }
+                }
+
+                try {
+                    setters.put(field.getJavaName(), cls.getMethod("set" + 
prop, field.getJavaType()));
+                }
+                catch (NoSuchMethodException e) {
+                    throw new CacheException("Failed to find setter for 
property " + field.getJavaName() +
+                        " of class: " + clsName, e);
+                }
+            }
+        }
+
+        /**
+         * Capitalizes the first character of the given string.
+         *
+         * @param str String.
+         * @return String with capitalized first character.
+         */
+        @Nullable private String capitalFirst(@Nullable String str) {
+            return str == null ? null :
+                str.isEmpty() ? "" : Character.toUpperCase(str.charAt(0)) + 
str.substring(1);
+        }
+
+        /**
+         * Construct new instance of pojo object.
+         *
+         * @return pojo object.
+         * @throws CacheLoaderException If construct new instance failed.
+         */
+        protected Object newInstance() throws CacheLoaderException {
+            try {
+                return ctor.newInstance();
+            }
+            catch (Exception e) {
+                throw new CacheLoaderException("Failed to create new instance 
for class: " + cls, e);
+            }
+        }
+    }
+
+    /** Methods cache. */
+    protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = 
Collections.emptyMap();
+
+    /** {@inheritDoc} */
+    @Override protected void prepareBuilders(@Nullable String cacheName, 
Collection<CacheTypeMetadata> types)
+        throws CacheException {
+        Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() 
* 2);
+
+        for (CacheTypeMetadata type : types) {
+            String keyType = type.getKeyType();
+            typeMethods.put(keyType, new PojoMethodsCache(keyType, 
type.getKeyFields()));
+
+            String valType = type.getValueType();
+            typeMethods.put(valType, new PojoMethodsCache(valType, 
type.getValueFields()));
+        }
+
+        Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new 
HashMap<>(mtdsCache);
+
+        newMtdsCache.put(cacheName, typeMethods);
+
+        mtdsCache = newMtdsCache;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected <R> R buildObject(String typeName, 
Collection<CacheTypeFieldMetadata> fields,
+        Map<String, Integer> loadColIdxs, ResultSet rs) throws 
CacheLoaderException {
+        PojoMethodsCache mc = 
mtdsCache.get(session().cacheName()).get(typeName);
+
+        Object obj = mc.newInstance();
+
+        try {
+            for (CacheTypeFieldMetadata field : fields) {
+                Method setter = mc.setters.get(field.getJavaName());
+
+                Integer colIdx = loadColIdxs.get(field.getDatabaseName());
+
+                setter.invoke(obj, getColumnValue(rs, colIdx, 
field.getJavaType()));
+            }
+
+            return (R)obj;
+        }
+        catch (Exception e) {
+            throw new CacheLoaderException("Failed to read object of class: " 
+ typeName, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Object extractField(String typeName, String 
fieldName, Object obj)
+        throws CacheException {
+        try {
+            PojoMethodsCache mc = 
mtdsCache.get(session().cacheName()).get(typeName);
+
+            return mc.getters.get(fieldName).invoke(obj);
+        }
+        catch (Exception e) {
+            throw new CacheException("Failed to read object of class: " + 
typeName, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object keyTypeId(Object key) throws CacheException {
+        return key.getClass();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object keyTypeId(String type) throws CacheException {
+        try {
+            return Class.forName(type);
+        }
+        catch (ClassNotFoundException e) {
+            throw new CacheException("Failed to find class: " + type, e);
+        }
+    }
+}

Reply via email to