http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 27fa473..4cff45b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2689,8 +2689,26 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov msgLsnr.apply(msg); if (redirectToClients(msg)) { - for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) - clientMsgWorker.addMessage(msg); + ByteBuffer marshalledMsg = null; + + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { + // Send a clone to client to avoid ConcurrentModificationException + TcpDiscoveryAbstractMessage msgClone; + + try { + if (marshalledMsg == null) + marshalledMsg = marsh.marshal(msg); + + msgClone = marsh.unmarshal(marshalledMsg, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message: " + msg, e); + + msgClone = msg; + } + + clientMsgWorker.addMessage(msgClone); + } } Collection<TcpDiscoveryNode> failedNodes; @@ -3933,18 +3951,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov long topVer; if (locNodeCoord) { - if (!msg.client() && ipFinder.isShared()) { - try { - ipFinder.unregisterAddresses(leftNode.socketAddresses()); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to unregister left node address: " + leftNode); - - onException("Failed to unregister left node address: " + leftNode, e); - } - } - topVer = ring.incrementTopologyVersion(); msg.topologyVersion(topVer); @@ -4112,20 +4118,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov long topVer; if (locNodeCoord) { - if (!node.isClient() && ipFinder.isShared()) { - try { - ipFinder.unregisterAddresses(node.socketAddresses()); - } - catch (IgniteSpiException e) { - if (log.isDebugEnabled()) - log.debug("Failed to unregister failed node address [node=" + node + - ", err=" + e.getMessage() + ']'); - - onException("Failed to unregister failed node address [node=" + node + - ", err=" + e.getMessage() + ']', e); - } - } - topVer = ring.incrementTopologyVersion(); msg.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java index 6b3f068..c52cbc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java @@ -55,11 +55,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */ public static final long DFLT_SOCK_TIMEOUT = 200; - /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_ACK_TIMEOUT = 5000; + /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */ + public static final long DFLT_ACK_TIMEOUT = 200; - /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */ - public static final long DFLT_NETWORK_TIMEOUT = 5000; + /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */ + public static final long DFLT_NETWORK_TIMEOUT = 200; /** Default value for thread priority (value is <tt>10</tt>). */ public static final int DFLT_THREAD_PRI = 10; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java new file mode 100644 index 0000000..0c4e2d1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Convenience adapter for streamers. Adapters are optional components for + * streaming from different data sources. The purpose of adapters is to + * convert different message formats into Ignite stream key-value tuples + * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}. + */ +public abstract class StreamAdapter<T, K, V> { + /** Tuple extractor. */ + private StreamTupleExtractor<T, K, V> extractor; + + /** Streamer. */ + private IgniteDataStreamer<K, V> stmr; + + /** Ignite. */ + private Ignite ignite; + + /** + * Empty constructor. + */ + protected StreamAdapter() { + // No-op. + } + + /** + * Stream adapter. + * + * @param stmr Streamer. + * @param extractor Tuple extractor. + */ + protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) { + this.stmr = stmr; + this.extractor = extractor; + } + + /** + * @return Provided data streamer. + */ + public IgniteDataStreamer<K, V> getStreamer() { + return stmr; + } + + /** + * @param stmr Ignite data streamer. + */ + public void setStreamer(IgniteDataStreamer<K, V> stmr) { + this.stmr = stmr; + } + + /** + * @return Provided tuple extractor. + */ + public StreamTupleExtractor<T, K, V> getTupleExtractor() { + return extractor; + } + + /** + * @param extractor Extractor for key-value tuples from messages. + */ + public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) { + this.extractor = extractor; + } + + /** + * @return Provided {@link Ignite} instance. + */ + public Ignite getIgnite() { + return ignite; + } + + /** + * @param ignite {@link Ignite} instance. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** + * Converts given message to a tuple and adds it to the underlying streamer. + * + * @param msg Message to convert. + */ + protected void addMessage(T msg) { + Map.Entry<K, V> e = extractor.extract(msg); + + if (e != null) + stmr.addData(e); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java new file mode 100644 index 0000000..d2a4ede --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import java.util.*; + +/** + * Stream tuple extractor to convert messages to Ignite key-value tuples. + */ +public interface StreamTupleExtractor<T, K, V> { + /** + * Extracts a key-value tuple from a message. + * + * @param msg Message. + * @return Key-value tuple. + */ + public Map.Entry<K, V> extract(T msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java new file mode 100644 index 0000000..8161d86 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +/** + * Socket message converter. + */ +public interface SocketMessageConverter<T> { + /** + * Converter message represented by array of bytes to object. + * + * @param msg Message. + * @return Converted object. + */ + public T convert(byte[] msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java new file mode 100644 index 0000000..d308897 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.stream.*; + +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; + +/** + * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and + * streams into {@link IgniteDataStreamer} instance. + * <p> + * By default server uses size-based message processing. That is every message sent over the socket is prepended with + * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then + * delimiter-based message processing will be used. That is every message sent over the socket is appended with + * provided delimiter. + * <p> + * Received messages through socket converts to Java object using standard serialization. Conversion functionality + * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from + * non Java clients). + */ +public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private IgniteLogger log; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Delimiter. */ + private byte[] delim; + + /** Converter. */ + private SocketMessageConverter<T> converter; + + /** Server. */ + private GridNioServer<byte[]> srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Sets message delimiter. + * + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** + * Sets message converter. + * + * @param converter Converter. + */ + public void setConverter(SocketMessageConverter<T> converter) { + this.converter = converter; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getTupleExtractor(), "tupleExtractor"); + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @Override public void onMessage(GridNioSession ses, byte[] msg) { + addMessage(converter.convert(msg)); + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); + + if (converter == null) + converter = new DefaultConverter<>(); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder<byte[]>() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } + + /** + * Converts message to Java object using Jdk marshaller. + */ + private static class DefaultConverter<T> implements SocketMessageConverter<T> { + /** Marshaller. */ + private static final JdkMarshaller MARSH = new JdkMarshaller(); + + /** {@inheritDoc} */ + @Override public T convert(byte[] msg) { + try { + return MARSH.unmarshal(ByteBuffer.wrap(msg), null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..e1cef65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains socket streamer implementation. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 1d4a652..4f4c1ae 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1140,7 +1140,6 @@ org.apache.ignite.internal.util.lang.GridAbsClosure org.apache.ignite.internal.util.lang.GridAbsClosureX org.apache.ignite.internal.util.lang.GridCloseableIterator org.apache.ignite.internal.util.lang.GridClosureException -org.apache.ignite.internal.util.lang.GridComputeJobWrapper org.apache.ignite.internal.util.lang.GridFunc$1 org.apache.ignite.internal.util.lang.GridFunc$10 org.apache.ignite.internal.util.lang.GridFunc$100 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java new file mode 100644 index 0000000..8c7d33d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.local.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Cache map entry self test. + */ +public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + // No-op. + } + + /** + * @param gridName Grid name. + * @param memoryMode Memory mode. + * @param atomicityMode Atomicity mode. + * @param cacheMode Cache mode. + * @param cacheName Cache name. + * @return Cache configuration. + * @throws Exception If failed. + */ + private CacheConfiguration cacheConfiguration(String gridName, + CacheMemoryMode memoryMode, + CacheAtomicityMode atomicityMode, + CacheMode cacheMode, + String cacheName) + throws Exception + { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(cacheMode); + cfg.setAtomicityMode(atomicityMode); + cfg.setMemoryMode(memoryMode); + cfg.setName(cacheName); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testCacheMapEntry() throws Exception { + checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class); + + checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, PARTITIONED, GridNearCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class); + + checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class); + + checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class); + + checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class); + + checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class); + } + + /** + * @param memoryMode Cache memory mode. + * @param atomicityMode Cache atomicity mode. + * @param cacheMode Cache mode. + * @param entryCls Class of cache map entry. + * @throws Exception If failed. + */ + private void checkCacheMapEntry(CacheMemoryMode memoryMode, + CacheAtomicityMode atomicityMode, + CacheMode cacheMode, + Class<?> entryCls) + throws Exception + { + log.info("Test cache [memMode=" + memoryMode + + ", atomicityMode=" + atomicityMode + + ", cacheMode=" + cacheMode + ']'); + + CacheConfiguration cfg = cacheConfiguration(grid(0).name(), + memoryMode, + atomicityMode, + cacheMode, + "Cache"); + + try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) { + GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName()); + + Integer key = primaryKey(grid(0).cache(null)); + + cache.put(key, "val"); + + GridCacheEntryEx entry = cache.entryEx(key); + + entry.unswap(true); + + assertNotNull(entry); + + assertEquals(entry.getClass(), entryCls); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java new file mode 100644 index 0000000..f5de96f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test remove all method. + */ +public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60000; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + IgniteCache<Integer, String> cache = grid(0).cache(null); + + for (int i = 0; i < 10_000; ++i) + cache.put(i, "val"); + + final AtomicInteger igniteId = new AtomicInteger(gridCount()); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 2; ++i) + startGrid(igniteId.getAndIncrement()); + + return true; + } + }, 3, "start-node-thread"); + + cache.removeAll(); + + fut.get(); + + U.sleep(5000); + + for (int i = 0; i < igniteId.get(); ++i) { + IgniteCache locCache = grid(i).cache(null); + + assertEquals("Local size: " + locCache.localSize() + "\n" + + "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" + + "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" + + "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" + + "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" + + "Backup: " + locCache.localSize(CachePeekMode.BACKUP), + 0, locCache.localSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java index 5389ef9..5d9ad35 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java @@ -25,12 +25,12 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.testframework.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import javax.cache.*; -import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; @@ -70,6 +70,12 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac cfg.getTransactionConfiguration().setTxSerializableEnabled(true); + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + discoSpi.setSocketTimeout(10_000); + discoSpi.setAckTimeout(10_000); + discoSpi.setNetworkTimeout(10_000); + return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java index 85e2c7c..7e65f23 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -106,6 +105,10 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs disco.setIpFinder(ipFinder); + disco.setSocketTimeout(30_000); + disco.setAckTimeout(30_000); + disco.setNetworkTimeout(30_000); + c.setDiscoverySpi(disco); return c; @@ -512,7 +515,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs try { cache.put(key, Integer.toString(key)); } - catch (TransactionRollbackException | ClusterTopologyException | CacheException ignored) { + catch (IgniteException | CacheException ignored) { // It is ok if primary node leaves grid. } @@ -668,7 +671,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs tx.commit(); } } - catch (ClusterTopologyException | CacheException ignored) { + catch (IgniteException | CacheException ignored) { // It is ok if primary node leaves grid. } @@ -814,7 +817,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs tx.commit(); } - catch (ClusterTopologyException | CacheException ignored) { + catch (IgniteException | CacheException ignored) { // It is ok if primary node leaves grid. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java index ab0f7d0..2fe76e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java @@ -501,8 +501,6 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest { * @throws Throwable If failed. */ public void testLockReentrancy() throws Throwable { - fail("https://issues.apache.org/jira/browse/IGNITE-835"); - Affinity<Integer> aff = ignite1.affinity(null); for (int i = 10; i < 100; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java new file mode 100644 index 0000000..469f513 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxGetAfterStopTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteTxGetAfterStopTest extends IgniteCacheAbstractTest { + /** */ + private CacheMode cacheMode; + + /** */ + private NearCacheConfiguration nearCfg; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return cacheMode; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return nearCfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + getAfterStop(REPLICATED, null); + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + getAfterStop(PARTITIONED, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedNearDisabled() throws Exception { + getAfterStop(PARTITIONED, null); + } + + /** + * @param cacheMode Cache mode. + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void getAfterStop(CacheMode cacheMode, @Nullable NearCacheConfiguration nearCfg) throws Exception { + this.cacheMode = cacheMode; + this.nearCfg = nearCfg; + + startGrids(); + + IgniteCache<Integer, Integer> cache0 = jcache(0); + IgniteCache<Integer, Integer> cache1 = jcache(1); + + Integer key0 = primaryKey(cache0); + Integer key1 = primaryKey(cache1); + + try (Transaction tx = ignite(0).transactions().txStart()) { + log.info("Put: " + key0); + + cache0.put(key0, key0); + + log.info("Stop node."); + + stopGrid(3); + + log.info("Get: " + key1); + + cache0.get(key1); + + log.info("Commit."); + + tx.commit(); + } + + assertEquals(key0, cache0.get(key0)); + assertNull(cache1.get(key1)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java new file mode 100644 index 0000000..69c7909 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedNearDisabledLockSelfTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; + +/** + * + */ +public class GridCachePartitionedNearDisabledLockSelfTest extends GridCachePartitionedLockSelfTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + CacheConfiguration ccfg = super.cacheConfiguration(); + + assertNotNull(ccfg.getNearConfiguration()); + + ccfg.setNearConfiguration(null); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected boolean isPartitioned() { + return false; + } + + /** {@inheritDoc} */ + @Override public void testLockReentrancy() throws Throwable { + fail("https://issues.apache.org/jira/browse/IGNITE-835"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index ee2f16b..f996877 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -199,7 +199,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync(); + IgniteInternalFuture<?> prepFut = txEx.prepareAsync(); waitPrepared(ignite(1)); @@ -360,7 +360,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. - IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync(); + IgniteInternalFuture<?> prepFut = txEx.prepareAsync(); waitPrepared(ignite(1)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java index 0a2781b..1e57c09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicReplicatedFailoverSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import static org.apache.ignite.cache.CacheMode.*; @@ -29,4 +30,9 @@ public class GridCacheAtomicReplicatedFailoverSelfTest extends GridCacheAtomicFa @Override protected CacheMode cacheMode() { return REPLICATED; } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java index 5072642..6192f39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxSalvageSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -35,6 +34,8 @@ import org.apache.ignite.transactions.*; import java.util.*; import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -76,10 +77,10 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes CacheConfiguration cc = defaultCacheConfiguration(); - cc.setCacheMode(CacheMode.PARTITIONED); + cc.setCacheMode(PARTITIONED); cc.setAffinity(new RendezvousAffinityFunction(false, 18)); cc.setBackups(1); - cc.setRebalanceMode(CacheRebalanceMode.SYNC); + cc.setRebalanceMode(SYNC); c.setCacheConfiguration(cc); @@ -109,8 +110,9 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - // Shutwodn the gird. stopAllGrids(); + + System.gc(); } /** @@ -145,8 +147,8 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * Check whether caches has no transactions after salvage timeout. * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). - * @param prepare Whether to preapre transaction state - * (i.e. call {@link org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx#prepare()}). + * @param prepare Whether to prepare transaction state + * (i.e. call {@link IgniteInternalTx#prepare()}). * @throws Exception If failed. */ private void checkSalvageAfterTimeout(TransactionConcurrency mode, boolean prepare) throws Exception { @@ -164,8 +166,8 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * Check whether caches still has all transactions before salvage timeout. * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). - * @param prepare Whether to preapre transaction state - * (i.e. call {@link org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx#prepare()}). + * @param prepare Whether to prepare transaction state + * (i.e. call {@link IgniteInternalTx#prepare()}). * @throws Exception If failed. */ private void checkSalvageBeforeTimeout(TransactionConcurrency mode, boolean prepare) throws Exception { @@ -191,8 +193,8 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes * Start new transaction on the grid(0) and put some keys to it. * * @param mode Transaction mode (PESSIMISTIC, OPTIMISTIC). - * @param prepare Whether to preapre transaction state - * (i.e. call {@link org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx#prepare()}). + * @param prepare Whether to prepare transaction state + * (i.e. call {@link IgniteInternalTx#prepare()}). * @throws Exception If failed. */ private void startTxAndPutKeys(final TransactionConcurrency mode, final boolean prepare) throws Exception { @@ -210,9 +212,8 @@ public class GridCachePartitionedTxSalvageSelfTest extends GridCommonAbstractTes for (Integer key : keys) c.put(key, "val" + key); - // Unproxy. if (prepare) - U.<IgniteInternalTx>field(tx, "tx").prepare(); + ((TransactionProxyImpl)tx).tx().prepare(); } catch (IgniteCheckedException e) { info("Failed to put keys to cache: " + e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java new file mode 100644 index 0000000..d61ddcc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Test none rebalance mode. + */ +public class NoneRebalanceModeSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setRebalanceMode(NONE); + + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + GridNearTransactionalCache cache = (GridNearTransactionalCache)((IgniteKernal)grid(0)).internalCache(null); + + for (GridDhtLocalPartition part : cache.dht().topology().localPartitions()) + assertEquals(MOVING, part.state()); + + grid(0).cache(null).removeAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java index 326f57d..3461dd4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedFailoverSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.replicated; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.*; import static org.apache.ignite.cache.CacheMode.*; @@ -30,4 +31,9 @@ public class GridCacheReplicatedFailoverSelfTest extends GridCacheAbstractFailov @Override protected CacheMode cacheMode() { return REPLICATED; } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedLockSelfTest.java index 97df1f0..510fa0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedLockSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedLockSelfTest.java @@ -30,4 +30,9 @@ public class GridCacheReplicatedLockSelfTest extends GridCacheLockAbstractTest { @Override protected CacheMode cacheMode() { return REPLICATED; } + + /** {@inheritDoc} */ + @Override public void testLockReentrancy() throws Throwable { + fail("https://issues.apache.org/jira/browse/IGNITE-835"); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java index 0023160..8ce96cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedNodeRestartSelfTest.java @@ -54,4 +54,84 @@ public class GridCacheReplicatedNodeRestartSelfTest extends GridCacheAbstractNod return c; } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutTwoNodesNoBackups() throws Throwable { + super.testRestartWithPutTwoNodesNoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutTwoNodesOneBackup() throws Throwable { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutFourNodesOneBackups() throws Throwable { + super.testRestartWithPutFourNodesOneBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutFourNodesNoBackups() throws Throwable { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable { + super.testRestartWithPutSixNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable { + super.testRestartWithPutEightNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithPutTenNodesTwoBackups() throws Throwable { + super.testRestartWithPutTenNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxTwoNodesNoBackups() throws Throwable { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxTwoNodesOneBackup() throws Throwable { + super.testRestartWithTxTwoNodesOneBackup(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxFourNodesOneBackups() throws Throwable { + super.testRestartWithTxFourNodesOneBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxFourNodesNoBackups() throws Throwable { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxSixNodesTwoBackups() throws Throwable { + super.testRestartWithTxSixNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxEightNodesTwoBackups() throws Throwable { + super.testRestartWithTxEightNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxTenNodesTwoBackups() throws Throwable { + super.testRestartWithTxTenNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxPutAllTenNodesTwoBackups() throws Throwable { + super.testRestartWithTxPutAllTenNodesTwoBackups(); + } + + /** {@inheritDoc} */ + @Override public void testRestartWithTxPutAllFourNodesTwoBackups() throws Throwable { + super.testRestartWithTxPutAllFourNodesTwoBackups(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalIsolatedNodesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalIsolatedNodesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalIsolatedNodesSelfTest.java index dbfdc86..ee42ddb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalIsolatedNodesSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalIsolatedNodesSelfTest.java @@ -40,7 +40,7 @@ public class GridCacheLocalIsolatedNodesSelfTest extends GridCommonAbstractTest /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - startGrids(2); + startGrids(3); } /** {@inheritDoc} */ @@ -59,7 +59,11 @@ public class GridCacheLocalIsolatedNodesSelfTest extends GridCommonAbstractTest Ignite g2 = grid(1); UUID nid2 = g2.cluster().localNode().id(); + Ignite g3 = grid(2); + UUID nid3 = g3.cluster().localNode().id(); + assert !nid1.equals(nid2); + assert !nid1.equals(nid3); // Local cache on first node only. CacheConfiguration<String, String> ccfg1 = new CacheConfiguration<>("A"); @@ -77,8 +81,20 @@ public class GridCacheLocalIsolatedNodesSelfTest extends GridCommonAbstractTest IgniteCache<String, String> c2 = g2.createCache(ccfg2); c2.put("g2", "c2"); + // Local cache on third node only. + CacheConfiguration<String, String> ccfg3 = new CacheConfiguration<>("A"); + ccfg3.setCacheMode(LOCAL); + ccfg3.setNodeFilter(new NodeIdFilter(nid3)); + + IgniteCache<String, String> c3 = g3.createCache(ccfg3); + c3.put("g3", "c3"); + assertNull(c1.get("g2")); + assertNull(c1.get("g3")); assertNull(c2.get("g1")); + assertNull(c2.get("g3")); + assertNull(c3.get("g1")); + assertNull(c3.get("g2")); } /** Filter by node ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java new file mode 100644 index 0000000..a0dd2e5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import junit.framework.TestCase; + +import java.nio.*; +import java.util.*; + +/** + * Tests for {@link GridNioDelimitedBuffer}. + */ +public class GridNioDelimitedBufferTest extends TestCase { + /** */ + private static final String ASCII = "ASCII"; + + /** + * Tests simple delimiter (excluded from alphabet) + */ + public void testReadZString() throws Exception { + Random rnd = new Random(); + + int buffSize = 0; + + byte[] delim = new byte[] {0}; + + List<String> strs = new ArrayList<>(50); + + for (int i = 0; i < 50; i++) { + int len = rnd.nextInt(128) + 1; + + buffSize += len + delim.length; + + StringBuilder sb = new StringBuilder(len); + + for (int j = 0; j < len; j++) + sb.append((char)(rnd.nextInt(26) + 'a')); + + + strs.add(sb.toString()); + } + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List<String> res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } + + /** + * Tests compound delimiter (included to alphabet) + */ + public void testDelim() throws Exception { + byte[] delim = "aabb".getBytes(ASCII); + + List<String> strs = Arrays.asList("za", "zaa", "zaab", "zab", "zaabaababbbbabaab"); + + int buffSize = 0; + + for (String str : strs) + buffSize += str.length() + delim.length; + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List<String> res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java new file mode 100644 index 0000000..e6bee4a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryRestartTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.eclipse.jetty.util.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class TcpDiscoveryRestartTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static AtomicReference<String> err; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + int[] evts = {EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}; + + cfg.setIncludeEventTypes(evts); + + Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); + + lsnrs.put(new TestEventListener(), evts); + + cfg.setLocalEventListeners(lsnrs); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testRestart() throws Exception { + err = new AtomicReference<>(); + + final int NODE_CNT = 3; + + startGrids(NODE_CNT); + + final ConcurrentHashSet<UUID> nodeIds = new ConcurrentHashSet<>(); + + final AtomicInteger id = new AtomicInteger(NODE_CNT); + + final IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = id.getAndIncrement(); + + for (int i = 0; i < 10 && err.get() == null; i++) { + Ignite ignite = startGrid(nodeIdx); + + UUID nodeId = ignite.cluster().localNode().id(); + + if (!nodeIds.add(nodeId)) + failed("Duplicated node ID: " + nodeId); + + stopGrid(nodeIdx); + } + + return null; + } + }, 5, "restart-thread"); + + IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(new Callable<Long>() { + @Override public Long call() throws Exception { + long dummyRes = 0; + + List<String> list = new ArrayList<>(); + + while (!fut.isDone()) { + for (int i = 0; i < 100; i++) { + String str = new String(new byte[i]); + + list.add(str); + + dummyRes += str.hashCode(); + } + + if (list.size() > 1000_000) { + list = new ArrayList<>(); + + System.gc(); + } + } + + return dummyRes; + } + }, 2, "test-load"); + + fut.get(); + + loadFut.get(); + + assertNull(err.get()); + + for (int i = 0; i < NODE_CNT; i++) { + Ignite ignite = ignite(i); + + TestEventListener lsnr = (TestEventListener)F.firstKey(ignite.configuration().getLocalEventListeners()); + + assertNotNull(lsnr); + + for (UUID nodeId : nodeIds) + lsnr.checkEvents(nodeId); + } + } + + + /** + * @param msg Message. + */ + private void failed(String msg) { + info(msg); + + err.compareAndSet(null, msg); + } + + /** + * + */ + private class TestEventListener implements IgnitePredicate<Event> { + /** */ + private final ConcurrentHashSet<UUID> joinIds = new ConcurrentHashSet<>(); + + /** */ + private final ConcurrentHashSet<UUID> leftIds = new ConcurrentHashSet<>(); + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + DiscoveryEvent evt0 = (DiscoveryEvent)evt; + + if (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) { + if (!leftIds.add(evt0.eventNode().id())) + failed("Duplicated failed node ID: " + evt0.eventNode().id()); + } + else { + assertEquals(EVT_NODE_JOINED, evt.type()); + + if (!joinIds.add(evt0.eventNode().id())) + failed("Duplicated joined node ID: " + evt0.eventNode().id()); + } + + return true; + } + + /** + * @param nodeId Node ID. + */ + void checkEvents(UUID nodeId) { + assertTrue("No join event: " + nodeId, joinIds.contains(nodeId)); + + assertTrue("No left event: " + nodeId, leftIds.contains(nodeId)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java new file mode 100644 index 0000000..b4a6923 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.testframework.junits.common.*; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Tests {@link SocketStreamer}. + */ +public class SocketStreamerSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private final static int GRID_CNT = 3; + + /** Count. */ + private static final int CNT = 500; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0}; + + /** Port. */ + private static int port; + + /** Ignite. */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration ccfg = cacheConfiguration(cfg, null); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrids(GRID_CNT); + ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + + try (ServerSocket sock = new ServerSocket(0)) { + port = sock.getLocalPort(); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite.cache(null).clear(); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedDefaultConverter() throws Exception { + test(null, null, new Runnable() { + @Override public void run() { + try (Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = U.toArray(marsh.marshal(new Tuple(i))); + + os.write(msg.length >>> 24); + os.write(msg.length >>> 16); + os.write(msg.length >>> 8); + os.write(msg.length); + + os.write(msg); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedCustomConverter() throws Exception { + SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, null, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(0); + os.write(0); + os.write(0); + os.write(4); + + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedDefaultConverter() throws Exception { + test(null, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = U.toArray(marsh.marshal(new Tuple(i))); + + os.write(msg); + os.write(DELIM); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedCustomConverter() throws Exception { + SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + + os.write(DELIM); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @param converter Converter. + * @param r Runnable.. + */ + private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception + { + SocketStreamer<Tuple, Integer, String> sockStmr = null; + + try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + sockStmr = new SocketStreamer<>(); + + IgniteCache<Integer, String> cache = ignite.cache(null); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setPort(port); + + sockStmr.setDelimiter(delim); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() { + @Override public Map.Entry<Integer, String> extract(Tuple msg) { + return new IgniteBiTuple<>(msg.key, msg.val); + } + }); + + if (converter != null) + sockStmr.setConverter(converter); + + final CountDownLatch latch = new CountDownLatch(CNT); + + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + + sockStmr.start(); + + r.run(); + + latch.await(); + + assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); + + for (int i = 0; i < CNT; i++) + assertEquals(Integer.toString(i), cache.get(i)); + } + finally { + if (sockStmr != null) + sockStmr.stop(); + } + + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Key. */ + private final int key; + + /** Value. */ + private final String val; + + /** + * @param key Key. + */ + Tuple(int key) { + this.key = key; + this.val = Integer.toString(key); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9433882d/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..2e28469 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains tests for socket streamer. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file