Ignite sp-1 fix for IGNITE-129
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55144cfa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55144cfa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55144cfa Branch: refs/heads/ignite-24 Commit: 55144cfa5af89904c67194d9c5db64fef2bc7592 Parents: c60edcc Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Mon Jan 26 17:12:28 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Mon Jan 26 17:12:28 2015 +0300 ---------------------------------------------------------------------- .../ignite/startup/BasicWarmupClosure.java | 561 +++++++++++++++++++ 1 file changed, 561 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55144cfa/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java new file mode 100644 index 0000000..1dd2e63 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/startup/BasicWarmupClosure.java @@ -0,0 +1,561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.startup; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.logger.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.text.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Basic warm-up closure which warm-ups cache operations. + */ +public class BasicWarmupClosure implements IgniteInClosure<IgniteConfiguration> { + /** */ + private static final long serialVersionUID = 9175346848249957458L; + + /** Default grid count to warm up. */ + public static final int DFLT_GRID_CNT = 2; + + /** Default iteration count per thread. */ + public static final int DFLT_ITERATION_CNT = 30_000; + + /** Default key range. */ + public static final int DFLT_KEY_RANGE = 10_000; + + /** Grid count. */ + private int gridCnt = DFLT_GRID_CNT; + + /** Warmup date format. */ + private static final SimpleDateFormat WARMUP_DATE_FMT = new SimpleDateFormat("HH:mm:ss"); + + /** Warmup thread count. */ + private int threadCnt = Runtime.getRuntime().availableProcessors() * 2; + + /** Per thread iteration count. */ + private int iterCnt = DFLT_ITERATION_CNT; + + /** Key range. */ + private int keyRange = DFLT_KEY_RANGE; + + /** Warmup discovery port. */ + private int discoveryPort = 27000; + + /** Methods to warmup. */ + private String[] warmupMethods = {"put", "putx", "get", "remove", "removex", "putIfAbsent", "replace"}; + + /** + * Gets number of grids to start and run warmup. + * + * @return Number of grids. + */ + public int getGridCount() { + return gridCnt; + } + + /** + * Sets number of grids to start and run the warmup. + * + * @param gridCnt Number of grids. + */ + public void setGridCount(int gridCnt) { + this.gridCnt = gridCnt; + } + + /** + * Gets warmup methods to use for cache warmup. + * + * @return Warmup methods. + */ + public String[] getWarmupMethods() { + return warmupMethods; + } + + /** + * Sets warmup methods to use for cache warmup. + * + * @param warmupMethods Array of warmup methods. + */ + public void setWarmupMethods(String... warmupMethods) { + this.warmupMethods = warmupMethods; + } + + /** + * Gets thread count for warmup. + * + * @return Thread count. + */ + public int getThreadCount() { + return threadCnt; + } + + /** + * Sets thread count for warmup. + * + * @param threadCnt Thread count. + */ + public void setThreadCount(int threadCnt) { + this.threadCnt = threadCnt; + } + + /** + * Gets iteration count for warmup. + * + * @return Iteration count. + */ + public int getIterationCount() { + return iterCnt; + } + + /** + * Sets iteration count for warmup. + * + * @param iterCnt Iteration count for warmup. + */ + public void setIterationCount(int iterCnt) { + this.iterCnt = iterCnt; + } + + /** + * Gets key range. + * + * @return Key range. + */ + public int getKeyRange() { + return keyRange; + } + + /** + * Sets key range. + * + * @param keyRange Key range. + */ + public void setKeyRange(int keyRange) { + this.keyRange = keyRange; + } + + /** + * Gets discovery port for warmup. + * + * @return Discovery port. + */ + public int getDiscoveryPort() { + return discoveryPort; + } + + /** + * Sets discovery port for warmup. + * + * @param discoveryPort Discovery port. + */ + public void setDiscoveryPort(int discoveryPort) { + this.discoveryPort = discoveryPort; + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteConfiguration gridCfg) { + // Remove cache duplicates, clean up the rest, etc. + IgniteConfiguration cfg = prepareConfiguration(gridCfg); + + // Do nothing if no caches found. + if (cfg == null) + return; + + out("Starting grids to warmup caches [gridCnt=" + gridCnt + + ", caches=" + cfg.getCacheConfiguration().length + ']'); + + Collection<Ignite> ignites = new LinkedList<>(); + + String old = System.getProperty(IgniteSystemProperties.GG_UPDATE_NOTIFIER); + + try { + System.setProperty(IgniteSystemProperties.GG_UPDATE_NOTIFIER, "false"); + + TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + for (int i = 0; i < gridCnt; i++) { + IgniteConfiguration cfg0 = new IgniteConfiguration(cfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + discoSpi.setLocalPort(discoveryPort); + + cfg0.setDiscoverySpi(discoSpi); + + cfg0.setGridLogger(new IgniteNullLogger()); + + cfg0.setGridName("gridgain-warmup-grid-" + i); + + ignites.add(Ignition.start(cfg0)); + } + + doWarmup(ignites); + } + catch (Exception e) { + throw new IgniteException(e); + } + finally { + for (Ignite ignite : ignites) + Ignition.stop(ignite.name(), false); + + out("Stopped warmup grids."); + + if (old == null) + old = "false"; + + System.setProperty(IgniteSystemProperties.GG_UPDATE_NOTIFIER, old); + } + } + + /** + * @param grids Grids to warmup. + */ + private void doWarmup(Iterable<Ignite> grids) throws Exception { + Ignite first = F.first(grids); + + ExecutorService svc = Executors.newFixedThreadPool(threadCnt); + + try { + for (GridCache<?, ?> cache : first.caches()) { + GridCache<Object, Object> cache0 = first.cache(cache.name()); + + for (String warmupMethod : warmupMethods) { + Collection<Future> futs = new ArrayList<>(threadCnt); + + for (int i = 0; i < threadCnt; i++) { + Callable call; + + switch (warmupMethod) { + case "get": { + call = new GetCallable(cache0); + + break; + } + + case "put": { + call = new PutCallable(cache0); + + break; + } + + case "putx": { + call = new PutxCallable(cache0); + + break; + } + + case "remove": { + call = new RemoveCallable(cache0); + + break; + } + + case "removex": { + call = new RemovexCallable(cache0); + + break; + } + + case "putIfAbsent": { + call = new PutIfAbsentCallable(cache0); + + break; + } + + case "replace": { + call = new ReplaceCallable(cache0); + + break; + } + + default: + throw new IgniteCheckedException("Unsupported warmup method: " + warmupMethod); + } + + futs.add(svc.submit(call)); + } + + out("Running warmup [cacheName=" + cache.name() + ", method=" + warmupMethod + ']'); + + for (Future fut : futs) + fut.get(); + + for (int key = 0; key < keyRange; key++) + cache0.remove(key); + } + } + } + finally { + svc.shutdownNow(); + } + } + + /** + * Output for warmup messages. + * + * @param msg Format message. + */ + private static void out(String msg) { + System.out.println('[' + WARMUP_DATE_FMT.format(new Date(System.currentTimeMillis())) + "][WARMUP][" + + Thread.currentThread().getName() + ']' + ' ' + msg); + } + + /** + * Prepares configuration for warmup. + * + * @param gridCfg Original grid configuration. + * @return Prepared configuration or {@code null} if no caches found. + */ + private IgniteConfiguration prepareConfiguration(IgniteConfiguration gridCfg) { + if (F.isEmpty(gridCfg.getCacheConfiguration())) + return null; + + IgniteConfiguration cp = new IgniteConfiguration(); + + cp.setClientConnectionConfiguration(null); + + Collection<CacheConfiguration> reduced = new ArrayList<>(); + + for (CacheConfiguration ccfg : gridCfg.getCacheConfiguration()) { + if (CU.isSystemCache(ccfg.getName())) + continue; + + if (!matches(reduced, ccfg)) { + CacheConfiguration ccfgCp = new CacheConfiguration(ccfg); + + if (ccfgCp.getDistributionMode() == CacheDistributionMode.CLIENT_ONLY) + ccfgCp.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); + else if (ccfgCp.getDistributionMode() == CacheDistributionMode.NEAR_ONLY) + ccfgCp.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED); + + ccfgCp.setCacheStoreFactory(null); + ccfgCp.setWriteBehindEnabled(false); + + reduced.add(ccfgCp); + } + } + + if (F.isEmpty(reduced)) + return null; + + CacheConfiguration[] res = new CacheConfiguration[reduced.size()]; + + reduced.toArray(res); + + cp.setCacheConfiguration(res); + + return cp; + } + + /** + * Checks if passed configuration matches one of the configurations in the list. + * + * @param reduced Reduced configurations. + * @param ccfg Cache configuration to match. + * @return {@code True} if matching configuration is found, {@code false} otherwise. + */ + private boolean matches(Iterable<CacheConfiguration> reduced, CacheConfiguration ccfg) { + for (CacheConfiguration ccfg0 : reduced) { + if (matches(ccfg0, ccfg)) + return true; + } + + return false; + } + + /** + * Checks if cache configurations are alike for warmup. + * + * @param ccfg0 First configuration. + * @param ccfg1 Second configuration. + * @return {@code True} if configurations match. + */ + private boolean matches(CacheConfiguration ccfg0, CacheConfiguration ccfg1) { + return + F.eq(ccfg0.getCacheMode(), ccfg1.getCacheMode()) && + F.eq(ccfg0.getBackups(), ccfg1.getBackups()) && + F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode()) && + F.eq(ccfg0.getAtomicWriteOrderMode(), ccfg1.getAtomicWriteOrderMode()) && + F.eq(ccfg0.getMemoryMode(), ccfg1.getMemoryMode()) && + F.eq(ccfg0.getDistributionMode(), ccfg1.getDistributionMode()); + } + + /** + * Base class for all warmup callables. + */ + private abstract class BaseWarmupCallable implements Callable<Object> { + /** Cache. */ + protected final GridCache<Object, Object> cache; + + /** + * @param cache Cache. + */ + protected BaseWarmupCallable(GridCache<Object, Object> cache) { + this.cache = cache; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < iterCnt; i++) + operation(rnd.nextInt(keyRange)); + + return null; + } + + /** + * Runs operation. + * + * @param key Key. + * @throws Exception If failed. + */ + protected abstract void operation(int key) throws Exception; + } + + /** + * + */ + private class GetCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private GetCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.get(key); + } + } + + /** + * + */ + private class PutCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private PutCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.put(key, key); + } + } + + /** + * + */ + private class PutxCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private PutxCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.putx(key, key); + } + } + + /** + * + */ + private class RemoveCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private RemoveCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.remove(key); + } + } + + /** + * + */ + private class RemovexCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private RemovexCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.removex(key); + } + } + + /** + * + */ + private class PutIfAbsentCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private PutIfAbsentCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.putIfAbsent(key, key); + } + } + + /** + * + */ + private class ReplaceCallable extends BaseWarmupCallable { + /** + * @param cache Cache. + */ + private ReplaceCallable(GridCache<Object, Object> cache) { + super(cache); + } + + /** {@inheritDoc} */ + @Override protected void operation(int key) throws Exception { + cache.replace(key, key, key); + } + } +}