http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java new file mode 100644 index 0000000..ec1fb87 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.swapspace.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test SPI context. + */ +public class GridSpiTestContext implements IgniteSpiContext { + /** */ + private final Collection<ClusterNode> rmtNodes = new ConcurrentLinkedQueue<>(); + + /** */ + private ClusterNode locNode; + + /** */ + private final Map<GridLocalEventListener, Set<Integer>> evtLsnrs = new HashMap<>(); + + /** */ + @SuppressWarnings("deprecation") + private final Collection<GridMessageListener> msgLsnrs = new ArrayList<>(); + + /** */ + private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>(); + + /** */ + private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> remoteNodes() { + return rmtNodes; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return locNode; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> remoteDaemonNodes() { + Collection<ClusterNode> daemons = new ArrayList<>(); + + for (ClusterNode node : rmtNodes) { + if (node.isDaemon()) + daemons.add(node); + } + + return daemons; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes() { + Collection<ClusterNode> all = new ArrayList<>(rmtNodes); + + if (locNode != null) + all.add(locNode); + + return all; + } + + /** + * @param locNode Local node. + */ + public void setLocalNode(@Nullable ClusterNode locNode) { + this.locNode = locNode; + } + + /** {@inheritDoc} */ + @Nullable @Override + public ClusterNode node(UUID nodeId) { + if (locNode != null && locNode.id().equals(nodeId)) + return locNode; + + for (ClusterNode node : rmtNodes) { + if (node.id().equals(nodeId)) + return node; + } + + return null; + } + + /** */ + public void createLocalNode() { + setLocalNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1))); + } + + /** + * @param cnt Number of nodes. + */ + public void createRemoteNodes(int cnt) { + for (int i = 0; i < cnt; i++) + addNode(new GridTestNode(UUID.randomUUID(), createMetrics(1, 1))); + } + + /** */ + public void reset() { + setLocalNode(null); + + rmtNodes.clear(); + } + + /** + * @param waitingJobs Waiting jobs count. + * @param activeJobs Active jobs count. + * @return Metrics adapter. + */ + private DiscoveryNodeMetricsAdapter createMetrics(int waitingJobs, int activeJobs) { + DiscoveryNodeMetricsAdapter metrics = new DiscoveryNodeMetricsAdapter(); + + metrics.setCurrentWaitingJobs(waitingJobs); + metrics.setCurrentActiveJobs(activeJobs); + + return metrics; + } + + /** + * @param nodes Nodes to reset. + * @param rmv Whether nodes that were not passed in should be removed or not. + */ + public void resetNodes(Collection<ClusterNode> nodes, boolean rmv) { + for (ClusterNode node : nodes) { + assert !node.equals(locNode); + + if (!rmtNodes.contains(node)) + addNode(node); + } + + if (rmv) { + for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) { + ClusterNode node = iter.next(); + + if (!nodes.contains(node)) { + iter.remove(); + + notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node)); + } + } + } + } + + /** + * @param node Node to check. + * @return {@code True} if the node is local. + */ + public boolean isLocalNode(ClusterNode node) { + return locNode.equals(node); + } + + /** + * @param node Node to add. + */ + public void addNode(ClusterNode node) { + rmtNodes.add(node); + + notifyListener(new IgniteDiscoveryEvent(locNode, "Node joined", EVT_NODE_JOINED, node)); + } + + /** + * @param node Node to remove. + */ + public void removeNode(ClusterNode node) { + if (rmtNodes.remove(node)) + notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node)); + } + + /** + * @param nodeId Node ID. + */ + public void removeNode(UUID nodeId) { + for (Iterator<ClusterNode> iter = rmtNodes.iterator(); iter.hasNext();) { + ClusterNode node = iter.next(); + + if (node.id().equals(nodeId)) { + iter.remove(); + + notifyListener(new IgniteDiscoveryEvent(locNode, "Node left", EVT_NODE_LEFT, node)); + } + } + } + + /** + * @param node Node to fail. + */ + public void failNode(ClusterNode node) { + if (rmtNodes.remove(node)) + notifyListener(new IgniteDiscoveryEvent(locNode, "Node failed", EVT_NODE_FAILED, node)); + } + + /** + * @param node Node for metrics update. + */ + public void updateMetrics(ClusterNode node) { + if (locNode.equals(node) || rmtNodes.contains(node)) + notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated.", EVT_NODE_METRICS_UPDATED, node)); + } + + /** */ + public void updateAllMetrics() { + notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, locNode)); + + for (ClusterNode node : rmtNodes) { + notifyListener(new IgniteDiscoveryEvent(locNode, "Metrics updated", EVT_NODE_METRICS_UPDATED, node)); + } + } + + /** + * @param evt Event node. + */ + private void notifyListener(IgniteEvent evt) { + assert evt.type() > 0; + + for (Map.Entry<GridLocalEventListener, Set<Integer>> entry : evtLsnrs.entrySet()) { + if (F.isEmpty(entry.getValue()) || entry.getValue().contains(evt.type())) + entry.getKey().onEvent(evt); + } + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return node(nodeId) != null; + } + + /** {@inheritDoc} */ + @Override public void send(ClusterNode node, Serializable msg, String topic) + throws IgniteSpiException { + sentMsgs.put(node, msg); + } + + /** + * @param node Node message was sent to. + * @return Sent message. + */ + public Serializable getSentMessage(ClusterNode node) { + return sentMsgs.get(node); + } + + /** + * @param node Node message was sent to. + * @return Sent message. + */ + public Serializable removeSentMessage(ClusterNode node) { + return sentMsgs.remove(node); + } + + /** + * @param node Destination node. + * @param msg Message. + */ + @SuppressWarnings("deprecation") + public void triggerMessage(ClusterNode node, Object msg) { + for (GridMessageListener lsnr : msgLsnrs) { + lsnr.onMessage(node.id(), msg); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public void addMessageListener(GridMessageListener lsnr, String topic) { + msgLsnrs.add(lsnr); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean removeMessageListener(GridMessageListener lsnr, String topic) { + return msgLsnrs.remove(lsnr); + } + + /** + * @param type Event type. + * @param taskName Task name. + * @param taskSesId Session ID. + * @param msg Event message. + */ + public void triggerTaskEvent(int type, String taskName, IgniteUuid taskSesId, String msg) { + assert type > 0; + + triggerEvent(new IgniteTaskEvent(locNode, msg, type, taskSesId, taskName, null, false, null)); + } + + /** + * @param evt Event to trigger. + */ + public void triggerEvent(IgniteEvent evt) { + notifyListener(evt); + } + + /** {@inheritDoc} */ + @Override public void addLocalEventListener(GridLocalEventListener lsnr, int... types) { + Set<Integer> typeSet = F.addIfAbsent(evtLsnrs, lsnr, F.<Integer>newSet()); + + assert typeSet != null; + + if (types != null) { + for (int type : types) { + typeSet.add(type); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean removeLocalEventListener(GridLocalEventListener lsnr) { + boolean res = evtLsnrs.containsKey(lsnr); + + evtLsnrs.remove(lsnr); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean isEventRecordable(int... types) { + return true; + } + + /** {@inheritDoc} */ + @Override public void recordEvent(IgniteEvent evt) { + notifyListener(evt); + } + + /** {@inheritDoc} */ + @Override public void registerPort(int port, IgnitePortProtocol proto) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void deregisterPort(int port, IgnitePortProtocol proto) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void deregisterPorts() { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public <K, V> V get(String cacheName, K key) throws IgniteCheckedException { + assert cacheName != null; + assert key != null; + + V res = null; + + Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName); + + CachedObject<V> obj = cache.get(key); + + if (obj != null) { + if (obj.expire == 0 || obj.expire > System.currentTimeMillis()) + res = obj.obj; + else + cache.remove(key); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public <K, V> V put(String cacheName, K key, V val, long ttl) throws IgniteCheckedException { + assert cacheName != null; + assert key != null; + assert ttl >= 0; + + long expire = ttl > 0 ? System.currentTimeMillis() + ttl : 0; + + CachedObject<V> obj = new CachedObject<>(expire, val); + + Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName); + + CachedObject<V> prev = cache.put(key, obj); + + return prev != null ? prev.obj : null; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long ttl) throws IgniteCheckedException { + V v = get(cacheName, key); + + if (v != null) + return put(cacheName, key, val, ttl); + + return v; + } + + /** {@inheritDoc} */ + @Override public <K, V> V remove(String cacheName, K key) throws IgniteCheckedException { + assert cacheName != null; + assert key != null; + + Map<K, CachedObject<V>> cache = getOrCreateCache(cacheName); + + CachedObject<V> prev = cache.remove(key); + + return prev != null ? prev.obj : null; + } + + /** {@inheritDoc} */ + @Override public <K> boolean containsKey(String cacheName, K key) { + assert cacheName != null; + assert key != null; + + boolean res = false; + + try { + res = get(cacheName, key) != null; + } + catch (IgniteCheckedException ignored) { + + } + + return res; + } + + /** {@inheritDoc} */ + @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val, + @Nullable ClassLoader ldr) throws IgniteCheckedException { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) + throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public <T> T readFromOffheap(String spaceName, int part, Object key, byte[] keyBytes, + @Nullable ClassLoader ldr) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean removeFromOffheap(@Nullable String spaceName, int part, Object key, + @Nullable byte[] keyBytes) throws IgniteCheckedException { + return false; + } + + /** {@inheritDoc} */ + @Override public void writeToOffheap(@Nullable String spaceName, int part, Object key, @Nullable byte[] keyBytes, + Object val, @Nullable byte[] valBytes, @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partition(String cacheName, Object key) { + return -1; + } + + /** {@inheritDoc} */ + @Override public void removeFromSwap(String spaceName, Object key, + @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean writeDelta(UUID nodeId, Object msg, ByteBuffer buf) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readDelta(UUID nodeId, Class<?> msgCls, ByteBuffer buf) { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<GridSecuritySubject> authenticatedSubjects() throws IgniteCheckedException { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public GridSecuritySubject authenticatedSubject(UUID subjId) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key, + @Nullable ClassLoader ldr) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public GridTcpMessageFactory messageFactory() { + return new GridTcpMessageFactory() { + @Override public GridTcpCommunicationMessageAdapter create(byte type) { + return GridTcpCommunicationMessageFactory.create(type); + } + }; + } + + /** + * @param cacheName Cache name. + * @return Map representing cache. + */ + @SuppressWarnings("unchecked") + private <K, V> Map<K, V> getOrCreateCache(String cacheName) { + synchronized (cache) { + Map<K, V> map = cache.get(cacheName); + + if (map == null) + cache.put(cacheName, map = new ConcurrentHashMap<>()); + + return map; + } + } + + /** + * Cached object. + */ + private static class CachedObject<V> { + /** */ + private long expire; + + /** */ + private V obj; + + /** + * @param expire Expire time. + * @param obj Object. + */ + private CachedObject(long expire, V obj) { + this.expire = expire; + this.obj = obj; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java new file mode 100644 index 0000000..50150b0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridStringLogger.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +/** + * Logger which logs to string buffer. + */ +public class GridStringLogger implements IgniteLogger { + /** */ + private static final int MAX = 1024 * 11; + + /** */ + private static final int CHAR_CNT = 1024 * 10; + + /** */ + private StringBuilder buf = new StringBuilder(MAX); + + /** */ + private final boolean dbg; + + /** */ + private final IgniteLogger echo; + + /** + * + */ + public GridStringLogger() { + this(false); + } + + /** + * @param dbg Debug flag. + */ + public GridStringLogger(boolean dbg) { + this(dbg, null); + } + + /** + * @param dbg Debug flag. + * @param echo Logger to echo all messages. + */ + public GridStringLogger(boolean dbg, @Nullable IgniteLogger echo) { + this.dbg = dbg; + this.echo = echo; + } + + /** + * @param msg Message to log. + */ + private void log(String msg) { + buf.append(msg).append(U.nl()); + + if (echo != null) + echo.info("[GridStringLogger echo] " + msg); + + if (buf.length() > CHAR_CNT) { + if (echo != null) + echo.warning("Cleaning GridStringLogger history."); + + buf.delete(0, buf.length() - CHAR_CNT); + } + + assert buf.length() <= CHAR_CNT; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger getLogger(Object ctgr) { + return this; + } + + /** {@inheritDoc} */ + @Override public void trace(String msg) { + log(msg); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + log(msg); + } + + /** {@inheritDoc} */ + @Override public void info(String msg) { + log(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg) { + log(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg, @Nullable Throwable e) { + log(msg); + + if (e != null) + log(e.toString()); + } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + log(msg); + } + + /** {@inheritDoc} */ + @Override public void error(String msg, @Nullable Throwable e) { + log(msg); + + if (e != null) + log(e.toString()); + } + + /** {@inheritDoc} */ + @Override public boolean isTraceEnabled() { + return dbg; + } + + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return dbg; + } + + /** {@inheritDoc} */ + @Override public boolean isInfoEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isQuiet() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public String fileName() { + return null; + } + + /** + * Resets logger. + */ + public void reset() { + buf.setLength(0); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestClassLoader.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestClassLoader.java new file mode 100644 index 0000000..cf4bdd4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestClassLoader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.apache.ignite.internal.util.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Test class loader. + */ +public class GridTestClassLoader extends ClassLoader { + /** */ + private final Map<String, String> rsrcs; + + /** */ + private final String[] clsNames; + + /** + * @param clsNames Test Class names. + */ + public GridTestClassLoader(String... clsNames) { + this(null, GridTestClassLoader.class.getClassLoader(), clsNames); + } + + /** + * @param clsNames Test Class name. + * @param rsrcs Resources. + */ + public GridTestClassLoader(Map<String, String> rsrcs, String... clsNames) { + this(rsrcs, GridTestClassLoader.class.getClassLoader(), clsNames); + } + + /** + * @param clsNames Test Class name. + * @param rsrcs Resources. + * @param parent Parent class loader. + */ + public GridTestClassLoader(@Nullable Map<String, String> rsrcs, ClassLoader parent, String... clsNames) { + super(parent); + + this.rsrcs = rsrcs; + this.clsNames = clsNames; + } + + /** {@inheritDoc} */ + @Override protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + Class<?> res = findLoadedClass(name); + + if (res != null) + return res; + + boolean patch = false; + + for (String clsName : clsNames) + if (name.equals(clsName)) + patch = true; + + if (patch) { + String path = name.replaceAll("\\.", "/") + ".class"; + + InputStream in = getResourceAsStream(path); + + if (in != null) { + GridByteArrayList bytes = new GridByteArrayList(1024); + + try { + bytes.readAll(in); + } + catch (IOException e) { + throw new ClassNotFoundException("Failed to upload class ", e); + } + + return defineClass(name, bytes.internalArray(), 0, bytes.size()); + } + + throw new ClassNotFoundException("Failed to upload resource [class=" + path + ", parent classloader=" + + getParent() + ']'); + } + + // Maybe super knows. + return super.loadClass(name, resolve); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public InputStream getResourceAsStream(String name) { + if (rsrcs != null && rsrcs.containsKey(name)) + return new StringBufferInputStream(rsrcs.get(name)); + + return getParent().getResourceAsStream(name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestExternalClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestExternalClassLoader.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestExternalClassLoader.java new file mode 100644 index 0000000..9170efd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestExternalClassLoader.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Utility classloader that has ability to load classes from external resources. + */ +@SuppressWarnings({"CustomClassloader"}) +public class GridTestExternalClassLoader extends URLClassLoader { + /** */ + private Set<String> excludeClassNames; + + /** */ + private Map<String, byte[]> resourceMap; + + /** */ + private long timeout; + + /** + * Constructor. + * @param urls the URLs from which to load classes and resources. + * @param excludeClassNames list of excluded classes. + */ + public GridTestExternalClassLoader(URL[] urls, String... excludeClassNames) { + this(urls, Collections.<String, byte[]>emptyMap(), excludeClassNames); + } + + /** + * Constructor. + * @param urls the URLs from which to load classes and resources. + * @param resourceMap mapped resources. + */ + public GridTestExternalClassLoader(URL[] urls, Map<String, byte[]> resourceMap) { + this(urls, resourceMap, Collections.<String>emptySet()); + } + + /** + * Constructor. + * @param urls the URLs from which to load classes and resources. + * @param resourceMap Resource map. + * @param excludeClassNames list of excluded classes. + */ + public GridTestExternalClassLoader(URL[] urls, Map<String, byte[]> resourceMap, String... excludeClassNames) { + this(urls, resourceMap, new HashSet<>(Arrays.asList(excludeClassNames))); + } + + /** + * Constructor. + * @param urls the URLs from which to load classes and resources. + * @param resourceMap Resource map. + * @param excludeClassNames list of excluded classes. + */ + public GridTestExternalClassLoader(URL[] urls, Map<String, byte[]> resourceMap, Set<String> excludeClassNames) { + super(urls, GridTestExternalClassLoader.class.getClassLoader()); + + this.excludeClassNames = excludeClassNames; + + assert resourceMap != null; + + this.resourceMap = resourceMap; + } + + /** + * Sets set of excluded resource paths. + * @param excludeClassNames excluded resource paths. + */ + public void setExcludeClassNames(Set<String> excludeClassNames) { + this.excludeClassNames = excludeClassNames; + } + + /** + * Sets set of excluded resource paths. + * @param excludeClassNames excluded resource paths. + */ + public void setExcludeClassNames(String... excludeClassNames) { + setExcludeClassNames(new HashSet<>(Arrays.asList(excludeClassNames))); + } + + /** + * @param timeout Timeout. + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + + /** + * Sleep {@code timeout} period of time. + */ + private void doTimeout() { + try { + Thread.sleep(timeout); + } + catch (InterruptedException e) { + throw new RuntimeException("Thread was interrupted", e); + } + } + + /** + * @param resName Resource name. + * @return Class name. + */ + private String resNameToClassName(String resName) { + if (resName.endsWith(".class")) + resName = resName.substring(0, resName.length() - ".class".length()); + + return resName.replace('/', '.'); + } + + /** {@inheritDoc} */ + @Override protected Class<?> findClass(String name) throws ClassNotFoundException { + for (String s : excludeClassNames) + if (s.equals(name)) + throw new ClassNotFoundException(name); + + return super.findClass(name); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"NonSynchronizedMethodOverridesSynchronizedMethod"}) + @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (excludeClassNames.contains(name)) + throw new ClassNotFoundException(name); + + return super.loadClass(name, resolve); + } + + /** {@inheritDoc} */ + @Nullable @Override public URL findResource(String name) { + if (excludeClassNames.contains(resNameToClassName(name))) + return null; + + return super.findResource(name); + } + + /** {@inheritDoc} */ + @Override public InputStream getResourceAsStream(String name) { + doTimeout(); + + byte[] res = resourceMap.get(name); + + return res == null ? super.getResourceAsStream(name) : new ByteArrayInputStream(res); + } + + /** + * @param resourceMap mapped resources. + */ + public void setResourceMap(Map<String, byte[]> resourceMap) { + this.resourceMap = resourceMap; + } + + /** + * Returns an Enumeration of URLs representing all of the resources on the URL search path having the specified name. + * + * @param name the resource name. + * @return an {@code Enumeration} of {@code URL}s. + * @throws IOException if an I/O exception occurs. + */ + @Override public Enumeration<URL> findResources(String name) throws IOException { + if (excludeClassNames.contains(resNameToClassName(name))) { + return new Enumeration<URL>() { + @Override public boolean hasMoreElements() { + return false; + } + + @Override public URL nextElement() { + throw new UnsupportedOperationException(); + } + }; + } + + return super.findResources(name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java new file mode 100644 index 0000000..0067f04 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.product.*; +import org.apache.ignite.internal.util.lang.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.product.IgniteProductVersion.*; + +/** + * Test node. + */ +public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNode { + /** */ + private static final IgniteProductVersion VERSION = fromString("99.99.99"); + + /** */ + private static final AtomicInteger consistentIdCtr = new AtomicInteger(); + + /** */ + private String addr; + + /** */ + private String hostName; + + /** */ + private Map<String, Object> attrs = new HashMap<>(); + + /** */ + private UUID id; + + /** */ + private Object consistentId = consistentIdCtr.incrementAndGet(); + + /** */ + private ClusterNodeMetrics metrics; + + /** */ + private long order; + + /** */ + public GridTestNode() { + // No-op. + + initAttributes(); + } + + /** + * @param id Node ID. + */ + public GridTestNode(UUID id) { + this.id = id; + + initAttributes(); + } + + /** */ + private void initAttributes() { + attrs.put(GridNodeAttributes.ATTR_BUILD_VER, "10"); + attrs.put(GridNodeAttributes.ATTR_GRID_NAME, "null"); + } + + /** + * @param id Node ID. + * @param metrics Node metrics. + */ + public GridTestNode(UUID id, ClusterNodeMetrics metrics) { + this.id = id; + this.metrics = metrics; + + initAttributes(); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + assert id != null; + + return id; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistentId; + } + + /** + * @param addr Address. + */ + public void setPhysicalAddress(String addr) { + this.addr = addr; + } + + /** + * @param hostName Host name. + */ + public void setHostName(String hostName) { + this.hostName = hostName; + } + + /** {@inheritDoc} */ + @Override @SuppressWarnings("unchecked") + public <T> T attribute(String name) { + assert name != null; + + return (T)attrs.get(name); + } + + /** + * @param name Name. + * @param val Value. + */ + public void addAttribute(String name, Object val) { + attrs.put(name, val); + } + + /** + * @param id ID. + */ + public void setId(UUID id) { + assert id != null; + + this.id = id; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Map<String, Object> attributes() { + return attrs; + } + + /** {@inheritDoc} */ + @Override public Collection<String> addresses() { + return Collections.singletonList(addr); + } + + /** {@inheritDoc} */ + @Override public Collection<String> hostNames() { + return Collections.singletonList(hostName); + } + + /** + * @param key Attribute key. + * @param val Attribute value. + */ + public void setAttribute(String key, Object val) { + attrs.put(key, val); + } + + /** + * @param key Attribute key. + * @return Removed value. + */ + public Object removeAttribute(String key) { + return attrs.remove(key); + } + + /** + * @param attrs Attributes. + */ + public void setAttributes(Map<String, Object> attrs) { + this.attrs.putAll(attrs); + } + + /** {@inheritDoc} */ + @Override public ClusterNodeMetrics metrics() { + return metrics; + } + + /** {@inheritDoc} */ + @Override public long order() { + return order != 0 ? order : (metrics == null ? -1 : metrics.getStartTime()); + } + + /** + * @param order Order. + */ + public void order(long order) { + this.order = order; + } + + /** {@inheritDoc} */ + @Override public IgniteProductVersion version() { + return VERSION; + } + + /** + * Sets node metrics. + * + * @param metrics Node metrics. + */ + public void setMetrics(ClusterNodeMetrics metrics) { + this.metrics = metrics; + } + + /** {@inheritDoc} */ + @Override public boolean isLocal() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDaemon() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isClient() { + return false; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + assert obj instanceof ClusterNode; + + return ((ClusterNode) obj).id().equals(id); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return id.toString(); +// StringBuilder buf = new StringBuilder(); +// +// buf.append(getClass().getSimpleName()); +// buf.append(" [attrs=").append(attrs); +// buf.append(", id=").append(id); +// buf.append(']'); +// +// return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestSafeThreadFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestSafeThreadFactory.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestSafeThreadFactory.java new file mode 100644 index 0000000..9833bb1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestSafeThreadFactory.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Threads factory for safe test-threads management. + */ +public final class GridTestSafeThreadFactory implements ThreadFactory { + /** Collection to hold all started threads across the JVM. */ + private static final BlockingQueue<Thread> startedThreads = new LinkedBlockingQueue<>(); + + /* Lock protection of the started across the JVM threads collection. */ + private static final GridBusyLock startedThreadsLock = new GridBusyLock(); + + /** Threads name prefix. */ + private final String threadName; + + /** Flag to interrupt all factory threads if any thread fails with unexpected exception. */ + private final boolean interruptAll; + + /** Created threads counter. */ + private final AtomicLong cnt = new AtomicLong(); + + /** Collection of ALL created threads in this factory. */ + private final Collection<GridTestThread> threads = new ArrayList<>(); + + /** The first thrown error during threads from this factory execution. */ + private final BlockingQueue<Throwable> errors = new LinkedBlockingQueue<>(); + + /** + * Constructs threads factory for safe test-threads management. + * + * @param threadName threads name prefix. + */ + public GridTestSafeThreadFactory(String threadName) { + this(threadName, true); + } + + /** + * Constructs threads factory for safe test-threads management. + * + * @param threadName Threads name prefix. + * @param interruptAll Interrupt all threads in factory if any thread fails with unexpected exception. + */ + public GridTestSafeThreadFactory(String threadName, boolean interruptAll) { + this.threadName = threadName; + this.interruptAll = interruptAll; + } + + /** + * Create new thread around callable task. + * + * @param c Callable task to execute in the thread. + * @return New thread around callable task. + * @see GridTestThread + */ + public Thread newThread(final Callable<?> c) { + // Create new thread around the task. + GridTestThread thread = new GridTestThread(c, threadName + '-' + cnt.incrementAndGet()) { + @Override protected void onError(Throwable err) { + // Save the exception. + errors.add(err); + + // Interrupt execution of all other threads in this factory. + if (interruptAll) + for (Thread t : threads) + t.interrupt(); + } + + @Override protected void onFinished() { + super.onFinished(); + + // No need to acquire lock here since it is a concurrent collection. + startedThreads.remove(this); + } + }; + + // Add this thread into the collection of managed threads. + startedThreadsLock.enterBusy(); + + try { + startedThreads.add(thread); + } + finally { + startedThreadsLock.leaveBusy(); + } + + // Register new thread in this factory. + threads.add(thread); + + return thread; + } + + /** + * Create new thread around runnable task. + * + * @param r Runnable task to execute in the thread. + * @return New thread around runnable task. + * @see GridTestThread + */ + @Override public Thread newThread(final Runnable r) { + return newThread(GridTestUtils.makeCallable(r, null)); + } + + /** + * Check and throws an exception if happens during this factory threads execution. + * + * @throws Exception If there is error. + */ + public void checkError() throws Exception { + Throwable err = errors.peek(); + if (err != null) { + if (err instanceof Error) + throw (Error)err; + + throw (Exception)err; + } + + for (GridTestThread thread : threads) { + thread.checkError(); + } + } + + /** + * Interrupts all threads, created by this thread factory. + */ + public void interruptAllThreads() { + for (Thread t : threads) + U.interrupt(t); + + try { + for (Thread t : threads) + U.join(t); + } + catch (IgniteInterruptedException ignored) { + // No-op. + } + } + + /** + * Interrupts and waits for termination of all the threads started + * so far by current test. + * + * @param log Logger. + */ + static void stopAllThreads(IgniteLogger log) { + startedThreadsLock.block(); + + List<Thread> all; + + try { + all = new ArrayList<>(startedThreads.size()); + startedThreads.drainTo(all); + } + finally { + startedThreadsLock.unblock(); + } + + boolean aliveThreads = F.forAny( + all, + new P1<Thread>() { + @Override public boolean apply(Thread t) { + return t.isAlive(); + } + } + ); + + if (!aliveThreads) + return; + + U.warn(log, "Interrupting threads started so far: " + all.size()); + + U.interrupt(all); + + U.joinThreads(all, log); + + Iterator<Thread> it = all.iterator(); + + for (Thread thread = it.next(); it.hasNext(); thread = it.next()) + if (!thread.isAlive()) + it.remove(); + + if (all.isEmpty()) + U.warn(log, "Finished interrupting threads."); + else + U.error(log, "Finished interrupting threads, but some threads are still alive" + + " [size=" + all.size() + ", threads=" + all + "]"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bd28003b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestThread.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestThread.java new file mode 100644 index 0000000..8bc8bed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestThread.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testframework; + +import java.util.concurrent.*; + +/** + * Test thread that has convenience failure checks. + */ +@SuppressWarnings({"ProhibitedExceptionThrown", "CatchGenericClass"}) +public class GridTestThread extends Thread { + /** Error. */ + private Throwable err; + + /** Target runnable. */ + private final Runnable run; + + /** Target callable. */ + private final Callable<?> call; + + /** + * @param run Target runnable. + */ + @SuppressWarnings({"NullableProblems"}) + public GridTestThread(Runnable run) { + this(run, null); + } + + /** + * @param call Target callable. + */ + @SuppressWarnings({"NullableProblems"}) + public GridTestThread(Callable<?> call) { + this(call, null); + } + + /** + * @param run Target runnable. + * @param name Thread name. + */ + public GridTestThread(Runnable run, String name) { + assert run != null; + + this.run = run; + + call = null; + + if (name != null) + setName(name); + } + + /** + * @param call Target callable. + * @param name Thread name. + */ + public GridTestThread(Callable<?> call, String name) { + assert call != null; + + this.call = call; + + run = null; + + if (name != null) + setName(name); + } + + /** {@inheritDoc} */ + @Override public final void run() { + try { + if (call != null) + call.call(); + else + run.run(); + } + catch (Throwable e) { + System.err.println("Failure in thread: " + name0()); + + e.printStackTrace(); + + err = e; + + onError(e); + } + finally { + onFinished(); + } + } + + /** + * Callback for subclasses. + */ + protected void onFinished() { + // No-op. + } + + /** + * Callback for subclasses. + * + * @param err Error. + */ + protected void onError(Throwable err) { + assert err != null; + + // No-op. + } + + /** + * @return Error. + */ + public Throwable getError() { + return err; + } + + /** + * @throws Exception If there is error. + */ + public void checkError() throws Exception { + if (err != null) { + if (err instanceof Error) + throw (Error)err; + + throw (Exception)err; + } + } + + /** + * @return Formatted string for current thread. + */ + private String name0() { + return "Thread [id=" + Thread.currentThread().getId() + ", name=" + Thread.currentThread().getName() + ']'; + } +}