ignite-471: fixed NPE in PortableMarshaller

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b84cef7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b84cef7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b84cef7d

Branch: refs/heads/ignite-471
Commit: b84cef7dcebb6f949d89af2c3c424aa182b6c315
Parents: 095e3f9 da5a228
Author: dmagda <magda7...@gmail.com>
Authored: Sat May 16 00:20:05 2015 +0300
Committer: dmagda <magda7...@gmail.com>
Committed: Sat May 16 00:20:05 2015 +0300

----------------------------------------------------------------------
 assembly/release-base.xml                       |    4 +-
 bin/ignite-schema-import.bat                    |    2 +-
 bin/ignite-schema-import.sh                     |    2 +-
 bin/ignite.bat                                  |    2 +-
 bin/ignite.sh                                   |    2 +-
 bin/ignitevisorcmd.bat                          |    2 +-
 bin/ignitevisorcmd.sh                           |    2 +-
 bin/include/build-classpath.bat                 |   46 +
 bin/include/build-classpath.sh                  |   71 ++
 bin/include/functions.sh                        |    2 +-
 bin/include/target-classpath.bat                |   46 -
 bin/include/target-classpath.sh                 |   71 --
 .../streaming/StreamTransformerExample.java     |    4 +-
 .../streaming/StreamVisitorExample.java         |    4 +-
 .../ignite/examples/streaming/package-info.java |    1 -
 .../streaming/wordcount/CacheConfig.java        |    2 +-
 .../streaming/wordcount/QueryWords.java         |   12 +-
 .../streaming/wordcount/StreamWords.java        |   12 +-
 .../streaming/wordcount/package-info.java       |    1 -
 .../socket/WordsSocketStreamerClient.java       |   82 ++
 .../socket/WordsSocketStreamerServer.java       |  124 +++
 .../wordcount/socket/package-info.java          |   21 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  136 +--
 .../internal/direct/DirectByteBufferStream.java |    4 +-
 .../internal/interop/InteropBootstrap.java      |   34 +
 .../interop/InteropBootstrapFactory.java        |   39 +
 .../internal/interop/InteropIgnition.java       |  103 ++
 .../internal/interop/InteropProcessor.java      |   25 +
 .../eventstorage/GridEventStorageManager.java   |    5 +-
 .../processors/cache/GridCacheAdapter.java      |  127 +--
 .../processors/cache/GridCacheContext.java      |    7 +
 .../processors/cache/GridCacheMapEntry.java     |  106 +-
 .../processors/cache/GridCacheMvccManager.java  |    4 +-
 .../GridDistributedCacheAdapter.java            |  210 ++--
 .../distributed/GridDistributedLockRequest.java |  111 +-
 .../GridDistributedTxFinishRequest.java         |   70 +-
 .../GridDistributedTxPrepareRequest.java        |  112 +-
 .../GridDistributedTxRemoteAdapter.java         |   20 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   16 +-
 .../distributed/dht/GridDhtLockFuture.java      |    2 -
 .../distributed/dht/GridDhtLockRequest.java     |   45 +-
 .../dht/GridDhtOffHeapCacheEntry.java           |   63 ++
 .../dht/GridDhtTransactionalCacheAdapter.java   |   15 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |    3 -
 .../distributed/dht/GridDhtTxFinishRequest.java |   43 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   38 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   95 +-
 .../cache/distributed/dht/GridDhtTxMapping.java |    2 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   99 +-
 .../dht/GridDhtTxPrepareRequest.java            |   60 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |    8 +-
 .../distributed/dht/GridNoStorageCacheMap.java  |    4 +-
 .../dht/atomic/GridDhtAtomicCache.java          |    3 +
 .../atomic/GridDhtAtomicOffHeapCacheEntry.java  |   63 ++
 .../dht/colocated/GridDhtColocatedCache.java    |    5 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   31 +-
 .../GridDhtColocatedOffHeapCacheEntry.java      |   63 ++
 .../colocated/GridDhtDetachedCacheEntry.java    |    4 +-
 .../GridDhtPartitionsExchangeFuture.java        |    2 +-
 .../distributed/near/GridNearCacheAdapter.java  |    3 +
 .../distributed/near/GridNearCacheEntry.java    |    4 +-
 .../distributed/near/GridNearLockFuture.java    |   11 -
 .../distributed/near/GridNearLockRequest.java   |   61 +-
 .../near/GridNearOffHeapCacheEntry.java         |   60 +
 .../near/GridNearOptimisticTxPrepareFuture.java |  768 +++++++++++++
 .../GridNearPessimisticTxPrepareFuture.java     |  347 ++++++
 .../near/GridNearTransactionalCache.java        |    4 -
 .../near/GridNearTxFinishRequest.java           |   28 +-
 .../cache/distributed/near/GridNearTxLocal.java |  104 +-
 .../near/GridNearTxPrepareFuture.java           | 1050 ------------------
 .../near/GridNearTxPrepareFutureAdapter.java    |  226 ++++
 .../near/GridNearTxPrepareRequest.java          |   52 +-
 .../distributed/near/GridNearTxRemote.java      |   24 +-
 .../cache/local/GridLocalCacheEntry.java        |   18 +
 .../cache/transactions/IgniteInternalTx.java    |   14 +-
 .../transactions/IgniteTransactionsImpl.java    |    4 +-
 .../cache/transactions/IgniteTxAdapter.java     |   74 +-
 .../cache/transactions/IgniteTxEntry.java       |   48 +-
 .../cache/transactions/IgniteTxHandler.java     |   74 +-
 .../transactions/IgniteTxLocalAdapter.java      |  167 +--
 .../cache/transactions/IgniteTxLocalEx.java     |   21 +-
 .../cache/transactions/IgniteTxManager.java     |   74 +-
 .../processors/resource/GridResourceField.java  |   16 +-
 .../processors/resource/GridResourceIoc.java    |  389 +++----
 .../processors/resource/GridResourceMethod.java |   13 +
 .../resource/GridResourceProcessor.java         |   20 +-
 .../ignite/internal/util/IgniteUtils.java       |   22 +-
 .../util/lang/GridComputeJobWrapper.java        |   96 --
 .../internal/util/nio/GridBufferedParser.java   |    4 -
 .../internal/util/nio/GridDelimitedParser.java  |   91 ++
 .../util/nio/GridNioDelimitedBuffer.java        |  106 ++
 .../communication/tcp/TcpCommunicationSpi.java  |    2 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   48 +-
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   |    8 +-
 .../org/apache/ignite/stream/StreamAdapter.java |  111 ++
 .../ignite/stream/StreamTupleExtractor.java     |   33 +
 .../stream/socket/SocketMessageConverter.java   |   31 +
 .../ignite/stream/socket/SocketStreamer.java    |  218 ++++
 .../ignite/stream/socket/package-info.java      |   21 +
 .../resources/META-INF/classnames.properties    |    1 -
 .../cache/CacheOffheapMapEntrySelfTest.java     |  168 +++
 .../cache/CacheRemoveAllSelfTest.java           |   81 ++
 .../GridCacheAbstractFailoverSelfTest.java      |    8 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |   11 +-
 .../distributed/GridCacheLockAbstractTest.java  |    2 -
 .../distributed/IgniteTxGetAfterStopTest.java   |  131 +++
 ...achePartitionedNearDisabledLockSelfTest.java |   47 +
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |    4 +-
 ...idCacheAtomicReplicatedFailoverSelfTest.java |    6 +
 .../GridCachePartitionedTxSalvageSelfTest.java  |   25 +-
 .../near/NoneRebalanceModeSelfTest.java         |   67 ++
 .../GridCacheReplicatedFailoverSelfTest.java    |    6 +
 .../GridCacheReplicatedLockSelfTest.java        |    5 +
 .../GridCacheReplicatedNodeRestartSelfTest.java |   80 ++
 .../GridCacheLocalIsolatedNodesSelfTest.java    |   18 +-
 .../util/nio/GridNioDelimitedBufferTest.java    |  112 ++
 .../discovery/tcp/TcpDiscoveryRestartTest.java  |  199 ++++
 .../stream/socket/SocketStreamerSelfTest.java   |  316 ++++++
 .../ignite/stream/socket/package-info.java      |   21 +
 .../IgniteCacheFailoverTestSuite.java           |   10 +-
 .../testsuites/IgniteCacheRestartTestSuite.java |    8 +-
 .../testsuites/IgniteCacheTestSuite2.java       |    2 +
 .../testsuites/IgniteCacheTestSuite3.java       |    2 +
 .../testsuites/IgniteCacheTestSuite4.java       |    4 +
 .../testsuites/IgniteStreamTestSuite.java       |   39 +
 .../testsuites/IgniteUtilSelfTestSuite.java     |    1 +
 .../cache/GridCacheOffheapIndexGetSelfTest.java |  111 ++
 .../IgniteCacheWithIndexingTestSuite.java       |    2 +
 .../processors/cache/jta/CacheJtaManager.java   |    4 +-
 .../scalar/tests/ScalarCacheQueriesSpec.scala   |  154 +--
 .../ignite/scalar/tests/ScalarCacheSpec.scala   |   23 +-
 .../scalar/tests/ScalarConversionsSpec.scala    |   43 +-
 .../scalar/tests/ScalarProjectionSpec.scala     |  128 ++-
 .../scalar/tests/ScalarReturnableSpec.scala     |   41 +-
 modules/visor-console/pom.xml                   |    2 +-
 .../ignite/visor/VisorRuntimeBaseSpec.scala     |    2 +-
 .../visor/commands/VisorArgListSpec.scala       |   60 +-
 .../commands/VisorFileNameCompleterSpec.scala   |   34 +-
 .../commands/ack/VisorAckCommandSpec.scala      |   20 +-
 .../commands/alert/VisorAlertCommandSpec.scala  |   68 +-
 .../cache/VisorCacheClearCommandSpec.scala      |   48 +-
 .../commands/cache/VisorCacheCommandSpec.scala  |   66 +-
 .../config/VisorConfigurationCommandSpec.scala  |    8 +-
 .../cswap/VisorCacheSwapCommandSpec.scala       |   24 +-
 .../deploy/VisorDeployCommandSpec.scala         |   10 +-
 .../disco/VisorDiscoveryCommandSpec.scala       |   46 +-
 .../events/VisorEventsCommandSpec.scala         |   28 +-
 .../visor/commands/gc/VisorGcCommandSpec.scala  |   30 +-
 .../commands/help/VisorHelpCommandSpec.scala    |   57 +-
 .../commands/kill/VisorKillCommandSpec.scala    |   58 +-
 .../commands/log/VisorLogCommandSpec.scala      |   10 +-
 .../commands/mem/VisorMemoryCommandSpec.scala   |   77 +-
 .../commands/node/VisorNodeCommandSpec.scala    |   22 +-
 .../commands/open/VisorOpenCommandSpec.scala    |   16 +-
 .../commands/ping/VisorPingCommandSpec.scala    |   16 +-
 .../commands/start/VisorStartCommandSpec.scala  |  126 +--
 .../commands/tasks/VisorTasksCommandSpec.scala  |  112 +-
 .../commands/top/VisorTopologyCommandSpec.scala |   52 +-
 .../commands/vvm/VisorVvmCommandSpec.scala      |   30 +-
 parent/pom.xml                                  |    2 +
 pom.xml                                         |   85 +-
 161 files changed, 5910 insertions(+), 3836 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 1c2f8d5,92035af..cbb78e7
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -227,12 -224,12 +224,13 @@@ public abstract class GridCacheMapEntr
                  if (val != null) {
                      byte type = val.type();
  
-                     valPtr = mem.putOffHeap(valPtr, 
U.toArray(val.valueBytes(cctx.cacheObjectContext())), type);
 -                    offHeapPointer(mem.putOffHeap(offHeapPointer(), 
val.valueBytes(cctx.cacheObjectContext()), type));
++                    offHeapPointer(mem.putOffHeap(offHeapPointer(), 
U.toArray(val.valueBytes(cctx.cacheObjectContext())),
++                            type));
                  }
                  else {
-                     mem.removeOffHeap(valPtr);
+                     mem.removeOffHeap(offHeapPointer());
  
-                     valPtr = 0;
+                     offHeapPointer(0);
                  }
              }
              catch (IgniteCheckedException e) {
@@@ -270,8 -267,8 +268,8 @@@
  
          CacheObject val0 = val;
  
-         if (val0 == null && valPtr != 0) {
+         if (val0 == null && hasOffHeapPointer()) {
 -            IgniteBiTuple<byte[], Byte> t = valueBytes0();
 +            IgniteBiTuple<ByteBuffer, Byte> t = valueBytes0();
  
              return 
cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), 
t.get1());
          }
@@@ -537,15 -534,13 +535,15 @@@
      /**
       * @return Value bytes and flag indicating whether value is byte array.
       */
 -    protected IgniteBiTuple<byte[], Byte> valueBytes0() {
 +    protected IgniteBiTuple<ByteBuffer, Byte> valueBytes0() {
          assert Thread.holdsLock(this);
  
-         if (valPtr != 0) {
+         if (hasOffHeapPointer()) {
              assert isOffHeapValuesOnly() || cctx.offheapTiered();
  
-             IgniteBiTuple<byte[], Byte> t = cctx.unsafeMemory().get(valPtr);
 -            return cctx.unsafeMemory().get(offHeapPointer());
++            IgniteBiTuple<byte[], Byte> t = 
cctx.unsafeMemory().get(offHeapPointer());
 +
 +            return F.t(ByteBuffer.wrap(t.get1()), t.get2());
          }
          else {
              assert val != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index aba9e86,cc2783a..d11b879
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@@ -456,8 -406,8 +406,8 @@@ public class GridDistributedTxPrepareRe
  
                  writer.incrementState();
  
-             case 21:
+             case 19:
 -                if (!writer.writeByteArray("txNodesBytes", txNodesBytes))
 +                if (!writer.writeByteBuffer("txNodesBytes", txNodesBytes))
                      return false;
  
                  writer.incrementState();
@@@ -612,8 -546,8 +546,8 @@@
  
                  reader.incrementState();
  
-             case 21:
+             case 19:
 -                txNodesBytes = reader.readByteArray("txNodesBytes");
 +                txNodesBytes = reader.readByteBuffer("txNodesBytes");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index b8e57a4,94ec718..e08344f
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@@ -355,8 -348,8 +348,8 @@@ public class GridDhtLockRequest extend
  
                  writer.incrementState();
  
-             case 27:
+             case 25:
 -                if (!writer.writeByteArray("ownedBytes", ownedBytes))
 +                if (!writer.writeByteBuffer("ownedBytes", ownedBytes))
                      return false;
  
                  writer.incrementState();
@@@ -433,8 -426,8 +426,8 @@@
  
                  reader.incrementState();
  
-             case 27:
+             case 25:
 -                ownedBytes = reader.readByteArray("ownedBytes");
 +                ownedBytes = reader.readByteBuffer("ownedBytes");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6d4fed6,247d350..aff2f40
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@@ -862,8 -836,8 +836,8 @@@ public class IgniteTxEntry implements G
  
                  writer.incrementState();
  
-             case 9:
+             case 8:
 -                if (!writer.writeByteArray("transformClosBytes", 
transformClosBytes))
 +                if (!writer.writeByteBuffer("transformClosBytes", 
transformClosBytes))
                      return false;
  
                  writer.incrementState();
@@@ -965,8 -931,8 +931,8 @@@
  
                  reader.incrementState();
  
-             case 9:
+             case 8:
 -                transformClosBytes = 
reader.readByteArray("transformClosBytes");
 +                transformClosBytes = 
reader.readByteBuffer("transformClosBytes");
  
                  if (!reader.isLastRead())
                      return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 27fa473,ed0e9dd..4cff45b
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@@ -2689,8 -2690,26 +2689,26 @@@ public class TcpDiscoverySpi extends Tc
                  msgLsnr.apply(msg);
  
              if (redirectToClients(msg)) {
-                 for (ClientMessageWorker clientMsgWorker : 
clientMsgWorkers.values())
-                     clientMsgWorker.addMessage(msg);
 -                byte[] marshalledMsg = null;
++                ByteBuffer marshalledMsg = null;
+ 
+                 for (ClientMessageWorker clientMsgWorker : 
clientMsgWorkers.values()) {
+                     // Send a clone to client to avoid 
ConcurrentModificationException
+                     TcpDiscoveryAbstractMessage msgClone;
+ 
+                     try {
+                         if (marshalledMsg == null)
+                             marshalledMsg = marsh.marshal(msg);
+ 
+                         msgClone = marsh.unmarshal(marshalledMsg, null);
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Failed to marshal message: " + msg, e);
+ 
+                         msgClone = msg;
+                     }
+ 
+                     clientMsgWorker.addMessage(msgClone);
+                 }
              }
  
              Collection<TcpDiscoveryNode> failedNodes;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0000000,07ce77e..d308897
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@@ -1,0 -1,218 +1,218 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.stream.socket;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.marshaller.jdk.*;
+ import org.apache.ignite.stream.*;
+ 
+ import org.jetbrains.annotations.*;
+ 
+ import java.net.*;
+ import java.nio.*;
+ 
+ /**
+  * Server that receives data from TCP socket, converts it to key-value pairs 
using {@link StreamTupleExtractor} and
+  * streams into {@link IgniteDataStreamer} instance.
+  * <p>
+  * By default server uses size-based message processing. That is every 
message sent over the socket is prepended with
+  * 4-byte integer header containing message size. If message delimiter is 
defined (see {@link #setDelimiter}) then
+  * delimiter-based message processing will be used. That is every message 
sent over the socket is appended with
+  * provided delimiter.
+  * <p>
+  * Received messages through socket converts to Java object using standard 
serialization. Conversion functionality
+  * can be customized via user defined {@link SocketMessageConverter} (e.g. in 
order to convert messages from
+  * non Java clients).
+  */
+ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+     /** Default threads. */
+     private static final int DFLT_THREADS = 
Runtime.getRuntime().availableProcessors();
+ 
+     /** Logger. */
+     private IgniteLogger log;
+ 
+     /** Address. */
+     private InetAddress addr;
+ 
+     /** Server port. */
+     private int port;
+ 
+     /** Threads number. */
+     private int threads = DFLT_THREADS;
+ 
+     /** Direct mode. */
+     private boolean directMode;
+ 
+     /** Delimiter. */
+     private byte[] delim;
+ 
+     /** Converter. */
+     private SocketMessageConverter<T> converter;
+ 
+     /** Server. */
+     private GridNioServer<byte[]> srv;
+ 
+     /**
+      * Sets server address.
+      *
+      * @param addr Address.
+      */
+     public void setAddr(InetAddress addr) {
+         this.addr = addr;
+     }
+ 
+     /**
+      * Sets port number.
+      *
+      * @param port Port.
+      */
+     public void setPort(int port) {
+         this.port = port;
+     }
+ 
+     /**
+      * Sets threadds amount.
+      *
+      * @param threads Threads.
+      */
+     public void setThreads(int threads) {
+         this.threads = threads;
+     }
+ 
+     /**
+      * Sets direct mode flag.
+      *
+      * @param directMode Direct mode.
+      */
+     public void setDirectMode(boolean directMode) {
+         this.directMode = directMode;
+     }
+ 
+     /**
+      * Sets message delimiter.
+      *
+      * @param delim Delimiter.
+      */
+     public void setDelimiter(byte[] delim) {
+         this.delim = delim;
+     }
+ 
+     /**
+      * Sets message converter.
+      *
+      * @param converter Converter.
+      */
+     public void setConverter(SocketMessageConverter<T> converter) {
+         this.converter = converter;
+     }
+ 
+     /**
+      * Starts streamer.
+      *
+      * @throws IgniteException If failed.
+      */
+     public void start() {
+         A.notNull(getTupleExtractor(), "tupleExtractor");
+         A.notNull(getStreamer(), "streamer");
+         A.notNull(getIgnite(), "ignite");
+         A.ensure(threads > 0, "threads > 0");
+ 
+         log = getIgnite().log();
+ 
+         GridNioServerListener<byte[]> lsnr = new 
GridNioServerListenerAdapter<byte[]>() {
+             @Override public void onConnected(GridNioSession ses) {
+                 assert ses.accepted();
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Accepted connection: " + ses.remoteAddress());
+             }
+ 
+             @Override public void onDisconnected(GridNioSession ses, 
@Nullable Exception e) {
+                 if (e != null)
+                     log.error("Connection failed with exception", e);
+             }
+ 
+             @Override public void onMessage(GridNioSession ses, byte[] msg) {
+                 addMessage(converter.convert(msg));
+             }
+         };
+ 
+         ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+ 
+         GridNioParser parser = F.isEmpty(delim) ? new 
GridBufferedParser(directMode, byteOrder) :
+             new GridDelimitedParser(delim, directMode);
+ 
+         if (converter == null)
+             converter = new DefaultConverter<>();
+ 
+         GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+ 
+         GridNioFilter[] filters = new GridNioFilter[] {codec};
+ 
+         try {
+             srv = new GridNioServer.Builder<byte[]>()
+                 .address(addr == null ? InetAddress.getLocalHost() : addr)
+                 .port(port)
+                 .listener(lsnr)
+                 .logger(log)
+                 .selectorCount(threads)
+                 .byteOrder(byteOrder)
+                 .filters(filters)
+                 .build();
+         }
+         catch (IgniteCheckedException | UnknownHostException e) {
+             throw new IgniteException(e);
+         }
+ 
+         srv.start();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Socket streaming server started on " + addr + ':' + 
port);
+     }
+ 
+     /**
+      * Stops streamer.
+      */
+     public void stop() {
+         srv.stop();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Socket streaming server stopped");
+     }
+ 
+     /**
+      * Converts message to Java object using Jdk marshaller.
+      */
+     private static class DefaultConverter<T> implements 
SocketMessageConverter<T> {
+         /** Marshaller. */
+         private static final JdkMarshaller MARSH = new JdkMarshaller();
+ 
+         /** {@inheritDoc} */
+         @Override public T convert(byte[] msg) {
+             try {
 -                return MARSH.unmarshal(msg, null);
++                return MARSH.unmarshal(ByteBuffer.wrap(msg), null);
+             }
+             catch (IgniteCheckedException e) {
+                 throw new IgniteException(e);
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b84cef7d/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 0000000,752e43c..b4a6923
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@@ -1,0 -1,315 +1,316 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.stream.socket;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.events.*;
++import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.marshaller.jdk.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.stream.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ 
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.apache.ignite.events.EventType.*;
+ 
+ /**
+  * Tests {@link SocketStreamer}.
+  */
+ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
+     /** IP finder. */
+     private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+ 
+     /** Grid count. */
+     private final static int GRID_CNT = 3;
+ 
+     /** Count. */
+     private static final int CNT = 500;
+ 
+     /** Delimiter. */
+     private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 
2, 1, 0};
+ 
+     /** Port. */
+     private static int port;
+ 
+     /** Ignite. */
+     private static Ignite ignite;
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
+         IgniteConfiguration cfg = super.getConfiguration();
+ 
+         CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+ 
+         cfg.setCacheConfiguration(ccfg);
+ 
+         TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ 
+         discoSpi.setIpFinder(IP_FINDER);
+ 
+         cfg.setDiscoverySpi(discoSpi);
+ 
+         return cfg;
+     }
+ 
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTestsStarted() throws Exception {
+         ignite = startGrids(GRID_CNT);
+         ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+ 
+         try (ServerSocket sock = new ServerSocket(0)) {
+             port = sock.getLocalPort();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTestsStopped() throws Exception {
+         stopAllGrids();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTest() throws Exception {
+         ignite.cache(null).clear();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSizeBasedDefaultConverter() throws Exception {
+         test(null, null, new Runnable() {
+             @Override public void run() {
+                 try (Socket sock = new Socket(InetAddress.getLocalHost(), 
port);
+                      OutputStream os = new 
BufferedOutputStream(sock.getOutputStream())) {
+                     Marshaller marsh = new JdkMarshaller();
+ 
+                     for (int i = 0; i < CNT; i++) {
 -                        byte[] msg = marsh.marshal(new Tuple(i));
++                        byte[] msg = U.toArray(marsh.marshal(new Tuple(i)));
+ 
+                         os.write(msg.length >>> 24);
+                         os.write(msg.length >>> 16);
+                         os.write(msg.length >>> 8);
+                         os.write(msg.length);
+ 
+                         os.write(msg);
+                     }
+                 }
+                 catch (IOException | IgniteCheckedException e) {
+                     throw new IgniteException(e);
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSizeBasedCustomConverter() throws Exception {
+         SocketMessageConverter<Tuple> converter = new 
SocketMessageConverter<Tuple>() {
+             @Override public Tuple convert(byte[] msg) {
+                 int i = (msg[0] & 0xFF) << 24;
+                 i |= (msg[1] & 0xFF) << 16;
+                 i |= (msg[2] & 0xFF) << 8;
+                 i |= msg[3] & 0xFF;
+ 
+                 return new Tuple(i);
+             }
+         };
+ 
+         test(converter, null, new Runnable() {
+             @Override public void run() {
+                 try(Socket sock = new Socket(InetAddress.getLocalHost(), 
port);
+                     OutputStream os = new 
BufferedOutputStream(sock.getOutputStream())) {
+ 
+                     for (int i = 0; i < CNT; i++) {
+                         os.write(0);
+                         os.write(0);
+                         os.write(0);
+                         os.write(4);
+ 
+                         os.write(i >>> 24);
+                         os.write(i >>> 16);
+                         os.write(i >>> 8);
+                         os.write(i);
+                     }
+                 }
+                 catch (IOException e) {
+                     throw new IgniteException(e);
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testDelimiterBasedDefaultConverter() throws Exception {
+         test(null, DELIM, new Runnable() {
+             @Override public void run() {
+                 try(Socket sock = new Socket(InetAddress.getLocalHost(), 
port);
+                     OutputStream os = new 
BufferedOutputStream(sock.getOutputStream())) {
+                     Marshaller marsh = new JdkMarshaller();
+ 
+                     for (int i = 0; i < CNT; i++) {
 -                        byte[] msg = marsh.marshal(new Tuple(i));
++                        byte[] msg = U.toArray(marsh.marshal(new Tuple(i)));
+ 
+                         os.write(msg);
+                         os.write(DELIM);
+                     }
+                 }
+                 catch (IOException | IgniteCheckedException e) {
+                     throw new IgniteException(e);
+                 }
+             }
+         });
+ 
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testDelimiterBasedCustomConverter() throws Exception {
+         SocketMessageConverter<Tuple> converter = new 
SocketMessageConverter<Tuple>() {
+             @Override public Tuple convert(byte[] msg) {
+                 int i = (msg[0] & 0xFF) << 24;
+                 i |= (msg[1] & 0xFF) << 16;
+                 i |= (msg[2] & 0xFF) << 8;
+                 i |= msg[3] & 0xFF;
+ 
+                 return new Tuple(i);
+             }
+         };
+ 
+         test(converter, DELIM, new Runnable() {
+             @Override public void run() {
+                 try(Socket sock = new Socket(InetAddress.getLocalHost(), 
port);
+                     OutputStream os = new 
BufferedOutputStream(sock.getOutputStream())) {
+ 
+                     for (int i = 0; i < CNT; i++) {
+                         os.write(i >>> 24);
+                         os.write(i >>> 16);
+                         os.write(i >>> 8);
+                         os.write(i);
+ 
+                         os.write(DELIM);
+                     }
+                 }
+                 catch (IOException e) {
+                     throw new IgniteException(e);
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * @param converter Converter.
+      * @param r Runnable..
+      */
+     private void test(@Nullable SocketMessageConverter<Tuple> converter, 
@Nullable byte[] delim, Runnable r) throws Exception
+     {
+         SocketStreamer<Tuple, Integer, String> sockStmr = null;
+ 
+         try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(null)) {
+ 
+             stmr.allowOverwrite(true);
+             stmr.autoFlushFrequency(10);
+ 
+             sockStmr = new SocketStreamer<>();
+ 
+             IgniteCache<Integer, String> cache = ignite.cache(null);
+ 
+             sockStmr.setIgnite(ignite);
+ 
+             sockStmr.setStreamer(stmr);
+ 
+             sockStmr.setPort(port);
+ 
+             sockStmr.setDelimiter(delim);
+ 
+             sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, 
Integer, String>() {
+                 @Override public Map.Entry<Integer, String> extract(Tuple 
msg) {
+                     return new IgniteBiTuple<>(msg.key, msg.val);
+                 }
+             });
+ 
+             if (converter != null)
+                 sockStmr.setConverter(converter);
+ 
+             final CountDownLatch latch = new CountDownLatch(CNT);
+ 
+             IgniteBiPredicate<UUID, CacheEvent> locLsnr = new 
IgniteBiPredicate<UUID, CacheEvent>() {
+                 @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                     latch.countDown();
+ 
+                     return true;
+                 }
+             };
+ 
+             
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, 
EVT_CACHE_OBJECT_PUT);
+ 
+             sockStmr.start();
+ 
+             r.run();
+ 
+             latch.await();
+ 
+             assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+ 
+             for (int i = 0; i < CNT; i++)
+                 assertEquals(Integer.toString(i), cache.get(i));
+         }
+         finally {
+             if (sockStmr != null)
+                 sockStmr.stop();
+         }
+ 
+     }
+ 
+     /**
+      * Tuple.
+      */
+     private static class Tuple implements Serializable {
+         /** Serial version uid. */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Key. */
+         private final int key;
+ 
+         /** Value. */
+         private final String val;
+ 
+         /**
+          * @param key Key.
+          */
+         Tuple(int key) {
+             this.key = key;
+             this.val = Integer.toString(key);
+         }
+     }
+ }


Reply via email to