Merge branch 'sprint-1' into ignite-188
Conflicts:
modules/core/src/main/java/org/apache/ignite/IgniteCache.java
modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
modules/core/src/main/java/org/apache/ignite/IgniteSet.java
modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java
modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/33834a34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/33834a34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/33834a34
Branch: refs/heads/ignite-188
Commit: 33834a3449031786d2b1cab919a09beb15bd8df7
Parents: 0704570 e4365c3
Author: vozerov-gridgain <[email protected]>
Authored: Fri Feb 6 15:58:20 2015 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Fri Feb 6 15:58:20 2015 +0300
----------------------------------------------------------------------
bin/include/ggservice.exe | Bin 5632 -> 0 bytes
bin/include/igniteservice.exe | Bin 0 -> 6144 bytes
.../IgniteAtomicReferenceExample.java | 2 +-
.../IgniteAtomicStampedExample.java | 2 +-
.../MessagingPingPongListenActorExample.java | 4 +-
.../streaming/StreamingCheckInExample.java | 2 +-
.../StreamingPopularNumbersExample.java | 2 +-
.../streaming/StreamingPriceBarsExample.java | 2 +-
.../apache/ignite/examples/ComputeExample.java | 6 +-
.../examples/ScalarCacheAffinityExample1.scala | 3 +-
.../java/org/apache/ignite/IgniteCluster.java | 6 +-
.../java/org/apache/ignite/IgniteCompute.java | 18 +-
.../java/org/apache/ignite/IgniteQueue.java | 4 +-
.../main/java/org/apache/ignite/IgniteSet.java | 2 +-
.../ignite/cache/eviction/EvictableEntry.java | 2 +-
.../store/jdbc/CacheAbstractJdbcStore.java | 1567 ++++++++++++++++++
.../cache/store/jdbc/CacheJdbcPojoStore.java | 205 +++
.../ignite/cache/store/jdbc/JdbcCacheStore.java | 1560 -----------------
.../cache/store/jdbc/JdbcPojoCacheStore.java | 205 ---
.../ignite/internal/IgniteComputeImpl.java | 21 +-
.../apache/ignite/internal/IgniteKernal.java | 16 +-
.../processors/cache/IgniteCacheProxy.java | 6 -
.../GridDistributedCacheAdapter.java | 46 +-
.../ignite/internal/util/IgniteUtils.java | 2 +-
.../internal/util/lang/GridAbsClosure.java | 4 +-
.../util/nodestart/GridNodeCallable.java | 29 -
.../util/nodestart/GridNodeStartUtils.java | 390 -----
.../nodestart/GridRemoteStartSpecification.java | 279 ----
.../util/nodestart/IgniteNodeCallable.java | 30 +
.../util/nodestart/IgniteNodeStartUtils.java | 391 +++++
.../IgniteRemoteStartSpecification.java | 279 ++++
.../util/nodestart/IgniteSshProcessor.java | 2 +-
.../apache/ignite/internal/util/typedef/CA.java | 3 +-
.../ignite/internal/util/typedef/CAX.java | 2 +-
.../org/apache/ignite/lang/IgniteRunnable.java | 2 +-
.../optimized/OptimizedObjectInputStream.java | 2 +
.../ignite/messaging/MessagingListenActor.java | 18 +-
.../core/src/test/bin/start-nodes-custom.bat | 2 +-
modules/core/src/test/config/start-nodes.ini | 6 +-
.../core/src/test/config/store/jdbc/Ignite.xml | 50 +
...ractJdbcCacheStoreMultithreadedSelfTest.java | 196 ---
.../CacheJdbcPojoStoreMultitreadedSelfTest.java | 35 +
.../store/jdbc/CacheJdbcPojoStoreTest.java | 759 +++++++++
...eJdbcStoreAbstractMultithreadedSelfTest.java | 275 +++
.../PojoJdbcCacheStoreMultitreadedSelfTest.java | 34 -
.../store/jdbc/PojoJdbcCacheStoreTest.java | 702 --------
.../store/jdbc/model/PersonComplexKey.java | 146 ++
.../GridJobMasterLeaveAwareSelfTest.java | 4 +-
.../internal/GridListenActorSelfTest.java | 11 +-
.../internal/GridManagementJobSelfTest.java | 7 +-
.../internal/GridProjectionAbstractTest.java | 18 +-
.../GridTaskExecutionContextSelfTest.java | 6 +-
.../IgniteComputeEmptyClusterGroupTest.java | 4 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 8 +-
...eAbstractDataStructuresFailoverSelfTest.java | 2 +-
...GridCacheQueueMultiNodeAbstractSelfTest.java | 2 +-
.../GridCacheSetAbstractSelfTest.java | 3 +-
...xOriginatingNodeFailureAbstractSelfTest.java | 3 +-
...cOriginatingNodeFailureAbstractSelfTest.java | 4 +-
.../near/GridCacheNearEvictionSelfTest.java | 5 +-
.../GridCachePartitionedAffinitySelfTest.java | 2 +-
.../GridCachePartitionedEntryLockSelfTest.java | 3 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 2 +-
.../closure/GridClosureProcessorSelfTest.java | 14 +-
.../loadtests/colocation/GridTestMain.java | 2 +-
.../marshaller/GridMarshallerAbstractTest.java | 2 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 3 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
.../GridHibernateAccessStrategyAdapter.java | 3 +-
.../ignite/scalar/ScalarConversions.scala | 8 +-
.../ignite/scalar/lang/ScalarOutClosure.scala | 4 +-
.../scalar/lang/ScalarOutClosureFunction.scala | 4 +-
.../ignite/schema/model/PojoDescriptor.java | 8 +-
.../org/apache/ignite/schema/ui/Controls.java | 4 +-
.../apache/ignite/schema/ui/SchemaLoadApp.java | 137 +-
.../schema/load/AbstractSchemaLoaderTest.java | 4 +-
.../util/nodestart/GridNodeCallableImpl.java | 344 ----
.../util/nodestart/IgniteNodeCallableImpl.java | 344 ++++
.../util/nodestart/IgniteSshProcessorImpl.java | 4 +-
.../internal/GridNodeStartUtilsSelfTest.java | 89 -
.../GridProjectionStartStopRestartSelfTest.java | 1032 ------------
.../internal/IgniteNodeStartUtilsSelfTest.java | 88 +
...gniteProjectionStartStopRestartSelfTest.java | 1032 ++++++++++++
.../IgniteStartStopRestartTestSuite.java | 4 +-
.../commands/start/VisorStartCommand.scala | 4 +-
.../commands/open/VisorOpenCommandSpec.scala | 6 +-
modules/winservice/IgniteService.sln | 22 +
.../winservice/IgniteService/IgniteService.cs | 170 ++
.../IgniteService/IgniteService.csproj | 90 +
modules/winservice/README.md | 3 +
.../yardstick/config/benchmark-store.properties | 35 +-
.../yardstick/config/ignite-store-config.xml | 11 +-
.../jdbc/IgniteJdbcStoreAbstractBenchmark.java | 95 +-
.../store/jdbc/IgniteJdbcStoreGetBenchmark.java | 10 +-
.../jdbc/IgniteJdbcStoreGetTxBenchmark.java | 47 +
.../store/jdbc/IgniteJdbcStorePutBenchmark.java | 5 +
.../jdbc/IgniteJdbcStorePutGetBenchmark.java | 5 +
.../jdbc/IgniteJdbcStorePutGetTxBenchmark.java | 52 +
.../jdbc/IgniteJdbcStorePutTxBenchmark.java | 47 +
.../yardstick/compute/IgniteRunBenchmark.java | 5 +-
.../yardstick/compute/model/NoopCallable.java | 5 +-
101 files changed, 6029 insertions(+), 5118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java
----------------------------------------------------------------------
diff --cc
examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java
index 9b5e3d8,0000000..866a426
mode 100644,000000..100644
---
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java
+++
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java
@@@ -1,110 -1,0 +1,110 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.datagrid.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Demonstrates a simple usage of distributed atomic reference.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat}
examples/config/example-cache.xml'}.
+ * <p>
+ * Alternatively you can run {@link CacheNodeStartup} in another JVM which
will
+ * start node with {@code examples/config/example-cache.xml} configuration.
+ */
+public final class IgniteAtomicReferenceExample {
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws Exception If example execution failed.
+ */
+ public static void main(String[] args) throws Exception {
+ try (Ignite ignite =
Ignition.start("examples/config/example-cache.xml")) {
+ System.out.println();
+ System.out.println(">>> Atomic reference example started.");
+
+ // Make name of atomic reference.
+ final String refName = UUID.randomUUID().toString();
+
+ // Make value of atomic reference.
+ String val = UUID.randomUUID().toString();
+
+ // Initialize atomic reference.
+ IgniteAtomicReference<String> ref =
ignite.atomicReference(refName, val, true);
+
+ System.out.println("Atomic reference initial value : " +
ref.get() + '.');
+
+ // Make closure for checking atomic reference value on cluster.
- Runnable c = new ReferenceClosure(refName);
++ IgniteRunnable c = new ReferenceClosure(refName);
+
+ // Check atomic reference on all cluster nodes.
+ ignite.compute().run(c);
+
+ // Make new value of atomic reference.
+ String newVal = UUID.randomUUID().toString();
+
+ System.out.println("Try to change value of atomic reference with
wrong expected value.");
+
+ ref.compareAndSet("WRONG EXPECTED VALUE", newVal); // Won't
change.
+
+ // Check atomic reference on all cluster nodes.
+ // Atomic reference value shouldn't be changed.
+ ignite.compute().run(c);
+
+ System.out.println("Try to change value of atomic reference with
correct expected value.");
+
+ ref.compareAndSet(val, newVal);
+
+ // Check atomic reference on all cluster nodes.
+ // Atomic reference value should be changed.
+ ignite.compute().run(c);
+ }
+
+ System.out.println();
+ System.out.println("Finished atomic reference example...");
+ System.out.println("Check all nodes for output (this node is also
part of the cluster).");
+ }
+
+ /**
+ * Obtains atomic reference.
+ */
+ private static class ReferenceClosure implements IgniteRunnable {
+ /** Reference name. */
+ private final String refName;
+
+ /**
+ * @param refName Reference name.
+ */
+ ReferenceClosure(String refName) {
+ this.refName = refName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteAtomicReference<String> ref =
Ignition.ignite().atomicReference(refName, null, true);
+
+ System.out.println("Atomic reference value is " + ref.get() +
'.');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java
----------------------------------------------------------------------
diff --cc
examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java
index ed38282,0000000..14f52b1
mode 100644,000000..100644
---
a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java
+++
b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicStampedExample.java
@@@ -1,117 -1,0 +1,117 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.datagrid.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Demonstrates a simple usage of distributed atomic stamped.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat}
examples/config/example-cache.xml'}.
+ * <p>
+ * Alternatively you can run {@link CacheNodeStartup} in another JVM which
will
+ * start node with {@code examples/config/example-cache.xml} configuration.
+ */
+public final class IgniteAtomicStampedExample {
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws Exception If example execution failed.
+ */
+ public static void main(String[] args) throws Exception {
+ try (Ignite ignite =
Ignition.start("examples/config/example-cache.xml")) {
+ System.out.println();
+ System.out.println(">>> Atomic stamped example started.");
+
+ // Make name of atomic stamped.
+ String stampedName = UUID.randomUUID().toString();
+
+ // Make value of atomic stamped.
+ String val = UUID.randomUUID().toString();
+
+ // Make stamp of atomic stamped.
+ String stamp = UUID.randomUUID().toString();
+
+ // Initialize atomic stamped.
+ IgniteAtomicStamped<String, String> stamped =
ignite.atomicStamped(stampedName, val, stamp, true);
+
+ System.out.println("Atomic stamped initial [value=" +
stamped.value() + ", stamp=" + stamped.stamp() + ']');
+
+ // Make closure for checking atomic stamped.
- Runnable c = new StampedUpdateClosure(stampedName);
++ IgniteRunnable c = new StampedUpdateClosure(stampedName);
+
+ // Check atomic stamped on all cluster nodes.
+ ignite.compute().broadcast(c);
+
+ // Make new value of atomic stamped.
+ String newVal = UUID.randomUUID().toString();
+
+ // Make new stamp of atomic stamped.
+ String newStamp = UUID.randomUUID().toString();
+
+ System.out.println("Try to change value and stamp of atomic
stamped with wrong expected value and stamp.");
+
+ stamped.compareAndSet("WRONG EXPECTED VALUE", newVal, "WRONG
EXPECTED STAMP", newStamp);
+
+ // Check atomic stamped on all cluster nodes.
+ // Atomic stamped value and stamp shouldn't be changed.
+ ignite.compute().run(c);
+
+ System.out.println("Try to change value and stamp of atomic
stamped with correct value and stamp.");
+
+ stamped.compareAndSet(val, newVal, stamp, newStamp);
+
+ // Check atomic stamped on all cluster nodes.
+ // Atomic stamped value and stamp should be changed.
+ ignite.compute().run(c);
+ }
+
+ System.out.println();
+ System.out.println("Finished atomic stamped example...");
+ System.out.println("Check all nodes for output (this node is also
part of the cluster).");
+ }
+
+ /**
+ * Performs update of on an atomic stamped variable in cache.
+ */
+ private static class StampedUpdateClosure implements IgniteRunnable {
+ /** Atomic stamped variable name. */
+ private final String stampedName;
+
+ /**
+ * @param stampedName Atomic stamped variable name.
+ */
+ StampedUpdateClosure(String stampedName) {
+ this.stampedName = stampedName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteAtomicStamped<String, String> stamped = Ignition.ignite().
+ atomicStamped(stampedName, null, null, true);
+
+ System.out.println("Atomic stamped [value=" + stamped.value() +
", stamp=" + stamped.stamp() + ']');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
index b15d06c,26fedf7..299993a
--- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java
@@@ -66,9 -67,9 +66,9 @@@ import java.util.concurrent.*
* Instances of distributed cache queues can be created by calling the
following method
* on {@link Ignite} API:
* <ul>
- * <li>{@link Ignite#queue(String,
org.apache.ignite.configuration.CollectionConfiguration, int, boolean)}</li>
- * <li>{@link Ignite#queue(String, int,
IgniteCollectionConfiguration)}</li>
++ * <li>{@link Ignite#queue(String, CollectionConfiguration, int,
boolean)}</li>
* </ul>
- * @see Ignite#queue(String,
org.apache.ignite.configuration.CollectionConfiguration, int, boolean)
- * @see Ignite#queue(String, int, IgniteCollectionConfiguration)
++ * @see Ignite#queue(String, CollectionConfiguration, int, boolean)
*/
public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/IgniteSet.java
index 003ce93,1228958..48f27ad
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java
@@@ -31,7 -34,7 +31,7 @@@ import java.util.*
* (governed by {@code collocated} parameter). {@code Non-collocated} mode is
provided only
* for partitioned caches. If {@code collocated} parameter is {@code true},
then all set items
* will be collocated on one node, otherwise items will be distributed
through all grid nodes.
- * @see Ignite#set(String,
org.apache.ignite.configuration.CollectionConfiguration, boolean)
- * @see Ignite#set(String, IgniteCollectionConfiguration)
++ * @see Ignite#set(String, CollectionConfiguration, boolean)
*/
public interface IgniteSet<T> extends Set<T>, Closeable {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 0000000,3d60a74..ab21fe1
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@@ -1,0 -1,1567 +1,1567 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.cache.store.jdbc;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cache.store.*;
+ import org.apache.ignite.cache.store.jdbc.dialect.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.lifecycle.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.transactions.*;
+ import org.jetbrains.annotations.*;
+
+ import javax.cache.*;
+ import javax.cache.integration.*;
+ import javax.sql.*;
+ import java.sql.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.locks.*;
+
+ import static java.sql.Statement.*;
+
+ /**
+ * Base {@link CacheStore} implementation backed by JDBC. This implementation
stores objects in underlying database
+ * using mapping description.
+ * <p>
+ * <h2 class="header">Configuration</h2>
+ * Sections below describe mandatory and optional configuration settings as
well
+ * as providing example using Java and Spring XML.
+ * <h3>Mandatory</h3>
+ * There are no mandatory configuration parameters.
+ * <h3>Optional</h3>
+ * <ul>
+ * <li>Data source (see {@link #setDataSource(DataSource)}</li>
+ * <li>Maximum batch size for writeAll and deleteAll operations. (see
{@link #setBatchSize(int)})</li>
+ * <li>Max workers thread count. These threads are responsible for load
cache. (see {@link #setMaxPoolSize(int)})</li>
+ * <li>Parallel load cache minimum threshold. (see {@link
#setParallelLoadCacheMinimumThreshold(int)})</li>
+ * </ul>
+ * <h2 class="header">Java Example</h2>
+ * <pre name="code" class="java">
+ * ...
+ * JdbcPojoCacheStore store = new JdbcPojoCacheStore();
+ * ...
+ *
+ * </pre>
+ * <h2 class="header">Spring Example</h2>
+ * <pre name="code" class="xml">
+ * ...
+ * <bean id="cache.jdbc.store"
+ *
class="org.apache.ignite.cache.store.jdbc.JdbcPojoCacheStore">
+ * <property name="connectionUrl"
value="jdbc:h2:mem:"/>
+ * </bean>
+ * ...
+ * </pre>
+ * <p>
+ * For information about Spring framework visit <a
href="http://www.springframework.org/">www.springframework.org</a>
+ */
+ public abstract class CacheAbstractJdbcStore<K, V> extends CacheStore<K, V>
implements LifecycleAware {
+ /** Max attempt write count. */
+ protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
+
+ /** Default batch size for put and remove operations. */
+ protected static final int DFLT_BATCH_SIZE = 512;
+
+ /** Default batch size for put and remove operations. */
+ protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD =
512;
+
+ /** Connection attribute property name. */
+ protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+
+ /** Empty column value. */
+ protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null
};
+
+ /** Auto-injected logger instance. */
- @IgniteLoggerResource
++ @LoggerResource
+ protected IgniteLogger log;
+
+ /** Lock for metadata cache. */
+ @GridToStringExclude
+ private final Lock cacheMappingsLock = new ReentrantLock();
+
+ /** Data source. */
+ protected DataSource dataSrc;
+
+ /** Cache with entry mapping description. (cache name, (key id, mapping
description)). */
+ protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings =
Collections.emptyMap();
+
+ /** Database dialect. */
+ protected JdbcDialect dialect;
+
+ /** Max workers thread count. These threads are responsible for load
cache. */
+ private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+ /** Maximum batch size for writeAll and deleteAll operations. */
+ private int batchSz = DFLT_BATCH_SIZE;
+
+ /** Parallel load cache minimum threshold. If {@code 0} then load
sequentially. */
+ private int parallelLoadCacheMinThreshold =
DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+ /**
+ * Get field value from object.
+ *
+ * @param typeName Type name.
+ * @param fieldName Field name.
+ * @param obj Cache object.
+ * @return Field value from object.
+ */
+ @Nullable protected abstract Object extractField(String typeName, String
fieldName, Object obj)
+ throws CacheException;
+
+ /**
+ * Construct object from query result.
+ *
+ * @param <R> Type of result object.
+ * @param typeName Type name.
+ * @param fields Fields descriptors.
+ * @param loadColIdxs Select query columns index.
+ * @param rs ResultSet.
+ * @return Constructed object.
+ */
+ protected abstract <R> R buildObject(String typeName,
Collection<CacheTypeFieldMetadata> fields,
+ Map<String, Integer> loadColIdxs, ResultSet rs) throws
CacheLoaderException;
+
+ /**
+ * Extract key type id from key object.
+ *
+ * @param key Key object.
+ * @return Key type id.
+ */
+ protected abstract Object keyTypeId(Object key) throws CacheException;
+
+ /**
+ * Extract key type id from key class name.
+ *
+ * @param type String description of key type.
+ * @return Key type id.
+ */
+ protected abstract Object keyTypeId(String type) throws CacheException;
+
+ /**
+ * Prepare internal store specific builders for provided types metadata.
+ *
+ * @param types Collection of types.
+ * @throws CacheException If failed to prepare.
+ */
+ protected abstract void prepareBuilders(@Nullable String cacheName,
Collection<CacheTypeMetadata> types)
+ throws CacheException;
+
+ /**
+ * Perform dialect resolution.
+ *
+ * @return The resolved dialect.
+ * @throws CacheException Indicates problems accessing the metadata.
+ */
+ protected JdbcDialect resolveDialect() throws CacheException {
+ Connection conn = null;
+
+ String dbProductName = null;
+
+ try {
+ conn = openConnection(false);
+
+ dbProductName = conn.getMetaData().getDatabaseProductName();
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed access to metadata for detect
database dialect.", e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+
+ if ("H2".equals(dbProductName))
+ return new H2Dialect();
+
+ if ("MySQL".equals(dbProductName))
+ return new MySQLDialect();
+
+ if (dbProductName.startsWith("Microsoft SQL Server"))
+ return new SQLServerDialect();
+
+ if ("Oracle".equals(dbProductName))
+ return new OracleDialect();
+
+ if (dbProductName.startsWith("DB2/"))
+ return new DB2Dialect();
+
+ U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be
used): " + dbProductName);
+
+ return new BasicJdbcDialect();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Failed to initialize cache store (data
source is not provided).");
+
+ if (dialect == null)
+ dialect = resolveDialect();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /**
+ * Gets connection from a pool.
+ *
+ * @param autocommit {@code true} If connection should use autocommit
mode.
+ * @return Pooled connection.
+ * @throws SQLException In case of error.
+ */
+ protected Connection openConnection(boolean autocommit) throws
SQLException {
+ Connection conn = dataSrc.getConnection();
+
+ conn.setAutoCommit(autocommit);
+
+ return conn;
+ }
+
+ /**
+ * @return Connection.
+ * @throws SQLException In case of error.
+ */
+ protected Connection connection() throws SQLException {
+ CacheStoreSession ses = session();
+
+ if (ses.transaction() != null) {
+ Map<String, Connection> prop = ses.properties();
+
+ Connection conn = prop.get(ATTR_CONN_PROP);
+
+ if (conn == null) {
+ conn = openConnection(false);
+
+ // Store connection in session to used it for other
operations in the same session.
+ prop.put(ATTR_CONN_PROP, conn);
+ }
+
+ return conn;
+ }
+ // Transaction can be null in case of simple load operation.
+ else
+ return openConnection(true);
+ }
+
+ /**
+ * Closes connection.
+ *
+ * @param conn Connection to close.
+ */
+ protected void closeConnection(@Nullable Connection conn) {
+ CacheStoreSession ses = session();
+
+ // Close connection right away if there is no transaction.
+ if (ses.transaction() == null)
+ U.closeQuiet(conn);
+ }
+
+ /**
+ * Closes allocated resources depending on transaction status.
+ *
+ * @param conn Allocated connection.
+ * @param st Created statement,
+ */
+ protected void end(@Nullable Connection conn, @Nullable Statement st) {
+ U.closeQuiet(st);
+
+ closeConnection(conn);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void txEnd(boolean commit) throws CacheWriterException {
+ CacheStoreSession ses = session();
+
+ IgniteTx tx = ses.transaction();
+
+ Connection conn = ses.<String,
Connection>properties().remove(ATTR_CONN_PROP);
+
+ if (conn != null) {
+ assert tx != null;
+
+ try {
+ if (commit)
+ conn.commit();
+ else
+ conn.rollback();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException(
+ "Failed to end transaction [xid=" + tx.xid() + ",
commit=" + commit + ']', e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+
+ if (tx != null && log.isDebugEnabled())
+ log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" +
commit + ']');
+ }
+
+ /**
+ * Retrieves the value of the designated column in the current row of
this <code>ResultSet</code> object and
+ * will convert to the requested Java data type.
+ *
+ * @param rs Result set.
+ * @param colIdx Column index in result set.
+ * @param type Class representing the Java data type to convert the
designated column to.
+ * @return Value in column.
+ * @throws SQLException If a database access error occurs or this method
is called.
+ */
+ protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type)
throws SQLException {
+ if (type == int.class)
+ return rs.getInt(colIdx);
+
+ if (type == long.class)
+ return rs.getLong(colIdx);
+
+ if (type == double.class)
+ return rs.getDouble(colIdx);
+
+ if (type == boolean.class)
+ return rs.getBoolean(colIdx);
+
+ if (type == byte.class)
+ return rs.getByte(colIdx);
+
+ if (type == short.class)
+ return rs.getShort(colIdx);
+
+ if (type == float.class)
+ return rs.getFloat(colIdx);
+
+ if (type == Integer.class || type == Long.class || type ==
Double.class ||
+ type == Byte.class || type == Short.class || type ==
Float.class) {
+ Object val = rs.getObject(colIdx);
+
+ if (val != null) {
+ Number num = (Number)val;
+
+ if (type == Integer.class)
+ return num.intValue();
+ else if (type == Long.class)
+ return num.longValue();
+ else if (type == Double.class)
+ return num.doubleValue();
+ else if (type == Byte.class)
+ return num.byteValue();
+ else if (type == Short.class)
+ return num.shortValue();
+ else if (type == Float.class)
+ return num.floatValue();
+ }
+ else
+ return EMPTY_COLUMN_VALUE;
+ }
+
+ return rs.getObject(colIdx);
+ }
+
+ /**
+ * Construct load cache from range.
+ *
+ * @param em Type mapping description.
+ * @param clo Closure that will be applied to loaded values.
+ * @param lowerBound Lower bound for range.
+ * @param upperBound Upper bound for range.
+ * @return Callable for pool submit.
+ */
+ private Callable<Void> loadCacheRange(final EntryMapping em, final
IgniteBiInClosure<K, V> clo,
+ @Nullable final Object[] lowerBound, @Nullable final Object[]
upperBound) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = openConnection(true);
+
+ stmt = conn.prepareStatement(lowerBound == null &&
upperBound == null
+ ? em.loadCacheQry
+ : em.loadCacheRangeQuery(lowerBound != null,
upperBound != null));
+
+ int ix = 1;
+
+ if (lowerBound != null)
+ for (int i = lowerBound.length; i > 0; i--)
+ for (int j = 0; j < i; j++)
+ stmt.setObject(ix++, lowerBound[j]);
+
+ if (upperBound != null)
+ for (int i = upperBound.length; i > 0; i--)
+ for (int j = 0; j < i; j++)
+ stmt.setObject(ix++, upperBound[j]);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ K key = buildObject(em.keyType(), em.keyColumns(),
em.loadColIdxs, rs);
+ V val = buildObject(em.valueType(),
em.valueColumns(), em.loadColIdxs, rs);
+
+ clo.apply(key, val);
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Failed to load cache",
e);
+ }
+ finally {
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Construct load cache in one select.
+ *
+ * @param m Type mapping description.
+ * @param clo Closure for loaded values.
+ * @return Callable for pool submit.
+ */
+ private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K,
V> clo) {
+ return loadCacheRange(m, clo, null, null);
+ }
+
+ /**
+ * @return Type mappings for specified cache name.
+ * @throws CacheException If failed to initialize.
+ */
+ private Map<Object, EntryMapping> cacheMappings(@Nullable String
cacheName) throws CacheException {
+ Map<Object, EntryMapping> entryMappings =
cacheMappings.get(cacheName);
+
+ if (entryMappings != null)
+ return entryMappings;
+
+ cacheMappingsLock.lock();
+
+ try {
+ entryMappings = cacheMappings.get(cacheName);
+
+ if (entryMappings != null)
+ return entryMappings;
+
+ Collection<CacheTypeMetadata> types =
ignite().cache(session().cacheName()).configuration()
+ .getTypeMetadata();
+
+ entryMappings = U.newHashMap(types.size());
+
+ for (CacheTypeMetadata type : types)
+ entryMappings.put(keyTypeId(type.getKeyType()), new
EntryMapping(dialect, type));
+
+ Map<String, Map<Object, EntryMapping>> mappings = new
HashMap<>(cacheMappings);
+
+ mappings.put(cacheName, entryMappings);
+
+ prepareBuilders(cacheName, types);
+
+ cacheMappings = mappings;
+
+ return entryMappings;
+ }
+ finally {
+ cacheMappingsLock.unlock();
+ }
+ }
+
+ /**
+ * @param keyTypeId Key type id.
+ * @param key Key object.
+ * @return Entry mapping.
+ * @throws CacheException if mapping for key was not found.
+ */
+ private EntryMapping entryMapping(Object keyTypeId, Object key) throws
CacheException {
+ String cacheName = session().cacheName();
+
+ EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
+
+ if (em == null)
+ throw new CacheException("Failed to find mapping description
[key=" + key +
+ ", cache=" + (cacheName != null ? cacheName : "<default>") +
"]");
+
+ return em;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(final IgniteBiInClosure<K, V> clo,
@Nullable Object... args)
+ throws CacheLoaderException {
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool(maxPoolSz);
+
+ Collection<Future<?>> futs = new ArrayList<>();
+
+ if (args != null && args.length > 0) {
+ if (args.length % 2 != 0)
+ throw new CacheLoaderException("Expected even number of
arguments, but found: " + args.length);
+
+ if (log.isDebugEnabled())
+ log.debug("Start loading entries from db using user
queries from arguments");
+
+ for (int i = 0; i < args.length; i += 2) {
+ String keyType = args[i].toString();
+
+ String selQry = args[i + 1].toString();
+
+ EntryMapping em = entryMapping(keyTypeId(keyType),
keyType);
+
+ futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em,
selQry, clo)));
+ }
+ }
+ else {
+ Collection<EntryMapping> entryMappings =
cacheMappings(session().cacheName()).values();
+
+ if (log.isDebugEnabled())
+ log.debug("Start loading all cache types entries from
db");
+
+ for (EntryMapping em : entryMappings) {
+ if (parallelLoadCacheMinThreshold > 0) {
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ PreparedStatement stmt =
conn.prepareStatement(em.loadCacheSelRangeQry);
+
+ stmt.setInt(1, parallelLoadCacheMinThreshold);
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next()) {
+ int keyCnt = em.keyCols.size();
+
+ Object[] upperBound = new Object[keyCnt];
+
+ for (int i = 0; i < keyCnt; i++)
+ upperBound[i] = rs.getObject(i + 1);
+
+ futs.add(pool.submit(loadCacheRange(em, clo,
null, upperBound)));
+
+ while (rs.next()) {
+ Object[] lowerBound = upperBound;
+
+ upperBound = new Object[keyCnt];
+
+ for (int i = 0; i < keyCnt; i++)
+ upperBound[i] = rs.getObject(i + 1);
+
+ futs.add(pool.submit(loadCacheRange(em,
clo, lowerBound, upperBound)));
+ }
+
+ futs.add(pool.submit(loadCacheRange(em, clo,
upperBound, null)));
+ }
+ else
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ catch (SQLException ignored) {
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+ else
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ }
+
+ for (Future<?> fut : futs)
+ U.get(fut);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException("Failed to load cache",
e.getCause());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V load(K key) throws CacheLoaderException {
+ assert key != null;
+
+ EntryMapping em = entryMapping(keyTypeId(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Start load value from database [table= " +
em.fullTableName() + ", key=" + key + "]");
+
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = connection();
+
+ stmt = conn.prepareStatement(em.loadQrySingle);
+
+ fillKeyParameters(stmt, em, key);
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next())
+ return buildObject(em.valueType(), em.valueColumns(),
em.loadColIdxs, rs);
+ }
+ catch (SQLException e) {
+ throw new CacheLoaderException("Failed to load object [table=" +
em.fullTableName() +
+ ", key=" + key + "]", e);
+ }
+ finally {
+ end(conn, stmt);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws
CacheLoaderException {
+ assert keys != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ Map<Object, LoadWorker<K, V>> workers =
U.newHashMap(cacheMappings(session().cacheName()).size());
+
+ Map<K, V> res = new HashMap<>();
+
+ for (K key : keys) {
+ Object keyTypeId = keyTypeId(key);
+
+ EntryMapping em = entryMapping(keyTypeId, key);
+
+ LoadWorker<K, V> worker = workers.get(keyTypeId);
+
+ if (worker == null)
+ workers.put(keyTypeId, worker = new LoadWorker<>(conn,
em));
+
+ worker.keys.add(key);
+
+ if (worker.keys.size() == em.maxKeysPerStmt)
+ res.putAll(workers.remove(keyTypeId).call());
+ }
+
+ for (LoadWorker<K, V> worker : workers.values())
+ res.putAll(worker.call());
+
+ return res;
+ }
+ catch (Exception e) {
+ throw new CacheWriterException("Failed to load entries from
database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /**
+ * @param insStmt Insert statement.
+ * @param updStmt Update statement.
+ * @param em Entry mapping.
+ * @param entry Cache entry.
+ */
+ private void writeUpsert(PreparedStatement insStmt, PreparedStatement
updStmt,
+ EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws
CacheWriterException {
+ try {
+ CacheWriterException we = null;
+
+ for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT;
attempt++) {
+ int paramIdx = fillValueParameters(updStmt, 1, em,
entry.getValue());
+
+ fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
+
+ if (updStmt.executeUpdate() == 0) {
+ paramIdx = fillKeyParameters(insStmt, em, entry.getKey());
+
+ fillValueParameters(insStmt, paramIdx, em,
entry.getValue());
+
+ try {
+ insStmt.executeUpdate();
+
+ if (attempt > 0)
+ U.warn(log, "Entry was inserted in database on
second try [table=" + em.fullTableName() +
+ ", entry=" + entry + "]");
+ }
+ catch (SQLException e) {
+ String sqlState = e.getSQLState();
+
+ SQLException nested = e.getNextException();
+
+ while (sqlState == null && nested != null) {
+ sqlState = nested.getSQLState();
+
+ nested = nested.getNextException();
+ }
+
+ // The error with code 23505 or 23000 is thrown when
trying to insert a row that
+ // would violate a unique index or primary key.
+ if ("23505".equals(sqlState) ||
"23000".equals(sqlState)) {
+ if (we == null)
+ we = new CacheWriterException("Failed insert
entry in database, violate a unique" +
+ " index or primary key [table=" +
em.fullTableName() + ", entry=" + entry + "]");
+
+ we.addSuppressed(e);
+
+ U.warn(log, "Failed insert entry in database,
violate a unique index or primary key" +
+ " [table=" + em.fullTableName() + ", entry="
+ entry + "]");
+
+ continue;
+ }
+
+ throw new CacheWriterException("Failed insert entry
in database [table=" + em.fullTableName() +
+ ", entry=" + entry, e);
+ }
+ }
+
+ if (attempt > 0)
+ U.warn(log, "Entry was updated in database on second try
[table=" + em.fullTableName() +
+ ", entry=" + entry + "]");
+
+ return;
+ }
+
+ throw we;
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed update entry in database
[table=" + em.fullTableName() +
+ ", entry=" + entry + "]", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> entry)
throws CacheWriterException {
+ assert entry != null;
+
+ K key = entry.getKey();
+
+ EntryMapping em = entryMapping(keyTypeId(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Start write entry to database [table=" +
em.fullTableName() + ", entry=" + entry + "]");
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ if (dialect.hasMerge()) {
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(em.mergeQry);
+
+ int i = fillKeyParameters(stmt, em, key);
+
+ fillValueParameters(stmt, i, em, entry.getValue());
+
+ int updCnt = stmt.executeUpdate();
+
+ if (updCnt != 1)
+ U.warn(log, "Unexpected number of updated entries
[table=" + em.fullTableName() +
+ ", entry=" + entry + "expected=1, actual=" +
updCnt + "]");
+ }
+ finally {
+ U.closeQuiet(stmt);
+ }
+ }
+ else {
+ PreparedStatement insStmt = null;
+
+ PreparedStatement updStmt = null;
+
+ try {
+ insStmt = conn.prepareStatement(em.insQry);
+
+ updStmt = conn.prepareStatement(em.updQry);
+
+ writeUpsert(insStmt, updStmt, em, entry);
+ }
+ finally {
+ U.closeQuiet(insStmt);
+
+ U.closeQuiet(updStmt);
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to write entry to database
[table=" + em.fullTableName() +
+ ", entry=" + entry + "]", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(final Collection<Cache.Entry<? extends K,
? extends V>> entries)
+ throws CacheWriterException {
+ assert entries != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ Object currKeyTypeId = null;
+
+ if (dialect.hasMerge()) {
+ PreparedStatement mergeStmt = null;
+
+ try {
+ EntryMapping em = null;
+
+ LazyValue<Object[]> lazyEntries = new
LazyValue<Object[]>() {
+ @Override public Object[] create() {
+ return entries.toArray();
+ }
+ };
+
+ int fromIdx = 0, prepared = 0;
+
+ for (Cache.Entry<? extends K, ? extends V> entry :
entries) {
+ K key = entry.getKey();
+
+ Object keyTypeId = keyTypeId(key);
+
+ em = entryMapping(keyTypeId, key);
+
+ if (currKeyTypeId == null ||
!currKeyTypeId.equals(keyTypeId)) {
+ if (mergeStmt != null) {
+ executeBatch(em, mergeStmt, "writeAll",
fromIdx, prepared, lazyEntries);
+
+ U.closeQuiet(mergeStmt);
+ }
+
+ mergeStmt = conn.prepareStatement(em.mergeQry);
+
+ currKeyTypeId = keyTypeId;
+
+ prepared = 0;
+ }
+
+ int i = fillKeyParameters(mergeStmt, em, key);
+
+ fillValueParameters(mergeStmt, i, em,
entry.getValue());
+
+ mergeStmt.addBatch();
+
+ if (++prepared % batchSz == 0) {
+ executeBatch(em, mergeStmt, "writeAll", fromIdx,
prepared, lazyEntries);
+
+ prepared = 0;
+ }
+ }
+
+ if (mergeStmt != null && prepared % batchSz != 0)
+ executeBatch(em, mergeStmt, "writeAll", fromIdx,
prepared, lazyEntries);
+ }
+ finally {
+ U.closeQuiet(mergeStmt);
+ }
+ }
+ else {
+ PreparedStatement insStmt = null;
+
+ PreparedStatement updStmt = null;
+
+ try {
+ for (Cache.Entry<? extends K, ? extends V> entry :
entries) {
+ K key = entry.getKey();
+
+ Object keyTypeId = keyTypeId(key);
+
+ EntryMapping em = entryMapping(keyTypeId, key);
+
+ if (currKeyTypeId == null ||
!currKeyTypeId.equals(keyTypeId)) {
+ U.closeQuiet(insStmt);
+
+ insStmt = conn.prepareStatement(em.insQry);
+
+ U.closeQuiet(updStmt);
+
+ updStmt = conn.prepareStatement(em.updQry);
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ writeUpsert(insStmt, updStmt, em, entry);
+ }
+ }
+ finally {
+ U.closeQuiet(insStmt);
+
+ U.closeQuiet(updStmt);
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to write entries in
database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ assert key != null;
+
+ EntryMapping em = entryMapping(keyTypeId(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Start remove value from database [table=" +
em.fullTableName() + ", key=" + key + "]");
+
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = connection();
+
+ stmt = conn.prepareStatement(em.remQry);
+
+ fillKeyParameters(stmt, em, key);
+
+ int delCnt = stmt.executeUpdate();
+
+ if (delCnt != 1)
+ U.warn(log, "Unexpected number of deleted entries [table=" +
em.fullTableName() + ", key=" + key +
+ "expected=1, actual=" + delCnt + "]");
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to remove value from
database [table=" + em.fullTableName() +
+ ", key=" + key + "]", e);
+ }
+ finally {
+ end(conn, stmt);
+ }
+ }
+
+ /**
+ * @param em Entry mapping.
+ * @param stmt Statement.
+ * @param desc Statement description for error message.
+ * @param fromIdx Objects in batch start from index.
+ * @param prepared Expected objects in batch.
+ * @param lazyObjs All objects used in batch statement as array.
+ */
+ private void executeBatch(EntryMapping em, Statement stmt, String desc,
int fromIdx, int prepared,
+ LazyValue<Object[]> lazyObjs) throws SQLException {
+ try {
+ int[] rowCounts = stmt.executeBatch();
+
+ int numOfRowCnt = rowCounts.length;
+
+ if (numOfRowCnt != prepared)
+ U.warn(log, "Unexpected number of updated rows [table=" +
em.fullTableName() + ", expected=" + prepared +
+ ", actual=" + numOfRowCnt + "]");
+
+ for (int i = 0; i < numOfRowCnt; i++) {
+ int cnt = rowCounts[i];
+
+ if (cnt != 1 && cnt != SUCCESS_NO_INFO) {
+ Object[] objs = lazyObjs.value();
+
+ U.warn(log, "Batch " + desc + " returned unexpected
updated row count [table=" + em.fullTableName() +
+ ", entry=" + objs[fromIdx + i] + ", expected=1,
actual=" + cnt + "]");
+ }
+ }
+ }
+ catch (BatchUpdateException be) {
+ int[] rowCounts = be.getUpdateCounts();
+
+ for (int i = 0; i < rowCounts.length; i++) {
+ if (rowCounts[i] == EXECUTE_FAILED) {
+ Object[] objs = lazyObjs.value();
+
+ U.warn(log, "Batch " + desc + " failed on execution
[table=" + em.fullTableName() +
+ ", entry=" + objs[fromIdx + i] + "]");
+ }
+ }
+
+ throw be;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(final Collection<?> keys) throws
CacheWriterException {
+ assert keys != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ Object currKeyTypeId = null;
+
+ EntryMapping em = null;
+
+ PreparedStatement delStmt = null;
+
+ LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() {
+ @Override public Object[] create() {
+ return keys.toArray();
+ }
+ };
+
+ int fromIdx = 0, prepared = 0;
+
+ for (Object key : keys) {
+ Object keyTypeId = keyTypeId(key);
+
+ em = entryMapping(keyTypeId, key);
+
+ if (delStmt == null) {
+ delStmt = conn.prepareStatement(em.remQry);
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ if (!currKeyTypeId.equals(keyTypeId)) {
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared,
lazyKeys);
+
+ fromIdx += prepared;
+
+ prepared = 0;
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ fillKeyParameters(delStmt, em, key);
+
+ delStmt.addBatch();
+
+ if (++prepared % batchSz == 0) {
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared,
lazyKeys);
+
+ fromIdx += prepared;
+
+ prepared = 0;
+ }
+ }
+
+ if (delStmt != null && prepared % batchSz != 0)
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared,
lazyKeys);
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to remove values from
database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param i Start index for parameters.
+ * @param em Entry mapping.
+ * @param key Key object.
+ * @return Next index for parameters.
+ */
+ protected int fillKeyParameters(PreparedStatement stmt, int i,
EntryMapping em,
+ Object key) throws CacheException {
+ for (CacheTypeFieldMetadata field : em.keyColumns()) {
+ Object fieldVal = extractField(em.keyType(), field.getJavaName(),
key);
+
+ try {
+ if (fieldVal != null)
+ stmt.setObject(i++, fieldVal);
+ else
+ stmt.setNull(i++, field.getDatabaseType());
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed to set statement parameter
name: " + field.getDatabaseName(), e);
+ }
+ }
+
+ return i;
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param m Type mapping description.
+ * @param key Key object.
+ * @return Next index for parameters.
+ */
+ protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m,
Object key) throws CacheException {
+ return fillKeyParameters(stmt, 1, m, key);
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param i Start index for parameters.
+ * @param m Type mapping description.
+ * @param val Value object.
+ * @return Next index for parameters.
+ */
+ protected int fillValueParameters(PreparedStatement stmt, int i,
EntryMapping m, Object val)
+ throws CacheWriterException {
+ for (CacheTypeFieldMetadata field : m.uniqValFields) {
+ Object fieldVal = extractField(m.valueType(),
field.getJavaName(), val);
+
+ try {
+ if (fieldVal != null)
+ stmt.setObject(i++, fieldVal);
+ else
+ stmt.setNull(i++, field.getDatabaseType());
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to set statement
parameter name: " + field.getDatabaseName(), e);
+ }
+ }
+
+ return i;
+ }
+
+ /**
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /**
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Get database dialect.
+ *
+ * @return Database dialect.
+ */
+ public JdbcDialect getDialect() {
+ return dialect;
+ }
+
+ /**
+ * Set database dialect.
+ *
+ * @param dialect Database dialect.
+ */
+ public void setDialect(JdbcDialect dialect) {
+ this.dialect = dialect;
+ }
+
+ /**
+ * Get Max workers thread count. These threads are responsible for
execute query.
+ *
+ * @return Max workers thread count.
+ */
+ public int getMaxPoolSize() {
+ return maxPoolSz;
+ }
+
+ /**
+ * Set Max workers thread count. These threads are responsible for
execute query.
+ *
+ * @param maxPoolSz Max workers thread count.
+ */
+ public void setMaxPoolSize(int maxPoolSz) {
+ this.maxPoolSz = maxPoolSz;
+ }
+
+ /**
+ * Get maximum batch size for delete and delete operations.
+ *
+ * @return Maximum batch size.
+ */
+ public int getBatchSize() {
+ return batchSz;
+ }
+
+ /**
+ * Set maximum batch size for write and delete operations.
+ *
+ * @param batchSz Maximum batch size.
+ */
+ public void setBatchSize(int batchSz) {
+ this.batchSz = batchSz;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @return If {@code 0} then load sequentially.
+ */
+ public int getParallelLoadCacheMinimumThreshold() {
+ return parallelLoadCacheMinThreshold;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @param parallelLoadCacheMinThreshold Minimum row count threshold. If
{@code 0} then load sequentially.
+ */
+ public void setParallelLoadCacheMinimumThreshold(int
parallelLoadCacheMinThreshold) {
+ this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+ }
+
+ /**
+ * Entry mapping description.
+ */
+ protected static class EntryMapping {
+ /** Database dialect. */
+ private final JdbcDialect dialect;
+
+ /** Select border for range queries. */
+ protected final String loadCacheSelRangeQry;
+
+ /** Select all items query. */
+ protected final String loadCacheQry;
+
+ /** Select item query. */
+ protected final String loadQrySingle;
+
+ /** Select items query. */
+ private final String loadQry;
+
+ /** Merge item(s) query. */
+ protected final String mergeQry;
+
+ /** Update item query. */
+ protected final String insQry;
+
+ /** Update item query. */
+ protected final String updQry;
+
+ /** Remove item(s) query. */
+ protected final String remQry;
+
+ /** Max key count for load query per statement. */
+ protected final int maxKeysPerStmt;
+
+ /** Database key columns. */
+ private final Collection<String> keyCols;
+
+ /** Database unique value columns. */
+ private final Collection<String> cols;
+
+ /** Select query columns index. */
+ private final Map<String, Integer> loadColIdxs;
+
+ /** Unique value fields. */
+ private final Collection<CacheTypeFieldMetadata> uniqValFields;
+
+ /** Type metadata. */
+ private final CacheTypeMetadata typeMeta;
+
+ /** Full table name. */
+ private final String fullTblName;
+
+ /**
+ * @param typeMeta Type metadata.
+ */
+ public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) {
+ this.dialect = dialect;
+
+ this.typeMeta = typeMeta;
+
+ final Collection<CacheTypeFieldMetadata> keyFields =
typeMeta.getKeyFields();
+
+ Collection<CacheTypeFieldMetadata> valFields =
typeMeta.getValueFields();
+
+ uniqValFields = F.view(valFields, new
IgnitePredicate<CacheTypeFieldMetadata>() {
+ @Override public boolean apply(CacheTypeFieldMetadata col) {
+ return !keyFields.contains(col);
+ }
+ });
+
+ String schema = typeMeta.getDatabaseSchema();
+
+ String tblName = typeMeta.getDatabaseTable();
+
+ fullTblName = F.isEmpty(schema) ? tblName : schema + "." +
tblName;
+
+ keyCols = databaseColumns(keyFields);
+
+ Collection<String> uniqValCols = databaseColumns(uniqValFields);
+
+ cols = F.concat(false, keyCols, uniqValCols);
+
+ loadColIdxs = U.newHashMap(cols.size());
+
+ int idx = 1;
+
+ for (String col : cols)
+ loadColIdxs.put(col, idx++);
+
+ loadCacheQry = dialect.loadCacheQuery(fullTblName, cols);
+
+ loadCacheSelRangeQry =
dialect.loadCacheSelectRangeQuery(fullTblName, keyCols);
+
+ loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1);
+
+ maxKeysPerStmt = dialect.getMaxParamsCnt() / keyCols.size();
+
+ loadQry = dialect.loadQuery(fullTblName, keyCols, cols,
maxKeysPerStmt);
+
+ insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols);
+
+ updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols);
+
+ mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols);
+
+ remQry = dialect.removeQuery(fullTblName, keyCols);
+ }
+
+ /**
+ * Extract database column names from {@link CacheTypeFieldMetadata}.
+ *
+ * @param dsc collection of {@link CacheTypeFieldMetadata}.
+ */
+ private static Collection<String>
databaseColumns(Collection<CacheTypeFieldMetadata> dsc) {
+ return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() {
+ /** {@inheritDoc} */
+ @Override public String apply(CacheTypeFieldMetadata col) {
+ return col.getDatabaseName();
+ }
+ });
+ }
+
+ /**
+ * Construct query for select values with key count less or equal
{@code maxKeysPerStmt}
+ *
+ * @param keyCnt Key count.
+ */
+ protected String loadQuery(int keyCnt) {
+ assert keyCnt <= maxKeysPerStmt;
+
+ if (keyCnt == maxKeysPerStmt)
+ return loadQry;
+
+ if (keyCnt == 1)
+ return loadQrySingle;
+
+ return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt);
+ }
+
+ /**
+ * Construct query for select values in range.
+ *
+ * @param appendLowerBound Need add lower bound for range.
+ * @param appendUpperBound Need add upper bound for range.
+ * @return Query with range.
+ */
+ protected String loadCacheRangeQuery(boolean appendLowerBound,
boolean appendUpperBound) {
+ return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols,
appendLowerBound, appendUpperBound);
+ }
+
+ /** Key type. */
+ protected String keyType() {
+ return typeMeta.getKeyType();
+ }
+
+ /** Value type. */
+ protected String valueType() {
+ return typeMeta.getValueType();
+ }
+
+ /**
+ * Gets key columns.
+ *
+ * @return Key columns.
+ */
+ protected Collection<CacheTypeFieldMetadata> keyColumns() {
+ return typeMeta.getKeyFields();
+ }
+
+ /**
+ * Gets value columns.
+ *
+ * @return Value columns.
+ */
+ protected Collection<CacheTypeFieldMetadata> valueColumns() {
+ return typeMeta.getValueFields();
+ }
+
+ /**
+ * Get full table name.
+ *
+ * @return <schema>.<table name>
+ */
+ protected String fullTableName() {
+ return fullTblName;
+ }
+ }
+
+ /**
+ * Worker for load cache using custom user query.
+ *
+ * @param <K1> Key type.
+ * @param <V1> Value type.
+ */
+ private class LoadCacheCustomQueryWorker<K1, V1> implements
Callable<Void> {
+ /** Entry mapping description. */
+ private final EntryMapping em;
+
+ /** User query. */
+ private final String qry;
+
+ /** Closure for loaded values. */
+ private final IgniteBiInClosure<K1, V1> clo;
+
+ /**
+ * @param em Entry mapping description.
+ * @param qry User query.
+ * @param clo Closure for loaded values.
+ */
+ private LoadCacheCustomQueryWorker(EntryMapping em, String qry,
IgniteBiInClosure<K1, V1> clo) {
+ this.em = em;
+ this.qry = qry;
+ this.clo = clo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = openConnection(true);
+
+ stmt = conn.prepareStatement(qry);
+
+ ResultSet rs = stmt.executeQuery();
+
+ ResultSetMetaData meta = rs.getMetaData();
+
+ Map<String, Integer> colIdxs =
U.newHashMap(meta.getColumnCount());
+
+ for (int i = 1; i <= meta.getColumnCount(); i++)
+ colIdxs.put(meta.getColumnLabel(i), i);
+
+ while (rs.next()) {
+ K1 key = buildObject(em.keyType(), em.keyColumns(),
colIdxs, rs);
+ V1 val = buildObject(em.valueType(), em.valueColumns(),
colIdxs, rs);
+
+ clo.apply(key, val);
+ }
+
+ return null;
+ }
+ catch (SQLException e) {
+ throw new CacheLoaderException("Failed to execute custom
query for load cache", e);
+ }
+ finally {
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ }
+ }
+ }
+
+ /**
+ * Lazy initialization of value.
+ *
+ * @param <T> Cached object type
+ */
+ private abstract static class LazyValue<T> {
+ /** Cached value. */
+ private T val;
+
+ /**
+ * @return Construct value.
+ */
+ protected abstract T create();
+
+ /**
+ * @return Value.
+ */
+ public T value() {
+ if (val == null)
+ val = create();
+
+ return val;
+ }
+ }
+
+ /**
+ * Worker for load by keys.
+ *
+ * @param <K1> Key type.
+ * @param <V1> Value type.
+ */
+ private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
+ /** Connection. */
+ private final Connection conn;
+
+ /** Keys for load. */
+ private final Collection<K1> keys;
+
+ /** Entry mapping description. */
+ private final EntryMapping em;
+
+ /**
+ * @param conn Connection.
+ * @param em Entry mapping description.
+ */
+ private LoadWorker(Connection conn, EntryMapping em) {
+ this.conn = conn;
+ this.em = em;
+
+ keys = new ArrayList<>(em.maxKeysPerStmt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K1, V1> call() throws Exception {
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(em.loadQuery(keys.size()));
+
+ int i = 1;
+
+ for (Object key : keys)
+ for (CacheTypeFieldMetadata field : em.keyColumns()) {
+ Object fieldVal = extractField(em.keyType(),
field.getJavaName(), key);
+
+ if (fieldVal != null)
+ stmt.setObject(i++, fieldVal);
+ else
+ stmt.setNull(i++, field.getDatabaseType());
+ }
+
+ ResultSet rs = stmt.executeQuery();
+
+ Map<K1, V1> entries = U.newHashMap(keys.size());
+
+ while (rs.next()) {
+ K1 key = buildObject(em.keyType(), em.keyColumns(),
em.loadColIdxs, rs);
+ V1 val = buildObject(em.valueType(), em.valueColumns(),
em.loadColIdxs, rs);
+
+ entries.put(key, val);
+ }
+
+ return entries;
+ }
+ finally {
+ U.closeQuiet(stmt);
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------