# IGNITE-61 - Fixing client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b9cb1573 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b9cb1573 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b9cb1573 Branch: refs/heads/ignite-82 Commit: b9cb15731239bb086895fcf1b7190317f90a483d Parents: d64fefb Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Feb 9 00:28:35 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Feb 9 00:28:35 2015 -0800 ---------------------------------------------------------------------- .../GridClientConnectionManagerAdapter.java | 84 +-- .../connection/GridClientNioTcpConnection.java | 135 +---- .../communication/GridIoMessageFactory.java | 21 - .../GridClientHandshakeRequestWrapper.java | 117 ----- .../GridClientHandshakeResponseWrapper.java | 94 ---- .../message/GridClientMessageWrapper.java | 275 ---------- .../message/GridClientPingPacketWrapper.java | 89 ---- .../protocols/tcp/GridClientPacketType.java | 3 + .../protocols/tcp/GridMemcachedMessage.java | 3 + .../protocols/tcp/GridTcpRestDirectParser.java | 523 ------------------- .../protocols/tcp/GridTcpRestNioListener.java | 29 +- .../rest/protocols/tcp/GridTcpRestParser.java | 26 + .../rest/protocols/tcp/GridTcpRestProtocol.java | 8 +- 13 files changed, 72 insertions(+), 1335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java index 476d9c5..6cecef0 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -18,18 +18,17 @@ package org.apache.ignite.client.impl.connection; import org.apache.ignite.*; -import org.apache.ignite.internal.direct.*; -import org.apache.ignite.logger.java.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.plugin.security.*; import org.apache.ignite.client.*; import org.apache.ignite.client.impl.*; import org.apache.ignite.client.util.*; import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.protocols.tcp.*; import org.apache.ignite.internal.util.nio.*; import org.apache.ignite.internal.util.nio.ssl.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.plugin.security.*; import org.jetbrains.annotations.*; import javax.net.ssl.*; @@ -143,12 +142,12 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio GridNioFilter[] filters; - GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(), gridLog, true); + GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(), gridLog, false); if (sslCtx != null) { GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog); - sslFilter.directMode(true); + sslFilter.directMode(false); sslFilter.clientMode(true); filters = new GridNioFilter[]{codecFilter, sslFilter}; @@ -166,7 +165,7 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio .byteOrder(ByteOrder.nativeOrder()) .tcpNoDelay(cfg.isTcpNoDelay()) .directBuffer(true) - .directMode(true) + .directMode(false) .socketReceiveBufferSize(0) .socketSendBufferSize(0) .idleTimeout(Long.MAX_VALUE) @@ -594,22 +593,10 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio assert conn != null; - if (msg instanceof GridClientMessageWrapper) { - GridClientMessageWrapper req = (GridClientMessageWrapper)msg; - - if (req.messageSize() != 0) { - assert req.message() != null; - - conn.handleResponse(req); - } - else - conn.handlePingResponse(); - } - else { - assert msg instanceof GridClientPingPacket : msg; - + if (msg instanceof GridClientPingPacket) conn.handlePingResponse(); - } + else + conn.handleResponse((GridClientMessage)msg); } } @@ -647,57 +634,4 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio ses.close(); } } - - /** - * - */ - private static class NioParser implements GridNioParser { - /** Message metadata key. */ - private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) - throws IOException, IgniteCheckedException { - GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE); - - if (handshakeFut != null) { - byte code = buf.get(); - - return new GridClientHandshakeResponse(code); - } - - MessageAdapter msg = ses.removeMeta(MSG_META_KEY); - - if (msg == null && buf.hasRemaining()) { - byte type = buf.get(); - - if (type == GridClientMessageWrapper.REQ_HEADER) { - msg = new GridClientMessageWrapper(); - - msg.setReader(new DirectMessageReader(null)); - } - else - throw new IOException("Invalid message type: " + type); - } - - boolean finished = false; - - if (buf.hasRemaining()) - finished = msg.readFrom(buf); - - if (finished) - return msg; - else { - ses.addMeta(MSG_META_KEY, msg); - - return null; - } - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - // No encoding needed for direct messages. - throw new UnsupportedEncodingException(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java index 51da3d7..3cead45 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java @@ -33,7 +33,6 @@ import org.jetbrains.annotations.*; import javax.net.ssl.*; import java.io.*; import java.net.*; -import java.nio.*; import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; @@ -43,6 +42,7 @@ import java.util.logging.*; import static org.apache.ignite.client.GridClientCacheFlag.*; import static org.apache.ignite.client.impl.connection.GridClientConnectionCloseReason.*; import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; /** * This class performs request to grid over tcp protocol. Serialization is performed with marshaller @@ -194,9 +194,9 @@ public class GridClientNioTcpConnection extends GridClientConnection { else if (marsh instanceof GridClientJdkMarshaller) req.marshallerId(GridClientJdkMarshaller.ID); - GridClientHandshakeRequestWrapper wrapper = new GridClientHandshakeRequestWrapper(req); + ses.addMeta(MARSHALLER.ordinal(), marsh); - ses.send(wrapper); + ses.send(req); handshakeFut.get(); @@ -392,7 +392,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { else if (now - lastPingSndTime > pingInterval && lastPingRcvTime != Long.MAX_VALUE) { lastPingRcvTime = Long.MAX_VALUE; - ses.send(new GridClientPingPacketWrapper()); + ses.send(GridClientPingPacket.PING_MESSAGE); lastPingSndTime = now; } @@ -415,22 +415,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { assert old == null; - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(msg); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + msg, e); - - removePending(reqId); - - fut.onDone(e); - - return fut; - } - - GridNioFuture<?> sndFut = ses.send(wrapper); + GridNioFuture<?> sndFut = ses.send(msg); lastMsgSndTime = U.currentTimeMillis(); @@ -473,59 +458,38 @@ public class GridClientNioTcpConnection extends GridClientConnection { * Handles incoming response message. If this connection is closed this method would signal empty event * if there is no more pending requests. * - * @param req Incoming response data. + * @param res Incoming response data. */ @SuppressWarnings({"unchecked", "TooBroadScope"}) - void handleResponse(GridClientMessageWrapper req) { + void handleResponse(GridClientMessage res) { lastMsgRcvTime = U.currentTimeMillis(); - TcpClientFuture fut = pendingReqs.get(req.requestId()); + TcpClientFuture fut = pendingReqs.get(res.requestId()); if (fut == null) { log.warning("Response for an unknown request is received, ignoring. " + - "[req=" + req + ", ses=" + ses + ']'); + "[res=" + res + ", ses=" + ses + ']'); return; } if (fut.forward()) { - GridRouterResponse msg = new GridRouterResponse( - req.messageArray(), - req.requestId(), - clientId, - req.destinationId()); - - removePending(msg.requestId()); - - fut.onDone(msg); + // TODO: IGNITE-61 +// GridRouterResponse msg = new GridRouterResponse( +// res.messageArray(), +// res.requestId(), +// clientId, +// res.destinationId()); +// +// removePending(msg.requestId()); +// +// fut.onDone(msg); } else { - GridClientMessage msg; - - if (keepPortablesMode != null) - keepPortablesMode.set(fut.keepPortables()); - - try { - msg = marsh.unmarshal(req.messageArray()); - } - catch (IOException e) { - fut.onDone(new GridClientException("Failed to unmarshal message.", e)); - - return; - } - - finally { - if (keepPortablesMode != null) - keepPortablesMode.set(true); - } - msg.requestId(req.requestId()); - msg.clientId(req.clientId()); - msg.destinationId(req.destinationId()); - - if (msg instanceof GridClientResponse) - handleClientResponse(fut, (GridClientResponse)msg); + if (res instanceof GridClientResponse) + handleClientResponse(fut, (GridClientResponse)res); else - log.warning("Unsupported response type received: " + msg); + log.warning("Unsupported response type received: " + res); } } @@ -559,22 +523,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { req.requestId(resp.requestId()); - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(req); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + req, e); - - removePending(resp.requestId()); - - fut.onDone(e); - - return; - } - - ses.send(wrapper); + ses.send(req); return; } @@ -588,22 +537,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { src.sessionToken(sesTok); - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(src); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + src, e); - - removePending(resp.requestId()); - - fut.onDone(e); - - return; - } - - ses.send(wrapper); + ses.send(src); return; } @@ -624,27 +558,6 @@ public class GridClientNioTcpConnection extends GridClientConnection { } /** - * @param msg Client message. - * @return Message wrapper for direct marshalling. - * @throws IOException If failed to marshal message. - */ - private GridClientMessageWrapper messageWrapper(GridClientMessage msg) throws IOException { - GridClientMessageWrapper wrapper = new GridClientMessageWrapper(); - - wrapper.requestId(msg.requestId()); - wrapper.clientId(clientId); - wrapper.destinationId(msg.destinationId()); - - ByteBuffer data = (msg instanceof GridRouterRequest) ? ByteBuffer.wrap(((GridRouterRequest)msg).body()) : - marsh.marshal(msg, 0); - - wrapper.message(data); - wrapper.messageSize(data.remaining() + 40); - - return wrapper; - } - - /** * Removes pending request and signals to {@link #closedLatch} if necessary. * * @param reqId Request Id. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 0634b1f..0f46e3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.clock.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.internal.processors.rest.client.message.*; import org.apache.ignite.internal.processors.rest.handlers.task.*; import org.apache.ignite.internal.processors.rest.protocols.tcp.*; import org.apache.ignite.internal.processors.streamer.*; @@ -445,26 +444,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 72: - msg = new GridClientHandshakeRequestWrapper(); - - break; - - case 73: - msg = new GridClientHandshakeResponseWrapper(); - - break; - - case 74: - msg = new GridClientMessageWrapper(); - - break; - - case 75: - msg = new GridClientPingPacketWrapper(); - - break; - case 76: msg = new GridTaskResultRequest(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java deleted file mode 100644 index 71dfcf9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.rest.client.message; - -import org.apache.ignite.internal.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; - -/** - * Client handshake wrapper for direct marshalling. - */ -public class GridClientHandshakeRequestWrapper extends MessageAdapter { - /** */ - private static final long serialVersionUID = -5705048094821942662L; - - /** Signal char. */ - public static final byte HANDSHAKE_HEADER = (byte)0x91; - - /** Stream. */ - private final DirectByteBufferStream stream = new DirectByteBufferStream(null); - - /** Handshake bytes. */ - private byte[] bytes; - - /** - * - */ - public GridClientHandshakeRequestWrapper() { - // No-op. - } - - /** - * - * @param req Handshake request. - */ - public GridClientHandshakeRequestWrapper(GridClientHandshakeRequest req) { - bytes = req.rawBytes(); - } - - /** - * @return Handshake bytes. - */ - public byte[] bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - stream.setBuffer(buf); - - if (!typeWritten) { - if (!buf.hasRemaining()) - return false; - - stream.writeByte(directType()); - - typeWritten = true; - } - - stream.writeByteArray(bytes, 0, bytes.length); - - return stream.lastFinished(); - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - stream.setBuffer(buf); - - bytes = stream.readByteArray(GridClientHandshakeRequest.PACKET_SIZE); - - return stream.lastFinished(); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return HANDSHAKE_HEADER; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - GridClientHandshakeRequestWrapper _clone = new GridClientHandshakeRequestWrapper(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - GridClientHandshakeRequestWrapper _clone = (GridClientHandshakeRequestWrapper)_msg; - - _clone.bytes = bytes; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClientHandshakeRequestWrapper.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java deleted file mode 100644 index e598f66..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.rest.client.message; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; - -/** - * Client handshake wrapper for direct marshalling. - */ -public class GridClientHandshakeResponseWrapper extends MessageAdapter { - /** */ - private static final long serialVersionUID = -1529807975073967381L; - - /** */ - private byte code; - - /** - * - */ - public GridClientHandshakeResponseWrapper() { - // No-op. - } - - /** - * @param code Response code. - */ - public GridClientHandshakeResponseWrapper(byte code) { - this.code = code; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - writer.setBuffer(buf); - - if (!typeWritten) { - if (!writer.writeByte(null, directType())) - return false; - - typeWritten = true; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return code; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - GridClientHandshakeResponseWrapper _clone = new GridClientHandshakeResponseWrapper(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - GridClientHandshakeResponseWrapper _clone = (GridClientHandshakeResponseWrapper)_msg; - - _clone.code = code; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClientHandshakeResponseWrapper.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java deleted file mode 100644 index 746a40f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.rest.client.message; - -import org.apache.ignite.internal.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; -import java.util.*; - -/** - * Client message wrapper for direct marshalling. - */ -public class GridClientMessageWrapper extends MessageAdapter { - /** */ - private static final long serialVersionUID = 5284375300887454697L; - - /** Client request header. */ - public static final byte REQ_HEADER = (byte)0x90; - - /** Stream. */ - private final DirectByteBufferStream stream = new DirectByteBufferStream(null); - - /** */ - private int msgSize; - - /** */ - private long reqId; - - /** */ - private UUID clientId; - - /** */ - private UUID destId; - - /** */ - private ByteBuffer msg; - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @param reqId Request ID. - */ - public void requestId(long reqId) { - this.reqId = reqId; - } - - /** - * @return Message size. - */ - public int messageSize() { - return msgSize; - } - - /** - * @param msgSize Message size. - */ - public void messageSize(int msgSize) { - this.msgSize = msgSize; - } - - /** - * @return Client ID. - */ - public UUID clientId() { - return clientId; - } - - /** - * @param clientId Client ID. - */ - public void clientId(UUID clientId) { - this.clientId = clientId; - } - - /** - * @return Destination ID. - */ - public UUID destinationId() { - return destId; - } - - /** - * @param destId Destination ID. - */ - public void destinationId(UUID destId) { - this.destId = destId; - } - - /** - * @return Message buffer. - */ - public ByteBuffer message() { - return msg; - } - - /** - * @return Message bytes. - */ - public byte[] messageArray() { - assert msg.hasArray(); - assert msg.position() == 0 && msg.remaining() == msg.capacity(); - - return msg.array(); - } - - /** - * @param msg Message bytes. - */ - public void message(ByteBuffer msg) { - this.msg = msg; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - stream.setBuffer(buf); - - if (!typeWritten) { - if (stream.remaining() < 1) - return false; - - stream.writeByte(directType()); - - typeWritten = true; - } - - switch (state) { - case 0: - if (stream.remaining() < 4) - return false; - - stream.writeInt(msgSize); - - state++; - - case 1: - if (stream.remaining() < 8) - return false; - - stream.writeLong(reqId); - - state++; - - case 2: - if (stream.remaining() < 16) - return false; - - stream.writeByteArray(U.uuidToBytes(clientId), 0, 16); - - state++; - - case 3: - if (stream.remaining() < 16) - return false; - - stream.writeByteArray(U.uuidToBytes(destId), 0, 16); - - state++; - - case 4: - stream.writeByteArray(msg.array(), msg.position(), msg.remaining()); - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - stream.setBuffer(buf); - - switch (state) { - case 0: - if (stream.remaining() < 4) - return false; - - msgSize = stream.readInt(); - - if (msgSize == 0) // Ping message. - return true; - - state++; - - case 1: - if (stream.remaining() < 8) - return false; - - reqId = stream.readLong(); - - state++; - - case 2: - if (stream.remaining() < 16) - return false; - - clientId = U.bytesToUuid(stream.readByteArray(16), 0); - - state++; - - case 3: - if (stream.remaining() < 16) - return false; - - destId = U.bytesToUuid(stream.readByteArray(16), 0); - - state++; - - case 4: - byte[] msg0 = stream.readByteArray(msgSize); - - if (!stream.lastFinished()) - return false; - - msg = ByteBuffer.wrap(msg0); - - state++; - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return REQ_HEADER; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - GridClientMessageWrapper _clone = new GridClientMessageWrapper(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - GridClientMessageWrapper _clone = (GridClientMessageWrapper)_msg; - - _clone.reqId = reqId; - _clone.msgSize = msgSize; - _clone.clientId = clientId; - _clone.destId = destId; - _clone.msg = msg; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClientMessageWrapper.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java deleted file mode 100644 index ca8e804..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.rest.client.message; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.nio.*; - -/** - * Ping packet wrapper for direct marshalling. - */ -public class GridClientPingPacketWrapper extends MessageAdapter { - /** */ - private static final long serialVersionUID = -3956036611004055629L; - - /** Ping message size (always zero). */ - private int size; - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - writer.setBuffer(buf); - - if (!typeWritten) { - if (!writer.writeByte(null, directType())) - return false; - - typeWritten = true; - } - - switch (state) { - case 0: - if (!writer.writeInt("size", size)) - return false; - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return GridClientMessageWrapper.REQ_HEADER; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - GridClientPingPacketWrapper _clone = new GridClientPingPacketWrapper(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - GridClientPingPacketWrapper _clone = (GridClientPingPacketWrapper)_msg; - - _clone.size = size; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridClientPingPacketWrapper.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java index 0bc8099..e47e825 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java @@ -27,6 +27,9 @@ public enum GridClientPacketType { /** Ignite handshake. */ IGNITE_HANDSHAKE, + /** Ignite handshake response. */ + IGNITE_HANDSHAKE_RES, + /** Ignite message. */ IGNITE } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java index 707383f..a42fb57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java @@ -51,6 +51,9 @@ public class GridMemcachedMessage implements GridClientMessage { /** Client handshake flag. */ public static final byte IGNITE_HANDSHAKE_FLAG = (byte)0x91; + /** Client handshake flag. */ + public static final byte IGNITE_HANDSHAKE_RES_FLAG = (byte)0x92; + /** Success status. */ public static final int SUCCESS = 0x0000; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java deleted file mode 100644 index b743e66..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.client.marshaller.*; -import org.apache.ignite.internal.direct.*; -import org.apache.ignite.internal.processors.rest.client.message.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.nio.charset.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * - */ -public class GridTcpRestDirectParser implements GridNioParser { - /** UTF-8 charset. */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** Message metadata key. */ - private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** Message reader. */ - private final MessageReader rdr = new DirectMessageReader(null); - - /** Protocol handler. */ - private final GridTcpRestProtocol proto; - - /** - * @param proto Protocol handler. - */ - public GridTcpRestDirectParser(GridTcpRestProtocol proto) { - this.proto = proto; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) - throws IOException, IgniteCheckedException { - ParserState state = ses.removeMeta(PARSER_STATE.ordinal()); - - if (state != null) { - assert state.packetType() == GridClientPacketType.MEMCACHE; - - Object memcacheMsg = parseMemcachePacket(ses, buf, state); - - if (memcacheMsg == null) - ses.addMeta(PARSER_STATE.ordinal(), state); - - return memcacheMsg; - } - - MessageAdapter msg = ses.removeMeta(MSG_META_KEY); - - if (msg == null && buf.hasRemaining()) { - byte type = buf.get(buf.position()); - - if (type == GridClientMessageWrapper.REQ_HEADER) { - buf.get(); - - msg = new GridClientMessageWrapper(); - - msg.setReader(rdr); - } - else if (type == GridClientHandshakeRequestWrapper.HANDSHAKE_HEADER) { - buf.get(); - - msg = new GridClientHandshakeRequestWrapper(); - - msg.setReader(rdr); - } - else if (type == MEMCACHE_REQ_FLAG) { - state = new ParserState(); - - state.packet(new GridMemcachedMessage()); - state.packetType(GridClientPacketType.MEMCACHE); - - Object memcacheMsg = parseMemcachePacket(ses, buf, state); - - if (memcacheMsg == null) - ses.addMeta(PARSER_STATE.ordinal(), state); - - return memcacheMsg; - } - else - throw new IOException("Invalid message type: " + type); - } - - boolean finished = false; - - if (buf.hasRemaining()) - finished = msg.readFrom(buf); - - if (finished) { - if (msg instanceof GridClientMessageWrapper) { - GridClientMessageWrapper clientMsg = (GridClientMessageWrapper)msg; - - if (clientMsg.messageSize() == 0) - return GridClientPingPacket.PING_MESSAGE; - - GridClientMarshaller marsh = proto.marshaller(ses); - - GridClientMessage ret = marsh.unmarshal(clientMsg.messageArray()); - - ret.requestId(clientMsg.requestId()); - ret.clientId(clientMsg.clientId()); - ret.destinationId(clientMsg.destinationId()); - - return ret; - } - else { - assert msg instanceof GridClientHandshakeRequestWrapper; - - GridClientHandshakeRequestWrapper req = (GridClientHandshakeRequestWrapper)msg; - - GridClientHandshakeRequest ret = new GridClientHandshakeRequest(); - - ret.putBytes(req.bytes(), 0, 4); - - return ret; - } - } - else { - ses.addMeta(MSG_META_KEY, msg); - - return null; - } - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - // No encoding needed for direct messages. - throw new UnsupportedEncodingException(); - } - - /** - * Parses memcache protocol message. - * - * @param ses Session. - * @param buf Buffer containing not parsed bytes. - * @param state Current parser state. - * @return Parsed packet.s - * @throws IOException If packet cannot be parsed. - * @throws IgniteCheckedException If deserialization error occurred. - */ - @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state) - throws IOException, IgniteCheckedException { - assert state.packetType() == GridClientPacketType.MEMCACHE; - assert state.packet() != null; - assert state.packet() instanceof GridMemcachedMessage; - - GridMemcachedMessage req = (GridMemcachedMessage)state.packet(); - ByteArrayOutputStream tmp = state.buffer(); - int i = state.index(); - - while (buf.remaining() > 0) { - byte b = buf.get(); - - if (i == 0) - req.requestFlag(b); - else if (i == 1) - req.operationCode(b); - else if (i == 2 || i == 3) { - tmp.write(b); - - if (i == 3) { - req.keyLength(U.bytesToShort(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i == 4) - req.extrasLength(b); - else if (i >= 8 && i <= 11) { - tmp.write(b); - - if (i == 11) { - req.totalLength(U.bytesToInt(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i >= 12 && i <= 15) { - tmp.write(b); - - if (i == 15) { - req.opaque(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() - 1) { - req.extras(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() && - i < HDR_LEN + req.extrasLength() + req.keyLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) { - req.key(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() && - i < HDR_LEN + req.totalLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.totalLength() - 1) { - req.value(tmp.toByteArray()); - - tmp.reset(); - } - } - - if (i == HDR_LEN + req.totalLength() - 1) - // Assembled the packet. - return assemble(ses, req); - - i++; - } - - state.index(i); - - return null; - } - - /** - * Validates incoming packet and deserializes all fields that need to be deserialized. - * - * @param ses Session on which packet is being parsed. - * @param req Raw packet. - * @return Same packet with fields deserialized. - * @throws IOException If parsing failed. - * @throws IgniteCheckedException If deserialization failed. - */ - private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException { - byte[] extras = req.extras(); - - // First, decode key and value, if any - if (req.key() != null || req.value() != null) { - short keyFlags = 0; - short valFlags = 0; - - if (req.hasFlags()) { - if (extras == null || extras.length < FLAGS_LENGTH) - throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - keyFlags = U.bytesToShort(extras, 0); - valFlags = U.bytesToShort(extras, 2); - } - - if (req.key() != null) { - assert req.key() instanceof byte[]; - - byte[] rawKey = (byte[])req.key(); - - // Only values can be hessian-encoded. - req.key(decodeObj(keyFlags, rawKey)); - } - - if (req.value() != null) { - assert req.value() instanceof byte[]; - - byte[] rawVal = (byte[])req.value(); - - req.value(decodeObj(valFlags, rawVal)); - } - } - - if (req.hasExpiration()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL); - } - - if (req.hasInitial()) { - if (extras == null || extras.length < 16) - throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.initial(U.bytesToLong(extras, 8)); - } - - if (req.hasDelta()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.delta(U.bytesToLong(extras, 0)); - } - - if (extras != null) { - // Clients that include cache name must always include flags. - int len = 4; - - if (req.hasExpiration()) - len += 4; - - if (req.hasDelta()) - len += 8; - - if (req.hasInitial()) - len += 8; - - if (extras.length - len > 0) { - byte[] cacheName = new byte[extras.length - len]; - - U.arrayCopy(extras, len, cacheName, 0, extras.length - len); - - req.cacheName(new String(cacheName, UTF_8)); - } - } - - return req; - } - - /** - * Decodes value from a given byte array to the object according to the flags given. - * - * @param flags Flags. - * @param bytes Byte array to decode. - * @return Decoded value. - * @throws IgniteCheckedException If deserialization failed. - */ - private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException { - assert bytes != null; - - if ((flags & SERIALIZED_FLAG) != 0) - return proto.jdkMarshaller().unmarshal(bytes, null); - - int masked = flags & 0xff00; - - switch (masked) { - case BOOLEAN_FLAG: - return bytes[0] == '1'; - case INT_FLAG: - return U.bytesToInt(bytes, 0); - case LONG_FLAG: - return U.bytesToLong(bytes, 0); - case DATE_FLAG: - return new Date(U.bytesToLong(bytes, 0)); - case BYTE_FLAG: - return bytes[0]; - case FLOAT_FLAG: - return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); - case DOUBLE_FLAG: - return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); - case BYTE_ARR_FLAG: - return bytes; - default: - return new String(bytes, UTF_8); - } - } - - /** - * Holder for parser state and temporary buffer. - */ - protected static class ParserState { - /** Parser index. */ - private int idx; - - /** Temporary data buffer. */ - private ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - /** Packet being assembled. */ - private GridClientMessage packet; - - /** Packet type. */ - private GridClientPacketType packetType; - - /** Header data. */ - private HeaderData hdr; - - /** - * @return Stored parser index. - */ - public int index() { - return idx; - } - - /** - * @param idx Index to store. - */ - public void index(int idx) { - this.idx = idx; - } - - /** - * @return Temporary data buffer. - */ - public ByteArrayOutputStream buffer() { - return buf; - } - - /** - * @return Pending packet. - */ - @Nullable public GridClientMessage packet() { - return packet; - } - - /** - * @param packet Pending packet. - */ - public void packet(GridClientMessage packet) { - assert this.packet == null; - - this.packet = packet; - } - - /** - * @return Pending packet type. - */ - public GridClientPacketType packetType() { - return packetType; - } - - /** - * @param packetType Pending packet type. - */ - public void packetType(GridClientPacketType packetType) { - this.packetType = packetType; - } - - /** - * @return Header. - */ - public HeaderData header() { - return hdr; - } - - /** - * @param hdr Header. - */ - public void header(HeaderData hdr) { - this.hdr = hdr; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ParserState.class, this); - } - } - - /** - * Header. - */ - protected static class HeaderData { - /** Request Id. */ - private final long reqId; - - /** Request Id. */ - private final UUID clientId; - - /** Request Id. */ - private final UUID destId; - - /** - * @param reqId Request Id. - * @param clientId Client Id. - * @param destId Destination Id. - */ - private HeaderData(long reqId, UUID clientId, UUID destId) { - this.reqId = reqId; - this.clientId = clientId; - this.destId = destId; - } - - /** - * @return Request Id. - */ - public long reqId() { - return reqId; - } - - /** - * @return Client Id. - */ - public UUID clientId() { - return clientId; - } - - /** - * @return Destination Id. - */ - public UUID destinationId() { - return destId; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java index 12dc711..159f178 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java @@ -29,14 +29,11 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; -import java.io.*; -import java.nio.*; import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; -import static org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse.*; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; /** @@ -138,7 +135,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli memcachedLsnr.onMessage(ses, (GridMemcachedMessage)msg); else { if (msg == GridClientPingPacket.PING_MESSAGE) - ses.send(new GridClientPingPacketWrapper()); + ses.send(new GridClientPingPacket()); else if (msg instanceof GridClientHandshakeRequest) { GridClientHandshakeRequest hs = (GridClientHandshakeRequest)msg; @@ -168,7 +165,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli else { ses.addMeta(MARSHALLER.ordinal(), marsh); - ses.send(new GridClientHandshakeResponseWrapper(CODE_OK)); + ses.send(GridClientHandshakeResponse.OK); } } } @@ -205,27 +202,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli res.errorMessage("Failed to process client request: " + e.getMessage()); } - GridClientMessageWrapper wrapper = new GridClientMessageWrapper(); - - wrapper.requestId(msg.requestId()); - wrapper.clientId(msg.clientId()); - - try { - ByteBuffer bytes = proto.marshaller(ses).marshal(res, 0); - - wrapper.message(bytes); - - wrapper.messageSize(bytes.remaining() + 40); - } - catch (IOException e) { - U.error(log, "Failed to marshal response: " + res, e); - - ses.close(); - - return; - } - - ses.send(wrapper); + ses.send(res); } }); else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java index d94b598..978ae61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java @@ -81,6 +81,13 @@ public class GridTcpRestParser implements GridNioParser { break; + case IGNITE_HANDSHAKE_RES_FLAG: + buf.get(); + + state.packetType(GridClientPacketType.IGNITE_HANDSHAKE_RES); + + break; + default: throw new IOException("Failed to parse incoming packet (invalid packet start) [ses=" + ses + ", b=" + Integer.toHexString(hdr & 0xFF) + ']'); @@ -100,6 +107,12 @@ public class GridTcpRestParser implements GridNioParser { break; + case IGNITE_HANDSHAKE_RES: + if (buf.hasRemaining()) + res = new GridClientHandshakeResponse(buf.get()); + + break; + case IGNITE: res = parseCustomPacket(ses, buf, state); @@ -123,8 +136,21 @@ public class GridTcpRestParser implements GridNioParser { return encodeMemcache((GridMemcachedMessage)msg); else if (msg == GridClientPingPacket.PING_MESSAGE) return ByteBuffer.wrap(GridClientPingPacket.PING_PACKET); + else if (msg instanceof GridClientHandshakeRequest) { + byte[] bytes = ((GridClientHandshakeRequest)msg).rawBytes(); + + ByteBuffer buf = ByteBuffer.allocate(bytes.length + 1); + + buf.put(IGNITE_HANDSHAKE_FLAG); + buf.put(bytes); + + buf.flip(); + + return buf; + } else if (msg instanceof GridClientHandshakeResponse) return ByteBuffer.wrap(new byte[] { + IGNITE_HANDSHAKE_RES_FLAG, ((GridClientHandshakeResponse)msg).resultCode() }); else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b9cb1573/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java index c0c63a4..19971e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java @@ -106,7 +106,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter { lsnr = new GridTcpRestNioListener(log, this, hnd, ctx); - GridNioParser parser = new GridTcpRestDirectParser(this); + GridNioParser parser = new GridTcpRestParser(); try { host = resolveRestTcpHost(ctx.config()); @@ -207,14 +207,14 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter { private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr, GridNioParser parser, @Nullable SSLContext sslCtx, ClientConnectionConfiguration cfg) { try { - GridNioFilter codec = new GridNioCodecFilter(parser, log, true); + GridNioFilter codec = new GridNioCodecFilter(parser, log, false); GridNioFilter[] filters; if (sslCtx != null) { GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log); - sslFilter.directMode(true); + sslFilter.directMode(false); boolean auth = cfg.isRestTcpSslClientAuth(); @@ -244,7 +244,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter { .socketReceiveBufferSize(cfg.getRestTcpReceiveBufferSize()) .sendQueueLimit(cfg.getRestTcpSendQueueLimit()) .filters(filters) - .directMode(true) + .directMode(false) .build(); srv.idleTimeout(cfg.getRestIdleTimeout());