# Removed shmem from communication
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c014b001 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c014b001 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c014b001 Branch: refs/heads/ignite-sql-tests Commit: c014b0017cc271a5942e242f0d28d075355e94a6 Parents: 3f23de3 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Fri Mar 13 17:20:36 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Fri Mar 13 17:20:36 2015 -0700 ---------------------------------------------------------------------- .../util/nio/GridShmemCommunicationClient.java | 146 ------------------- .../communication/tcp/TcpCommunicationSpi.java | 44 ++---- 2 files changed, 10 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c014b001/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java deleted file mode 100644 index b0a6df3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.nio; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * - */ -public class GridShmemCommunicationClient extends GridAbstractCommunicationClient { - /** */ - private final IpcSharedMemoryClientEndpoint shmem; - - /** */ - private final ByteBuffer writeBuf; - - /** */ - private final MessageFormatter formatter; - - /** - * @param metricsLsnr Metrics listener. - * @param port Shared memory IPC server port. - * @param connTimeout Connection timeout. - * @param log Logger. - * @param formatter Message formatter. - * @throws IgniteCheckedException If failed. - */ - public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr, int port, long connTimeout, - IgniteLogger log, MessageFormatter formatter) throws IgniteCheckedException { - super(metricsLsnr); - - assert metricsLsnr != null; - assert port > 0 && port < 0xffff; - assert connTimeout >= 0; - - shmem = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log); - - writeBuf = ByteBuffer.allocate(8 << 10); - - writeBuf.order(ByteOrder.nativeOrder()); - - this.formatter = formatter; - } - - /** {@inheritDoc} */ - @Override public synchronized void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) - throws IgniteCheckedException { - handshakeC.applyx(shmem.inputStream(), shmem.outputStream()); - } - - /** {@inheritDoc} */ - @Override public boolean close() { - boolean res = super.close(); - - if (res) - shmem.close(); - - return res; - } - - /** {@inheritDoc} */ - @Override public void forceClose() { - super.forceClose(); - - // Do not call forceClose() here. - shmem.close(); - } - - /** {@inheritDoc} */ - @Override public synchronized void sendMessage(byte[] data, int len) throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Communication client was closed: " + this); - - try { - shmem.outputStream().write(data, 0, len); - - metricsLsnr.onBytesSent(len); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); - } - - markUsed(); - } - - /** {@inheritDoc} */ - @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) - throws IgniteCheckedException { - if (closed()) - throw new IgniteCheckedException("Communication client was closed: " + this); - - assert writeBuf.hasArray(); - - try { - int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer()); - - metricsLsnr.onBytesSent(cnt); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); - } - - markUsed(); - - return false; - } - - /** {@inheritDoc} */ - @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public void flushIfNeeded(long timeout) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridShmemCommunicationClient.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c014b001/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b38dc15..65eed6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -319,8 +319,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient oldClient = clients.get(sndId); - boolean hasShmemClient = false; - if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { if (log.isDebugEnabled()) @@ -332,11 +330,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } - else { - assert oldClient instanceof GridShmemCommunicationClient; - - hasShmemClient = true; - } } GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>(); @@ -363,15 +356,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } - else { - assert oldClient instanceof GridShmemCommunicationClient; - - hasShmemClient = true; - } } boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); if (log.isDebugEnabled()) log.debug("Received incoming connection from remote node " + @@ -380,7 +368,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserved) { try { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); fut.onDone(client); } @@ -402,11 +390,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } else { boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, fut)); if (reserved) { GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true); fut.onDone(client); } @@ -474,7 +462,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node. * @param rcvCnt Number of received messages.. * @param sndRes If {@code true} sends response for recovery handshake. - * @param createClient If {@code true} creates NIO communication client. * @return Client. */ private GridTcpNioCommunicationClient connected( @@ -482,8 +469,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioSession ses, ClusterNode node, long rcvCnt, - boolean sndRes, - boolean createClient) { + boolean sndRes) { recovery.onHandshake(rcvCnt); ses.recoveryDescriptor(recovery); @@ -495,16 +481,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter recovery.connected(); - GridTcpNioCommunicationClient client = null; - - if (createClient) { - client = new GridTcpNioCommunicationClient(ses, log); + GridTcpNioCommunicationClient client = new GridTcpNioCommunicationClient(ses, log); - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + - ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; - } + assert oldClient == null : "Client already created [node=" + node + ", client=" + client + + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; return client; } @@ -532,28 +514,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final GridFutureAdapter<GridCommunicationClient> fut; - /** */ - private final boolean createClient; - /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. * @param rmtNode Remote node. * @param msg Handshake message. - * @param createClient If {@code true} creates NIO communication client.. * @param fut Connect future. */ ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, HandshakeMessage msg, - boolean createClient, GridFutureAdapter<GridCommunicationClient> fut) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; this.msg = msg; - this.createClient = createClient; this.fut = fut; } @@ -566,7 +542,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + connected(recoveryDesc, ses, rmtNode, msg.received(), false); fut.onDone(client); }