http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/package.html deleted file mode 100644 index 3122418..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - REST protocol handlers. -</body> -</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridClientPacketType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridClientPacketType.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridClientPacketType.java deleted file mode 100644 index a37b8d5..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridClientPacketType.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -/** - * Type of message being parsed. - */ -public enum GridClientPacketType { - /** Memcache protocol message. */ - MEMCACHE, - - /** GridGain handshake. */ - GRIDGAIN_HANDSHAKE, - - /** GridGain message. */ - GRIDGAIN -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessage.java deleted file mode 100644 index fb96b02..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessage.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.gridgain.grid.kernal.processors.rest.client.message.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Memcached protocol request. - */ -public class GridMemcachedMessage implements GridClientMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Random UUID used for memcached clients authentication. */ - private static final UUID MEMCACHED_ID = UUID.randomUUID(); - - /** Header length. */ - public static final int HDR_LEN = 24; - - /** Flags length. */ - public static final byte FLAGS_LENGTH = 4; - - /** Memcache client request flag. */ - public static final byte MEMCACHE_REQ_FLAG = (byte)0x80; - - /** Response flag. */ - public static final byte MEMCACHE_RES_FLAG = (byte)0x81; - - /** Custom client request flag. */ - public static final byte GRIDGAIN_REQ_FLAG = (byte)0x90; - - /** Client handshake flag. */ - public static final byte GRIDGAIN_HANDSHAKE_FLAG = (byte)0x91; - - /** Success status. */ - public static final int SUCCESS = 0x0000; - - /** Key not found status. */ - public static final int KEY_NOT_FOUND = 0x0001; - - /** Failure status. */ - public static final int FAILURE = 0x0004; - - /** Serialized flag. */ - public static final int SERIALIZED_FLAG = 1; - - /** Boolean flag. */ - public static final int BOOLEAN_FLAG = (1 << 8); - - /** Integer flag. */ - public static final int INT_FLAG = (2 << 8); - - /** Long flag. */ - public static final int LONG_FLAG = (3 << 8); - - /** Date flag. */ - public static final int DATE_FLAG = (4 << 8); - - /** Byte flag. */ - public static final int BYTE_FLAG = (5 << 8); - - /** Float flag. */ - public static final int FLOAT_FLAG = (6 << 8); - - /** Double flag. */ - public static final int DOUBLE_FLAG = (7 << 8); - - /** Byte array flag. */ - public static final int BYTE_ARR_FLAG = (8 << 8); - - /** Request flag. */ - private byte reqFlag; - - /** Operation code. */ - private byte opCode; - - /** Key length. */ - private short keyLen; - - /** Extras length. */ - private byte extrasLen; - - /** Status. */ - private int status; - - /** Total body length. */ - private int totalLen; - - /** Opaque. */ - private byte[] opaque; - - /** Extras. */ - private byte[] extras; - - /** Key. */ - private Object key; - - /** Value. */ - private Object val; - - /** Value to add/subtract */ - private Long delta; - - /** Initial value for increment and decrement commands. */ - private Long init; - - /** Expiration time. */ - private Long expiration; - - /** Cache name. */ - private String cacheName; - - /** - * Creates empty packet which will be filled in parser. - */ - GridMemcachedMessage() { - } - - /** - * Creates copy of request packet for easy response construction. - * - * @param req Source request packet. - */ - GridMemcachedMessage(GridMemcachedMessage req) { - assert req != null; - - reqFlag = req.reqFlag; - opCode = req.opCode; - - opaque = new byte[req.opaque.length]; - U.arrayCopy(req.opaque, 0, opaque, 0, req.opaque.length); - } - - /** {@inheritDoc} */ - @Override public long requestId() { - return U.bytesToInt(opaque, 0); - } - - /** {@inheritDoc} */ - @Override public void requestId(long reqId) { - U.intToBytes((int)reqId, opaque, 0); - } - - /** {@inheritDoc} */ - @Override public UUID clientId() { - return MEMCACHED_ID; - } - - /** {@inheritDoc} */ - @Override public void clientId(UUID id) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public UUID destinationId() { - return null; // No destination available for memcached packets. - } - - /** {@inheritDoc} */ - @Override public void destinationId(UUID id) { - throw new UnsupportedOperationException("destId is not supported by memcached packets."); - } - - /** {@inheritDoc} */ - @Override public byte[] sessionToken() { - return null; - } - - /** {@inheritDoc} */ - @Override public void sessionToken(byte[] sesTok) { - // No-op. - } - - /** - * @return Request flag. - */ - public byte requestFlag() { - return reqFlag; - } - - /** - * @param reqFlag Request flag. - */ - public void requestFlag(byte reqFlag) { - this.reqFlag = reqFlag; - } - - /** - * @return Operation code. - */ - public byte operationCode() { - return opCode; - } - - /** - * @param opCode Operation code. - */ - public void operationCode(byte opCode) { - assert opCode >= 0; - - this.opCode = opCode; - } - - /** - * @return Key length. - */ - public short keyLength() { - return keyLen; - } - - /** - * @param keyLen Key length. - */ - public void keyLength(short keyLen) { - assert keyLen >= 0; - - this.keyLen = keyLen; - } - - /** - * @return Extras length. - */ - public byte extrasLength() { - return extrasLen; - } - - /** - * @param extrasLen Extras length. - */ - public void extrasLength(byte extrasLen) { - assert extrasLen >= 0; - - this.extrasLen = extrasLen; - } - - /** - * @return Status. - */ - public int status() { - return status; - } - - /** - * @param status Status. - */ - public void status(int status) { - this.status = status; - } - - /** - * @return Total length. - */ - public int totalLength() { - return totalLen; - } - - /** - * @param totalLen Total length. - */ - public void totalLength(int totalLen) { - assert totalLen >= 0; - - this.totalLen = totalLen; - } - - /** - * @return Opaque. - */ - public byte[] opaque() { - return opaque; - } - - /** - * @param opaque Opaque. - */ - public void opaque(byte[] opaque) { - assert opaque != null; - - this.opaque = opaque; - } - - /** - * @return Extras. - */ - public byte[] extras() { - return extras; - } - - /** - * @param extras Extras. - */ - public void extras(byte[] extras) { - assert extras != null; - - this.extras = extras; - } - - /** - * @return Key. - */ - public Object key() { - return key; - } - - /** - * @param key Key. - */ - public void key(Object key) { - assert key != null; - - this.key = key; - } - - /** - * @return Value. - */ - public Object value() { - return val; - } - - /** - * @param val Value. - */ - public void value(Object val) { - assert val != null; - - this.val = val; - } - - /** - * @return Expiration. - */ - @Nullable public Long expiration() { - return expiration; - } - - /** - * @param expiration Expiration. - */ - public void expiration(long expiration) { - this.expiration = expiration; - } - - /** - * @return Delta for increment and decrement commands. - */ - @Nullable public Long delta() { - return delta; - } - - /** - * @param delta Delta for increment and decrement commands. - */ - public void delta(long delta) { - this.delta = delta; - } - - /** - * @return Initial value for increment and decrement commands. - */ - @Nullable public Long initial() { - return init; - } - - /** - * @param init Initial value for increment and decrement commands. - */ - public void initial(long init) { - this.init = init; - } - - /** - * @return Cache name. - */ - @Nullable public String cacheName() { - return cacheName; - } - - /** - * @param cacheName Cache name. - */ - public void cacheName(String cacheName) { - assert cacheName != null; - - this.cacheName = cacheName; - } - - /** - * @return Whether request MUST have flags in extras. - */ - public boolean hasFlags() { - return opCode == 0x01 || - opCode == 0x02 || - opCode == 0x03 || - opCode == 0x11 || - opCode == 0x12 || - opCode == 0x13; - } - - /** - * @return Whether request has expiration field. - */ - public boolean hasExpiration() { - return opCode == 0x01 || - opCode == 0x02 || - opCode == 0x03 || - opCode == 0x11 || - opCode == 0x12 || - opCode == 0x13; - } - - /** - * @return Whether request has delta field. - */ - public boolean hasDelta() { - return opCode == 0x05 || - opCode == 0x06 || - opCode == 0x15 || - opCode == 0x16; - } - - /** - * @return Whether request has initial field. - */ - public boolean hasInitial() { - return opCode == 0x05 || - opCode == 0x06 || - opCode == 0x15 || - opCode == 0x16; - } - - /** - * @return Whether to add data to response. - */ - public boolean addData() { - return opCode == 0x00 || - opCode == 0x05 || - opCode == 0x06 || - opCode == 0x09 || - opCode == 0x0B || - opCode == 0x0C || - opCode == 0x0D || - opCode == 0x20 || - opCode == 0x24 || - opCode == 0x25 || - opCode == 0x26 || - opCode == 0x27 || - opCode == 0x28 || - opCode == 0x29; - } - - /** - * @return Whether to add flags to response. - */ - public boolean addFlags() { - return opCode == 0x00 || - opCode == 0x09 || - opCode == 0x0C || - opCode == 0x0D; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridMemcachedMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java deleted file mode 100644 index 6acf3c2..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.nio.*; -import java.nio.charset.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.rest.protocols.tcp.GridMemcachedMessage.*; - -/** - * Memcached message wrapper for direct marshalling. - */ -public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdapter { - /** */ - private static final long serialVersionUID = 3053626103006980626L; - - /** UTF-8 charset. */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** - * Memcached message bytes. - */ - private byte[] bytes; - - /** - * - */ - public GridMemcachedMessageWrapper() { - // No-op. - } - - /** - * @param msg Message. - * @param jdkMarshaller JDK marshaller. - * @throws IgniteCheckedException If failed to marshal. - */ - public GridMemcachedMessageWrapper(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { - bytes = encodeMemcache(msg, jdkMarshaller); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putByteArrayClient(bytes)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return MEMCACHE_RES_FLAG; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridMemcachedMessageWrapper _clone = new GridMemcachedMessageWrapper(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - GridMemcachedMessageWrapper _clone = (GridMemcachedMessageWrapper)_msg; - - _clone.bytes = bytes; - } - - /** - * Encodes memcache message to a raw byte array. - * - * @param msg Message being serialized. - * @param jdkMarshaller JDK marshaller. - * @return Serialized message. - * @throws IgniteCheckedException If serialization failed. - */ - private byte[] encodeMemcache(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { - GridByteArrayList res = new GridByteArrayList(HDR_LEN - 1); - - int keyLen = 0; - - int keyFlags = 0; - - if (msg.key() != null) { - ByteArrayOutputStream rawKey = new ByteArrayOutputStream(); - - keyFlags = encodeObj(msg.key(), rawKey, jdkMarshaller); - - msg.key(rawKey.toByteArray()); - - keyLen = rawKey.size(); - } - - int dataLen = 0; - - int valFlags = 0; - - if (msg.value() != null) { - ByteArrayOutputStream rawVal = new ByteArrayOutputStream(); - - valFlags = encodeObj(msg.value(), rawVal, jdkMarshaller); - - msg.value(rawVal.toByteArray()); - - dataLen = rawVal.size(); - } - - int flagsLen = 0; - - if (msg.addFlags()) - flagsLen = FLAGS_LENGTH; - - res.add(msg.operationCode()); - - // Cast is required due to packet layout. - res.add((short)keyLen); - - // Cast is required due to packet layout. - res.add((byte)flagsLen); - - // Data type is always 0x00. - res.add((byte)0x00); - - res.add((short)msg.status()); - - res.add(keyLen + flagsLen + dataLen); - - res.add(msg.opaque(), 0, msg.opaque().length); - - // CAS, unused. - res.add(0L); - - assert res.size() == HDR_LEN - 1; - - if (flagsLen > 0) { - res.add((short) keyFlags); - res.add((short) valFlags); - } - - assert msg.key() == null || msg.key() instanceof byte[]; - assert msg.value() == null || msg.value() instanceof byte[]; - - if (keyLen > 0) - res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length); - - if (dataLen > 0) - res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length); - - return res.entireArray(); - } - - /** - * Encodes given object to a byte array and returns flags that describe the type of serialized object. - * - * @param obj Object to serialize. - * @param out Output stream to which object should be written. - * @param jdkMarshaller JDK marshaller. - * @return Serialization flags. - * @throws IgniteCheckedException If JDK serialization failed. - */ - private int encodeObj(Object obj, ByteArrayOutputStream out, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { - int flags = 0; - - byte[] data = null; - - if (obj instanceof String) - data = ((String)obj).getBytes(UTF_8); - else if (obj instanceof Boolean) { - data = new byte[] {(byte)((Boolean)obj ? '1' : '0')}; - - flags |= BOOLEAN_FLAG; - } - else if (obj instanceof Integer) { - data = U.intToBytes((Integer) obj); - - flags |= INT_FLAG; - } - else if (obj instanceof Long) { - data = U.longToBytes((Long)obj); - - flags |= LONG_FLAG; - } - else if (obj instanceof Date) { - data = U.longToBytes(((Date)obj).getTime()); - - flags |= DATE_FLAG; - } - else if (obj instanceof Byte) { - data = new byte[] {(Byte)obj}; - - flags |= BYTE_FLAG; - } - else if (obj instanceof Float) { - data = U.intToBytes(Float.floatToIntBits((Float)obj)); - - flags |= FLOAT_FLAG; - } - else if (obj instanceof Double) { - data = U.longToBytes(Double.doubleToLongBits((Double)obj)); - - flags |= DOUBLE_FLAG; - } - else if (obj instanceof byte[]) { - data = (byte[])obj; - - flags |= BYTE_ARR_FLAG; - } - else { - jdkMarshaller.marshal(obj, out); - - flags |= SERIALIZED_FLAG; - } - - if (data != null) - out.write(data, 0, data.length); - - return flags; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridMemcachedMessageWrapper.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java deleted file mode 100644 index 2d1474e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.gridgain.grid.kernal.processors.rest.*; -import org.gridgain.grid.kernal.processors.rest.handlers.cache.*; -import org.gridgain.grid.kernal.processors.rest.request.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.gridgain.grid.kernal.processors.rest.GridRestCommand.*; -import static org.gridgain.grid.kernal.processors.rest.protocols.tcp.GridMemcachedMessage.*; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * Handles memcache requests. - */ -public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<GridMemcachedMessage> { - /** Logger */ - private final IgniteLogger log; - - /** Handler. */ - private final GridRestProtocolHandler hnd; - - /** JDK marshaller. */ - private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); - - /** Context. */ - private final GridKernalContext ctx; - - /** - * Creates listener which will convert incoming tcp packets to rest requests and forward them to - * a given rest handler. - * - * @param log Logger to use. - * @param hnd Rest handler. - * @param ctx Context. - */ - public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd, GridKernalContext ctx) { - this.log = log; - this.hnd = hnd; - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public void onConnected(GridNioSession ses) { - // No-op, never called. - assert false; - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - // No-op, never called. - assert false; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional"}) - @Override public void onMessage(final GridNioSession ses, final GridMemcachedMessage req) { - assert req != null; - - final GridTuple3<GridRestCommand, Boolean, Boolean> cmd = command(req.operationCode()); - - if (cmd == null) { - U.warn(log, "Cannot find corresponding REST command for op code (session will be closed) [ses=" + ses + - ", opCode=" + Integer.toHexString(req.operationCode()) + ']'); - - ses.close(); - - return; - } - - assert req.requestFlag() == MEMCACHE_REQ_FLAG; - assert cmd.get2() != null && cmd.get3() != null; - - // Close connection on 'Quit' command. - if (cmd.get1() == QUIT) { - try { - if (cmd.get2()) { - GridMemcachedMessage res = new GridMemcachedMessage(req); - - sendResponse(ses, res).get(); - } - } - // Catch all when quitting. - catch (Exception e) { - U.warn(log, "Failed to send quit response packet (session will be closed anyway) [ses=" + ses + - ", msg=" + e.getMessage() + "]"); - } - finally { - ses.close(); - } - - return; - } - - IgniteFuture<GridRestResponse> lastFut = ses.removeMeta(LAST_FUT.ordinal()); - - if (lastFut != null && lastFut.isDone()) - lastFut = null; - - IgniteFuture<GridRestResponse> f; - - if (lastFut == null) - f = handleRequest0(ses, req, cmd); - else { - f = new GridEmbeddedFuture<>( - lastFut, - new C2<GridRestResponse, Exception, IgniteFuture<GridRestResponse>>() { - @Override public IgniteFuture<GridRestResponse> apply(GridRestResponse res, Exception e) { - return handleRequest0(ses, req, cmd); - } - }, - ctx); - } - - if (f != null) - ses.addMeta(LAST_FUT.ordinal(), f); - } - - /** - * @param ses Session. - * @param req Request. - * @param cmd Command. - * @return Future or {@code null} if processed immediately. - */ - @Nullable private IgniteFuture<GridRestResponse> handleRequest0( - final GridNioSession ses, - final GridMemcachedMessage req, - final GridTuple3<GridRestCommand, Boolean, Boolean> cmd - ) { - if (cmd.get1() == NOOP) { - GridMemcachedMessage res0 = new GridMemcachedMessage(req); - - res0.status(SUCCESS); - - sendResponse(ses, res0); - - return null; - } - - IgniteFuture<GridRestResponse> f = hnd.handleAsync(createRestRequest(req, cmd.get1())); - - f.listenAsync(new CIX1<IgniteFuture<GridRestResponse>>() { - @Override public void applyx(IgniteFuture<GridRestResponse> f) throws IgniteCheckedException { - GridRestResponse restRes = f.get(); - - // Handle 'Stat' command (special case because several packets are included in response). - if (cmd.get1() == CACHE_METRICS) { - assert restRes.getResponse() instanceof GridCacheRestMetrics; - - Map<String, Long> metrics = ((GridCacheRestMetrics)restRes.getResponse()).map(); - - for (Map.Entry<String, Long> e : metrics.entrySet()) { - GridMemcachedMessage res = new GridMemcachedMessage(req); - - res.key(e.getKey()); - - res.value(String.valueOf(e.getValue())); - - sendResponse(ses, res); - } - - sendResponse(ses, new GridMemcachedMessage(req)); - } - else { - GridMemcachedMessage res = new GridMemcachedMessage(req); - - if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS) { - switch (cmd.get1()) { - case CACHE_GET: { - res.status(restRes.getResponse() == null ? KEY_NOT_FOUND : SUCCESS); - - break; - } - - case CACHE_PUT: - case CACHE_ADD: - case CACHE_REMOVE: - case CACHE_REPLACE: - case CACHE_CAS: - case CACHE_APPEND: - case CACHE_PREPEND: { - boolean res0 = restRes.getResponse().equals(Boolean.TRUE); - - res.status(res0 ? SUCCESS : FAILURE); - - break; - } - - default: { - res.status(SUCCESS); - - break; - } - } - } - else - res.status(FAILURE); - - if (cmd.get3()) - res.key(req.key()); - - if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS && res.addData() && - restRes.getResponse() != null) - res.value(restRes.getResponse()); - - sendResponse(ses, res); - } - } - }); - - return f; - } - - /** - * @param ses NIO session. - * @param res Response. - * @return NIO send future. - */ - private GridNioFuture<?> sendResponse(GridNioSession ses, GridMemcachedMessage res) { - try { - GridMemcachedMessageWrapper wrapper = new GridMemcachedMessageWrapper(res, jdkMarshaller); - - return ses.send(wrapper); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal response: " + res, e); - - ses.close(); - - return new GridNioFinishedFuture<>(e); - } - } - - /** - * Creates REST request from the protocol request. - * - * @param req Request. - * @param cmd Command. - * @return REST request. - */ - @SuppressWarnings("unchecked") - private GridRestCacheRequest createRestRequest(GridMemcachedMessage req, GridRestCommand cmd) { - assert req != null; - - GridRestCacheRequest restReq = new GridRestCacheRequest(); - - restReq.command(cmd); - restReq.clientId(req.clientId()); - restReq.ttl(req.expiration()); - restReq.delta(req.delta()); - restReq.initial(req.initial()); - restReq.cacheName(req.cacheName()); - restReq.key(req.key()); - - if (cmd == CACHE_REMOVE_ALL) { - Object[] keys = (Object[]) req.value(); - - if (keys != null) { - Map<Object, Object> map = new HashMap<>(); - - for (Object key : keys) { - map.put(key, null); - } - - restReq.values(map); - } - } - else { - if (req.value() != null) - restReq.value(req.value()); - } - - return restReq; - } - - /** - * Gets command and command attributes from operation code. - * - * @param opCode Operation code. - * @return Command. - */ - @Nullable private GridTuple3<GridRestCommand, Boolean, Boolean> command(int opCode) { - GridRestCommand cmd; - boolean quiet = false; - boolean retKey = false; - - switch (opCode) { - case 0x00: - cmd = CACHE_GET; - - break; - case 0x01: - cmd = CACHE_PUT; - - break; - case 0x02: - cmd = CACHE_ADD; - - break; - case 0x03: - cmd = CACHE_REPLACE; - - break; - case 0x04: - cmd = CACHE_REMOVE; - - break; - case 0x05: - cmd = CACHE_INCREMENT; - - break; - case 0x06: - cmd = CACHE_DECREMENT; - - break; - case 0x07: - cmd = QUIT; - - break; - case 0x08: - cmd = CACHE_REMOVE_ALL; - - break; - case 0x09: - cmd = CACHE_GET; - - break; - case 0x0A: - cmd = NOOP; - - break; - case 0x0B: - cmd = VERSION; - - break; - case 0x0C: - cmd = CACHE_GET; - retKey = true; - - break; - case 0x0D: - cmd = CACHE_GET; - retKey = true; - - break; - case 0x0E: - cmd = CACHE_APPEND; - - break; - case 0x0F: - cmd = CACHE_PREPEND; - - break; - case 0x10: - cmd = CACHE_METRICS; - - break; - case 0x11: - cmd = CACHE_PUT; - quiet = true; - - break; - case 0x12: - cmd = CACHE_ADD; - quiet = true; - - break; - case 0x13: - cmd = CACHE_REPLACE; - quiet = true; - - break; - case 0x14: - cmd = CACHE_REMOVE; - quiet = true; - - break; - case 0x15: - cmd = CACHE_INCREMENT; - quiet = true; - - break; - case 0x16: - cmd = CACHE_DECREMENT; - quiet = true; - - break; - case 0x17: - cmd = QUIT; - quiet = true; - - break; - case 0x18: - cmd = CACHE_REMOVE_ALL; - quiet = true; - - break; - case 0x19: - cmd = CACHE_APPEND; - quiet = true; - - break; - case 0x1A: - cmd = CACHE_PREPEND; - quiet = true; - - break; - default: - return null; - } - - return new GridTuple3<>(cmd, quiet, retKey); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java deleted file mode 100644 index b3fd931..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.client.marshaller.*; -import org.gridgain.grid.kernal.processors.rest.client.message.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.nio.charset.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.rest.protocols.tcp.GridMemcachedMessage.*; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * - */ -public class GridTcpRestDirectParser implements GridNioParser { - /** UTF-8 charset. */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** Message metadata key. */ - private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** Protocol handler. */ - private final GridTcpRestProtocol proto; - - /** Message reader. */ - private final GridNioMessageReader msgReader; - - /** - * @param proto Protocol handler. - * @param msgReader Message reader. - */ - public GridTcpRestDirectParser(GridTcpRestProtocol proto, GridNioMessageReader msgReader) { - this.proto = proto; - this.msgReader = msgReader; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - ParserState state = ses.removeMeta(PARSER_STATE.ordinal()); - - if (state != null) { - assert state.packetType() == GridClientPacketType.MEMCACHE; - - Object memcacheMsg = parseMemcachePacket(ses, buf, state); - - if (memcacheMsg == null) - ses.addMeta(PARSER_STATE.ordinal(), state); - - return memcacheMsg; - } - - GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY); - - if (msg == null && buf.hasRemaining()) { - byte type = buf.get(buf.position()); - - if (type == GridClientMessageWrapper.REQ_HEADER) { - buf.get(); - - msg = new GridClientMessageWrapper(); - } - else if (type == GridClientHandshakeRequestWrapper.HANDSHAKE_HEADER) { - buf.get(); - - msg = new GridClientHandshakeRequestWrapper(); - } - else if (type == MEMCACHE_REQ_FLAG) { - state = new ParserState(); - - state.packet(new GridMemcachedMessage()); - state.packetType(GridClientPacketType.MEMCACHE); - - Object memcacheMsg = parseMemcachePacket(ses, buf, state); - - if (memcacheMsg == null) - ses.addMeta(PARSER_STATE.ordinal(), state); - - return memcacheMsg; - } - else - throw new IOException("Invalid message type: " + type); - } - - boolean finished = false; - - if (buf.hasRemaining()) - finished = msgReader.read(null, msg, buf); - - if (finished) { - if (msg instanceof GridClientMessageWrapper) { - GridClientMessageWrapper clientMsg = (GridClientMessageWrapper)msg; - - if (clientMsg.messageSize() == 0) - return GridClientPingPacket.PING_MESSAGE; - - GridClientMarshaller marsh = proto.marshaller(ses); - - GridClientMessage ret = marsh.unmarshal(clientMsg.messageArray()); - - ret.requestId(clientMsg.requestId()); - ret.clientId(clientMsg.clientId()); - ret.destinationId(clientMsg.destinationId()); - - return ret; - } - else { - assert msg instanceof GridClientHandshakeRequestWrapper; - - GridClientHandshakeRequestWrapper req = (GridClientHandshakeRequestWrapper)msg; - - GridClientHandshakeRequest ret = new GridClientHandshakeRequest(); - - ret.putBytes(req.bytes(), 0, 4); - - return ret; - } - } - else { - ses.addMeta(MSG_META_KEY, msg); - - return null; - } - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - // No encoding needed for direct messages. - throw new UnsupportedEncodingException(); - } - - /** - * Parses memcache protocol message. - * - * @param ses Session. - * @param buf Buffer containing not parsed bytes. - * @param state Current parser state. - * @return Parsed packet.s - * @throws IOException If packet cannot be parsed. - * @throws IgniteCheckedException If deserialization error occurred. - */ - @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state) - throws IOException, IgniteCheckedException { - assert state.packetType() == GridClientPacketType.MEMCACHE; - assert state.packet() != null; - assert state.packet() instanceof GridMemcachedMessage; - - GridMemcachedMessage req = (GridMemcachedMessage)state.packet(); - ByteArrayOutputStream tmp = state.buffer(); - int i = state.index(); - - while (buf.remaining() > 0) { - byte b = buf.get(); - - if (i == 0) - req.requestFlag(b); - else if (i == 1) - req.operationCode(b); - else if (i == 2 || i == 3) { - tmp.write(b); - - if (i == 3) { - req.keyLength(U.bytesToShort(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i == 4) - req.extrasLength(b); - else if (i >= 8 && i <= 11) { - tmp.write(b); - - if (i == 11) { - req.totalLength(U.bytesToInt(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i >= 12 && i <= 15) { - tmp.write(b); - - if (i == 15) { - req.opaque(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() - 1) { - req.extras(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() && - i < HDR_LEN + req.extrasLength() + req.keyLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) { - req.key(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() && - i < HDR_LEN + req.totalLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.totalLength() - 1) { - req.value(tmp.toByteArray()); - - tmp.reset(); - } - } - - if (i == HDR_LEN + req.totalLength() - 1) - // Assembled the packet. - return assemble(ses, req); - - i++; - } - - state.index(i); - - return null; - } - - /** - * Validates incoming packet and deserializes all fields that need to be deserialized. - * - * @param ses Session on which packet is being parsed. - * @param req Raw packet. - * @return Same packet with fields deserialized. - * @throws IOException If parsing failed. - * @throws IgniteCheckedException If deserialization failed. - */ - private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException { - byte[] extras = req.extras(); - - // First, decode key and value, if any - if (req.key() != null || req.value() != null) { - short keyFlags = 0; - short valFlags = 0; - - if (req.hasFlags()) { - if (extras == null || extras.length < FLAGS_LENGTH) - throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - keyFlags = U.bytesToShort(extras, 0); - valFlags = U.bytesToShort(extras, 2); - } - - if (req.key() != null) { - assert req.key() instanceof byte[]; - - byte[] rawKey = (byte[])req.key(); - - // Only values can be hessian-encoded. - req.key(decodeObj(keyFlags, rawKey)); - } - - if (req.value() != null) { - assert req.value() instanceof byte[]; - - byte[] rawVal = (byte[])req.value(); - - req.value(decodeObj(valFlags, rawVal)); - } - } - - if (req.hasExpiration()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL); - } - - if (req.hasInitial()) { - if (extras == null || extras.length < 16) - throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.initial(U.bytesToLong(extras, 8)); - } - - if (req.hasDelta()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.delta(U.bytesToLong(extras, 0)); - } - - if (extras != null) { - // Clients that include cache name must always include flags. - int len = 4; - - if (req.hasExpiration()) - len += 4; - - if (req.hasDelta()) - len += 8; - - if (req.hasInitial()) - len += 8; - - if (extras.length - len > 0) { - byte[] cacheName = new byte[extras.length - len]; - - U.arrayCopy(extras, len, cacheName, 0, extras.length - len); - - req.cacheName(new String(cacheName, UTF_8)); - } - } - - return req; - } - - /** - * Decodes value from a given byte array to the object according to the flags given. - * - * @param flags Flags. - * @param bytes Byte array to decode. - * @return Decoded value. - * @throws IgniteCheckedException If deserialization failed. - */ - private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException { - assert bytes != null; - - if ((flags & SERIALIZED_FLAG) != 0) - return proto.jdkMarshaller().unmarshal(bytes, null); - - int masked = flags & 0xff00; - - switch (masked) { - case BOOLEAN_FLAG: - return bytes[0] == '1'; - case INT_FLAG: - return U.bytesToInt(bytes, 0); - case LONG_FLAG: - return U.bytesToLong(bytes, 0); - case DATE_FLAG: - return new Date(U.bytesToLong(bytes, 0)); - case BYTE_FLAG: - return bytes[0]; - case FLOAT_FLAG: - return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); - case DOUBLE_FLAG: - return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); - case BYTE_ARR_FLAG: - return bytes; - default: - return new String(bytes, UTF_8); - } - } - - /** - * Holder for parser state and temporary buffer. - */ - protected static class ParserState { - /** Parser index. */ - private int idx; - - /** Temporary data buffer. */ - private ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - /** Packet being assembled. */ - private GridClientMessage packet; - - /** Packet type. */ - private GridClientPacketType packetType; - - /** Header data. */ - private HeaderData hdr; - - /** - * @return Stored parser index. - */ - public int index() { - return idx; - } - - /** - * @param idx Index to store. - */ - public void index(int idx) { - this.idx = idx; - } - - /** - * @return Temporary data buffer. - */ - public ByteArrayOutputStream buffer() { - return buf; - } - - /** - * @return Pending packet. - */ - @Nullable public GridClientMessage packet() { - return packet; - } - - /** - * @param packet Pending packet. - */ - public void packet(GridClientMessage packet) { - assert this.packet == null; - - this.packet = packet; - } - - /** - * @return Pending packet type. - */ - public GridClientPacketType packetType() { - return packetType; - } - - /** - * @param packetType Pending packet type. - */ - public void packetType(GridClientPacketType packetType) { - this.packetType = packetType; - } - - /** - * @return Header. - */ - public HeaderData header() { - return hdr; - } - - /** - * @param hdr Header. - */ - public void header(HeaderData hdr) { - this.hdr = hdr; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ParserState.class, this); - } - } - - /** - * Header. - */ - protected static class HeaderData { - /** Request Id. */ - private final long reqId; - - /** Request Id. */ - private final UUID clientId; - - /** Request Id. */ - private final UUID destId; - - /** - * @param reqId Request Id. - * @param clientId Client Id. - * @param destId Destination Id. - */ - private HeaderData(long reqId, UUID clientId, UUID destId) { - this.reqId = reqId; - this.clientId = clientId; - this.destId = destId; - } - - /** - * @return Request Id. - */ - public long reqId() { - return reqId; - } - - /** - * @return Client Id. - */ - public UUID clientId() { - return clientId; - } - - /** - * @return Destination Id. - */ - public UUID destinationId() { - return destId; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestNioListener.java deleted file mode 100644 index b655630..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestNioListener.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.client.marshaller.*; -import org.gridgain.grid.kernal.processors.rest.*; -import org.gridgain.grid.kernal.processors.rest.client.message.*; -import org.gridgain.grid.kernal.processors.rest.handlers.cache.*; -import org.gridgain.grid.kernal.processors.rest.request.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.gridgain.grid.kernal.processors.rest.GridRestCommand.*; -import static org.gridgain.grid.kernal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; -import static org.gridgain.grid.kernal.processors.rest.client.message.GridClientHandshakeResponse.*; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * Listener for nio server that handles incoming tcp rest packets. - */ -public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridClientMessage> { - /** Mapping of {@code GridCacheOperation} to {@code GridRestCommand}. */ - private static final Map<GridClientCacheRequest.GridCacheOperation, GridRestCommand> cacheCmdMap = - new EnumMap<>(GridClientCacheRequest.GridCacheOperation.class); - - /** Supported protocol versions. */ - private static final Collection<Short> SUPP_VERS = new HashSet<>(); - - /** - * Fills {@code cacheCmdMap}. - */ - static { - cacheCmdMap.put(PUT, CACHE_PUT); - cacheCmdMap.put(PUT_ALL, CACHE_PUT_ALL); - cacheCmdMap.put(GET, CACHE_GET); - cacheCmdMap.put(GET_ALL, CACHE_GET_ALL); - cacheCmdMap.put(RMV, CACHE_REMOVE); - cacheCmdMap.put(RMV_ALL, CACHE_REMOVE_ALL); - cacheCmdMap.put(REPLACE, CACHE_REPLACE); - cacheCmdMap.put(CAS, CACHE_CAS); - cacheCmdMap.put(METRICS, CACHE_METRICS); - cacheCmdMap.put(APPEND, CACHE_APPEND); - cacheCmdMap.put(PREPEND, CACHE_PREPEND); - - SUPP_VERS.add((short)1); - } - - /** */ - private final CountDownLatch marshMapLatch = new CountDownLatch(1); - - /** Marshallers map. */ - private Map<Byte, GridClientMarshaller> marshMap; - - /** Logger. */ - private IgniteLogger log; - - /** Protocol. */ - private GridTcpRestProtocol proto; - - /** Protocol handler. */ - private GridRestProtocolHandler hnd; - - /** Handler for all memcache requests */ - private GridTcpMemcachedNioListener memcachedLsnr; - - /** - * Creates listener which will convert incoming tcp packets to rest requests and forward them to - * a given rest handler. - * - * @param log Logger to use. - * @param proto Protocol. - * @param hnd Rest handler. - * @param ctx Context. - */ - public GridTcpRestNioListener(IgniteLogger log, GridTcpRestProtocol proto, GridRestProtocolHandler hnd, - GridKernalContext ctx) { - memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd, ctx); - - this.log = log; - this.proto = proto; - this.hnd = hnd; - } - - /** - * @param marshMap Marshallers. - */ - void marshallers(Map<Byte, GridClientMarshaller> marshMap) { - assert marshMap != null; - - this.marshMap = marshMap; - - marshMapLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void onConnected(GridNioSession ses) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - if (e != null) { - if (e instanceof RuntimeException) - U.error(log, "Failed to process request from remote client: " + ses, e); - else - U.warn(log, "Closed client session due to exception [ses=" + ses + ", msg=" + e.getMessage() + ']'); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void onMessage(final GridNioSession ses, final GridClientMessage msg) { - if (msg instanceof GridMemcachedMessage) - memcachedLsnr.onMessage(ses, (GridMemcachedMessage)msg); - else { - if (msg == GridClientPingPacket.PING_MESSAGE) - ses.send(new GridClientPingPacketWrapper()); - else if (msg instanceof GridClientHandshakeRequest) { - GridClientHandshakeRequest hs = (GridClientHandshakeRequest)msg; - - short ver = hs.version(); - - if (!SUPP_VERS.contains(ver)) { - U.error(log, "Client protocol version is not supported [ses=" + ses + - ", ver=" + ver + - ", supported=" + SUPP_VERS + ']'); - - ses.close(); - } - else { - byte marshId = hs.marshallerId(); - - if (marshMapLatch.getCount() > 0) - U.awaitQuiet(marshMapLatch); - - GridClientMarshaller marsh = marshMap.get(marshId); - - if (marsh == null) { - U.error(log, "Client marshaller ID is invalid. Note that .NET and C++ clients " + - "are supported only in enterprise edition [ses=" + ses + ", marshId=" + marshId + ']'); - - ses.close(); - } - else { - ses.addMeta(MARSHALLER.ordinal(), marsh); - - ses.send(new GridClientHandshakeResponseWrapper(CODE_OK)); - } - } - } - else { - final GridRestRequest req = createRestRequest(ses, msg); - - if (req != null) - hnd.handleAsync(req).listenAsync(new CI1<IgniteFuture<GridRestResponse>>() { - @Override public void apply(IgniteFuture<GridRestResponse> fut) { - GridClientResponse res = new GridClientResponse(); - - res.requestId(msg.requestId()); - res.clientId(msg.clientId()); - - try { - GridRestResponse restRes = fut.get(); - - res.sessionToken(restRes.sessionTokenBytes()); - res.successStatus(restRes.getSuccessStatus()); - res.errorMessage(restRes.getError()); - - Object o = restRes.getResponse(); - - // In case of metrics a little adjustment is needed. - if (o instanceof GridCacheRestMetrics) - o = ((GridCacheRestMetrics)o).map(); - - res.result(o); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process client request: " + msg, e); - - res.successStatus(GridClientResponse.STATUS_FAILED); - res.errorMessage("Failed to process client request: " + e.getMessage()); - } - - GridClientMessageWrapper wrapper = new GridClientMessageWrapper(); - - wrapper.requestId(msg.requestId()); - wrapper.clientId(msg.clientId()); - - try { - ByteBuffer bytes = proto.marshaller(ses).marshal(res, 0); - - wrapper.message(bytes); - - wrapper.messageSize(bytes.remaining() + 40); - } - catch (IOException e) { - U.error(log, "Failed to marshal response: " + res, e); - - ses.close(); - - return; - } - - ses.send(wrapper); - } - }); - else - U.error(log, "Failed to process client request (unknown packet type) [ses=" + ses + - ", msg=" + msg + ']'); - } - } - } - - /** - * Creates a REST request object from client TCP binary packet. - * - * @param ses NIO session. - * @param msg Request message. - * @return REST request object. - */ - @Nullable private GridRestRequest createRestRequest(GridNioSession ses, GridClientMessage msg) { - GridRestRequest restReq = null; - - if (msg instanceof GridClientAuthenticationRequest) { - GridClientAuthenticationRequest req = (GridClientAuthenticationRequest)msg; - - restReq = new GridRestTaskRequest(); - - restReq.command(NOOP); - - restReq.credentials(req.credentials()); - } - else if (msg instanceof GridClientCacheRequest) { - GridClientCacheRequest req = (GridClientCacheRequest)msg; - - GridRestCacheRequest restCacheReq = new GridRestCacheRequest(); - - restCacheReq.cacheName(req.cacheName()); - restCacheReq.cacheFlags(req.cacheFlagsOn()); - - restCacheReq.key(req.key()); - restCacheReq.value(req.value()); - restCacheReq.value2(req.value2()); - restCacheReq.portableMode(proto.portableMode(ses)); - - Map vals = req.values(); - if (vals != null) - restCacheReq.values(new HashMap<Object, Object>(vals)); - - restCacheReq.command(cacheCmdMap.get(req.operation())); - - restReq = restCacheReq; - } - else if (msg instanceof GridClientCacheQueryRequest) { - GridClientCacheQueryRequest req = (GridClientCacheQueryRequest) msg; - - restReq = new GridRestCacheQueryRequest(req); - - switch (req.operation()) { - case EXECUTE: - restReq.command(CACHE_QUERY_EXECUTE); - - break; - - case FETCH: - restReq.command(CACHE_QUERY_FETCH); - break; - - case REBUILD_INDEXES: - restReq.command(CACHE_QUERY_REBUILD_INDEXES); - - break; - - default: - throw new IllegalArgumentException("Unknown query operation: " + req.operation()); - } - } - else if (msg instanceof GridClientTaskRequest) { - GridClientTaskRequest req = (GridClientTaskRequest) msg; - - GridRestTaskRequest restTaskReq = new GridRestTaskRequest(); - - restTaskReq.command(EXE); - - restTaskReq.taskName(req.taskName()); - restTaskReq.params(Arrays.asList(req.argument())); - restTaskReq.keepPortables(req.keepPortables()); - restTaskReq.portableMode(proto.portableMode(ses)); - - restReq = restTaskReq; - } - else if (msg instanceof GridClientGetMetaDataRequest) { - GridClientGetMetaDataRequest req = (GridClientGetMetaDataRequest)msg; - - restReq = new GridRestPortableGetMetaDataRequest(req); - - restReq.command(GET_PORTABLE_METADATA); - } - else if (msg instanceof GridClientPutMetaDataRequest) { - GridClientPutMetaDataRequest req = (GridClientPutMetaDataRequest)msg; - - restReq = new GridRestPortablePutMetaDataRequest(req); - - restReq.command(PUT_PORTABLE_METADATA); - } - else if (msg instanceof GridClientTopologyRequest) { - GridClientTopologyRequest req = (GridClientTopologyRequest) msg; - - GridRestTopologyRequest restTopReq = new GridRestTopologyRequest(); - - restTopReq.includeMetrics(req.includeMetrics()); - restTopReq.includeAttributes(req.includeAttributes()); - - if (req.nodeId() != null) { - restTopReq.command(NODE); - - restTopReq.nodeId(req.nodeId()); - } - else if (req.nodeIp() != null) { - restTopReq.command(NODE); - - restTopReq.nodeIp(req.nodeIp()); - } - else - restTopReq.command(TOPOLOGY); - - restReq = restTopReq; - } - else if (msg instanceof GridClientLogRequest) { - GridClientLogRequest req = (GridClientLogRequest) msg; - - GridRestLogRequest restLogReq = new GridRestLogRequest(); - - restLogReq.command(LOG); - - restLogReq.path(req.path()); - restLogReq.from(req.from()); - restLogReq.to(req.to()); - - restReq = restLogReq; - } - - if (restReq != null) { - restReq.destinationId(msg.destinationId()); - restReq.clientId(msg.clientId()); - restReq.sessionToken(msg.sessionToken()); - restReq.address(ses.remoteAddress()); - } - - return restReq; - } - - /** - * Closes the session by timeout (i.e. inactivity within the configured period of time). - * - * @param ses Session, that was inactive. - */ - @Override public void onSessionIdleTimeout(GridNioSession ses) { - ses.close(); - } -}
