Repository: incubator-ignite Updated Branches: refs/heads/ignite-32 707aaf533 -> 8e5c0a8af
# IGNITE-32: Fixes after review. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e5c0a8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e5c0a8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e5c0a8a Branch: refs/heads/ignite-32 Commit: 8e5c0a8af10e39dd4f70d9fcecb8c60d0b3d7822 Parents: 707aaf5 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Tue Feb 3 09:26:03 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Tue Feb 3 09:26:03 2015 +0700 ---------------------------------------------------------------------- .../ignite/cache/store/jdbc/JdbcCacheStore.java | 259 ++++++++++++------- 1 file changed, 161 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e5c0a8a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java index 4774d16..33461b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java @@ -169,8 +169,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L protected JdbcDialect resolveDialect() throws CacheException { Connection conn = null; - // TODO check conn.getMetaData().getURL() will work ??? - String dbProductName = null; try { @@ -612,6 +610,44 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L } } + /** + * @param insStmt Insert statement. + * @param updStmt Update statement. + * @param em Entry mapping. + * @param entry Cache entry. + */ + private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt, + EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws SQLException { + for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) { + int i = fillValueParameters(updStmt, 1, em, entry.getValue()); + + fillKeyParameters(updStmt, i, em, entry.getKey()); + + if (updStmt.executeUpdate() == 0) { + i = fillKeyParameters(insStmt, em, entry.getKey()); + + fillValueParameters(insStmt, i, em, entry.getValue()); + + try { + insStmt.executeUpdate(); + } + catch (SQLException e) { + // The error with code 23505 is thrown when trying to insert a row that + // would violate a unique index or primary key. + // TODO check with all RDBMS + if (e.getErrorCode() == 23505) + continue; + + throw e; + } + } + + return; + } + + throw new CacheWriterException("Failed write entry to database: " + entry); + } + /** {@inheritDoc} */ @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { assert entry != null; @@ -625,64 +661,49 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L Connection conn = null; - PreparedStatement stmt = null; - try { conn = connection(); if (dialect.hasMerge()) { - stmt = conn.prepareStatement(em.mergeQry); + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(em.mergeQry); - int i = fillKeyParameters(stmt, em, key); + int i = fillKeyParameters(stmt, em, key); - fillValueParameters(stmt, i, em, entry.getValue()); + fillValueParameters(stmt, i, em, entry.getValue()); - stmt.executeUpdate(); + stmt.executeUpdate(); + } + finally { + U.closeQuiet(stmt); + } } else { - V val = entry.getValue(); - - for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) { - stmt = conn.prepareStatement(em.updQry); + PreparedStatement insStmt = null; - int i = fillValueParameters(stmt, 1, em, val); + PreparedStatement updStmt = null; - fillKeyParameters(stmt, i, em, key); - - if (stmt.executeUpdate() == 0) { - U.closeQuiet(stmt); - - stmt = conn.prepareStatement(em.insQry); - - i = fillKeyParameters(stmt, em, key); - - fillValueParameters(stmt, i, em, val); + try { + insStmt = conn.prepareStatement(em.insQry); - try { - stmt.executeUpdate(); - } - catch (SQLException e) { - // The error with code 23505 is thrown when trying to insert a row that - // would violate a unique index or primary key. - // TODO check with all RDBMS - if (e.getErrorCode() == 23505) - continue; - - throw e; - } - } + updStmt = conn.prepareStatement(em.updQry); - return; + writeUpsert(insStmt, updStmt, em, entry); } + finally { + U.closeQuiet(insStmt); - throw new CacheWriterException("Failed write entry to database: " + entry); + U.closeQuiet(updStmt); + } } } catch (SQLException e) { throw new CacheWriterException("Failed to write entry to database: " + entry, e); } finally { - end(conn, stmt); + closeConnection(conn); } } @@ -691,68 +712,101 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L throws CacheWriterException { assert entries != null; - if (dialect.hasMerge()) { - Connection conn = null; + Connection conn = null; - PreparedStatement mergeStmt = null; + try { + conn = connection(); - try { - conn = connection(); + Object currKeyTypeId = null; - Object currKeyTypeId = null; + if (dialect.hasMerge()) { + PreparedStatement mergeStmt = null; - int cnt = 0; + try { + int fromIdx = 0, prepared = 0; - for (Cache.Entry<? extends K, ? extends V> entry : entries) { - K key = entry.getKey(); + for (Cache.Entry<? extends K, ? extends V> entry : entries) { + K key = entry.getKey(); - Object keyTypeId = keyTypeId(key); + Object keyTypeId = keyTypeId(key); - EntryMapping em = entryMapping(keyTypeId, key); + EntryMapping em = entryMapping(keyTypeId, key); - if (mergeStmt == null) { - mergeStmt = conn.prepareStatement(em.mergeQry); + if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { + if (mergeStmt != null) { + executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries); - currKeyTypeId = keyTypeId; - } + U.closeQuiet(mergeStmt); + } - if (!currKeyTypeId.equals(keyTypeId)) { - executeBatch(mergeStmt, "writeAll", cnt); + mergeStmt = conn.prepareStatement(em.mergeQry); - currKeyTypeId = keyTypeId; + currKeyTypeId = keyTypeId; - cnt = 0; - } + prepared = 0; + } - int i = fillKeyParameters(mergeStmt, em, key); + int i = fillKeyParameters(mergeStmt, em, key); - fillValueParameters(mergeStmt, i, em, entry.getValue()); + fillValueParameters(mergeStmt, i, em, entry.getValue()); - mergeStmt.addBatch(); + mergeStmt.addBatch(); - if (++cnt % batchSz == 0) { - executeBatch(mergeStmt, "writeAll", cnt); + if (++prepared % batchSz == 0) { + executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries); - cnt = 0; + prepared = 0; + } } - } - if (mergeStmt != null && cnt % batchSz != 0) - executeBatch(mergeStmt, "writeAll", cnt); - } - catch (SQLException e) { - throw new CacheWriterException("Failed to write entries in database", e); + if (mergeStmt != null && prepared % batchSz != 0) + executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries); + } + finally { + U.closeQuiet(mergeStmt); + } } - finally { - U.closeQuiet(mergeStmt); + else { + PreparedStatement insStmt = null; + + PreparedStatement updStmt = null; + + try { + for (Cache.Entry<? extends K, ? extends V> entry : entries) { + K key = entry.getKey(); + + Object keyTypeId = keyTypeId(key); + + EntryMapping em = entryMapping(keyTypeId, key); - closeConnection(conn); + if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) { + U.closeQuiet(insStmt); + + insStmt = conn.prepareStatement(em.insQry); + + U.closeQuiet(updStmt); + + updStmt = conn.prepareStatement(em.updQry); + + currKeyTypeId = keyTypeId; + } + + writeUpsert(insStmt, updStmt, em, entry); + } + } + finally { + U.closeQuiet(insStmt); + + U.closeQuiet(updStmt); + } } } - else - for (Cache.Entry<? extends K, ? extends V> e : entries) - write(e); // TODO rework to optimal usage. Method write will get all params each time (conn, stmt). - // split into 2 methods writeMerge + writeUpsert. + catch (SQLException e) { + throw new CacheWriterException("Failed to write entries in database", e); + } + finally { + closeConnection(conn); + } } /** {@inheritDoc} */ @@ -788,24 +842,29 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L /** * @param stmt Statement. - * @param stmtType Statement type for error message. - * @param batchSz Expected batch size. + * @param stmtType Statement description for error message. + * @param fromIdx Objects in batch start from index. + * @param prepared Expected objects in batch. + * @param objects All objects. */ - private void executeBatch(Statement stmt, String stmtType, int batchSz) throws SQLException { + private void executeBatch(Statement stmt, String stmtType, int fromIdx, int prepared, Collection<?> objects) + throws SQLException { int[] rowCounts = stmt.executeBatch(); int numOfRowCnt = rowCounts.length; - if (numOfRowCnt != batchSz) - log.warning("JDBC driver did not return the expected number of row counts," + - " actual row count: " + numOfRowCnt + " expected: " + batchSz); + if (numOfRowCnt != prepared) + log.warning("JDBC driver did not return the expected number of updated row counts," + + " actual row count: " + numOfRowCnt + " expected: " + prepared); + + Object[] arr = null; + + for (int i = 0; i < numOfRowCnt; i++) + if (rowCounts[i] != 1) { + if (arr == null) + arr = objects.toArray(); - for (int rowCount : rowCounts) - if (rowCount != 1) { - // TODO stmtType used 2 times? - // TODO print warning for all keys. - // TODO while failed - convert collection to array and print warnins to each failed key. - log.warning("Batch " + stmtType + " returned unexpected row count from " + stmtType + " statement"); + log.warning("Batch " + stmtType + " returned unexpected updated row count for: " + arr[fromIdx + i]); } } @@ -822,7 +881,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L PreparedStatement delStmt = null; - int cnt = 0; + int fromIdx = 0, prepared = 0; for (Object key : keys) { Object keyTypeId = keyTypeId(key); @@ -836,9 +895,11 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L } if (!currKeyTypeId.equals(keyTypeId)) { - executeBatch(delStmt, "deleteAll", cnt); + executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys); - cnt = 0; + fromIdx += prepared; + + prepared = 0; currKeyTypeId = keyTypeId; } @@ -847,15 +908,17 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L delStmt.addBatch(); - if (++cnt % batchSz == 0) { - executeBatch(delStmt, "deleteAll", cnt); + if (++prepared % batchSz == 0) { + executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys); + + fromIdx += prepared; - cnt = 0; + prepared = 0; } } - if (delStmt != null && cnt % batchSz != 0) - executeBatch(delStmt, "deleteAll", cnt); + if (delStmt != null && prepared % batchSz != 0) + executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys); } catch (SQLException e) { throw new CacheWriterException("Failed to remove values from database", e);