Fixed SSL bugs. Added test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e37efa33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e37efa33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e37efa33 Branch: refs/heads/ignite-gg-9615 Commit: e37efa3357d96e7831068eaec29627bd1bcc2ba0 Parents: c5dc492 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Thu Jul 23 11:37:26 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Thu Jul 23 11:38:35 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 5 +- .../util/nio/ssl/BlockingSslHandler.java | 61 ++++++++++++-------- .../communication/tcp/TcpCommunicationSpi.java | 60 ++++++++++--------- .../ignite/spi/discovery/tcp/ServerImpl.java | 5 +- .../tcp/IgniteCacheSslStartStopSelfTest.java | 46 +++++++++++++++ .../IgniteCacheFailoverTestSuite.java | 4 +- 6 files changed, 125 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8a246dc..b746261 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2064,9 +2064,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { private void ackSecurity() { assert log != null; - if (log.isInfoEnabled()) - log.info("Security status [authentication=" + onOff(ctx.security().enabled()) - + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']'); + U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled()) + + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']'); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java index eee90d8..9890efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java @@ -39,14 +39,14 @@ public class BlockingSslHandler { /** Logger. */ private IgniteLogger log; - /** */ + /** Socket channel. */ private SocketChannel ch; - /** */ - private GridFutureAdapter<ByteBuffer> fut; + /** Order. */ + private final ByteOrder order; /** SSL engine. */ - private SSLEngine sslEngine; + private final SSLEngine sslEngine; /** Handshake completion flag. */ private boolean handshakeFinished; @@ -69,33 +69,38 @@ public class BlockingSslHandler { /** * @param sslEngine SSLEngine. * @param ch Socket channel. - * @param fut Future. + * @param directBuf Direct buffer flag. + * @param order Byte order. * @param log Logger. */ - public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer> fut, - IgniteLogger log) throws SSLException { + public BlockingSslHandler(SSLEngine sslEngine, + SocketChannel ch, + boolean directBuf, + ByteOrder order, + IgniteLogger log) + throws SSLException { this.ch = ch; - this.fut = fut; this.log = log; - this.sslEngine = sslEngine; + this.order = order; // Allocate a little bit more so SSL engine would not return buffer overflow status. int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50; - outNetBuf = ByteBuffer.allocate(netBufSize); - inNetBuf = ByteBuffer.allocate(netBufSize); + outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize); + outNetBuf.order(order); // Initially buffer is empty. outNetBuf.position(0); outNetBuf.limit(0); + inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize); + inNetBuf.order(order); + appBuf = allocateAppBuff(); handshakeStatus = sslEngine.getHandshakeStatus(); - sslEngine.setUseClientMode(true); - if (log.isDebugEnabled()) log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']'); } @@ -122,12 +127,6 @@ public class BlockingSslHandler { case FINISHED: { handshakeFinished = true; - if (fut != null) { - appBuf.flip(); - - fut.onDone(appBuf); - } - loop = false; break; @@ -187,6 +186,15 @@ public class BlockingSslHandler { } /** + * @return Application buffer with decoded data. + */ + public ByteBuffer applicationBuffer() { + appBuf.flip(); + + return appBuf; + } + + /** * Encrypts data to be written to the network. * * @param src data to encrypt. @@ -439,27 +447,32 @@ public class BlockingSslHandler { int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2); - return ByteBuffer.allocate(appBufSize); + ByteBuffer buf = ByteBuffer.allocate(appBufSize); + buf.order(order); + + return buf; } /** * Read data from net buffer. */ - private void readFromNet() { + private void readFromNet() throws IgniteCheckedException { try { inNetBuf.clear(); - ch.read(inNetBuf); + int read = ch.read(inNetBuf); + + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); } catch (IOException e) { - e.printStackTrace(); + throw new IgniteCheckedException("Failed to write byte to socket.", e); } } /** * Copies data from out net buffer and passes it to the underlying chain. * - * @return Nothing. * @throws GridNioException If send failed. */ private void writeNetBuffer() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 99ca2b7..48dc52e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2051,12 +2051,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt = -1; - GridTuple<SSLEngine> ssl = new GridTuple<>(); + SSLEngine sslEngine = null; try { ch.socket().connect(addr, (int)connTimeout); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl); + if (isSslEnabled()) { + sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine(); + + sslEngine.setUseClientMode(true); + } + + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine); if (rcvCnt == -1) return null; @@ -2072,10 +2078,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter meta.put(NODE_ID_META, node.id()); if (isSslEnabled()) { - assert ssl != null; - assert ssl.get() != null; + assert sslEngine != null; - meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get()); + meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); } if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); @@ -2211,7 +2216,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - @Nullable GridTuple<SSLEngine> ssl + @Nullable SSLEngine ssl ) throws IgniteCheckedException { HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2233,23 +2238,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ByteBuffer buf; if (isSslEnabled()) { - GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>(); - - SSLEngine sslEngine = ignite.configuration().getSslContextFactory() - .create().createSSLEngine(); - - sslEngine.setUseClientMode(true); - - sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log); + sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log); if (!sslHnd.handshake()) - throw new IgniteCheckedException("SSL handshake isn't completed."); - - ssl.set(sslEngine); + throw new IgniteCheckedException("SSL handshake is not completed."); - ByteBuffer handBuff = handFut.get(); + ByteBuffer handBuff = sslHnd.applicationBuffer(); - if (handBuff.limit() < 17) { + if (handBuff.remaining() < 17) { buf = ByteBuffer.allocate(1000); int read = ch.read(buf); @@ -2338,18 +2334,30 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf = ByteBuffer.allocate(1000); + ByteBuffer decode = null; + buf.order(ByteOrder.nativeOrder()); - int read = ch.read(buf); + for (int i = 0; i < 9; ) { + int read = ch.read(buf); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node recovery handshake " + + "(connection closed)."); - buf.flip(); + buf.flip(); + + decode = sslHnd.decode(buf); - rcvCnt = sslHnd.decode(buf).getLong(1); - } else { + i += decode.remaining(); + + buf.flip(); + buf.compact(); + } + + rcvCnt = decode.getLong(1); + } + else { buf = ByteBuffer.allocate(9); buf.order(ByteOrder.nativeOrder()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 34f90f7..68552a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4242,10 +4242,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); - if (X.hasCause(e, SSLException.class) && spi.isSslEnabled()) + if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0()) LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " + "Missed SSL configuration on node? [sock=" + sock + ']'); - else if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) { + else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) + && !spi.isNodeStopping0()) { if (U.isMacInvalidArgumentError(e)) LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" + U.MAC_INVALID_ARG_MSG); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java new file mode 100644 index 0000000..9bf6caa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.testframework.*; + +/** + * + */ +public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstractSelfTest { + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setSslContextFactory(GridTestUtils.sslFactory()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 60_000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e37efa33/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index 80bfbf2..524bfb3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.distributed.replicated.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.testframework.*; import java.util.*; @@ -75,6 +75,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class); suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class); + suite.addTestSuite(IgniteCacheSslStartStopSelfTest.class); + return suite; } }