# Removed shmem from communication

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c014b001
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c014b001
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c014b001

Branch: refs/heads/ignite-45
Commit: c014b0017cc271a5942e242f0d28d075355e94a6
Parents: 3f23de3
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Fri Mar 13 17:20:36 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Fri Mar 13 17:20:36 2015 -0700

----------------------------------------------------------------------
 .../util/nio/GridShmemCommunicationClient.java  | 146 -------------------
 .../communication/tcp/TcpCommunicationSpi.java  |  44 ++----
 2 files changed, 10 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c014b001/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
deleted file mode 100644
index b0a6df3..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClient {
-    /** */
-    private final IpcSharedMemoryClientEndpoint shmem;
-
-    /** */
-    private final ByteBuffer writeBuf;
-
-    /** */
-    private final MessageFormatter formatter;
-
-    /**
-     * @param metricsLsnr Metrics listener.
-     * @param port Shared memory IPC server port.
-     * @param connTimeout Connection timeout.
-     * @param log Logger.
-     * @param formatter Message formatter.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, 
int port, long connTimeout,
-        IgniteLogger log, MessageFormatter formatter) throws 
IgniteCheckedException {
-        super(metricsLsnr);
-
-        assert metricsLsnr != null;
-        assert port > 0 && port < 0xffff;
-        assert connTimeout >= 0;
-
-        shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
-
-        writeBuf = ByteBuffer.allocate(8 << 10);
-
-        writeBuf.order(ByteOrder.nativeOrder());
-
-        this.formatter = formatter;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void 
doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC)
-        throws IgniteCheckedException {
-        handshakeC.applyx(shmem.inputStream(), shmem.outputStream());
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        boolean res = super.close();
-
-        if (res)
-            shmem.close();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        super.forceClose();
-
-        // Do not call forceClose() here.
-        shmem.close();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void sendMessage(byte[] data, int len) 
throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Communication client was closed: 
" + this);
-
-        try {
-            shmem.outputStream().write(data, 0, len);
-
-            metricsLsnr.onBytesSent(len);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote 
node: " + shmem, e);
-        }
-
-        markUsed();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, 
Message msg)
-        throws IgniteCheckedException {
-        if (closed())
-            throw new IgniteCheckedException("Communication client was closed: 
" + this);
-
-        assert writeBuf.hasArray();
-
-        try {
-            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, 
formatter.writer());
-
-            metricsLsnr.onBytesSent(cnt);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to send message to remote 
node: " + shmem, e);
-        }
-
-        markUsed();
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendMessage(ByteBuffer data) throws 
IgniteCheckedException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flushIfNeeded(long timeout) throws IOException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridShmemCommunicationClient.class, this, 
super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c014b001/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b38dc15..65eed6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -319,8 +319,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     GridCommunicationClient oldClient = clients.get(sndId);
 
-                    boolean hasShmemClient = false;
-
                     if (oldClient != null) {
                         if (oldClient instanceof 
GridTcpNioCommunicationClient) {
                             if (log.isDebugEnabled())
@@ -332,11 +330,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             return;
                         }
-                        else {
-                            assert oldClient instanceof 
GridShmemCommunicationClient;
-
-                            hasShmemClient = true;
-                        }
                     }
 
                     GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
@@ -363,15 +356,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                                 return;
                             }
-                            else {
-                                assert oldClient instanceof 
GridShmemCommunicationClient;
-
-                                hasShmemClient = true;
-                            }
                         }
 
                         boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
 
                         if (log.isDebugEnabled())
                             log.debug("Received incoming connection from 
remote node " +
@@ -380,7 +368,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (reserved) {
                             try {
                                 GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
+                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
 
                                 fut.onDone(client);
                             }
@@ -402,11 +390,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                         else {
                             boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, fut));
 
                             if (reserved) {
                                 GridTcpNioCommunicationClient client =
-                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
+                                    connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true);
 
                                 fut.onDone(client);
                             }
@@ -474,7 +462,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
              * @param node Node.
              * @param rcvCnt Number of received messages..
              * @param sndRes If {@code true} sends response for recovery 
handshake.
-             * @param createClient If {@code true} creates NIO communication 
client.
              * @return Client.
              */
             private GridTcpNioCommunicationClient connected(
@@ -482,8 +469,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridNioSession ses,
                 ClusterNode node,
                 long rcvCnt,
-                boolean sndRes,
-                boolean createClient) {
+                boolean sndRes) {
                 recovery.onHandshake(rcvCnt);
 
                 ses.recoveryDescriptor(recovery);
@@ -495,16 +481,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 recovery.connected();
 
-                GridTcpNioCommunicationClient client = null;
-
-                if (createClient) {
-                    client = new GridTcpNioCommunicationClient(ses, log);
+                GridTcpNioCommunicationClient client = new 
GridTcpNioCommunicationClient(ses, log);
 
-                    GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
+                GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
 
-                    assert oldClient == null : "Client already created [node=" 
+ node + ", client=" + client +
-                            ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
-                }
+                assert oldClient == null : "Client already created [node=" + 
node + ", client=" + client +
+                        ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
 
                 return client;
             }
@@ -532,28 +514,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final GridFutureAdapter<GridCommunicationClient> fut;
 
-                /** */
-                private final boolean createClient;
-
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
                  * @param rmtNode Remote node.
                  * @param msg Handshake message.
-                 * @param createClient If {@code true} creates NIO 
communication client..
                  * @param fut Connect future.
                  */
                 ConnectClosure(GridNioSession ses,
                     GridNioRecoveryDescriptor recoveryDesc,
                     ClusterNode rmtNode,
                     HandshakeMessage msg,
-                    boolean createClient,
                     GridFutureAdapter<GridCommunicationClient> fut) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
                     this.msg = msg;
-                    this.createClient = createClient;
                     this.fut = fut;
                 }
 
@@ -566,7 +542,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     msgFut.get();
 
                                     GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false, createClient);
+                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false);
 
                                     fut.onDone(client);
                                 }

Reply via email to