sprint-2 - Added isWithinTransaction() method to session.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16105ec9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16105ec9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16105ec9 Branch: refs/heads/ignite-344 Commit: 16105ec9687732d0b01cfeaee9a5b1c227b0921f Parents: 6097e7b Author: Dmitiry Setrakyan <dsetrak...@gridgain.com> Authored: Sat Feb 28 09:44:30 2015 -0800 Committer: Dmitiry Setrakyan <dsetrak...@gridgain.com> Committed: Sat Feb 28 09:44:30 2015 -0800 ---------------------------------------------------------------------- .../store/jdbc/CacheJdbcPersonStore.java | 110 +++++++------------ .../ignite/cache/store/CacheStoreSession.java | 9 ++ .../processors/cache/GridCacheStoreManager.java | 6 +- .../junits/cache/TestCacheSession.java | 5 + .../cache/TestThreadLocalCacheSession.java | 5 + 5 files changed, 62 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java index d80861d..0473280 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; -import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -72,8 +71,6 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { /** {@inheritDoc} */ @Override public void txEnd(boolean commit) { - Transaction tx = transaction(); - Map<String, Connection> props = ses.properties(); try (Connection conn = props.remove(ATTR_NAME)) { @@ -84,23 +81,21 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { conn.rollback(); } - System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']'); + System.out.println(">>> Transaction ended [commit=" + commit + ']'); } catch (SQLException e) { - throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e); + throw new CacheWriterException("Failed to end transaction: " + ses.transaction(), e); } } /** {@inheritDoc} */ @Override public Person load(Long key) { - Transaction tx = transaction(); - - System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); + System.out.println(">>> Loading key: " + key); Connection conn = null; try { - conn = connection(tx); + conn = connection(); try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) { st.setString(1, key.toString()); @@ -108,14 +103,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { ResultSet rs = st.executeQuery(); if (rs.next()) - return person(rs.getLong(1), rs.getString(2), rs.getString(3)); + return new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); } } catch (SQLException e) { throw new CacheLoaderException("Failed to load object: " + key, e); } finally { - end(tx, conn); + end(conn); } return null; @@ -123,60 +118,57 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { /** {@inheritDoc} */ @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) { - Transaction tx = transaction(); - Long key = entry.getKey(); Person val = entry.getValue(); - System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']'); + System.out.println(">>> Putting [key=" + key + ", val=" + val + ']'); Connection conn = null; try { - conn = connection(tx); + conn = connection(); - int updated; + int updated; - try (PreparedStatement st = conn.prepareStatement( - "update PERSONS set firstName=?, lastName=? where id=?")) { - st.setString(1, val.getFirstName()); - st.setString(2, val.getLastName()); - st.setLong(3, val.getId()); + // Try update first. + try (PreparedStatement st = conn.prepareStatement( + "update PERSONS set firstName=?, lastName=? where id=?")) { + st.setString(1, val.getFirstName()); + st.setString(2, val.getLastName()); + st.setLong(3, val.getId()); - updated = st.executeUpdate(); - } + updated = st.executeUpdate(); + } - // If update failed, try to insert. - if (updated == 0) { - try (PreparedStatement st = conn.prepareStatement( - "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) { - st.setLong(1, val.getId()); - st.setString(2, val.getFirstName()); - st.setString(3, val.getLastName()); + // If update failed, try to insert. + if (updated == 0) { + try (PreparedStatement st = conn.prepareStatement( + "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) { + st.setLong(1, val.getId()); + st.setString(2, val.getFirstName()); + st.setString(3, val.getLastName()); - st.executeUpdate(); + st.executeUpdate(); + } } } - } catch (SQLException e) { throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e); } finally { - end(tx, conn); + end(conn); } } /** {@inheritDoc} */ @Override public void delete(Object key) { - Transaction tx = transaction(); - - System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']'); + System.out.println(">>> Removing key: " + key); Connection conn = null; try { - conn = connection(tx); + conn = connection(); try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) { st.setLong(1, (Long)key); @@ -188,7 +180,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { throw new CacheWriterException("Failed to remove object: " + key, e); } finally { - end(tx, conn); + end(conn); } } @@ -199,17 +191,13 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { final int entryCnt = (Integer)args[0]; - Connection conn = null; - - try { - conn = connection(null); - + try (Connection conn = connection()) { try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) { try (ResultSet rs = st.executeQuery()) { int cnt = 0; while (cnt < entryCnt && rs.next()) { - Person person = person(rs.getLong(1), rs.getString(2), rs.getString(3)); + Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); clo.apply(person.getId(), person); @@ -223,18 +211,16 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { catch (SQLException e) { throw new CacheLoaderException("Failed to load values from cache store.", e); } - finally { - end(null, conn); - } } /** - * @param tx Cache transaction. * @return Connection. * @throws SQLException In case of error. */ - private Connection connection(@Nullable Transaction tx) throws SQLException { - if (tx != null) { + private Connection connection() throws SQLException { + // If there is an ongoing transaction, + // we must reuse the same connection. + if (ses.isWithinTransaction()) { Map<Object, Object> props = ses.properties(); Connection conn = (Connection)props.get(ATTR_NAME); @@ -257,11 +243,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { /** * Closes allocated resources depending on transaction status. * - * @param tx Active transaction, if any. * @param conn Allocated connection. */ - private void end(@Nullable Transaction tx, @Nullable Connection conn) { - if (tx == null && conn != null) { + private void end(@Nullable Connection conn) { + if (!ses.isWithinTransaction() && conn != null) { // Close connection right away if there is no transaction. try { conn.close(); @@ -286,23 +271,4 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { return conn; } - - /** - * Builds person object out of provided values. - * - * @param id ID. - * @param firstName First name. - * @param lastName Last name. - * @return Person. - */ - private Person person(Long id, String firstName, String lastName) { - return new Person(id, firstName, lastName); - } - - /** - * @return Current transaction. - */ - private Transaction transaction() { - return ses != null ? ses.transaction() : null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java index a2be4c5..38fe95c 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java @@ -43,6 +43,15 @@ public interface CacheStoreSession { public Transaction transaction(); /** + * Returns {@code true} if performing store operation within a transaction, + * {@code false} otherwise. Analogous to calling {@code transaction() != null}. + * + * @return {@code True} if performing store operation within a transaction, + * {@code false} otherwise. + */ + public boolean isWithinTransaction(); + + /** * Gets current session properties. You can add properties directly to the * returned map. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java index fac6ea3..9262a8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheStoreManager.java @@ -36,7 +36,6 @@ import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.integration.*; -import java.lang.reflect.*; import java.util.*; /** @@ -913,6 +912,11 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K1, V1> Map<K1, V1> properties() { SessionData ses0 = sesHolder.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java index cca20fe..0709880 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java @@ -50,6 +50,11 @@ public class TestCacheSession implements CacheStoreSession { } /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K, V> Map<K, V> properties() { if (props == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16105ec9/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java index 6687f1f..2bbcf1b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java @@ -49,6 +49,11 @@ public class TestThreadLocalCacheSession implements CacheStoreSession { } /** {@inheritDoc} */ + @Override public boolean isWithinTransaction() { + return transaction() != null; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K, V> Map<K, V> properties() { TestCacheSession ses = sesHolder.get();