Repository: incubator-ignite Updated Branches: refs/heads/ignite-648 ada57fc79 -> b77db74b9
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77db74b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java new file mode 100644 index 0000000..6f9a0cb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ignite events proxy for ignite instance at another JVM. + */ +public class IgniteEventsProcessProxy implements IgniteEvents { + /** Ignite proxy. */ + private final transient IgniteProcessProxy igniteProxy; + + /** Grid id. */ + private final UUID gridId; + + /** + * @param igniteProxy Ignite proxy. + */ + public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) { + this.igniteProxy = igniteProxy; + + gridId = igniteProxy.getId(); + } + + /** + * @return Events instance. + */ + private IgniteEvents events() { + return Ignition.ignite(gridId).events(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup clusterGroup() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> List<T> remoteQuery(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe, + @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void stopRemoteListen(UUID opId) throws IgniteException { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void recordLocal(Event evt) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) { + igniteProxy.remoteCompute().run(new IgniteRunnable() { + @Override public void run() { + events().localListen(lsnr, types); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean stopLocalListen(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void enableLocal(int... types) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void disableLocal(int... types) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public int[] enabledEvents() { + return new int[0]; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isEnabled(int type) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteEvents withAsync() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + return null; // TODO: CODE: implement. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77db74b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java new file mode 100644 index 0000000..677d383 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import com.thoughtworks.xstream.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; + +import java.io.*; +import java.util.*; + +/** + * Run ignite node. + */ +public class IgniteNodeRunner { + /** */ + private static final String IGNITE_CONFIGURATION_FILE = System.getProperty("java.io.tmpdir") + + File.separator + "igniteConfiguration.tmp_"; + + /** + * Starts {@link Ignite} instance accorging to given arguments. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + try { + X.println(GridJavaProcess.PID_MSG_PREFIX + U.jvmPid()); + + X.println("Starting Ignite Node... Args=" + Arrays.toString(args)); + + IgniteConfiguration cfg = readCfgFromFileAndDeleteFile(args[0]); + + Ignition.start(cfg); + } + catch (Throwable e) { + e.printStackTrace(); + + System.exit(1); + } + } + + /** + * Stores {@link IgniteConfiguration} to file as xml. + * + * @param cfg Ignite Configuration. + * @return A name of file where the configuration was stored. + * @throws IOException If failed. + * @see #readCfgFromFileAndDeleteFile(String) + */ + public static String storeToFile(IgniteConfiguration cfg) throws IOException { + String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId(); + + try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) { + cfg.setMBeanServer(null); + cfg.setMarshaller(null); + cfg.setDiscoverySpi(null); + cfg.setGridLogger(null); + + new XStream().toXML(cfg, out); + } + + return fileName; + } + + /** + * Reads configuration from given file and delete the file after. + * + * @param fileName File name. + * @return Readed configuration. + * @throws IOException If failed. + * @see #storeToFile(IgniteConfiguration) + */ + private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException { + try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) { + IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER); + cfg.setDiscoverySpi(disco); + + return cfg; + } + finally { + new File(fileName).delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77db74b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java new file mode 100644 index 0000000..31c65f0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework.junits.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Ignite proxy for ignite instance at another JVM. + */ +public class IgniteProcessProxy implements IgniteEx { + /** Grid proxies. */ + private transient static final Map<String, IgniteProcessProxy> gridProxies = new HashMap<>(); + + /** Jvm process with ignite instance. */ + private transient final GridJavaProcess proc; + + /** Configuration. */ + private transient final IgniteConfiguration cfg; + + /** Local jvm grid. */ + private transient final Ignite locJvmGrid; + + /** Logger. */ + private transient final IgniteLogger log; + + /** Grid id. */ + private final UUID id = UUID.randomUUID(); + + /** Remote ignite instance started latch. */ + private transient final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1); + + /** + * @param cfg Configuration. + * @param log Logger. + * @param locJvmGrid Local jvm grid. + */ + public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid) + throws Exception { + this.cfg = cfg; + this.locJvmGrid = locJvmGrid; + this.log = log.getLogger("jvm-" + id.toString().substring(0, id.toString().indexOf('-'))); + + String cfgFileName = IgniteNodeRunner.storeToFile(cfg.setNodeId(id)); + + List<String> jvmArgs = U.jvmArgs(); + + List<String> filteredJvmArgs = new ArrayList<>(); + + for (String arg : jvmArgs) { + if(!arg.toLowerCase().startsWith("-agentlib")) + filteredJvmArgs.add(arg); + } + + locJvmGrid.events().localListen(new IgnitePredicateX<Event>() { + @Override public boolean applyx(Event e) { + if (((DiscoveryEvent)e).eventNode().id().equals(id)) { + rmtNodeStartedLatch.countDown(); + + return false; + } + + return true; + } + }, EventType.EVT_NODE_JOINED); + + proc = GridJavaProcess.exec( + IgniteNodeRunner.class, + cfgFileName, // Params. + this.log, + // Optional closure to be called each time wrapped process prints line to system.out or system.err. + new IgniteInClosure<String>() { + @Override public void apply(String s) { + IgniteProcessProxy.this.log.info(s); + } + }, + null, + filteredJvmArgs, // JVM Args. + System.getProperty("surefire.test.class.path") + ); + + assert rmtNodeStartedLatch.await(30, TimeUnit.SECONDS): "Remote node with id=" + id + " didn't join."; + + gridProxies.put(cfg.getGridName(), this); + } + + /** + * @param gridName Grid name. + * @return Instance by name or exception wiil be thrown. + */ + public static IgniteProcessProxy get(String gridName) { + IgniteProcessProxy res = gridProxies.get(gridName); + + if (res == null) + throw new IgniteIllegalStateException("Grid instance was not properly started " + + "or was already stopped: " + gridName); + + return res; + } + + /** + * Kill all running processes. + */ + public static void killAll() { + for (IgniteProcessProxy ignite : gridProxies.values()) { + try { + ignite.getProcess().kill(); + } + catch (Exception e) { + U.error(ignite.log, "Killing failed.", e); + } + } + + gridProxies.clear(); + } + + /** + * @return Local JVM grid instance. + */ + public Ignite localJvmGrid() { + return locJvmGrid; + } + + /** + * @return Grid id. + */ + public UUID getId() { + return id; + } + + /** {@inheritDoc} */ + @Override public String name() { + return cfg.getGridName(); + } + + /** {@inheritDoc} */ + @Override public IgniteLogger log() { + return log; + } + + /** {@inheritDoc} */ + @Override public IgniteConfiguration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteInternalCache<?, ?>> cachesx( + @Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean eventUserRecordable(int type) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean allEventsUserRecordable(int[] types) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isJmxRemoteEnabled() { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isRestartEnabled() { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFileSystem igfsx(@Nullable String name) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteClusterEx cluster() { + return (IgniteClusterEx)locJvmGrid.cluster(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String latestVersion() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return remoteCompute().call(new IgniteCallable<ClusterNode>() { + @Override public ClusterNode call() throws Exception { + return ((IgniteEx)Ignition.ignite(id)).localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public GridKernalContext context() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteCompute compute(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteMessaging message(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events() { + return new IgniteEventsProcessProxy(this); + } + + /** {@inheritDoc} */ + @Override public IgniteEvents events(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteServices services() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteServices services(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public ExecutorService executorService(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteScheduler scheduler() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(String cacheName) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(String cacheName) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> void addCacheConfiguration(CacheConfiguration<K, V> cacheCfg) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg, + NearCacheConfiguration<K, V> nearCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg, + NearCacheConfiguration<K, V> nearCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, NearCacheConfiguration<K, V> nearCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> getOrCreateNearCache(@Nullable String cacheName, + NearCacheConfiguration<K, V> nearCfg) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void destroyCache(String cacheName) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteCache<K, V> cache(@Nullable final String name) { + return new IgniteCacheProcessProxy(name, this); + } + + /** {@inheritDoc} */ + @Override public IgniteTransactions transactions() { + throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode."); + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteFileSystem fileSystem(String name) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteFileSystem> fileSystems() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T> IgniteAtomicReference<T> atomicReference(String name, @Nullable T initVal, + boolean create) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp, + boolean create) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteCountDownLatch countDownLatch(String name, int cnt, boolean autoDel, + boolean create) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T> IgniteQueue<T> queue(String name, int cap, + @Nullable CollectionConfiguration cfg) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T> IgniteSet<T> set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + try { + getProcess().kill(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + /** {@inheritDoc} */ + @Override public <K> Affinity<K> affinity(String cacheName) { + return new AffinityProcessProxy(cacheName, this); + } + + /** + * @return Jvm process in which grid node started. + */ + public GridJavaProcess getProcess() { + return proc; + } + + /** + * @return {@link IgniteCompute} instance to communicate with remote node. + */ + public IgniteCompute remoteCompute() { + ClusterGroup grp = locJvmGrid.cluster().forNodeId(id); + + if (grp.nodes().isEmpty()) + throw new IllegalStateException("Could not found node with id=" + id + "."); + + return locJvmGrid.compute(grp); + } +}