ignite-471: fixed NPE in PortableMarshaller
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b84cef7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b84cef7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b84cef7d Branch: refs/heads/ignite-471 Commit: b84cef7dcebb6f949d89af2c3c424aa182b6c315 Parents: 095e3f9 da5a228 Author: dmagda <magda7...@gmail.com> Authored: Sat May 16 00:20:05 2015 +0300 Committer: dmagda <magda7...@gmail.com> Committed: Sat May 16 00:20:05 2015 +0300 ---------------------------------------------------------------------- assembly/release-base.xml | 4 +- bin/ignite-schema-import.bat | 2 +- bin/ignite-schema-import.sh | 2 +- bin/ignite.bat | 2 +- bin/ignite.sh | 2 +- bin/ignitevisorcmd.bat | 2 +- bin/ignitevisorcmd.sh | 2 +- bin/include/build-classpath.bat | 46 + bin/include/build-classpath.sh | 71 ++ bin/include/functions.sh | 2 +- bin/include/target-classpath.bat | 46 - bin/include/target-classpath.sh | 71 -- .../streaming/StreamTransformerExample.java | 4 +- .../streaming/StreamVisitorExample.java | 4 +- .../ignite/examples/streaming/package-info.java | 1 - .../streaming/wordcount/CacheConfig.java | 2 +- .../streaming/wordcount/QueryWords.java | 12 +- .../streaming/wordcount/StreamWords.java | 12 +- .../streaming/wordcount/package-info.java | 1 - .../socket/WordsSocketStreamerClient.java | 82 ++ .../socket/WordsSocketStreamerServer.java | 124 +++ .../wordcount/socket/package-info.java | 21 + .../org/apache/ignite/internal/IgnitionEx.java | 136 +-- .../internal/direct/DirectByteBufferStream.java | 4 +- .../internal/interop/InteropBootstrap.java | 34 + .../interop/InteropBootstrapFactory.java | 39 + .../internal/interop/InteropIgnition.java | 103 ++ .../internal/interop/InteropProcessor.java | 25 + .../eventstorage/GridEventStorageManager.java | 5 +- .../processors/cache/GridCacheAdapter.java | 127 +-- .../processors/cache/GridCacheContext.java | 7 + .../processors/cache/GridCacheMapEntry.java | 106 +- .../processors/cache/GridCacheMvccManager.java | 4 +- .../GridDistributedCacheAdapter.java | 210 ++-- .../distributed/GridDistributedLockRequest.java | 111 +- .../GridDistributedTxFinishRequest.java | 70 +- .../GridDistributedTxPrepareRequest.java | 112 +- .../GridDistributedTxRemoteAdapter.java | 20 +- .../distributed/dht/GridDhtCacheAdapter.java | 16 +- .../distributed/dht/GridDhtLockFuture.java | 2 - .../distributed/dht/GridDhtLockRequest.java | 45 +- .../dht/GridDhtOffHeapCacheEntry.java | 63 ++ .../dht/GridDhtTransactionalCacheAdapter.java | 15 +- .../distributed/dht/GridDhtTxFinishFuture.java | 3 - .../distributed/dht/GridDhtTxFinishRequest.java | 43 +- .../cache/distributed/dht/GridDhtTxLocal.java | 38 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 95 +- .../cache/distributed/dht/GridDhtTxMapping.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 99 +- .../dht/GridDhtTxPrepareRequest.java | 60 +- .../cache/distributed/dht/GridDhtTxRemote.java | 8 +- .../distributed/dht/GridNoStorageCacheMap.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 3 + .../atomic/GridDhtAtomicOffHeapCacheEntry.java | 63 ++ .../dht/colocated/GridDhtColocatedCache.java | 5 +- .../colocated/GridDhtColocatedLockFuture.java | 31 +- .../GridDhtColocatedOffHeapCacheEntry.java | 63 ++ .../colocated/GridDhtDetachedCacheEntry.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../distributed/near/GridNearCacheAdapter.java | 3 + .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearLockFuture.java | 11 - .../distributed/near/GridNearLockRequest.java | 61 +- .../near/GridNearOffHeapCacheEntry.java | 60 + .../near/GridNearOptimisticTxPrepareFuture.java | 768 +++++++++++++ .../GridNearPessimisticTxPrepareFuture.java | 347 ++++++ .../near/GridNearTransactionalCache.java | 4 - .../near/GridNearTxFinishRequest.java | 28 +- .../cache/distributed/near/GridNearTxLocal.java | 104 +- .../near/GridNearTxPrepareFuture.java | 1050 ------------------ .../near/GridNearTxPrepareFutureAdapter.java | 226 ++++ .../near/GridNearTxPrepareRequest.java | 52 +- .../distributed/near/GridNearTxRemote.java | 24 +- .../cache/local/GridLocalCacheEntry.java | 18 + .../cache/transactions/IgniteInternalTx.java | 14 +- .../transactions/IgniteTransactionsImpl.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 74 +- .../cache/transactions/IgniteTxEntry.java | 48 +- .../cache/transactions/IgniteTxHandler.java | 74 +- .../transactions/IgniteTxLocalAdapter.java | 167 +-- .../cache/transactions/IgniteTxLocalEx.java | 21 +- .../cache/transactions/IgniteTxManager.java | 74 +- .../processors/resource/GridResourceField.java | 16 +- .../processors/resource/GridResourceIoc.java | 389 +++---- .../processors/resource/GridResourceMethod.java | 13 + .../resource/GridResourceProcessor.java | 20 +- .../ignite/internal/util/IgniteUtils.java | 22 +- .../util/lang/GridComputeJobWrapper.java | 96 -- .../internal/util/nio/GridBufferedParser.java | 4 - .../internal/util/nio/GridDelimitedParser.java | 91 ++ .../util/nio/GridNioDelimitedBuffer.java | 106 ++ .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 48 +- .../discovery/tcp/TcpDiscoverySpiAdapter.java | 8 +- .../org/apache/ignite/stream/StreamAdapter.java | 111 ++ .../ignite/stream/StreamTupleExtractor.java | 33 + .../stream/socket/SocketMessageConverter.java | 31 + .../ignite/stream/socket/SocketStreamer.java | 218 ++++ .../ignite/stream/socket/package-info.java | 21 + .../resources/META-INF/classnames.properties | 1 - .../cache/CacheOffheapMapEntrySelfTest.java | 168 +++ .../cache/CacheRemoveAllSelfTest.java | 81 ++ .../GridCacheAbstractFailoverSelfTest.java | 8 +- .../GridCacheAbstractNodeRestartSelfTest.java | 11 +- .../distributed/GridCacheLockAbstractTest.java | 2 - .../distributed/IgniteTxGetAfterStopTest.java | 131 +++ ...achePartitionedNearDisabledLockSelfTest.java | 47 + ...ePrimaryNodeFailureRecoveryAbstractTest.java | 4 +- ...idCacheAtomicReplicatedFailoverSelfTest.java | 6 + .../GridCachePartitionedTxSalvageSelfTest.java | 25 +- .../near/NoneRebalanceModeSelfTest.java | 67 ++ .../GridCacheReplicatedFailoverSelfTest.java | 6 + .../GridCacheReplicatedLockSelfTest.java | 5 + .../GridCacheReplicatedNodeRestartSelfTest.java | 80 ++ .../GridCacheLocalIsolatedNodesSelfTest.java | 18 +- .../util/nio/GridNioDelimitedBufferTest.java | 112 ++ .../discovery/tcp/TcpDiscoveryRestartTest.java | 199 ++++ .../stream/socket/SocketStreamerSelfTest.java | 316 ++++++ .../ignite/stream/socket/package-info.java | 21 + .../IgniteCacheFailoverTestSuite.java | 10 +- .../testsuites/IgniteCacheRestartTestSuite.java | 8 +- .../testsuites/IgniteCacheTestSuite2.java | 2 + .../testsuites/IgniteCacheTestSuite3.java | 2 + .../testsuites/IgniteCacheTestSuite4.java | 4 + .../testsuites/IgniteStreamTestSuite.java | 39 + .../testsuites/IgniteUtilSelfTestSuite.java | 1 + .../cache/GridCacheOffheapIndexGetSelfTest.java | 111 ++ .../IgniteCacheWithIndexingTestSuite.java | 2 + .../processors/cache/jta/CacheJtaManager.java | 4 +- .../scalar/tests/ScalarCacheQueriesSpec.scala | 154 +-- .../ignite/scalar/tests/ScalarCacheSpec.scala | 23 +- .../scalar/tests/ScalarConversionsSpec.scala | 43 +- .../scalar/tests/ScalarProjectionSpec.scala | 128 ++- .../scalar/tests/ScalarReturnableSpec.scala | 41 +- modules/visor-console/pom.xml | 2 +- .../ignite/visor/VisorRuntimeBaseSpec.scala | 2 +- .../visor/commands/VisorArgListSpec.scala | 60 +- .../commands/VisorFileNameCompleterSpec.scala | 34 +- .../commands/ack/VisorAckCommandSpec.scala | 20 +- .../commands/alert/VisorAlertCommandSpec.scala | 68 +- .../cache/VisorCacheClearCommandSpec.scala | 48 +- .../commands/cache/VisorCacheCommandSpec.scala | 66 +- .../config/VisorConfigurationCommandSpec.scala | 8 +- .../cswap/VisorCacheSwapCommandSpec.scala | 24 +- .../deploy/VisorDeployCommandSpec.scala | 10 +- .../disco/VisorDiscoveryCommandSpec.scala | 46 +- .../events/VisorEventsCommandSpec.scala | 28 +- .../visor/commands/gc/VisorGcCommandSpec.scala | 30 +- .../commands/help/VisorHelpCommandSpec.scala | 57 +- .../commands/kill/VisorKillCommandSpec.scala | 58 +- .../commands/log/VisorLogCommandSpec.scala | 10 +- .../commands/mem/VisorMemoryCommandSpec.scala | 77 +- .../commands/node/VisorNodeCommandSpec.scala | 22 +- .../commands/open/VisorOpenCommandSpec.scala | 16 +- .../commands/ping/VisorPingCommandSpec.scala | 16 +- .../commands/start/VisorStartCommandSpec.scala | 126 +-- .../commands/tasks/VisorTasksCommandSpec.scala | 112 +- .../commands/top/VisorTopologyCommandSpec.scala | 52 +- .../commands/vvm/VisorVvmCommandSpec.scala | 30 +- parent/pom.xml | 2 + pom.xml | 85 +- 161 files changed, 5910 insertions(+), 3836 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 1c2f8d5,92035af..cbb78e7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -227,12 -224,12 +224,13 @@@ public abstract class GridCacheMapEntr if (val != null) { byte type = val.type(); - valPtr = mem.putOffHeap(valPtr, U.toArray(val.valueBytes(cctx.cacheObjectContext())), type); - offHeapPointer(mem.putOffHeap(offHeapPointer(), val.valueBytes(cctx.cacheObjectContext()), type)); ++ offHeapPointer(mem.putOffHeap(offHeapPointer(), U.toArray(val.valueBytes(cctx.cacheObjectContext())), ++ type)); } else { - mem.removeOffHeap(valPtr); + mem.removeOffHeap(offHeapPointer()); - valPtr = 0; + offHeapPointer(0); } } catch (IgniteCheckedException e) { @@@ -270,8 -267,8 +268,8 @@@ CacheObject val0 = val; - if (val0 == null && valPtr != 0) { + if (val0 == null && hasOffHeapPointer()) { - IgniteBiTuple<byte[], Byte> t = valueBytes0(); + IgniteBiTuple<ByteBuffer, Byte> t = valueBytes0(); return cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); } @@@ -537,15 -534,13 +535,15 @@@ /** * @return Value bytes and flag indicating whether value is byte array. */ - protected IgniteBiTuple<byte[], Byte> valueBytes0() { + protected IgniteBiTuple<ByteBuffer, Byte> valueBytes0() { assert Thread.holdsLock(this); - if (valPtr != 0) { + if (hasOffHeapPointer()) { assert isOffHeapValuesOnly() || cctx.offheapTiered(); - IgniteBiTuple<byte[], Byte> t = cctx.unsafeMemory().get(valPtr); - return cctx.unsafeMemory().get(offHeapPointer()); ++ IgniteBiTuple<byte[], Byte> t = cctx.unsafeMemory().get(offHeapPointer()); + + return F.t(ByteBuffer.wrap(t.get1()), t.get2()); } else { assert val != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index aba9e86,cc2783a..d11b879 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@@ -456,8 -406,8 +406,8 @@@ public class GridDistributedTxPrepareRe writer.incrementState(); - case 21: + case 19: - if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) + if (!writer.writeByteBuffer("txNodesBytes", txNodesBytes)) return false; writer.incrementState(); @@@ -612,8 -546,8 +546,8 @@@ reader.incrementState(); - case 21: + case 19: - txNodesBytes = reader.readByteArray("txNodesBytes"); + txNodesBytes = reader.readByteBuffer("txNodesBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index b8e57a4,94ec718..e08344f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@@ -355,8 -348,8 +348,8 @@@ public class GridDhtLockRequest extend writer.incrementState(); - case 27: + case 25: - if (!writer.writeByteArray("ownedBytes", ownedBytes)) + if (!writer.writeByteBuffer("ownedBytes", ownedBytes)) return false; writer.incrementState(); @@@ -433,8 -426,8 +426,8 @@@ reader.incrementState(); - case 27: + case 25: - ownedBytes = reader.readByteArray("ownedBytes"); + ownedBytes = reader.readByteBuffer("ownedBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 6d4fed6,247d350..aff2f40 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@@ -862,8 -836,8 +836,8 @@@ public class IgniteTxEntry implements G writer.incrementState(); - case 9: + case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + if (!writer.writeByteBuffer("transformClosBytes", transformClosBytes)) return false; writer.incrementState(); @@@ -965,8 -931,8 +931,8 @@@ reader.incrementState(); - case 9: + case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); + transformClosBytes = reader.readByteBuffer("transformClosBytes"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 27fa473,ed0e9dd..4cff45b --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@@ -2689,8 -2690,26 +2689,26 @@@ public class TcpDiscoverySpi extends Tc msgLsnr.apply(msg); if (redirectToClients(msg)) { - for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) - clientMsgWorker.addMessage(msg); - byte[] marshalledMsg = null; ++ ByteBuffer marshalledMsg = null; + + for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) { + // Send a clone to client to avoid ConcurrentModificationException + TcpDiscoveryAbstractMessage msgClone; + + try { + if (marshalledMsg == null) + marshalledMsg = marsh.marshal(msg); + + msgClone = marsh.unmarshal(marshalledMsg, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message: " + msg, e); + + msgClone = msg; + } + + clientMsgWorker.addMessage(msgClone); + } } Collection<TcpDiscoveryNode> failedNodes; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java index 0000000,07ce77e..d308897 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@@ -1,0 -1,218 +1,218 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.stream.socket; + + import org.apache.ignite.*; + import org.apache.ignite.internal.util.nio.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.stream.*; + + import org.jetbrains.annotations.*; + + import java.net.*; + import java.nio.*; + + /** + * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and + * streams into {@link IgniteDataStreamer} instance. + * <p> + * By default server uses size-based message processing. That is every message sent over the socket is prepended with + * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then + * delimiter-based message processing will be used. That is every message sent over the socket is appended with + * provided delimiter. + * <p> + * Received messages through socket converts to Java object using standard serialization. Conversion functionality + * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from + * non Java clients). + */ + public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private IgniteLogger log; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Delimiter. */ + private byte[] delim; + + /** Converter. */ + private SocketMessageConverter<T> converter; + + /** Server. */ + private GridNioServer<byte[]> srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Sets message delimiter. + * + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** + * Sets message converter. + * + * @param converter Converter. + */ + public void setConverter(SocketMessageConverter<T> converter) { + this.converter = converter; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getTupleExtractor(), "tupleExtractor"); + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @Override public void onMessage(GridNioSession ses, byte[] msg) { + addMessage(converter.convert(msg)); + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); + + if (converter == null) + converter = new DefaultConverter<>(); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder<byte[]>() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } + + /** + * Converts message to Java object using Jdk marshaller. + */ + private static class DefaultConverter<T> implements SocketMessageConverter<T> { + /** Marshaller. */ + private static final JdkMarshaller MARSH = new JdkMarshaller(); + + /** {@inheritDoc} */ + @Override public T convert(byte[] msg) { + try { - return MARSH.unmarshal(msg, null); ++ return MARSH.unmarshal(ByteBuffer.wrap(msg), null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java index 0000000,752e43c..b4a6923 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@@ -1,0 -1,315 +1,316 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.stream.socket; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.events.*; ++import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.stream.*; + import org.apache.ignite.testframework.junits.common.*; + + import org.jetbrains.annotations.*; + + import java.io.*; + import java.net.*; + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.events.EventType.*; + + /** + * Tests {@link SocketStreamer}. + */ + public class SocketStreamerSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private final static int GRID_CNT = 3; + + /** Count. */ + private static final int CNT = 500; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0}; + + /** Port. */ + private static int port; + + /** Ignite. */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration ccfg = cacheConfiguration(cfg, null); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrids(GRID_CNT); + ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + + try (ServerSocket sock = new ServerSocket(0)) { + port = sock.getLocalPort(); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite.cache(null).clear(); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedDefaultConverter() throws Exception { + test(null, null, new Runnable() { + @Override public void run() { + try (Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); ++ byte[] msg = U.toArray(marsh.marshal(new Tuple(i))); + + os.write(msg.length >>> 24); + os.write(msg.length >>> 16); + os.write(msg.length >>> 8); + os.write(msg.length); + + os.write(msg); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedCustomConverter() throws Exception { + SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, null, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(0); + os.write(0); + os.write(0); + os.write(4); + + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedDefaultConverter() throws Exception { + test(null, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); ++ byte[] msg = U.toArray(marsh.marshal(new Tuple(i))); + + os.write(msg); + os.write(DELIM); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedCustomConverter() throws Exception { + SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + + os.write(DELIM); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @param converter Converter. + * @param r Runnable.. + */ + private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception + { + SocketStreamer<Tuple, Integer, String> sockStmr = null; + + try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + sockStmr = new SocketStreamer<>(); + + IgniteCache<Integer, String> cache = ignite.cache(null); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setPort(port); + + sockStmr.setDelimiter(delim); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() { + @Override public Map.Entry<Integer, String> extract(Tuple msg) { + return new IgniteBiTuple<>(msg.key, msg.val); + } + }); + + if (converter != null) + sockStmr.setConverter(converter); + + final CountDownLatch latch = new CountDownLatch(CNT); + + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + + sockStmr.start(); + + r.run(); + + latch.await(); + + assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); + + for (int i = 0; i < CNT; i++) + assertEquals(Integer.toString(i), cache.get(i)); + } + finally { + if (sockStmr != null) + sockStmr.stop(); + } + + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Key. */ + private final int key; + + /** Value. */ + private final String val; + + /** + * @param key Key. + */ + Tuple(int key) { + this.key = key; + this.val = Integer.toString(key); + } + } + }