http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java new file mode 100644 index 0000000..7e8722b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java @@ -0,0 +1,878 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.client.marshaller.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.nio.charset.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * Parser for extended memcache protocol. Handles parsing and encoding activity. + */ +public class GridTcpRestParser implements GridNioParser { + /** UTF-8 charset. */ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** JDK marshaller. */ + private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); + + /** {@inheritDoc} */ + @Nullable @Override public GridClientMessage decode(GridNioSession ses, ByteBuffer buf) throws IOException, + IgniteCheckedException { + ParserState state = ses.removeMeta(PARSER_STATE.ordinal()); + + if (state == null) + state = new ParserState(); + + GridClientPacketType type = state.packetType(); + + if (type == null) { + byte hdr = buf.get(buf.position()); + + switch (hdr) { + case MEMCACHE_REQ_FLAG: + state.packet(new GridMemcachedMessage()); + state.packetType(GridClientPacketType.MEMCACHE); + + break; + + case GRIDGAIN_REQ_FLAG: + // Skip header. + buf.get(); + + state.packetType(GridClientPacketType.GRIDGAIN); + + break; + + case GRIDGAIN_HANDSHAKE_FLAG: + // Skip header. + buf.get(); + + state.packetType(GridClientPacketType.GRIDGAIN_HANDSHAKE); + + break; + + default: + throw new IOException("Failed to parse incoming packet (invalid packet start) [ses=" + ses + + ", b=" + Integer.toHexString(hdr & 0xFF) + ']'); + } + } + + GridClientMessage res = null; + + switch (state.packetType()) { + case MEMCACHE: + res = parseMemcachePacket(ses, buf, state); + + break; + + case GRIDGAIN_HANDSHAKE: + res = parseHandshake(buf, state); + + break; + + case GRIDGAIN: + res = parseCustomPacket(ses, buf, state); + + break; + } + + if (res == null) + // Packet was not fully parsed yet. + ses.addMeta(PARSER_STATE.ordinal(), state); + + return res; + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg0) throws IOException, IgniteCheckedException { + assert msg0 != null; + + GridClientMessage msg = (GridClientMessage)msg0; + + if (msg instanceof GridMemcachedMessage) + return encodeMemcache((GridMemcachedMessage)msg); + else if (msg == GridClientPingPacket.PING_MESSAGE) + return ByteBuffer.wrap(GridClientPingPacket.PING_PACKET); + else if (msg instanceof GridClientHandshakeResponse) + return ByteBuffer.wrap(new byte[] { + ((GridClientHandshakeResponse)msg).resultCode() + }); + else { + GridClientMarshaller marsh = marshaller(ses); + + ByteBuffer res = marsh.marshal(msg, 45); + + ByteBuffer slice = res.slice(); + + slice.put(GRIDGAIN_REQ_FLAG); + slice.putInt(res.remaining() - 5); + slice.putLong(msg.requestId()); + slice.put(U.uuidToBytes(msg.clientId())); + slice.put(U.uuidToBytes(msg.destinationId())); + + return res; + } + } + + /** + * Parses memcache protocol message. + * + * @param ses Session. + * @param buf Buffer containing not parsed bytes. + * @param state Current parser state. + * @return Parsed packet.s + * @throws IOException If packet cannot be parsed. + * @throws IgniteCheckedException If deserialization error occurred. + */ + @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state) + throws IOException, IgniteCheckedException { + assert state.packetType() == GridClientPacketType.MEMCACHE; + assert state.packet() != null; + assert state.packet() instanceof GridMemcachedMessage; + + GridMemcachedMessage req = (GridMemcachedMessage)state.packet(); + ByteArrayOutputStream tmp = state.buffer(); + int i = state.index(); + + while (buf.remaining() > 0) { + byte b = buf.get(); + + if (i == 0) + req.requestFlag(b); + else if (i == 1) + req.operationCode(b); + else if (i == 2 || i == 3) { + tmp.write(b); + + if (i == 3) { + req.keyLength(U.bytesToShort(tmp.toByteArray(), 0)); + + tmp.reset(); + } + } + else if (i == 4) + req.extrasLength(b); + else if (i >= 8 && i <= 11) { + tmp.write(b); + + if (i == 11) { + req.totalLength(U.bytesToInt(tmp.toByteArray(), 0)); + + tmp.reset(); + } + } + else if (i >= 12 && i <= 15) { + tmp.write(b); + + if (i == 15) { + req.opaque(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.extrasLength() - 1) { + req.extras(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN + req.extrasLength() && + i < HDR_LEN + req.extrasLength() + req.keyLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) { + req.key(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() && + i < HDR_LEN + req.totalLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.totalLength() - 1) { + req.value(tmp.toByteArray()); + + tmp.reset(); + } + } + + if (i == HDR_LEN + req.totalLength() - 1) + // Assembled the packet. + return assemble(ses, req); + + i++; + } + + state.index(i); + + return null; + } + + /** + * Parses a client handshake, checking a client version and + * reading the marshaller protocol ID. + * + * @param buf Message bytes. + * @param state Parser state. + * @return True if a hint was parsed, false if still need more bytes to parse. + */ + @Nullable private GridClientMessage parseHandshake(ByteBuffer buf, ParserState state) { + assert state.packetType() == GridClientPacketType.GRIDGAIN_HANDSHAKE; + + int idx = state.index(); + + GridClientHandshakeRequest packet = (GridClientHandshakeRequest)state.packet(); + + if (packet == null) { + packet = new GridClientHandshakeRequest(); + + state.packet(packet); + } + + int rem = buf.remaining(); + + if (rem > 0) { + byte[] bbuf = new byte[5]; // Buffer to read data to. + + int nRead = Math.min(rem, bbuf.length); // Number of bytes to read. + + buf.get(bbuf, 0, nRead); // Batch read from buffer. + + int nAvailable = nRead; // Number of available bytes. + + if (idx < 4) { // Need to read version bytes. + int len = Math.min(nRead, 4 - idx); // Number of version bytes available in buffer. + + packet.putBytes(bbuf, idx, len); + + idx += len; + state.index(idx); + nAvailable -= len; + } + + assert idx <= 4 : "Wrong idx: " + idx; + assert nAvailable == 0 || nAvailable == 1 : "Wrong nav: " + nAvailable; + + if (idx == 4 && nAvailable > 0) + return packet; + } + + return null; // Wait for more data. + } + + /** + * Parses custom packet serialized by GridGain marshaller. + * + * @param ses Session. + * @param buf Buffer containing not parsed bytes. + * @param state Parser state. + * @return Parsed message. + * @throws IOException If packet parsing or deserialization failed. + * @throws IgniteCheckedException If failed. + */ + @Nullable private GridClientMessage parseCustomPacket(GridNioSession ses, ByteBuffer buf, ParserState state) + throws IOException, IgniteCheckedException { + assert state.packetType() == GridClientPacketType.GRIDGAIN; + assert state.packet() == null; + + ByteArrayOutputStream tmp = state.buffer(); + + int len = state.index(); + + if (buf.remaining() > 0) { + if (len == 0) { // Don't know the size yet. + byte[] lenBytes = statefulRead(buf, tmp, 4); + + if (lenBytes != null) { + len = U.bytesToInt(lenBytes, 0); + + if (len == 0) + return GridClientPingPacket.PING_MESSAGE; + else if (len < 0) + throw new IOException("Failed to parse incoming packet (invalid packet length) [ses=" + ses + + ", len=" + len + ']'); + + state.index(len); + } + } + + if (len > 0 && state.header() == null) { + byte[] hdrBytes = statefulRead(buf, tmp, 40); + + if (hdrBytes != null) { + long reqId = GridClientByteUtils.bytesToLong(hdrBytes, 0); + UUID clientId = GridClientByteUtils.bytesToUuid(hdrBytes, 8); + UUID destId = GridClientByteUtils.bytesToUuid(hdrBytes, 24); + + state.header(new HeaderData(reqId, clientId, destId)); + } + } + + if (len > 0 && state.header() != null) { + final int packetSize = len - 40; + + if (tmp.size() + buf.remaining() >= packetSize) { + if (buf.remaining() > 0) { + byte[] bodyBytes = new byte[packetSize - tmp.size()]; + + buf.get(bodyBytes); + + tmp.write(bodyBytes); + } + + return parseClientMessage(ses, state); + } + else + copyRemaining(buf, tmp); + } + } + + return null; + } + + /** + * Tries to read the specified amount of bytes using intermediate buffer. Stores + * the bytes to intermediate buffer, if size requirement is not met. + * + * @param buf Byte buffer to read from. + * @param intBuf Intermediate buffer to read bytes from and to save remaining bytes to. + * @param size Number of bytes to read. + * @return Resulting byte array or {@code null}, if both buffers contain less bytes + * than required. In case of non-null result, the intermediate buffer is empty. + * In case of {@code null} result, the input buffer is empty (read fully). + * @throws IOException If IO error occurs. + */ + @Nullable private byte[] statefulRead(ByteBuffer buf, ByteArrayOutputStream intBuf, int size) throws IOException { + if (intBuf.size() + buf.remaining() >= size) { + int off = 0; + byte[] bytes = new byte[size]; + + if (intBuf.size() > 0) { + assert intBuf.size() < size; + + byte[] tmpBytes = intBuf.toByteArray(); + + System.arraycopy(tmpBytes, 0, bytes, 0, tmpBytes.length); + + off = intBuf.size(); + + intBuf.reset(); + } + + buf.get(bytes, off, size - off); + + return bytes; + } + else { + copyRemaining(buf, intBuf); + + return null; + } + } + + /** + * Copies remaining bytes from byte buffer to output stream. + * + * @param src Source buffer. + * @param dest Destination stream. + * @throws IOException If IO error occurs. + */ + private void copyRemaining(ByteBuffer src, OutputStream dest) throws IOException { + byte[] b = new byte[src.remaining()]; + + src.get(b); + + dest.write(b); + } + + /** + * Parses {@link GridClientMessage} from raw bytes. + * + * @param ses Session. + * @param state Parser state. + * @return A parsed client message. + * @throws IOException On marshaller error. + * @throws IgniteCheckedException If no marshaller was defined for the session. + */ + protected GridClientMessage parseClientMessage(GridNioSession ses, ParserState state) throws IOException, IgniteCheckedException { + GridClientMarshaller marsh = marshaller(ses); + + GridClientMessage msg = marsh.unmarshal(state.buffer().toByteArray()); + + msg.requestId(state.header().reqId()); + msg.clientId(state.header().clientId()); + msg.destinationId(state.header().destinationId()); + + return msg; + } + + /** + * Encodes memcache message to a raw byte array. + * + * @param msg Message being serialized. + * @return Serialized message. + * @throws IgniteCheckedException If serialization failed. + */ + private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException { + GridByteArrayList res = new GridByteArrayList(HDR_LEN); + + int keyLen = 0; + + int keyFlags = 0; + + if (msg.key() != null) { + ByteArrayOutputStream rawKey = new ByteArrayOutputStream(); + + keyFlags = encodeObj(msg.key(), rawKey); + + msg.key(rawKey.toByteArray()); + + keyLen = rawKey.size(); + } + + int dataLen = 0; + + int valFlags = 0; + + if (msg.value() != null) { + ByteArrayOutputStream rawVal = new ByteArrayOutputStream(); + + valFlags = encodeObj(msg.value(), rawVal); + + msg.value(rawVal.toByteArray()); + + dataLen = rawVal.size(); + } + + int flagsLen = 0; + + if (msg.addFlags())// || keyFlags > 0 || valFlags > 0) + flagsLen = FLAGS_LENGTH; + + res.add(MEMCACHE_RES_FLAG); + + res.add(msg.operationCode()); + + // Cast is required due to packet layout. + res.add((short)keyLen); + + // Cast is required due to packet layout. + res.add((byte)flagsLen); + + // Data type is always 0x00. + res.add((byte)0x00); + + res.add((short)msg.status()); + + res.add(keyLen + flagsLen + dataLen); + + res.add(msg.opaque(), 0, msg.opaque().length); + + // CAS, unused. + res.add(0L); + + assert res.size() == HDR_LEN; + + if (flagsLen > 0) { + res.add((short) keyFlags); + res.add((short) valFlags); + } + + assert msg.key() == null || msg.key() instanceof byte[]; + assert msg.value() == null || msg.value() instanceof byte[]; + + if (keyLen > 0) + res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length); + + if (dataLen > 0) + res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length); + + return ByteBuffer.wrap(res.entireArray()); + } + + /** + * Validates incoming packet and deserializes all fields that need to be deserialized. + * + * @param ses Session on which packet is being parsed. + * @param req Raw packet. + * @return Same packet with fields deserialized. + * @throws IOException If parsing failed. + * @throws IgniteCheckedException If deserialization failed. + */ + private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException { + byte[] extras = req.extras(); + + // First, decode key and value, if any + if (req.key() != null || req.value() != null) { + short keyFlags = 0; + short valFlags = 0; + + if (req.hasFlags()) { + if (extras == null || extras.length < FLAGS_LENGTH) + throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + keyFlags = U.bytesToShort(extras, 0); + valFlags = U.bytesToShort(extras, 2); + } + + if (req.key() != null) { + assert req.key() instanceof byte[]; + + byte[] rawKey = (byte[])req.key(); + + // Only values can be hessian-encoded. + req.key(decodeObj(keyFlags, rawKey)); + } + + if (req.value() != null) { + assert req.value() instanceof byte[]; + + byte[] rawVal = (byte[])req.value(); + + req.value(decodeObj(valFlags, rawVal)); + } + } + + if (req.hasExpiration()) { + if (extras == null || extras.length < 8) + throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL); + } + + if (req.hasInitial()) { + if (extras == null || extras.length < 16) + throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.initial(U.bytesToLong(extras, 8)); + } + + if (req.hasDelta()) { + if (extras == null || extras.length < 8) + throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.delta(U.bytesToLong(extras, 0)); + } + + if (extras != null) { + // Clients that include cache name must always include flags. + int len = 4; + + if (req.hasExpiration()) + len += 4; + + if (req.hasDelta()) + len += 8; + + if (req.hasInitial()) + len += 8; + + if (extras.length - len > 0) { + byte[] cacheName = new byte[extras.length - len]; + + U.arrayCopy(extras, len, cacheName, 0, extras.length - len); + + req.cacheName(new String(cacheName, UTF_8)); + } + } + + return req; + } + + /** + * Decodes value from a given byte array to the object according to the flags given. + * + * @param flags Flags. + * @param bytes Byte array to decode. + * @return Decoded value. + * @throws IgniteCheckedException If deserialization failed. + */ + private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException { + assert bytes != null; + + if ((flags & SERIALIZED_FLAG) != 0) + return jdkMarshaller.unmarshal(bytes, null); + + int masked = flags & 0xff00; + + switch (masked) { + case BOOLEAN_FLAG: + return bytes[0] == '1'; + case INT_FLAG: + return U.bytesToInt(bytes, 0); + case LONG_FLAG: + return U.bytesToLong(bytes, 0); + case DATE_FLAG: + return new Date(U.bytesToLong(bytes, 0)); + case BYTE_FLAG: + return bytes[0]; + case FLOAT_FLAG: + return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); + case DOUBLE_FLAG: + return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); + case BYTE_ARR_FLAG: + return bytes; + default: + return new String(bytes, UTF_8); + } + } + + /** + * Encodes given object to a byte array and returns flags that describe the type of serialized object. + * + * @param obj Object to serialize. + * @param out Output stream to which object should be written. + * @return Serialization flags. + * @throws IgniteCheckedException If JDK serialization failed. + */ + private int encodeObj(Object obj, ByteArrayOutputStream out) throws IgniteCheckedException { + int flags = 0; + + byte[] data = null; + + if (obj instanceof String) + data = ((String)obj).getBytes(UTF_8); + else if (obj instanceof Boolean) { + data = new byte[] {(byte)((Boolean)obj ? '1' : '0')}; + + flags |= BOOLEAN_FLAG; + } + else if (obj instanceof Integer) { + data = U.intToBytes((Integer)obj); + + flags |= INT_FLAG; + } + else if (obj instanceof Long) { + data = U.longToBytes((Long)obj); + + flags |= LONG_FLAG; + } + else if (obj instanceof Date) { + data = U.longToBytes(((Date)obj).getTime()); + + flags |= DATE_FLAG; + } + else if (obj instanceof Byte) { + data = new byte[] {(Byte)obj}; + + flags |= BYTE_FLAG; + } + else if (obj instanceof Float) { + data = U.intToBytes(Float.floatToIntBits((Float)obj)); + + flags |= FLOAT_FLAG; + } + else if (obj instanceof Double) { + data = U.longToBytes(Double.doubleToLongBits((Double)obj)); + + flags |= DOUBLE_FLAG; + } + else if (obj instanceof byte[]) { + data = (byte[])obj; + + flags |= BYTE_ARR_FLAG; + } + else { + jdkMarshaller.marshal(obj, out); + + flags |= SERIALIZED_FLAG; + } + + if (data != null) + out.write(data, 0, data.length); + + return flags; + } + + /** + * Returns marshaller. + * + * @return Marshaller. + */ + protected GridClientMarshaller marshaller(GridNioSession ses) { + GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal()); + + assert marsh != null; + + return marsh; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(GridTcpRestParser.class, this); + } + + /** + * Holder for parser state and temporary buffer. + */ + protected static class ParserState { + /** Parser index. */ + private int idx; + + /** Temporary data buffer. */ + private ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + /** Packet being assembled. */ + private GridClientMessage packet; + + /** Packet type. */ + private GridClientPacketType packetType; + + /** Header data. */ + private HeaderData hdr; + + /** + * @return Stored parser index. + */ + public int index() { + return idx; + } + + /** + * @param idx Index to store. + */ + public void index(int idx) { + this.idx = idx; + } + + /** + * @return Temporary data buffer. + */ + public ByteArrayOutputStream buffer() { + return buf; + } + + /** + * @return Pending packet. + */ + @Nullable public GridClientMessage packet() { + return packet; + } + + /** + * @param packet Pending packet. + */ + public void packet(GridClientMessage packet) { + assert this.packet == null; + + this.packet = packet; + } + + /** + * @return Pending packet type. + */ + public GridClientPacketType packetType() { + return packetType; + } + + /** + * @param packetType Pending packet type. + */ + public void packetType(GridClientPacketType packetType) { + this.packetType = packetType; + } + + /** + * @return Header. + */ + public HeaderData header() { + return hdr; + } + + /** + * @param hdr Header. + */ + public void header(HeaderData hdr) { + this.hdr = hdr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ParserState.class, this); + } + } + + /** + * Header. + */ + protected static class HeaderData { + /** Request Id. */ + private final long reqId; + + /** Request Id. */ + private final UUID clientId; + + /** Request Id. */ + private final UUID destId; + + /** + * @param reqId Request Id. + * @param clientId Client Id. + * @param destId Destination Id. + */ + private HeaderData(long reqId, UUID clientId, UUID destId) { + this.reqId = reqId; + this.clientId = clientId; + this.destId = destId; + } + + /** + * @return Request Id. + */ + public long reqId() { + return reqId; + } + + /** + * @return Client Id. + */ + public UUID clientId() { + return clientId; + } + + /** + * @return Destination Id. + */ + public UUID destinationId() { + return destId; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java new file mode 100644 index 0000000..a4f6488 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.client.marshaller.*; +import org.apache.ignite.client.marshaller.jdk.*; +import org.apache.ignite.client.marshaller.optimized.*; +import org.apache.ignite.client.ssl.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.protocols.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.nio.ssl.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; + +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * TCP binary protocol implementation. + */ +public class GridTcpRestProtocol extends GridRestProtocolAdapter { + /** Server. */ + private GridNioServer<GridClientMessage> srv; + + /** JDK marshaller. */ + private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); + + /** NIO server listener. */ + private GridTcpRestNioListener lsnr; + + /** Message reader. */ + private final GridNioMessageReader msgReader = new GridNioMessageReader() { + @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { + assert msg != null; + assert buf != null; + + msg.messageReader(this, nodeId); + + return msg.readFrom(buf); + } + + @Nullable @Override public GridTcpMessageFactory messageFactory() { + return null; + } + }; + + /** Message writer. */ + private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { + @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { + assert msg != null; + assert buf != null; + + msg.messageWriter(this, nodeId); + + return msg.writeTo(buf); + } + + @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, + ByteBuffer buf) throws IOException { + assert msg != null; + assert out != null; + assert buf != null; + assert buf.hasArray(); + + msg.messageWriter(this, nodeId); + + boolean finished = false; + int cnt = 0; + + while (!finished) { + finished = msg.writeTo(buf); + + out.write(buf.array(), 0, buf.position()); + + cnt += buf.position(); + + buf.clear(); + } + + return cnt; + } + }; + + /** @param ctx Context. */ + public GridTcpRestProtocol(GridKernalContext ctx) { + super(ctx); + } + + /** + * @return JDK marshaller. + */ + IgniteMarshaller jdkMarshaller() { + return jdkMarshaller; + } + + /** + * Returns marshaller. + * + * @param ses Session. + * @return Marshaller. + */ + GridClientMarshaller marshaller(GridNioSession ses) { + GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal()); + + assert marsh != null; + + return marsh; + } + + /** + * @param ses Session. + * @return Whether portable marshaller is used. + */ + boolean portableMode(GridNioSession ses) { + return ctx.portable().isPortable(marshaller(ses)); + } + + /** {@inheritDoc} */ + @Override public String name() { + return "TCP binary"; + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException { + assert hnd != null; + + ClientConnectionConfiguration cfg = ctx.config().getClientConnectionConfiguration(); + + assert cfg != null; + + lsnr = new GridTcpRestNioListener(log, this, hnd, ctx); + + GridNioParser parser = new GridTcpRestDirectParser(this, msgReader); + + try { + host = resolveRestTcpHost(ctx.config()); + + SSLContext sslCtx = null; + + if (cfg.isRestTcpSslEnabled()) { + GridSslContextFactory factory = cfg.getRestTcpSslContextFactory(); + + if (factory == null) + // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log. + throw new SSLException("SSL is enabled, but SSL context factory is not specified."); + + sslCtx = factory.createSslContext(); + } + + int lastPort = cfg.getRestTcpPort() + cfg.getRestPortRange() - 1; + + for (int port0 = cfg.getRestTcpPort(); port0 <= lastPort; port0++) { + if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) { + port = port0; + + if (log.isInfoEnabled()) + log.info(startInfo()); + + return; + } + } + + U.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " + + "[firstPort=" + cfg.getRestTcpPort() + ", lastPort=" + lastPort + ", host=" + host + ']'); + } + catch (SSLException e) { + U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), + "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " + + "properly configured."); + } + catch (IOException e) { + U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), + "Failed to start " + name() + " protocol on port " + port + ". " + + "Check restTcpHost configuration property."); + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStart() { + super.onKernalStart(); + + Map<Byte, GridClientMarshaller> marshMap = new HashMap<>(); + + marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller()); + marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller()); + marshMap.put((byte)0, ctx.portable().portableMarshaller()); + + lsnr.marshallers(marshMap); + } + + /** {@inheritDoc} */ + @Override public void stop() { + if (srv != null) { + ctx.ports().deregisterPorts(getClass()); + + srv.stop(); + } + + if (log.isInfoEnabled()) + log.info(stopInfo()); + } + + /** + * Resolves host for REST TCP server using grid configuration. + * + * @param cfg Grid configuration. + * @return REST host. + * @throws IOException If failed to resolve REST host. + */ + private InetAddress resolveRestTcpHost(IgniteConfiguration cfg) throws IOException { + String host = cfg.getClientConnectionConfiguration().getRestTcpHost(); + + if (host == null) + host = cfg.getLocalHost(); + + return U.resolveLocalHost(host); + } + + /** + * Tries to start server with given parameters. + * + * @param hostAddr Host on which server should be bound. + * @param port Port on which server should be bound. + * @param lsnr Server message listener. + * @param parser Server message parser. + * @param sslCtx SSL context in case if SSL is enabled. + * @param cfg Configuration for other parameters. + * @return {@code True} if server successfully started, {@code false} if port is used and + * server was unable to start. + */ + private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr, + GridNioParser parser, @Nullable SSLContext sslCtx, ClientConnectionConfiguration cfg) { + try { + GridNioFilter codec = new GridNioCodecFilter(parser, log, true); + + GridNioFilter[] filters; + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log); + + sslFilter.directMode(true); + + boolean auth = cfg.isRestTcpSslClientAuth(); + + sslFilter.wantClientAuth(auth); + + sslFilter.needClientAuth(auth); + + filters = new GridNioFilter[] { + codec, + sslFilter + }; + } + else + filters = new GridNioFilter[] { codec }; + + srv = GridNioServer.<GridClientMessage>builder() + .address(hostAddr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(cfg.getRestTcpSelectorCount()) + .gridName(ctx.gridName()) + .tcpNoDelay(cfg.isRestTcpNoDelay()) + .directBuffer(cfg.isRestTcpDirectBuffer()) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(cfg.getRestTcpSendBufferSize()) + .socketReceiveBufferSize(cfg.getRestTcpReceiveBufferSize()) + .sendQueueLimit(cfg.getRestTcpSendQueueLimit()) + .filters(filters) + .directMode(true) + .messageWriter(msgWriter) + .build(); + + srv.idleTimeout(cfg.getRestIdleTimeout()); + + srv.start(); + + ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); + + return true; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage()); + + return false; + } + } + + /** {@inheritDoc} */ + @Override protected String getAddressPropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_ADDRS; + } + + /** {@inheritDoc} */ + @Override protected String getHostNamePropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_HOST_NAMES; + } + + /** {@inheritDoc} */ + @Override protected String getPortPropertyName() { + return GridNodeAttributes.ATTR_REST_TCP_PORT; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/package.html new file mode 100644 index 0000000..31d10f0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Adapters for TCP-based REST protocols. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheQueryRequest.java new file mode 100644 index 0000000..b75eb44 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheQueryRequest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.processors.rest.client.message.*; + +import java.io.Serializable; + +/** + * Cache query request. + */ +public class GridRestCacheQueryRequest extends GridRestRequest implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Request message. */ + private GridClientCacheQueryRequest msg; + + /** + * @param msg Client request message. + */ + public GridRestCacheQueryRequest(GridClientCacheQueryRequest msg) { + this.msg = msg; + } + + /** + * @return Query ID. + */ + public long queryId() { + return msg.queryId(); + } + + /** + * @return Operation. + */ + public GridClientCacheQueryRequest.GridQueryOperation operation() { + return msg.operation(); + } + + /** + * @return Cache name. + */ + public String cacheName() { + return msg.cacheName(); + } + + /** + * @return Query clause. + */ + public String clause() { + return msg.clause(); + } + + /** + * @return Query type. + */ + public GridClientCacheQueryRequest.GridQueryType type() { + return msg.type(); + } + + /** + * @return Page size. + */ + public int pageSize() { + return msg.pageSize(); + } + + /** + * @return Timeout. + */ + public long timeout() { + return msg.timeout(); + } + + /** + * @return Include backups. + */ + public boolean includeBackups() { + return msg.includeBackups(); + } + + /** + * @return Enable dedup. + */ + public boolean enableDedup() { + return msg.enableDedup(); + } + + /** + * @return Keep portable flag. + */ + public boolean keepPortable() { + return msg.keepPortable(); + } + + /** + * @return Class name. + */ + public String className() { + return msg.className(); + } + + /** + * @return Remot reducer class name. + */ + public String remoteReducerClassName() { + return msg.remoteReducerClassName(); + } + + /** + * @return Remote transformer class name. + */ + public String remoteTransformerClassName() { + return msg.remoteTransformerClassName(); + } + + /** + * @return Query arguments. + */ + public Object[] queryArguments() { + return msg.queryArguments(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return msg.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheRequest.java new file mode 100644 index 0000000..89cd06f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestCacheRequest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +public class GridRestCacheRequest extends GridRestRequest { + /** Cache name. */ + private String cacheName; + + /** Key. */ + private Object key; + + /** Value (expected value for CAS). */ + private Object val; + + /** New value for CAS. */ + private Object val2; + + /** Keys and values for put all, get all, remove all operations. */ + private Map<Object, Object> vals; + + /** Bit map of cache flags to be enabled on cache projection. */ + private int cacheFlags; + + /** Expiration time. */ + private Long ttl; + + /** Value to add/subtract. */ + private Long delta; + + /** Initial value for increment and decrement commands. */ + private Long init; + + /** + * @return Cache name, or {@code null} if not set. + */ + public String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * @return Key. + */ + public Object key() { + return key; + } + + /** + * @param key Key. + */ + public void key(Object key) { + this.key = key; + } + + /** + * @return Value 1. + */ + public Object value() { + return val; + } + + /** + * @param val Value 1. + */ + public void value(Object val) { + this.val = val; + } + + /** + * @return Value 2. + */ + public Object value2() { + return val2; + } + + /** + * @param val2 Value 2. + */ + public void value2(Object val2) { + this.val2 = val2; + } + + /** + * @return Keys and values for put all, get all, remove all operations. + */ + public Map<Object, Object> values() { + return vals; + } + + /** + * @param vals Keys and values for put all, get all, remove all operations. + */ + public void values(Map<Object, Object> vals) { + this.vals = vals; + } + + /** + * @param cacheFlags Bit representation of cache flags. + */ + public void cacheFlags(int cacheFlags) { + this.cacheFlags = cacheFlags; + } + + /** + * @return Bit representation of cache flags. + */ + public int cacheFlags() { + return cacheFlags; + } + + /** + * @return Expiration time. + */ + public Long ttl() { + return ttl; + } + + /** + * @param ttl Expiration time. + */ + public void ttl(Long ttl) { + this.ttl = ttl; + } + + /** + * @return Delta for increment and decrement commands. + */ + public Long delta() { + return delta; + } + + /** + * @param delta Delta for increment and decrement commands. + */ + public void delta(Long delta) { + this.delta = delta; + } + + /** + * @return Initial value for increment and decrement commands. + */ + public Long initial() { + return init; + } + + /** + * @param init Initial value for increment and decrement commands. + */ + public void initial(Long init) { + this.init = init; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRestCacheRequest.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestLogRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestLogRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestLogRequest.java new file mode 100644 index 0000000..4537fbd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestLogRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Grid command request of log file. + */ +public class GridRestLogRequest extends GridRestRequest { + /** Task name. */ + private String path; + + /** From line, inclusive, indexing from 0. */ + private int from = -1; + + /** To line, inclusive, indexing from 0, can exceed count of lines in log. */ + private int to = -1; + + /** + * @return Path to log file. + */ + public String path() { + return path; + } + + /** + * @param path Path to log file. + */ + public void path(String path) { + this.path = path; + } + + /** + * @return From line, inclusive, indexing from 0. + */ + public int from() { + return from; + } + + /** + * @param from From line, inclusive, indexing from 0. + */ + public void from(int from) { + this.from = from; + } + + /** + * @return To line, inclusive, indexing from 0. + */ + public int to() { + return to; + } + + /** + * @param to To line, inclusive, indexing from 0. + */ + public void to(int to) { + this.to = to; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRestLogRequest.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortableGetMetaDataRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortableGetMetaDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortableGetMetaDataRequest.java new file mode 100644 index 0000000..6205a0e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortableGetMetaDataRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.processors.rest.client.message.*; + +import java.util.*; + +/** + * Portable get metadata request. + */ +public class GridRestPortableGetMetaDataRequest extends GridRestRequest { + /** */ + private final GridClientGetMetaDataRequest msg; + + /** + * @param msg Client message. + */ + public GridRestPortableGetMetaDataRequest(GridClientGetMetaDataRequest msg) { + this.msg = msg; + } + + /** + * @return Type IDs. + */ + public Collection<Integer> typeIds() { + return msg.typeIds(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortablePutMetaDataRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortablePutMetaDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortablePutMetaDataRequest.java new file mode 100644 index 0000000..7acee393 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestPortablePutMetaDataRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.processors.rest.client.message.*; + +import java.util.*; + +/** + * Portable get metadata request. + */ +public class GridRestPortablePutMetaDataRequest extends GridRestRequest { + /** */ + private final GridClientPutMetaDataRequest msg; + + /** + * @param msg Client message. + */ + public GridRestPortablePutMetaDataRequest(GridClientPutMetaDataRequest msg) { + this.msg = msg; + } + + /** + * @return Type IDs. + */ + public Collection<GridClientPortableMetaData> metaData() { + return msg.metaData(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestRequest.java new file mode 100644 index 0000000..f638703 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestRequest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; +import java.util.*; + +/** + * Grid command request. + */ +public class GridRestRequest { + /** Destination ID. */ + private UUID destId; + + /** Client ID. */ + private UUID clientId; + + /** Client network address. */ + private InetSocketAddress addr; + + /** Client credentials. */ + @GridToStringExclude + private Object cred; + + /** Client session token. */ + private byte[] sesTok; + + /** Command. */ + private GridRestCommand cmd; + + /** Portable mode flag. */ + private boolean portableMode; + + /** + * @return Destination ID. + */ + public UUID destinationId() { + return destId; + } + + /** + * @param destId Destination ID. + */ + public void destinationId(UUID destId) { + this.destId = destId; + } + + /** + * @return Command. + */ + public GridRestCommand command() { + return cmd; + } + + /** + * @param cmd Command. + */ + public void command(GridRestCommand cmd) { + this.cmd = cmd; + } + + /** + * Gets client ID that performed request. + * + * @return Client ID. + */ + public UUID clientId() { + return clientId; + } + + /** + * Sets client ID that performed request. + * + * @param clientId Client ID. + */ + public void clientId(UUID clientId) { + this.clientId = clientId; + } + + /** + * Gets client credentials for authentication process. + * + * @return Credentials. + */ + public Object credentials() { + return cred; + } + + /** + * Sets client credentials for authentication. + * + * @param cred Credentials. + */ + public void credentials(Object cred) { + this.cred = cred; + } + + /** + * Gets session token for already authenticated client. + * + * @return Session token. + */ + public byte[] sessionToken() { + return sesTok; + } + + /** + * Sets session token for already authenticated client. + * + * @param sesTok Session token. + */ + public void sessionToken(byte[] sesTok) { + this.sesTok = sesTok; + } + + /** + * @return Client address. + */ + public InetSocketAddress address() { + return addr; + } + + /** + * @param addr Client address. + */ + public void address(InetSocketAddress addr) { + this.addr = addr; + } + + /** + * @return Portable mode flag. + */ + public boolean portableMode() { + return portableMode; + } + + /** + * @param portableMode Portable mode flag. + */ + public void portableMode(boolean portableMode) { + this.portableMode = portableMode; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRestRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTaskRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTaskRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTaskRequest.java new file mode 100644 index 0000000..a8ad183 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTaskRequest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Grid task command request. + */ +public class GridRestTaskRequest extends GridRestRequest { + /** Task name. */ + private String taskName; + + /** Task Id. */ + private String taskId; + + /** Parameters. */ + private List<Object> params; + + /** Asynchronous execution flag. */ + private boolean async; + + /** Timeout. */ + private long timeout; + + /** Keep portables flag. */ + private boolean keepPortables; + + /** + * @return Task name, if specified, {@code null} otherwise. + */ + public String taskName() { + return taskName; + } + + /** + * @param taskName Name of task for execution. + */ + public void taskName(String taskName) { + this.taskName = taskName; + } + + /** + * @return Task identifier, if specified, {@code null} otherwise. + */ + public String taskId() { + return taskId; + } + + /** + * @param taskId Task identifier. + */ + public void taskId(String taskId) { + this.taskId = taskId; + } + + /** + * @return Asynchronous execution flag. + */ + public boolean async() { + return async; + } + + /** + * @param async Asynchronous execution flag. + */ + public void async(boolean async) { + this.async = async; + } + + /** + * @return Task execution parameters. + */ + public List<Object> params() { + return params; + } + + /** + * @param params Task execution parameters. + */ + public void params(List<Object> params) { + this.params = params; + } + + /** + * @return Timeout. + */ + public long timeout() { + return timeout; + } + + /** + * @param timeout Timeout. + */ + public void timeout(long timeout) { + this.timeout = timeout; + } + + /** + * @return Keep portables flag. + */ + public boolean keepPortables() { + return keepPortables; + } + + /** + * @param keepPortables Keep portables flag. + */ + public void keepPortables(boolean keepPortables) { + this.keepPortables = keepPortables; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRestTaskRequest.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTopologyRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTopologyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTopologyRequest.java new file mode 100644 index 0000000..522bb2c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/GridRestTopologyRequest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Grid command topology request. + */ +public class GridRestTopologyRequest extends GridRestRequest { + /** Id of requested node. */ + private UUID nodeId; + + /** IP address of requested node. */ + private String nodeIp; + + /** Include metrics flag. */ + private boolean includeMetrics; + + /** Include node attributes flag. */ + private boolean includeAttrs; + + /** + * @return Include metrics flag. + */ + public boolean includeMetrics() { + return includeMetrics; + } + + /** + * @param includeMetrics Include metrics flag. + */ + public void includeMetrics(boolean includeMetrics) { + this.includeMetrics = includeMetrics; + } + + /** + * @return Include node attributes flag. + */ + public boolean includeAttributes() { + return includeAttrs; + } + + /** + * @param includeAttrs Include node attributes flag. + */ + public void includeAttributes(boolean includeAttrs) { + this.includeAttrs = includeAttrs; + } + + /** + * @return Node identifier, if specified, {@code null} otherwise. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @param nodeId Node identifier to lookup. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return Node ip address if specified, {@code null} otherwise. + */ + public String nodeIp() { + return nodeIp; + } + + /** + * @param nodeIp Node ip address to lookup. + */ + public void nodeIp(String nodeIp) { + this.nodeIp = nodeIp; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRestTopologyRequest.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/package.html new file mode 100644 index 0000000..3e414be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST requests. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java index ae75077..39aed58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java @@ -32,7 +32,7 @@ import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.kernal.processors.clock.*; import org.gridgain.grid.kernal.processors.continuous.*; import org.gridgain.grid.kernal.processors.dataload.*; -import org.gridgain.grid.kernal.processors.rest.handlers.task.*; +import org.apache.ignite.internal.processors.rest.handlers.task.*; import org.apache.ignite.spi.collision.jobstealing.*; import org.apache.ignite.spi.communication.tcp.*; import org.jdk8.backport.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/GridRestCommand.java deleted file mode 100644 index fde09d7..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/GridRestCommand.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest; - -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Supported commands. - */ -public enum GridRestCommand { - /* - * API commands. - * ============= - */ - - /** Get cached value. */ - CACHE_GET("get"), - - /** Get several cached values. */ - CACHE_GET_ALL("getall"), - - /** Store value in cache. */ - CACHE_PUT("put"), - - /** Store value in cache if it doesn't exist. */ - CACHE_ADD("add"), - - /** Store several values in cache. */ - CACHE_PUT_ALL("putall"), - - /** Remove value from cache. */ - CACHE_REMOVE("rmv"), - - /** Remove several values from cache. */ - CACHE_REMOVE_ALL("rmvall"), - - /** Replace cache value only if there is currently a mapping for it. */ - CACHE_REPLACE("rep"), - - /** Increment. */ - CACHE_INCREMENT("incr"), - - /** Decrement. */ - CACHE_DECREMENT("decr"), - - /** Compare and set. */ - CACHE_CAS("cas"), - - /** Append. */ - CACHE_APPEND("append"), - - /** Prepend. */ - CACHE_PREPEND("prepend"), - - /** Cache metrics. */ - CACHE_METRICS("cache"), - - /** Grid topology. */ - TOPOLOGY("top"), - - /** Single node info. */ - NODE("node"), - - /** Task execution .*/ - EXE("exe"), - - /** Task execution .*/ - RESULT("res"), - - /** Version. */ - VERSION("version"), - - /** Log. */ - LOG("log"), - - /** No-op. */ - NOOP("noop"), - - /** Quit. */ - QUIT("quit"), - - /** Start query execution. */ - CACHE_QUERY_EXECUTE("queryexecute"), - - /** Fetch query results. */ - CACHE_QUERY_FETCH("queryfetch"), - - /** Rebuild indexes. */ - CACHE_QUERY_REBUILD_INDEXES("rebuildqueryindexes"), - - /** Put portable metadata. */ - PUT_PORTABLE_METADATA("putportablemetadata"), - - /** Get portable metadata. */ - GET_PORTABLE_METADATA("getportablemetadata"); - - /** Enum values. */ - private static final GridRestCommand[] VALS = values(); - - /** Key to enum map. */ - private static final Map<String, GridRestCommand> cmds = new HashMap<>(); - - /** - * Map keys to commands. - */ - static { - for (GridRestCommand cmd : values()) - cmds.put(cmd.key(), cmd); - } - - /** Command key. */ - private final String key; - - /** - * @param key Key. - */ - GridRestCommand(String key) { - this.key = key; - } - - /** - * @param ord Byte to convert to enum. - * @return Enum. - */ - @Nullable public static GridRestCommand fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } - - /** - * @param key Key. - * @return Command. - */ - @Nullable public static GridRestCommand fromKey(String key) { - return cmds.get(key); - } - - /** - * @return Command key. - */ - public String key() { - return key; - } -}
