IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ada1b2a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ada1b2a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ada1b2a7 Branch: refs/heads/ignite-sprint-5 Commit: ada1b2a7c4d82722cc5721bad50042af7216bfdc Parents: 990bf9e Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun May 24 20:42:53 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun May 24 20:42:53 2015 -0700 ---------------------------------------------------------------------- .../hibernate/CacheHibernatePersonStore.java | 27 +- .../store/jdbc/CacheJdbcPersonStore.java | 69 ++-- .../store/jdbc/CacheJdbcStoreExample.java | 3 +- .../store/spring/CacheSpringPersonStore.java | 128 ++++++ .../store/spring/CacheSpringStoreExample.java | 143 +++++++ .../datagrid/store/spring/package-info.java | 22 ++ .../processors/cache/GridCacheProcessor.java | 4 +- .../processors/cache/GridCacheUtils.java | 25 +- .../store/GridCacheStoreManagerAdapter.java | 61 ++- ...heStoreSessionListenerLifeCycleSelfTest.java | 395 +++++++++++++++++++ .../IgniteCrossCacheTxStoreSelfTest.java | 24 -- .../junits/common/GridCommonAbstractTest.java | 24 ++ .../spring/CacheSpringStoreSessionListener.java | 2 +- 13 files changed, 810 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java index 557ec6f..80a9f22 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java @@ -54,13 +54,7 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> { Long key = entry.getKey(); Person val = entry.getValue(); - System.out.println(">>> Store put [key=" + key + ", val=" + val + ']'); - - if (val == null) { - delete(key); - - return; - } + System.out.println(">>> Store write [key=" + key + ", val=" + val + ']'); Session hibSes = ses.attachment(); @@ -75,13 +69,14 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> { /** {@inheritDoc} */ @SuppressWarnings({"JpaQueryApiInspection"}) @Override public void delete(Object key) { - System.out.println(">>> Store remove [key=" + key + ']'); + System.out.println(">>> Store delete [key=" + key + ']'); Session hibSes = ses.attachment(); try { - hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key") - .setParameter("key", key).setFlushMode(FlushMode.ALWAYS).executeUpdate(); + hibSes.createQuery("delete " + Person.class.getSimpleName() + " where key = :key"). + setParameter("key", key). + executeUpdate(); } catch (HibernateException e) { throw new CacheWriterException("Failed to remove value from cache store [key=" + key + ']', e); @@ -100,13 +95,13 @@ public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> { try { int cnt = 0; - List res = hibSes.createCriteria(Person.class).list(); - - if (res != null) { - Iterator iter = res.iterator(); + List list = hibSes.createCriteria(Person.class). + setMaxResults(entryCnt). + list(); - while (cnt < entryCnt && iter.hasNext()) { - Person person = (Person)iter.next(); + if (list != null) { + for (Object obj : list) { + Person person = (Person)obj; clo.apply(person.getId(), person); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java index 6eb0386..ed14a99 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java @@ -22,9 +22,11 @@ import org.apache.ignite.cache.store.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; +import org.h2.jdbcx.*; import javax.cache.*; import javax.cache.integration.*; +import javax.sql.*; import java.sql.*; /** @@ -32,6 +34,10 @@ import java.sql.*; * transaction with cache transactions and maps {@link Long} to {@link Person}. */ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { + /** Data source. */ + public static final DataSource DATA_SRC = + JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", ""); + /** Store session. */ @CacheStoreSessionResource private CacheStoreSession ses; @@ -52,12 +58,10 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { * @throws IgniteException If failed. */ private void prepareDb() throws IgniteException { - try ( - Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); - Statement st = conn.createStatement() - ) { - st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " + - "lastName varchar(255))"); + try (Connection conn = DATA_SRC.getConnection()) { + conn.createStatement().execute( + "create table if not exists PERSONS (" + + "id number unique, firstName varchar(255), lastName varchar(255))"); } catch (SQLException e) { throw new IgniteException("Failed to create database table.", e); @@ -66,34 +70,28 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { /** {@inheritDoc} */ @Override public Person load(Long key) { - System.out.println(">>> Loading key: " + key); + System.out.println(">>> Store load [key=" + key + ']'); - try { - Connection conn = ses.attachment(); + Connection conn = ses.attachment(); - try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) { - st.setString(1, key.toString()); + try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id = ?")) { + st.setString(1, key.toString()); - ResultSet rs = st.executeQuery(); + ResultSet rs = st.executeQuery(); - if (rs.next()) - return new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); - } + return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null; } catch (SQLException e) { - throw new CacheLoaderException("Failed to load object: " + key, e); + throw new CacheLoaderException("Failed to load object [key=" + key + ']', e); } - - return null; } /** {@inheritDoc} */ @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) { Long key = entry.getKey(); - Person val = entry.getValue(); - System.out.println(">>> Putting [key=" + key + ", val=" + val + ']'); + System.out.println(">>> Store write [key=" + key + ", val=" + val + ']'); try { Connection conn = ses.attachment(); @@ -103,7 +101,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { // Try update first. If it does not work, then try insert. // Some databases would allow these to be done in one 'upsert' operation. try (PreparedStatement st = conn.prepareStatement( - "update PERSONS set firstName=?, lastName=? where id=?")) { + "update PERSONS set firstName = ?, lastName = ? where id = ?")) { st.setString(1, val.getFirstName()); st.setString(2, val.getLastName()); st.setLong(3, val.getId()); @@ -114,7 +112,7 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { // If update failed, try to insert. if (updated == 0) { try (PreparedStatement st = conn.prepareStatement( - "insert into PERSONS (id, firstName, lastName) values(?, ?, ?)")) { + "insert into PERSONS (id, firstName, lastName) values (?, ?, ?)")) { st.setLong(1, val.getId()); st.setString(2, val.getFirstName()); st.setString(3, val.getLastName()); @@ -124,25 +122,23 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { } } catch (SQLException e) { - throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e); + throw new CacheWriterException("Failed to write object [key=" + key + ", val=" + val + ']', e); } } /** {@inheritDoc} */ @Override public void delete(Object key) { - System.out.println(">>> Removing key: " + key); + System.out.println(">>> Store delete [key=" + key + ']'); - try { - Connection conn = ses.attachment(); + Connection conn = ses.attachment(); - try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) { - st.setLong(1, (Long)key); + try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) { + st.setLong(1, (Long)key); - st.executeUpdate(); - } + st.executeUpdate(); } catch (SQLException e) { - throw new CacheWriterException("Failed to remove object: " + key, e); + throw new CacheWriterException("Failed to delete object [key=" + key + ']', e); } } @@ -155,13 +151,14 @@ public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> { Connection conn = ses.attachment(); - try ( - PreparedStatement st = conn.prepareStatement("select * from PERSONS"); - ResultSet rs = st.executeQuery() - ) { + try (PreparedStatement stmt = conn.prepareStatement("select * from PERSONS limit ?")) { + stmt.setInt(1, entryCnt); + + ResultSet rs = stmt.executeQuery(); + int cnt = 0; - while (cnt < entryCnt && rs.next()) { + while (rs.next()) { Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); clo.apply(person.getId(), person); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java index 74e262c..637d6dc 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/CacheJdbcStoreExample.java @@ -24,7 +24,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.examples.*; import org.apache.ignite.examples.datagrid.store.*; import org.apache.ignite.transactions.*; -import org.h2.jdbcx.*; import javax.cache.configuration.*; import java.util.*; @@ -79,7 +78,7 @@ public class CacheJdbcStoreExample { @Override public CacheStoreSessionListener create() { CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); - lsnr.setDataSource(JdbcConnectionPool.create("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1", "", "")); + lsnr.setDataSource(CacheJdbcPersonStore.DATA_SRC); return lsnr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java new file mode 100644 index 0000000..50149ba --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringPersonStore.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid.store.spring; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.examples.datagrid.store.*; +import org.apache.ignite.lang.*; +import org.springframework.dao.*; +import org.springframework.jdbc.core.*; +import org.springframework.jdbc.datasource.*; + +import javax.cache.*; +import javax.cache.integration.*; +import javax.sql.*; +import java.sql.*; +import java.util.concurrent.atomic.*; + +/** + * Example of {@link CacheStore} implementation that uses JDBC + * transaction with cache transactions and maps {@link Long} to {@link Person}. + */ +public class CacheSpringPersonStore extends CacheStoreAdapter<Long, Person> { + /** Data source. */ + public static final DataSource DATA_SRC = new DriverManagerDataSource("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1"); + + /** Spring JDBC template. */ + private JdbcTemplate jdbcTemplate; + + /** + * Constructor. + * + * @throws IgniteException If failed. + */ + public CacheSpringPersonStore() throws IgniteException { + jdbcTemplate = new JdbcTemplate(DATA_SRC); + + prepareDb(); + } + + /** + * Prepares database for example execution. This method will create a + * table called "PERSONS" so it can be used by store implementation. + * + * @throws IgniteException If failed. + */ + private void prepareDb() throws IgniteException { + jdbcTemplate.update( + "create table if not exists PERSONS (" + + "id number unique, firstName varchar(255), lastName varchar(255))"); + } + + /** {@inheritDoc} */ + @Override public Person load(Long key) { + System.out.println(">>> Store load [key=" + key + ']'); + + try { + return jdbcTemplate.queryForObject("select * from PERSONS where id = ?", new RowMapper<Person>() { + @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); + } + }, key); + } + catch (EmptyResultDataAccessException ignored) { + return null; + } + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Long, ? extends Person> entry) { + Long key = entry.getKey(); + Person val = entry.getValue(); + + System.out.println(">>> Store write [key=" + key + ", val=" + val + ']'); + + int updated = jdbcTemplate.update("update PERSONS set firstName = ?, lastName = ? where id = ?", + val.getFirstName(), val.getLastName(), val.getId()); + + if (updated == 0) { + jdbcTemplate.update("insert into PERSONS (id, firstName, lastName) values (?, ?, ?)", + val.getId(), val.getFirstName(), val.getLastName()); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + System.out.println(">>> Store delete [key=" + key + ']'); + + jdbcTemplate.update("delete from PERSONS where id = ?", key); + } + + /** {@inheritDoc} */ + @Override public void loadCache(final IgniteBiInClosure<Long, Person> clo, Object... args) { + if (args == null || args.length == 0 || args[0] == null) + throw new CacheLoaderException("Expected entry count parameter is not provided."); + + int entryCnt = (Integer)args[0]; + + final AtomicInteger cnt = new AtomicInteger(); + + jdbcTemplate.query("select * from PERSONS limit ?", new RowCallbackHandler() { + @Override public void processRow(ResultSet rs) throws SQLException { + Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)); + + clo.apply(person.getId(), person); + + cnt.incrementAndGet(); + } + }, entryCnt); + + System.out.println(">>> Loaded " + cnt + " values into cache."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java new file mode 100644 index 0000000..9be6672 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/CacheSpringStoreExample.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid.store.spring; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cache.store.jdbc.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.datagrid.store.*; +import org.apache.ignite.transactions.*; + +import javax.cache.configuration.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * Demonstrates usage of cache with underlying persistent store configured. + * <p> + * This example uses {@link CacheSpringPersonStore} as a persistent store. + * <p> + * Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheSpringStoreExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheSpringStoreExample.class.getSimpleName(); + + /** Heap size required to run this example. */ + public static final int MIN_MEMORY = 1024 * 1024 * 1024; + + /** Number of entries to load. */ + private static final int ENTRY_COUNT = 100_000; + + /** Global person ID to use across entire example. */ + private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + ExamplesUtils.checkMinMemory(MIN_MEMORY); + + // To start ignite with desired configuration uncomment the appropriate line. + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache store example started."); + + CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>(CACHE_NAME); + + // Set atomicity as transaction, since we are showing transactions in example. + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + // Configure JDBC store. + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(CacheSpringPersonStore.class)); + + // Configure JDBC session listener. + cacheCfg.setCacheStoreSessionListenerFactories(new Factory<CacheStoreSessionListener>() { + @Override public CacheStoreSessionListener create() { + CacheJdbcStoreSessionListener lsnr = new CacheJdbcStoreSessionListener(); + + lsnr.setDataSource(CacheSpringPersonStore.DATA_SRC); + + return lsnr; + } + }); + + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + + try (IgniteCache<Long, Person> cache = ignite.getOrCreateCache(cacheCfg)) { + // Make initial cache loading from persistent store. This is a + // distributed operation and will call CacheStore.loadCache(...) + // method on all nodes in topology. + loadCache(cache); + + // Start transaction and execute several cache operations with + // read/write-through to persistent store. + executeTransaction(cache); + } + } + } + + /** + * Makes initial cache loading. + * + * @param cache Cache to load. + */ + private static void loadCache(IgniteCache<Long, Person> cache) { + long start = System.currentTimeMillis(); + + // Start loading cache from persistent store on all caching nodes. + cache.loadCache(null, ENTRY_COUNT); + + long end = System.currentTimeMillis(); + + System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms."); + } + + /** + * Executes transaction with read/write-through to persistent store. + * + * @param cache Cache to execute transaction on. + */ + private static void executeTransaction(IgniteCache<Long, Person> cache) { + try (Transaction tx = Ignition.ignite().transactions().txStart()) { + Person val = cache.get(id); + + System.out.println("Read value: " + val); + + val = cache.getAndPut(id, new Person(id, "Isaac", "Newton")); + + System.out.println("Overwrote old value: " + val); + + val = cache.get(id); + + System.out.println("Read value: " + val); + + tx.commit(); + } + + System.out.println("Read value after commit: " + cache.get(id)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java new file mode 100644 index 0000000..211239f --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/spring/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains Spring-based cache store implementation. + */ +package org.apache.ignite.examples.datagrid.store.spring; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5b57817..4457f98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -567,7 +567,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx, CU.createStoreSessionListeners(ctx, + sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", @@ -813,6 +813,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel); } + CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners()); + sharedCtx.cleanup(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 6968fcb..7096da5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1795,13 +1795,14 @@ public class GridCacheUtils { } /** - * Creates store session listeners. + * Creates and starts store session listeners. * * @param ctx Kernal context. * @param factories Factories. * @return Listeners. + * @throws IgniteCheckedException In case of error. */ - public static Collection<CacheStoreSessionListener> createStoreSessionListeners(GridKernalContext ctx, + public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx, Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException { if (factories == null) return null; @@ -1823,4 +1824,24 @@ public class GridCacheUtils { return lsnrs; } + + /** + * Stops store session listeners. + * + * @param ctx Kernal context. + * @param sesLsnrs Session listeners. + * @throws IgniteCheckedException In case of error. + */ + public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs) + throws IgniteCheckedException { + if (sesLsnrs == null) + return; + + for (CacheStoreSessionListener lsnr : sesLsnrs) { + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).stop(); + + ctx.resource().cleanupGeneric(lsnr); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 11d711c..bc5a0a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -70,6 +70,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** */ private Collection<CacheStoreSessionListener> sesLsnrs; + /** */ + private boolean globalSesLsnrs; + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException { @@ -166,10 +169,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt "Persistence store is configured, but both read-through and write-through are disabled."); } - sesLsnrs = CU.createStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories()); + sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories()); - if (sesLsnrs == null) + if (sesLsnrs == null) { sesLsnrs = cctx.shared().storeSessionListeners(); + + globalSesLsnrs = true; + } } /** {@inheritDoc} */ @@ -187,18 +193,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } } - if (sesLsnrs != null) { - for (CacheStoreSessionListener lsnr : sesLsnrs) { - if (lsnr instanceof LifecycleAware) - ((LifecycleAware)lsnr).stop(); - - try { - cctx.kernalContext().resource().cleanupGeneric(lsnr); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to remove injected resources from store session listener (ignoring): " + - lsnr, e); - } + if (!globalSesLsnrs) { + try { + CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e); } } } @@ -721,7 +721,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt lsnr.onSessionEnd(locSes, commit); } - if (!sesHolder.get().storeEnded(store)) + if (!sesHolder.get().ended(store)) store.sessionEnd(commit); } catch (Throwable e) { @@ -788,13 +788,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sesHolder.set(ses); - if (!ses.started()) { - if (sesLsnrs != null) { - for (CacheStoreSessionListener lsnr : sesLsnrs) - lsnr.onSessionStart(locSes); - } - - ses.onStarted(); + if (sesLsnrs != null && !ses.started(this)) { + for (CacheStoreSessionListener lsnr : sesLsnrs) + lsnr.onSessionStart(locSes); } } @@ -809,7 +805,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt lsnr.onSessionEnd(locSes, !threwEx); } - assert !sesHolder.get().storeEnded(store); + assert !sesHolder.get().ended(store); store.sessionEnd(!threwEx); } @@ -858,10 +854,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private Object attachment; /** */ - private boolean started; + private final Set<CacheStoreManager> started = + new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>()); /** */ - private final Set<CacheStore> endedStores = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>()); + private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>()); /** * @param tx Current transaction. @@ -918,24 +915,18 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** - */ - private void onStarted() { - started = true; - } - - /** * @return If session is started. */ - private boolean started() { - return started; + private boolean started(CacheStoreManager mgr) { + return !started.add(mgr); } /** * @param store Cache store. * @return Whether session already ended on this store instance. */ - private boolean storeEnded(CacheStore store) { - return !endedStores.add(store); + private boolean ended(CacheStore store) { + return !ended.add(store); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java new file mode 100644 index 0000000..814c8a5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifeCycleSelfTest.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * Store session listeners test. + */ +public class CacheStoreSessionListenerLifecycleSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final Queue<String> evts = new ConcurrentLinkedDeque<>(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheStoreSessionListenerFactories( + new SessionListenerFactory("Shared 1"), + new SessionListenerFactory("Shared 2") + ); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + evts.clear(); + } + + /** + * @throws Exception If failed. + */ + public void testNoCaches() throws Exception { + try { + startGrid(); + } + finally { + stopGrid(); + } + + assertEqualsCollections(Arrays.asList("Shared 1 START", "Shared 2 START", "Shared 1 STOP", "Shared 2 STOP"), + evts); + } + + /** + * @throws Exception If failed. + */ + public void testNoOverride() throws Exception { + try { + Ignite ignite = startGrid(); + + for (int i = 0; i < 2; i++) { + CacheConfiguration<Integer, Integer> cacheCfg = cacheConfiguration("cache-" + i); + + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + ignite.createCache(cacheCfg); + } + + ignite.cache("cache-0").put(1, 1); + ignite.cache("cache-1").put(1, 1); + + try (Transaction tx = ignite.transactions().txStart()) { + ignite.cache("cache-0").put(2, 2); + ignite.cache("cache-0").put(3, 3); + ignite.cache("cache-1").put(2, 2); + ignite.cache("cache-1").put(3, 3); + + tx.commit(); + } + } + finally { + stopGrid(); + } + + assertEqualsCollections(Arrays.asList( + "Shared 1 START", + "Shared 2 START", + + // Put to cache-0. + "Shared 1 SESSION START cache-0", + "Shared 2 SESSION START cache-0", + "Shared 1 SESSION END cache-0", + "Shared 2 SESSION END cache-0", + + // Put to cache-1. + "Shared 1 SESSION START cache-1", + "Shared 2 SESSION START cache-1", + "Shared 1 SESSION END cache-1", + "Shared 2 SESSION END cache-1", + + // Transaction. + "Shared 1 SESSION START cache-0", + "Shared 2 SESSION START cache-0", + "Shared 1 SESSION START cache-1", + "Shared 2 SESSION START cache-1", + "Shared 1 SESSION END cache-0", + "Shared 2 SESSION END cache-0", + "Shared 1 SESSION END cache-1", + "Shared 2 SESSION END cache-1", + + "Shared 1 STOP", + "Shared 2 STOP" + ), evts); + } + + /** + * @throws Exception If failed. + */ + public void testPartialOverride() throws Exception { + try { + Ignite ignite = startGrid(); + + for (int i = 0; i < 2; i++) { + String name = "cache-" + i; + + CacheConfiguration cacheCfg = cacheConfiguration(name); + + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + if (i == 0) { + cacheCfg.setCacheStoreSessionListenerFactories( + new SessionListenerFactory(name + " 1"), + new SessionListenerFactory(name + " 2") + ); + } + + ignite.createCache(cacheCfg); + } + + ignite.cache("cache-0").put(1, 1); + ignite.cache("cache-1").put(1, 1); + + try (Transaction tx = ignite.transactions().txStart()) { + ignite.cache("cache-0").put(2, 2); + ignite.cache("cache-0").put(3, 3); + ignite.cache("cache-1").put(2, 2); + ignite.cache("cache-1").put(3, 3); + + tx.commit(); + } + } + finally { + stopGrid(); + } + + assertEqualsCollections(Arrays.asList( + "Shared 1 START", + "Shared 2 START", + "cache-0 1 START", + "cache-0 2 START", + + // Put to cache-0. + "cache-0 1 SESSION START cache-0", + "cache-0 2 SESSION START cache-0", + "cache-0 1 SESSION END cache-0", + "cache-0 2 SESSION END cache-0", + + // Put to cache-1. + "Shared 1 SESSION START cache-1", + "Shared 2 SESSION START cache-1", + "Shared 1 SESSION END cache-1", + "Shared 2 SESSION END cache-1", + + // Transaction. + "cache-0 1 SESSION START cache-0", + "cache-0 2 SESSION START cache-0", + "Shared 1 SESSION START cache-1", + "Shared 2 SESSION START cache-1", + "cache-0 1 SESSION END cache-0", + "cache-0 2 SESSION END cache-0", + "Shared 1 SESSION END cache-1", + "Shared 2 SESSION END cache-1", + + "cache-0 1 STOP", + "cache-0 2 STOP", + "Shared 1 STOP", + "Shared 2 STOP" + ), evts); + } + + /** + * @throws Exception If failed. + */ + public void testOverride() throws Exception { + try { + Ignite ignite = startGrid(); + + for (int i = 0; i < 2; i++) { + String name = "cache-" + i; + + CacheConfiguration cacheCfg = cacheConfiguration(name); + + cacheCfg.setCacheStoreSessionListenerFactories(new SessionListenerFactory(name + " 1"), new SessionListenerFactory(name + " 2")); + + ignite.createCache(cacheCfg); + } + + ignite.cache("cache-0").put(1, 1); + ignite.cache("cache-1").put(1, 1); + + try (Transaction tx = ignite.transactions().txStart()) { + ignite.cache("cache-0").put(2, 2); + ignite.cache("cache-0").put(3, 3); + ignite.cache("cache-1").put(2, 2); + ignite.cache("cache-1").put(3, 3); + + tx.commit(); + } + } + finally { + stopGrid(); + } + + assertEqualsCollections(Arrays.asList( + "Shared 1 START", + "Shared 2 START", + "cache-0 1 START", + "cache-0 2 START", + "cache-1 1 START", + "cache-1 2 START", + + // Put to cache-0. + "cache-0 1 SESSION START cache-0", + "cache-0 2 SESSION START cache-0", + "cache-0 1 SESSION END cache-0", + "cache-0 2 SESSION END cache-0", + + // Put to cache-1. + "cache-1 1 SESSION START cache-1", + "cache-1 2 SESSION START cache-1", + "cache-1 1 SESSION END cache-1", + "cache-1 2 SESSION END cache-1", + + // Transaction. + "cache-0 1 SESSION START cache-0", + "cache-0 2 SESSION START cache-0", + "cache-1 1 SESSION START cache-1", + "cache-1 2 SESSION START cache-1", + "cache-0 1 SESSION END cache-0", + "cache-0 2 SESSION END cache-0", + "cache-1 1 SESSION END cache-1", + "cache-1 2 SESSION END cache-1", + + "cache-0 1 STOP", + "cache-0 2 STOP", + "cache-1 1 STOP", + "cache-1 2 STOP", + "Shared 1 STOP", + "Shared 2 STOP" + ), evts); + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(String name) { + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(name); + + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(Store.class)); + cacheCfg.setWriteThrough(true); + + return cacheCfg; + } + + /** + */ + private static class SessionListener implements CacheStoreSessionListener, LifecycleAware { + /** */ + private final String name; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param name Name. + */ + private SessionListener(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + assertNotNull(ignite); + + evts.add(name + " START"); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + assertNotNull(ignite); + + evts.add(name + " STOP"); + } + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + assertNotNull(ignite); + + evts.add(name + " SESSION START " + ses.cacheName()); + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + assertNotNull(ignite); + + evts.add(name + " SESSION END " + ses.cacheName()); + } + } + + /** + */ + private static class SessionListenerFactory implements Factory<CacheStoreSessionListener> { + /** */ + private String name; + + /** + * @param name Name. + */ + private SessionListenerFactory(String name) { + this.name = name; + } + + @Override public CacheStoreSessionListener create() { + return new SessionListener(name); + } + } + + /** + */ + public static class Store extends CacheStoreAdapter<Integer, Integer> { + public Store() { + } + + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) + throws CacheWriterException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java index f72ea47..f2de8ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCrossCacheTxStoreSelfTest.java @@ -266,30 +266,6 @@ public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest { } /** - * @param col1 Collection 1. - * @param col2 Collection 2. - */ - private static void assertEqualsCollections(Collection<?> col1, Collection<?> col2) { - if (col1.size() != col2.size()) - fail("Collections are not equal:\nExpected:\t" + col1 + "\nActual:\t" + col2); - - Iterator<?> it1 = col1.iterator(); - Iterator<?> it2 = col2.iterator(); - - int idx = 0; - - while (it1.hasNext()) { - Object item1 = it1.next(); - Object item2 = it2.next(); - - if (!F.eq(item1, item2)) - fail("Collections are not equal (position " + idx + "):\nExpected: " + col1 + "\nActual: " + col2); - - idx++; - } - } - - /** * */ private static class TestStore implements CacheStore<Object, Object> { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 5533897..a19ea23 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -858,4 +858,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK) U.sleep(50); } + + /** + * @param exp Expected. + * @param act Actual. + */ + protected void assertEqualsCollections(Collection<?> exp, Collection<?> act) { + if (exp.size() != act.size()) + fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act); + + Iterator<?> it1 = exp.iterator(); + Iterator<?> it2 = act.iterator(); + + int idx = 0; + + while (it1.hasNext()) { + Object item1 = it1.next(); + Object item2 = it2.next(); + + if (!F.eq(item1, item2)) + fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual: " + act); + + idx++; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ada1b2a7/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java index 81736cd..90431d7 100644 --- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java +++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java @@ -154,7 +154,7 @@ public class CacheSpringStoreSessionListener implements CacheStoreSessionListene /** {@inheritDoc} */ @Override public void onSessionStart(CacheStoreSession ses) { - if (ses.isWithinTransaction()) { + if (ses.isWithinTransaction() && ses.attachment() == null) { try { TransactionDefinition def = definition(ses.transaction(), ses.cacheName());