IGNITE-61 - Direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/865c4286 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/865c4286 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/865c4286 Branch: refs/heads/ignite-138 Commit: 865c42867e60ab9b8abb703911bd9ff8ca351195 Parents: 4e43484 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Feb 9 13:01:02 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Feb 9 13:01:02 2015 -0800 ---------------------------------------------------------------------- .../ignite/client/ClientTestRestServer.java | 6 +++ .../protocols/tcp/TcpRestParserSelfTest.java | 12 ++--- .../apache/ignite/client/GridClientFactory.java | 2 +- .../ignite/client/impl/GridClientImpl.java | 17 ++++--- .../GridClientConnectionManagerAdapter.java | 15 +++++-- .../GridClientConnectionManagerOsImpl.java | 5 ++- .../connection/GridClientNioTcpConnection.java | 31 +++++++------ .../router/impl/GridRouterClientImpl.java | 4 +- .../router/impl/GridTcpRouterNioParser.java | 6 +++ .../rest/client/message/GridRouterRequest.java | 1 + .../rest/protocols/tcp/GridTcpRestParser.java | 47 +++++++++++++++++--- .../rest/protocols/tcp/GridTcpRestProtocol.java | 2 +- 12 files changed, 106 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/clients/src/test/java/org/apache/ignite/client/ClientTestRestServer.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/client/ClientTestRestServer.java b/modules/clients/src/test/java/org/apache/ignite/client/ClientTestRestServer.java index fbe3dda..017888f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/client/ClientTestRestServer.java +++ b/modules/clients/src/test/java/org/apache/ignite/client/ClientTestRestServer.java @@ -267,6 +267,12 @@ public class ClientTestRestServer { /** */ private final GridClientMarshaller marsh = new GridClientOptimizedMarshaller(); + /** + */ + public TestParser() { + super(false); + } + /** {@inheritDoc} */ @Override protected GridClientMarshaller marshaller(GridNioSession ses) { return marsh; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/TcpRestParserSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/TcpRestParserSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/TcpRestParserSelfTest.java index 070441d..2a7263f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/TcpRestParserSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/TcpRestParserSelfTest.java @@ -56,7 +56,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { public void testSimplePacketParsing() throws Exception { GridNioSession ses = new MockNioSession(); - GridTcpRestParser parser = new GridTcpRestParser(); + GridTcpRestParser parser = new GridTcpRestParser(false); byte hdr = MEMCACHE_REQ_FLAG; @@ -91,7 +91,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { public void testIncorrectPackets() throws Exception { final GridNioSession ses = new MockNioSession(); - final GridTcpRestParser parser = new GridTcpRestParser(); + final GridTcpRestParser parser = new GridTcpRestParser(false); final byte[] opaque = new byte[] {0x01, 0x02, 0x03, (byte)0xFF}; @@ -151,7 +151,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { ses.addMeta(MARSHALLER.ordinal(), new GridClientOptimizedMarshaller()); - GridTcpRestParser parser = new GridTcpRestParser(); + GridTcpRestParser parser = new GridTcpRestParser(false); GridClientMessage msg = parser.decode(ses, raw); @@ -180,7 +180,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { ses1.addMeta(MARSHALLER.ordinal(), new GridClientOptimizedMarshaller()); ses2.addMeta(MARSHALLER.ordinal(), new GridClientOptimizedMarshaller()); - GridTcpRestParser parser = new GridTcpRestParser(); + GridTcpRestParser parser = new GridTcpRestParser(false); GridClientCacheRequest req = new GridClientCacheRequest(CAS); @@ -266,7 +266,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { ses.addMeta(MARSHALLER.ordinal(), new GridClientOptimizedMarshaller()); - GridTcpRestParser parser = new GridTcpRestParser(); + GridTcpRestParser parser = new GridTcpRestParser(false); Collection<GridClientCacheRequest> lst = new ArrayList<>(5); @@ -308,7 +308,7 @@ public class TcpRestParserSelfTest extends GridCommonAbstractTest { ses.addMeta(MARSHALLER.ordinal(), new GridClientOptimizedMarshaller()); - GridTcpRestParser parser = new GridTcpRestParser(); + GridTcpRestParser parser = new GridTcpRestParser(false); Collection<GridClientMessage> lst = new ArrayList<>(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/GridClientFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/GridClientFactory.java b/modules/core/src/main/java/org/apache/ignite/client/GridClientFactory.java index 647155e..339a9f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/GridClientFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/client/GridClientFactory.java @@ -54,7 +54,7 @@ public class GridClientFactory { try { UUID clientId = UUID.randomUUID(); - GridClientImpl client = new GridClientImpl(clientId, cfg); + GridClientImpl client = new GridClientImpl(clientId, cfg, false); GridClientImpl old = openClients.putIfAbsent(clientId, client); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/impl/GridClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/GridClientImpl.java b/modules/core/src/main/java/org/apache/ignite/client/impl/GridClientImpl.java index 8f64a79..43b8d20 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/GridClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/GridClientImpl.java @@ -103,12 +103,13 @@ public class GridClientImpl implements GridClient { * * @param id Client identifier. * @param cfg0 Client configuration. + * @param routerClient Router client flag. * @throws GridClientException If client configuration is incorrect. * @throws GridServerUnreachableException If none of the servers specified in configuration can * be reached. */ @SuppressWarnings("CallToThreadStartDuringObjectConstruction") - public GridClientImpl(UUID id, GridClientConfiguration cfg0) throws GridClientException { + public GridClientImpl(UUID id, GridClientConfiguration cfg0, boolean routerClient) throws GridClientException { this.id = id; cfg = new GridClientConfiguration(cfg0); @@ -158,7 +159,7 @@ public class GridClientImpl implements GridClient { throw new GridClientException("Servers addresses and routers addresses cannot both be provided " + "for client (please fix configuration and restart): " + this); - connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null); + connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null, routerClient); try { // Init connection manager, it should cause topology update. @@ -385,8 +386,9 @@ public class GridClientImpl implements GridClient { * @return New connection manager based on current client settings. * @throws GridClientException If failed to start connection server. */ - public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId) throws GridClientException { - return createConnectionManager(id, sslCtx, cfg, routers, top, marshId); + public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId, boolean routerClient) + throws GridClientException { + return createConnectionManager(id, sslCtx, cfg, routers, top, marshId, routerClient); } /** @@ -399,7 +401,7 @@ public class GridClientImpl implements GridClient { */ private GridClientConnectionManager createConnectionManager(UUID clientId, SSLContext sslCtx, GridClientConfiguration cfg, Collection<InetSocketAddress> routers, GridClientTopology top, - @Nullable Byte marshId) + @Nullable Byte marshId, boolean routerClient) throws GridClientException { GridClientConnectionManager mgr; @@ -409,10 +411,11 @@ public class GridClientImpl implements GridClient { Constructor<?> cons = cls.getConstructor(UUID.class, SSLContext.class, GridClientConfiguration.class, Collection.class, GridClientTopology.class, Byte.class); - mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId); + mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId, + routerClient); } catch (ClassNotFoundException ignored) { - mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId); + mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId, routerClient); } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { throw new GridClientException("Failed to create client connection manager.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java index 6cecef0..ff9bdf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -110,7 +110,8 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio GridClientConfiguration cfg, Collection<InetSocketAddress> routers, GridClientTopology top, - @Nullable Byte marshId) + @Nullable Byte marshId, + boolean routerClient) throws GridClientException { assert clientId != null : "clientId != null"; assert cfg != null : "cfg != null"; @@ -142,7 +143,7 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio GridNioFilter[] filters; - GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(), gridLog, false); + GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(routerClient), gridLog, false); if (sslCtx != null) { GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog); @@ -595,8 +596,14 @@ abstract class GridClientConnectionManagerAdapter implements GridClientConnectio if (msg instanceof GridClientPingPacket) conn.handlePingResponse(); - else - conn.handleResponse((GridClientMessage)msg); + else { + try { + conn.handleResponse((GridClientMessage)msg); + } + catch (IOException e) { + log.log(Level.SEVERE, "Failed to parse response.", e); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerOsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerOsImpl.java b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerOsImpl.java index f2d4506..a9bd80e 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerOsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerOsImpl.java @@ -36,8 +36,9 @@ public class GridClientConnectionManagerOsImpl extends GridClientConnectionManag * @throws GridClientException In case of error. */ public GridClientConnectionManagerOsImpl(UUID clientId, SSLContext sslCtx, GridClientConfiguration cfg, - Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId) throws GridClientException { - super(clientId, sslCtx, cfg, routers, top, marshId); + Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId, boolean routerClient) + throws GridClientException { + super(clientId, sslCtx, cfg, routers, top, marshId, routerClient); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java index 3cead45..03fa95d 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientNioTcpConnection.java @@ -461,7 +461,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { * @param res Incoming response data. */ @SuppressWarnings({"unchecked", "TooBroadScope"}) - void handleResponse(GridClientMessage res) { + void handleResponse(GridClientMessage res) throws IOException { lastMsgRcvTime = U.currentTimeMillis(); TcpClientFuture fut = pendingReqs.get(res.requestId()); @@ -474,22 +474,25 @@ public class GridClientNioTcpConnection extends GridClientConnection { } if (fut.forward()) { - // TODO: IGNITE-61 -// GridRouterResponse msg = new GridRouterResponse( -// res.messageArray(), -// res.requestId(), -// clientId, -// res.destinationId()); -// -// removePending(msg.requestId()); -// -// fut.onDone(msg); + removePending(res.requestId()); + + fut.onDone(res); } else { - if (res instanceof GridClientResponse) - handleClientResponse(fut, (GridClientResponse)res); + GridClientMessage res0 = res; + + if (res instanceof GridRouterResponse) { + res0 = marsh.unmarshal(((GridRouterResponse)res).body()); + + res0.requestId(res.requestId()); + res0.clientId(res.clientId()); + res0.destinationId(res.destinationId()); + } + + if (res0 instanceof GridClientResponse) + handleClientResponse(fut, (GridClientResponse)res0); else - log.warning("Unsupported response type received: " + res); + log.warning("Unsupported response type received: " + res0); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterClientImpl.java b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterClientImpl.java index 0d7f74d..1b34346 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridRouterClientImpl.java @@ -60,7 +60,7 @@ class GridRouterClientImpl implements GridClient { this.cliCfg = cliCfg; - clientImpl = new GridClientImpl(id, cliCfg); + clientImpl = new GridClientImpl(id, cliCfg, true); if (cliCfg.getProtocol() != GridClientProtocol.TCP) throw new AssertionError("Unknown protocol: " + cliCfg.getProtocol()); @@ -136,7 +136,7 @@ class GridRouterClientImpl implements GridClient { if (mgr == null) { GridClientConnectionManager old = connMgrMap.putIfAbsent(marshId, mgr = - clientImpl.newConnectionManager(marshId)); + clientImpl.newConnectionManager(marshId, true)); if (old != null) mgr = old; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridTcpRouterNioParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridTcpRouterNioParser.java b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridTcpRouterNioParser.java index 231b0f5..129406d 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridTcpRouterNioParser.java +++ b/modules/core/src/main/java/org/apache/ignite/client/router/impl/GridTcpRouterNioParser.java @@ -39,6 +39,12 @@ class GridTcpRouterNioParser extends GridTcpRestParser { /** Number of sent messages. */ private volatile long sndCnt; + /** + */ + public GridTcpRouterNioParser() { + super(false); + } + /** {@inheritDoc} */ @Override protected GridClientMessage parseClientMessage(GridNioSession ses, ParserState state) { rcvCnt++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridRouterRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridRouterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridRouterRequest.java index 34efcf6..6dcbf92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridRouterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridRouterRequest.java @@ -37,6 +37,7 @@ public class GridRouterRequest extends GridClientAbstractMessage { */ public GridRouterRequest(byte[] body, Long reqId, UUID clientId, UUID destId) { this.body = body; + destinationId(destId); clientId(clientId); requestId(reqId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java index 978ae61..03322d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java @@ -45,6 +45,16 @@ public class GridTcpRestParser implements GridNioParser { /** JDK marshaller. */ private final Marshaller jdkMarshaller = new JdkMarshaller(); + /** Router client flag. */ + private final boolean routerClient; + + /** + * @param routerClient Router client flag. + */ + public GridTcpRestParser(boolean routerClient) { + this.routerClient = routerClient; + } + /** {@inheritDoc} */ @Nullable @Override public GridClientMessage decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { @@ -153,6 +163,22 @@ public class GridTcpRestParser implements GridNioParser { IGNITE_HANDSHAKE_RES_FLAG, ((GridClientHandshakeResponse)msg).resultCode() }); + else if (msg instanceof GridRouterRequest) { + byte[] body = ((GridRouterRequest)msg).body(); + + ByteBuffer buf = ByteBuffer.allocate(45 + body.length); + + buf.put(IGNITE_REQ_FLAG); + buf.putInt(40 + body.length); + buf.putLong(msg.requestId()); + buf.put(U.uuidToBytes(msg.clientId())); + buf.put(U.uuidToBytes(msg.destinationId())); + buf.put(body); + + buf.flip(); + + return buf; + } else { GridClientMarshaller marsh = marshaller(ses); @@ -455,13 +481,24 @@ public class GridTcpRestParser implements GridNioParser { * @throws IgniteCheckedException If no marshaller was defined for the session. */ protected GridClientMessage parseClientMessage(GridNioSession ses, ParserState state) throws IOException, IgniteCheckedException { - GridClientMarshaller marsh = marshaller(ses); + GridClientMessage msg; + + if (routerClient) { + msg = new GridRouterResponse( + state.buffer().toByteArray(), + state.header().reqId(), + state.header().clientId(), + state.header().destinationId()); + } + else { + GridClientMarshaller marsh = marshaller(ses); - GridClientMessage msg = marsh.unmarshal(state.buffer().toByteArray()); + msg = marsh.unmarshal(state.buffer().toByteArray()); - msg.requestId(state.header().reqId()); - msg.clientId(state.header().clientId()); - msg.destinationId(state.header().destinationId()); + msg.requestId(state.header().reqId()); + msg.clientId(state.header().clientId()); + msg.destinationId(state.header().destinationId()); + } return msg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/865c4286/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java index 19971e1..7482a79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java @@ -106,7 +106,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter { lsnr = new GridTcpRestNioListener(log, this, hnd, ctx); - GridNioParser parser = new GridTcpRestParser(); + GridNioParser parser = new GridTcpRestParser(false); try { host = resolveRestTcpHost(ctx.config());