http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index 1882663,0000000..d84bca5 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@@ -1,1147 -1,0 +1,1063 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl.connection; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.marshaller.*; +import org.apache.ignite.internal.client.marshaller.jdk.*; +import org.apache.ignite.internal.client.marshaller.optimized.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.nio.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; +import java.io.*; +import java.net.*; - import java.nio.*; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.logging.*; + +import static org.apache.ignite.internal.client.GridClientCacheFlag.*; +import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*; +import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; ++import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * This class performs request to grid over tcp protocol. Serialization is performed with marshaller + * provided. + */ +public class GridClientNioTcpConnection extends GridClientConnection { + /** */ + static final int SES_META_HANDSHAKE = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + static final int SES_META_CONN = GridNioSessionMetaKey.nextUniqueKey(); + + /** Logger */ + private static final Logger log = Logger.getLogger(GridClientNioTcpConnection.class.getName()); + + /** Ping interval. */ + private final long pingInterval; + + /** Ping timeout. */ + private final long pingTimeout; + + /** Requests that are waiting for response. */ + private ConcurrentMap<Long, TcpClientFuture> pendingReqs = new ConcurrentHashMap<>(); + + /** Node by node id requests. Map for reducing server load. */ + private ConcurrentMap<UUID, TcpClientFuture> refreshNodeReqs = new ConcurrentHashMap<>(); + + /** Latch indicating pending are empty and connection could be terminated. */ + private final CountDownLatch closedLatch = new CountDownLatch(1); + + /** Request ID counter. */ + private AtomicLong reqIdCntr = new AtomicLong(1); + + /** Timestamp of last sent message. */ + private volatile long lastMsgSndTime; + + /** Timestamp of last received message. */ + private volatile long lastMsgRcvTime; + + /** + * Ping receive time. + * {@code 0} until first ping send and {@link Long#MAX_VALUE} while response isn't received. + */ + private volatile long lastPingRcvTime; + + /** Ping send time. */ + private volatile long lastPingSndTime; + + /** Connection create timestamp. */ + private long createTs; + + /** Session token. */ + private volatile byte[] sesTok; + + /** Timer to run ping checks. */ + private ScheduledFuture<?> pingTask; + + /** NIO session. */ + private GridNioSession ses; + + /** Marshaller. */ + private final GridClientMarshaller marsh; + + /** */ + private final ThreadLocal<Boolean> keepPortablesMode; + + /** + * Creates a client facade, tries to connect to remote server, in case of success starts reader thread. + * + * @param srv NIO server. + * @param clientId Client identifier. + * @param srvAddr Server to connect to. + * @param sslCtx SSL context to use if SSL is enabled, {@code null} otherwise. + * @param pingExecutor Executor service for sending ping requests. + * @param connectTimeout Connect timeout. + * @param pingInterval Ping interval. + * @param pingTimeout Ping timeout. + * @param tcpNoDelay TCP_NODELAY flag for outgoing socket connection. + * @param marsh Marshaller to use in communication. + * @param top Topology instance. + * @param cred Client credentials. @throws IOException If connection could not be established. + * @throws IOException If IO error occurs. + * @throws GridClientException If handshake error occurs. + */ + @SuppressWarnings("unchecked") + GridClientNioTcpConnection(GridNioServer srv, + UUID clientId, + InetSocketAddress srvAddr, + SSLContext sslCtx, + ScheduledExecutorService pingExecutor, + int connectTimeout, + long pingInterval, + long pingTimeout, + boolean tcpNoDelay, + GridClientMarshaller marsh, + Byte marshId, + GridClientTopology top, + Object cred, + ThreadLocal<Boolean> keepPortablesMode + ) throws IOException, GridClientException { + super(clientId, srvAddr, sslCtx, top, cred); + + assert marsh != null || marshId != null; + + this.marsh = marsh; + this.pingInterval = pingInterval; + this.pingTimeout = pingTimeout; + this.keepPortablesMode = keepPortablesMode; + + SocketChannel ch = null; + Socket sock = null; + boolean cleanup = true; + + try { + ch = SocketChannel.open(); + sock = ch.socket(); + + sock.setTcpNoDelay(tcpNoDelay); + sock.setKeepAlive(true); + + sock.connect(srvAddr, connectTimeout); + + GridClientFuture<?> handshakeFut = new GridClientFutureAdapter<>(); + + Map<Integer, Object> meta = new HashMap<>(); + + meta.put(SES_META_HANDSHAKE, handshakeFut); + + GridNioFuture<?> sslHandshakeFut = null; + + if (sslCtx != null) { + sslHandshakeFut = new GridNioFutureImpl<>(); + + meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); + } + + ses = (GridNioSession)srv.createSession(ch, meta).get(); + + if (sslHandshakeFut != null) + sslHandshakeFut.get(); + + GridClientHandshakeRequest req = new GridClientHandshakeRequest(); + + if (marshId != null) + req.marshallerId(marshId); + // marsh != null. + else if (marsh instanceof GridClientOptimizedMarshaller) + req.marshallerId(GridClientOptimizedMarshaller.ID); + else if (marsh instanceof GridClientJdkMarshaller) + req.marshallerId(GridClientJdkMarshaller.ID); + - GridClientHandshakeRequestWrapper wrapper = new GridClientHandshakeRequestWrapper(req); ++ ses.addMeta(MARSHALLER.ordinal(), marsh); + - ses.send(wrapper); ++ ses.send(req); + + handshakeFut.get(); + + ses.addMeta(SES_META_CONN, this); + + if (log.isLoggable(Level.INFO)) + log.info("Client TCP connection established: " + serverAddress()); + + pingTask = pingExecutor.scheduleAtFixedRate(new Runnable() { + @Override public void run() { + try { + makeRequest(GridClientPingPacket.PING_MESSAGE, (TcpClientFuture)null, false); + } + catch (Exception e) { + log.warning("Failed to send ping message: " + e); + } + } + }, 500, 500, TimeUnit.MILLISECONDS); + + createTs = System.currentTimeMillis(); + + cleanup = false; + } + catch (IgniteCheckedException e) { + throw new GridClientException(e); + } + finally { + if (cleanup) { + if (ses != null) + srv.close(ses); + + if (sock!= null) + sock.close(); + + if (ch != null) + ch.close(); + } + } + } + + /** {@inheritDoc} */ + @Override void close(GridClientConnectionCloseReason reason, boolean waitCompletion) { + close(reason, waitCompletion, null); + } + + /** + * Closes connection facade. + * + * @param reason Why this connection should be closed. + * @param waitCompletion If {@code true} this method will wait for all pending requests to be completed. + * @param cause The cause of connection close, or {@code null} if it is an ordinal close. + */ + @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") + private void close(GridClientConnectionCloseReason reason, boolean waitCompletion, @Nullable Throwable cause) { + synchronized (this) { + if (closeReason != null) + return; + + closeReason = reason; + } + + try { + // Wait for all pending requests to be processed. + if (waitCompletion && !pendingReqs.isEmpty() && ses.closeTime() == 0) + closedLatch.await(); + } + catch (InterruptedException ignored) { + log.warning("Interrupted while waiting for all requests to be processed (all pending " + + "requests will be failed): " + serverAddress()); + + Thread.currentThread().interrupt(); + } + + if (pingTask != null) + pingTask.cancel(false); + + if (ses != null) + ses.close(); // Async close. + + for (Iterator<TcpClientFuture> it = pendingReqs.values().iterator(); it.hasNext(); ) { + GridClientFutureAdapter fut = it.next(); + + fut.onDone(getCloseReasonAsException(closeReason, cause)); + + it.remove(); + } + + if (log.isLoggable(Level.INFO)) + log.info("Client TCP connection closed: " + serverAddress()); + } + + /** + * Closes client only if there are no pending requests in map. + * + * @return {@code True} if client was closed. + */ + @Override boolean closeIfIdle(long idleTimeout) { + if (closeReason != null) + return true; + + // Timestamp of the last sent or received message. + long lastMsgTime = Math.max(Math.max(lastMsgSndTime, lastMsgRcvTime), createTs); + + if (lastMsgTime + idleTimeout < System.currentTimeMillis() && pendingReqs.isEmpty()) { + // In case of new request came between empty check and setting closing flag + // await for finishing all requests. + close(CONN_IDLE, true); + + return true; + } + + return false; + } + + /** + * Makes request to server via tcp protocol and returns a future that will be completed when + * response is received. + * + * @param msg Message to request, + * @param destId Destination node identifier. + * @return Response object. + * @throws GridClientConnectionResetException If request failed. + * @throws GridClientClosedException If client was closed. + */ + private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, UUID destId) + throws GridClientConnectionResetException, GridClientClosedException { + return makeRequest(msg, destId, false); + } + + /** + * Makes request to server via tcp protocol and returns a future that will be completed when + * response is received. + * + * @param msg Message to request, + * @param destId Destination node identifier. + * @param keepPortables Keep portables flag. + * @return Response object. + * @throws GridClientConnectionResetException If request failed. + * @throws GridClientClosedException If client was closed. + */ + private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, UUID destId, boolean keepPortables) + throws GridClientConnectionResetException, GridClientClosedException { + assert msg != null; + + TcpClientFuture<R> res = new TcpClientFuture<>(false, keepPortables); + + msg.destinationId(destId); + + return makeRequest(msg, res); + } + + /** + * Makes request to server via tcp protocol and returns a future that will be completed when response is received. + * + * @param msg Message to request, + * @param fut Future that will handle response. + * @return Response object. + * @throws GridClientConnectionResetException If request failed. + * @throws GridClientClosedException If client was closed. + */ + private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, TcpClientFuture<R> fut) + throws GridClientConnectionResetException, GridClientClosedException { + return makeRequest(msg, fut, false); + } + + /** + * Makes request to server via tcp protocol and returns a future that will be completed when response is received. + * + * @param msg Message to request, + * @param fut Future that will handle response. + * @param routeMode If {@code true} then this method should overwrite session token by the cached one, + * otherwise keep original value. + * @return Response object. + * @throws GridClientConnectionResetException If request failed. + * @throws GridClientClosedException If client closed. + */ + private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, final TcpClientFuture<R> fut, + boolean routeMode) throws GridClientConnectionResetException, GridClientClosedException { + assert msg != null; + + if (msg instanceof GridClientPingPacket) { + long now = U.currentTimeMillis(); + + if (Math.min(now, lastPingRcvTime) - lastPingSndTime >= pingTimeout) + close(FAILED, false, + new IOException("Did not receive any packets within ping response interval (connection is " + + "considered to be half-opened) [lastPingReceiveTime=" + lastPingRcvTime + + ", lastPingSendTime=" + lastPingSndTime + ", now=" + now + ", timeout=" + pingTimeout + + ", addr=" + serverAddress() + ']') + ); + // Do not pass ping requests if ping interval didn't pass yet + // or we've already waiting for ping response. + else if (now - lastPingSndTime > pingInterval && lastPingRcvTime != Long.MAX_VALUE) { + lastPingRcvTime = Long.MAX_VALUE; + - ses.send(new GridClientPingPacketWrapper()); ++ ses.send(GridClientPingPacket.PING_MESSAGE); + + lastPingSndTime = now; + } + } + else { + long reqId = reqIdCntr.getAndIncrement(); + + msg.requestId(reqId); + + if (!routeMode) { + msg.clientId(clientId); + msg.sessionToken(sesTok); + } + + fut.pendingMessage(msg); + + checkClosed(closeReason); + + GridClientFutureAdapter old = pendingReqs.putIfAbsent(reqId, fut); + + assert old == null; + - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(msg); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + msg, e); - - removePending(reqId); - - fut.onDone(e); - - return fut; - } - - GridNioFuture<?> sndFut = ses.send(wrapper); ++ GridNioFuture<?> sndFut = ses.send(msg); + + lastMsgSndTime = U.currentTimeMillis(); + + if (routeMode) { + sndFut.listenAsync(new CI1<GridNioFuture<?>>() { + @Override public void apply(GridNioFuture<?> sndFut) { + try { + sndFut.get(); + } + catch (Exception e) { + close(FAILED, false, e); + + fut.onDone(getCloseReasonAsException(FAILED, e)); + } + } + }); + } + else { + try { + sndFut.get(); + } + catch (Exception e) { + throw new GridClientConnectionResetException("Failed to send message over connection " + + "(will try to reconnect): " + serverAddress(), e); + } + } + } + + return fut; + } + + /** + * Handles ping response. + */ + void handlePingResponse() { + lastPingRcvTime = U.currentTimeMillis(); + } + + /** + * Handles incoming response message. If this connection is closed this method would signal empty event + * if there is no more pending requests. + * - * @param req Incoming response data. ++ * @param res Incoming response data. + */ + @SuppressWarnings({"unchecked", "TooBroadScope"}) - void handleResponse(GridClientMessageWrapper req) { ++ void handleResponse(GridClientMessage res) throws IOException { + lastMsgRcvTime = U.currentTimeMillis(); + - TcpClientFuture fut = pendingReqs.get(req.requestId()); ++ TcpClientFuture fut = pendingReqs.get(res.requestId()); + + if (fut == null) { + log.warning("Response for an unknown request is received, ignoring. " + - "[req=" + req + ", ses=" + ses + ']'); ++ "[res=" + res + ", ses=" + ses + ']'); + + return; + } + + if (fut.forward()) { - GridRouterResponse msg = new GridRouterResponse( - req.messageArray(), - req.requestId(), - clientId, - req.destinationId()); - - removePending(msg.requestId()); ++ removePending(res.requestId()); + - fut.onDone(msg); ++ fut.onDone(res); + } + else { - GridClientMessage msg; - - if (keepPortablesMode != null) - keepPortablesMode.set(fut.keepPortables()); - - try { - msg = marsh.unmarshal(req.messageArray()); - } - catch (IOException e) { - fut.onDone(new GridClientException("Failed to unmarshal message.", e)); ++ GridClientMessage res0 = res; + - return; - } ++ if (res instanceof GridRouterResponse) { ++ res0 = marsh.unmarshal(((GridRouterResponse)res).body()); + - finally { - if (keepPortablesMode != null) - keepPortablesMode.set(true); ++ res0.requestId(res.requestId()); ++ res0.clientId(res.clientId()); ++ res0.destinationId(res.destinationId()); + } - msg.requestId(req.requestId()); - msg.clientId(req.clientId()); - msg.destinationId(req.destinationId()); + - if (msg instanceof GridClientResponse) - handleClientResponse(fut, (GridClientResponse)msg); ++ if (res0 instanceof GridClientResponse) ++ handleClientResponse(fut, (GridClientResponse)res0); + else - log.warning("Unsupported response type received: " + msg); ++ log.warning("Unsupported response type received: " + res0); + } + } + + /** + * Handler responses addressed to this client. + * + * @param fut Response future. + * @param resp Response. + */ + @SuppressWarnings("unchecked") + private void handleClientResponse(TcpClientFuture fut, GridClientResponse resp) { + if (resp.sessionToken() != null) + sesTok = resp.sessionToken(); + + GridClientMessage src = fut.pendingMessage(); + + switch (fut.retryState()) { + case TcpClientFuture.STATE_INITIAL: { + if (resp.successStatus() == GridClientResponse.STATUS_AUTH_FAILURE) { + if (credentials() == null) { + fut.onDone(new GridClientAuthenticationException("Authentication failed on server " + + "(client has no credentials) [clientId=" + clientId + + ", srvAddr=" + serverAddress() + ", errMsg=" + resp.errorMessage() +']')); + + return; + } + + fut.retryState(TcpClientFuture.STATE_AUTH_RETRY); + + GridClientAuthenticationRequest req = buildAuthRequest(); + + req.requestId(resp.requestId()); + - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(req); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + req, e); - - removePending(resp.requestId()); - - fut.onDone(e); - - return; - } - - ses.send(wrapper); ++ ses.send(req); + + return; + } + + break; + } + + case TcpClientFuture.STATE_AUTH_RETRY: { + if (resp.successStatus() == GridClientResponse.STATUS_SUCCESS) { + fut.retryState(TcpClientFuture.STATE_REQUEST_RETRY); + + src.sessionToken(sesTok); + - GridClientMessageWrapper wrapper; - - try { - wrapper = messageWrapper(src); - } - catch (IOException e) { - log.log(Level.SEVERE, "Failed to marshal message: " + src, e); - - removePending(resp.requestId()); - - fut.onDone(e); - - return; - } - - ses.send(wrapper); ++ ses.send(src); + + return; + } + + break; + } + } + + removePending(resp.requestId()); + + if (resp.successStatus() == GridClientResponse.STATUS_AUTH_FAILURE) + fut.onDone(new GridClientAuthenticationException("Client authentication failed [clientId=" + clientId + + ", srvAddr=" + serverAddress() + ", errMsg=" + resp.errorMessage() +']')); + else if (resp.errorMessage() != null) + fut.onDone(new GridClientException(resp.errorMessage())); + else + fut.onDone(resp.result()); + } + + /** - * @param msg Client message. - * @return Message wrapper for direct marshalling. - * @throws IOException If failed to marshal message. - */ - private GridClientMessageWrapper messageWrapper(GridClientMessage msg) throws IOException { - GridClientMessageWrapper wrapper = new GridClientMessageWrapper(); - - wrapper.requestId(msg.requestId()); - wrapper.clientId(clientId); - wrapper.destinationId(msg.destinationId()); - - ByteBuffer data = (msg instanceof GridRouterRequest) ? ByteBuffer.wrap(((GridRouterRequest)msg).body()) : - marsh.marshal(msg, 0); - - wrapper.message(data); - wrapper.messageSize(data.remaining() + 40); - - return wrapper; - } - - /** + * Removes pending request and signals to {@link #closedLatch} if necessary. + * + * @param reqId Request Id. + */ + private void removePending(long reqId) { + pendingReqs.remove(reqId); + + if (pendingReqs.isEmpty() && closeReason != null) + closedLatch.countDown(); + } + + /** + * Builds authentication request message with credentials taken from credentials object. + * + * @return AuthenticationRequest message. + */ + private GridClientAuthenticationRequest buildAuthRequest() { + GridClientAuthenticationRequest req = new GridClientAuthenticationRequest(); + + req.clientId(clientId); + + req.credentials(credentials()); + + return req; + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Boolean> cachePutAll(String cacheName, Map<K, V> entries, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert entries != null; + + GridClientCacheRequest req = new GridClientCacheRequest(PUT_ALL); + + req.cacheName(cacheName); + req.values((Map<Object, Object>)entries); + req.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(req, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Map<K, V>> cacheGetAll(String cacheName, Collection<K> keys, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert keys != null; + + GridClientCacheRequest req = new GridClientCacheRequest(GET_ALL); + + req.cacheName(cacheName); + req.keys((Iterable<Object>)keys); + req.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(req, destNodeId, flags.contains(KEEP_PORTABLES)); + } + + /** {@inheritDoc} */ + @Override public <K> GridClientFutureAdapter<Boolean> cacheRemove(String cacheName, K key, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + GridClientCacheRequest req = new GridClientCacheRequest(RMV); + + req.cacheName(cacheName); + req.key(key); + req.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(req, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <K> GridClientFutureAdapter<Boolean> cacheRemoveAll(String cacheName, Collection<K> keys, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert keys != null; + + GridClientCacheRequest req = new GridClientCacheRequest(RMV_ALL); + + req.cacheName(cacheName); + req.keys((Iterable<Object>)keys); + req.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(req, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Boolean> cacheReplace(String cacheName, K key, V val, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert key != null; + assert val != null; + + GridClientCacheRequest replace = new GridClientCacheRequest(REPLACE); + + replace.cacheName(cacheName); + replace.key(key); + replace.value(val); + replace.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(replace, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Boolean> cacheCompareAndSet(String cacheName, K key, V newVal, + V oldVal, Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert key != null; + + GridClientCacheRequest msg = new GridClientCacheRequest(CAS); + + msg.cacheName(cacheName); + msg.key(key); + msg.value(newVal); + msg.value2(oldVal); + msg.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(msg, destNodeId); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K> GridClientFutureAdapter<GridClientDataMetrics> cacheMetrics(String cacheName, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + GridClientCacheRequest metrics = new GridClientCacheRequest(METRICS); + + metrics.cacheName(cacheName); + metrics.destinationId(destNodeId); + + TcpClientFuture fut = new TcpClientFuture() { + @Override public void onDone(Object res) { + super.onDone(metricsMapToMetrics((Map<String, Number>)res)); + } + }; + + return makeRequest(metrics, fut); + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Boolean> cacheAppend(String cacheName, K key, V val, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert key != null; + assert val != null; + + GridClientCacheRequest append = new GridClientCacheRequest(APPEND); + + append.cacheName(cacheName); + append.key(key); + append.value(val); + append.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(append, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <K, V> GridClientFutureAdapter<Boolean> cachePrepend(String cacheName, K key, V val, + Set<GridClientCacheFlag> flags, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + assert key != null; + assert val != null; + + GridClientCacheRequest prepend = new GridClientCacheRequest(PREPEND); + + prepend.cacheName(cacheName); + prepend.key(key); + prepend.value(val); + prepend.cacheFlagsOn(encodeCacheFlags(flags)); + + return makeRequest(prepend, destNodeId); + } + + /** {@inheritDoc} */ + @Override public <R> GridClientFutureAdapter<R> execute(String taskName, Object arg, UUID destNodeId, + final boolean keepPortables) throws GridClientConnectionResetException, GridClientClosedException { + GridClientTaskRequest msg = new GridClientTaskRequest(); + + msg.taskName(taskName); + msg.argument(arg); + msg.keepPortables(keepPortables); + + return this.<GridClientTaskResultBean>makeRequest(msg, destNodeId).chain( + new GridClientFutureCallback<GridClientTaskResultBean, R>() { + @Override public R onComplete(GridClientFuture<GridClientTaskResultBean> fut) + throws GridClientException { + return fut.get().getResult(); + } + }); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public GridClientFuture<GridClientNode> node(final UUID id, boolean inclAttrs, boolean inclMetrics, + UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException { + assert id != null; + + TcpClientFuture fut = refreshNodeReqs.get(id); + + // Return request that is in progress. + if (fut != null) + return fut; + + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + fut = new TcpClientFuture() { + @Override public void onDone(Object res) { + //Clean up the node id requests map. + refreshNodeReqs.remove(id); + + GridClientNodeImpl node = nodeBeanToNode((GridClientNodeBean)res); + + if (node != null) + top.updateNode(node); + + super.onDone(node); + } + }; + + GridClientFutureAdapter old = refreshNodeReqs.putIfAbsent(id, fut); + + // If concurrent thread put request, do not send the message. + if (old != null) + return old; + + msg.nodeId(id); + msg.includeAttributes(inclAttrs); + msg.includeMetrics(inclMetrics); + msg.destinationId(destNodeId); + + return makeRequest(msg, fut); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public GridClientFuture<GridClientNode> node(String ipAddr, boolean inclAttrs, boolean includeMetrics, + UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException { + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + TcpClientFuture fut = new TcpClientFuture() { + @Override public void onDone(Object res) { + GridClientNodeImpl node = nodeBeanToNode((GridClientNodeBean)res); + + if (node != null) + super.onDone(top.updateNode(node)); + else + super.onDone(node); + } + }; + + msg.nodeIp(ipAddr); + msg.includeAttributes(inclAttrs); + msg.includeMetrics(includeMetrics); + msg.destinationId(destNodeId); + + return makeRequest(msg, fut); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public GridClientFuture<List<GridClientNode>> topology(boolean inclAttrs, boolean inclMetrics, + UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException { + GridClientTopologyRequest msg = new GridClientTopologyRequest(); + + TcpClientFuture fut = new TcpClientFuture() { + @Override public void onDone(Object res) { + Collection<GridClientNodeBean> beans = (Collection<GridClientNodeBean>)res; + + Collection<GridClientNodeImpl> nodes = new ArrayList<>(beans.size()); + + for (GridClientNodeBean bean : beans) + nodes.add(nodeBeanToNode(bean)); + + super.onDone(top.updateTopology(nodes)); + } + }; + + msg.includeAttributes(inclAttrs); + msg.includeMetrics(inclMetrics); + msg.destinationId(destNodeId); + + return makeRequest(msg, fut); + } + + /** {@inheritDoc} */ + @Override public GridClientFutureAdapter<GridRouterRequest> forwardMessage(Object msg) + throws GridClientException { + assert msg instanceof GridRouterRequest; + + TcpClientFuture<GridRouterRequest> res = new TcpClientFuture<>(true, false); + + makeRequest((GridClientMessage)msg, res, true); + + return res; + } + + /** + * Creates client node instance from message. + * + * @param nodeBean Node bean message. + * @return Created node. + */ + @Nullable private GridClientNodeImpl nodeBeanToNode(@Nullable GridClientNodeBean nodeBean) { + if (nodeBean == null) + return null; + + GridClientNodeImpl.Builder nodeBuilder = GridClientNodeImpl.builder() + .nodeId(nodeBean.getNodeId()) + .consistentId(nodeBean.getConsistentId()) + .tcpAddresses(nodeBean.getTcpAddresses()) + .tcpPort(nodeBean.getTcpPort()) + .replicaCount(nodeBean.getReplicaCount()); + + Map<String, GridClientCacheMode> caches = new HashMap<>(); + + if (nodeBean.getCaches() != null) { + for (Map.Entry<String, String> e : nodeBean.getCaches().entrySet()) { + try { + caches.put(e.getKey(), GridClientCacheMode.valueOf(e.getValue())); + } + catch (IllegalArgumentException ignored) { + log.warning("Invalid cache mode received from remote node (will ignore) [srv=" + serverAddress() + + ", cacheName=" + e.getKey() + ", cacheMode=" + e.getValue() + ']'); + } + } + } + + if (nodeBean.getDefaultCacheMode() != null) { + try { + caches.put(null, GridClientCacheMode.valueOf(nodeBean.getDefaultCacheMode())); + } + catch (IllegalArgumentException ignored) { + log.warning("Invalid cache mode received for default cache from remote node (will ignore) [srv=" + + serverAddress() + ", cacheMode=" + nodeBean.getDefaultCacheMode() + ']'); + } + } + + if (!caches.isEmpty()) + nodeBuilder.caches(caches); + + if (nodeBean.getAttributes() != null) + nodeBuilder.attributes(nodeBean.getAttributes()); + + GridClientNodeMetricsBean metricsBean = nodeBean.getMetrics(); + + if (metricsBean != null) { + GridClientNodeMetricsAdapter metrics = new GridClientNodeMetricsAdapter(); + + metrics.setStartTime(metricsBean.getStartTime()); + metrics.setAverageActiveJobs(metricsBean.getAverageActiveJobs()); + metrics.setAverageCancelledJobs(metricsBean.getAverageCancelledJobs()); + metrics.setAverageCpuLoad(metricsBean.getAverageCpuLoad()); + metrics.setAverageJobExecuteTime(metricsBean.getAverageJobExecuteTime()); + metrics.setAverageJobWaitTime(metricsBean.getAverageJobWaitTime()); + metrics.setAverageRejectedJobs(metricsBean.getAverageRejectedJobs()); + metrics.setAverageWaitingJobs(metricsBean.getAverageWaitingJobs()); + metrics.setCurrentActiveJobs(metricsBean.getCurrentActiveJobs()); + metrics.setCurrentCancelledJobs(metricsBean.getCurrentCancelledJobs()); + metrics.setCurrentCpuLoad(metricsBean.getCurrentCpuLoad()); + metrics.setCurrentGcCpuLoad(metricsBean.getCurrentGcCpuLoad()); + metrics.setCurrentDaemonThreadCount(metricsBean.getCurrentDaemonThreadCount()); + metrics.setCurrentIdleTime(metricsBean.getCurrentIdleTime()); + metrics.setCurrentJobExecuteTime(metricsBean.getCurrentJobExecuteTime()); + metrics.setCurrentJobWaitTime(metricsBean.getCurrentJobWaitTime()); + metrics.setCurrentRejectedJobs(metricsBean.getCurrentRejectedJobs()); + metrics.setCurrentThreadCount(metricsBean.getCurrentThreadCount()); + metrics.setCurrentWaitingJobs(metricsBean.getCurrentWaitingJobs()); + metrics.setFileSystemFreeSpace(metricsBean.getFileSystemFreeSpace()); + metrics.setFileSystemTotalSpace(metricsBean.getFileSystemTotalSpace()); + metrics.setFileSystemUsableSpace(metricsBean.getFileSystemUsableSpace()); + metrics.setHeapMemoryCommitted(metricsBean.getHeapMemoryCommitted()); + metrics.setHeapMemoryInitialized(metricsBean.getHeapMemoryInitialized()); + metrics.setHeapMemoryMaximum(metricsBean.getHeapMemoryMaximum()); + metrics.setHeapMemoryUsed(metricsBean.getHeapMemoryUsed()); + metrics.setLastDataVersion(metricsBean.getLastDataVersion()); + metrics.setLastUpdateTime(metricsBean.getLastUpdateTime()); + metrics.setMaximumActiveJobs(metricsBean.getMaximumActiveJobs()); + metrics.setMaximumCancelledJobs(metricsBean.getMaximumCancelledJobs()); + metrics.setMaximumJobExecuteTime(metricsBean.getMaximumJobExecuteTime()); + metrics.setMaximumJobWaitTime(metricsBean.getMaximumJobWaitTime()); + metrics.setMaximumRejectedJobs(metricsBean.getMaximumRejectedJobs()); + metrics.setMaximumThreadCount(metricsBean.getMaximumThreadCount()); + metrics.setMaximumWaitingJobs(metricsBean.getMaximumWaitingJobs()); + metrics.setNodeStartTime(metricsBean.getNodeStartTime()); + metrics.setNonHeapMemoryCommitted(metricsBean.getNonHeapMemoryCommitted()); + metrics.setNonHeapMemoryInitialized(metricsBean.getNonHeapMemoryInitialized()); + metrics.setNonHeapMemoryMaximum(metricsBean.getNonHeapMemoryMaximum()); + metrics.setNonHeapMemoryUsed(metricsBean.getNonHeapMemoryUsed()); + metrics.setStartTime(metricsBean.getStartTime()); + metrics.setTotalCancelledJobs(metricsBean.getTotalCancelledJobs()); + metrics.setTotalCpus(metricsBean.getTotalCpus()); + metrics.setTotalExecutedJobs(metricsBean.getTotalExecutedJobs()); + metrics.setTotalIdleTime(metricsBean.getTotalIdleTime()); + metrics.setTotalRejectedJobs(metricsBean.getTotalRejectedJobs()); + metrics.setTotalStartedThreadCount(metricsBean.getTotalStartedThreadCount()); + metrics.setTotalExecutedTasks(metricsBean.getTotalExecutedTasks()); + metrics.setSentMessagesCount(metricsBean.getSentMessagesCount()); + metrics.setSentBytesCount(metricsBean.getSentBytesCount()); + metrics.setReceivedMessagesCount(metricsBean.getReceivedMessagesCount()); + metrics.setReceivedBytesCount(metricsBean.getReceivedBytesCount()); + metrics.setUpTime(metricsBean.getUpTime()); + + nodeBuilder.metrics(metrics); + } + + return nodeBuilder.build(); + } + + /** + * Future extension that holds client tcp message and auth retry flag. + */ + private static class TcpClientFuture<R> extends GridClientFutureAdapter<R> { + /** */ + private static final long serialVersionUID = 0L; + + /** Initial request. */ + private static final int STATE_INITIAL = 0; + + /** Authentication retry. */ + private static final int STATE_AUTH_RETRY = 1; + + /** Request retry after auth retry. */ + private static final int STATE_REQUEST_RETRY = 2; + + /** Flag indicating if connected message is a forwarded. */ + private final boolean forward; + + /** Keep portables flag. */ + private final boolean keepPortables; + + /** Pending message for this future. */ + private GridClientMessage pendingMsg; + + /** Flag indicating whether authentication retry was attempted for this request. */ + @SuppressWarnings("RedundantFieldInitialization") + private int authRetry = STATE_INITIAL; + + /** + * Creates new future with {@code forward} flag set to {@code false}. + */ + private TcpClientFuture() { + forward = false; + keepPortables = false; + } + + /** + * Creates new future with the given {@code forward} flag value. + * + * @param forward Flag value. + */ + private TcpClientFuture(boolean forward, boolean keepPortables) { + this.forward = forward; + this.keepPortables = keepPortables; + } + + /** + * @return Originating request message. + */ + public GridClientMessage pendingMessage() { + return pendingMsg; + } + + /** + * @param pendingMsg Originating request message. + */ + public void pendingMessage(GridClientMessage pendingMsg) { + this.pendingMsg = pendingMsg; + } + + /** + * @return Whether or not authentication retry attempted. + */ + public int retryState() { + return authRetry; + } + + /** + * @param authRetry Whether or not authentication retry attempted. + */ + public void retryState(int authRetry) { + this.authRetry = authRetry; + } + + /** + * @return {@code true} if this future created for forwarded message, {@code false} otherwise. + */ + public boolean forward() { + return forward; + } + + /** + * @return Keep portables flag. + */ + public boolean keepPortables() { + return keepPortables; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TcpClientFuture [state=" + authRetry + ", forward=" + forward + ", message=" + pendingMsg + "]"; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java index 7df1121,0000000..a15cb65 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java @@@ -1,450 -1,0 +1,450 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl.connection; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; +import java.util.logging.*; + - import static org.apache.ignite.internal.GridNodeAttributes.*; ++import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * Client topology cache. + */ +public class GridClientTopology { + /** Logger. */ + private static final Logger log = Logger.getLogger(GridClientTopology.class.getName()); + + /** Topology cache */ + private Map<UUID, GridClientNodeImpl> nodes = Collections.emptyMap(); + + /** Cached last error prevented topology from update. */ + private GridClientException lastError; + + /** + * Set of router addresses to infer direct connectivity + * when client is working in router connection mode. + * {@code null} when client is working in direct connection node. + */ + private final Set<String> routerAddrs; + + /** Protocol. */ + private final GridClientProtocol prot; + + /** Flag indicating whether metrics should be cached. */ + private final boolean metricsCache; + + /** Flag indicating whether metrics should be cached. */ + private final boolean attrCache; + + /** Lock for topology changing. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Topology listeners. */ + private final Collection<GridClientTopologyListener> topLsnrs = new ConcurrentLinkedQueue<>(); + + /** Executor for listener notification. */ + private final ExecutorService exec = + Executors.newSingleThreadExecutor(new GridClientThreadFactory("top-lsnr", true)); + + /** + * Creates topology instance. + * + * @param cfg Client configuration. + */ + public GridClientTopology(GridClientConfiguration cfg) { + metricsCache = cfg.isEnableMetricsCache(); + attrCache = cfg.isEnableAttributesCache(); + prot = cfg.getProtocol(); + routerAddrs = (!cfg.getRouters().isEmpty() && cfg.getServers().isEmpty()) ? + new HashSet<>(cfg.getRouters()) : null; + } + + /** + * Adds topology listener. + * + * @param lsnr Topology listener. + */ + public void addTopologyListener(GridClientTopologyListener lsnr) { + topLsnrs.add(lsnr); + } + + /** + * Removes topology listener. + * + * @param lsnr Topology listener. + */ + public void removeTopologyListener(GridClientTopologyListener lsnr) { + topLsnrs.remove(lsnr); + } + + /** + * Returns all added topology listeners. + * + * @return Unmodifiable view of topology listeners. + */ + public Collection<GridClientTopologyListener> topologyListeners() { + return Collections.unmodifiableCollection(topLsnrs); + } + + /** + * Updates topology if cache enabled. If cache is disabled, returns original node. + * + * @param node Converted rest server response. + * @return Node in topology. + */ + public GridClientNode updateNode(GridClientNodeImpl node) { + lock.writeLock().lock(); + + try { + boolean newNode = !nodes.containsKey(node.nodeId()); + + GridClientNodeImpl preparedNode = prepareNode(node); + + // We update the whole topology if node was not in topology or we cache metrics. + if (newNode || metricsCache || attrCache) { + Map<UUID, GridClientNodeImpl> updatedTop = new HashMap<>(nodes); + + updatedTop.put(node.nodeId(), preparedNode); + + // Change the reference to new topology. + // So everyone who captured old version will see a consistent snapshot. + nodes = updatedTop; + lastError = null; + } + + if (newNode) + notifyEvents(Collections.singletonList(new TopologyEvent(true, preparedNode))); + + return preparedNode; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Updates (if cache is enabled) the whole topology. If cache is disabled, original collection is returned. + * + * @param nodeList Converted rest server response. + * @return Topology nodes. + */ + public Collection<? extends GridClientNode> updateTopology(Collection<GridClientNodeImpl> nodeList) { + Collection<TopologyEvent> evts = new LinkedList<>(); + + lock.writeLock().lock(); + + try { + Map<UUID, GridClientNodeImpl> updated = new HashMap<>(); + + Collection<GridClientNodeImpl> preparedNodes = F.transform(nodeList, + new C1<GridClientNodeImpl, GridClientNodeImpl>() { + @Override public GridClientNodeImpl apply(GridClientNodeImpl e) { + return prepareNode(e); + } + }); + + for (GridClientNodeImpl node : preparedNodes) { + updated.put(node.nodeId(), node); + + // Generate add events. + if (!nodes.containsKey(node.nodeId())) + evts.add(new TopologyEvent(true, node)); + } + + for (Map.Entry<UUID, GridClientNodeImpl> e : nodes.entrySet()) { + if (!updated.containsKey(e.getKey())) + evts.add(new TopologyEvent(false, e.getValue())); + } + + nodes = updated; + lastError = null; + + if (!evts.isEmpty()) + notifyEvents(evts); + + return preparedNodes; + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Marks topology as failed. After this method called all accessors will throw exception + * until a next successful update. + * + * @param cause Exception caused the failure. + */ + public void fail(GridClientException cause) { + lock.writeLock().lock(); + + try { + lastError = cause; + + for (GridClientNode n : nodes.values()) + notifyEvents(Collections.singletonList(new TopologyEvent(false, n))); + + nodes = Collections.emptyMap(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Updates topology when node that is expected to be in topology fails. + * + * @param nodeId Node id for which node failed to be obtained. + */ + public void nodeFailed(UUID nodeId) { + lock.writeLock().lock(); + + try { + boolean nodeDeleted = nodes.containsKey(nodeId); + + GridClientNode deleted = null; + + // We update the whole topology if node was not in topology or we cache metrics. + if (nodeDeleted) { + Map<UUID, GridClientNodeImpl> updatedTop = new HashMap<>(nodes); + + deleted = updatedTop.remove(nodeId); + + // Change the reference to new topology. + // So everyone who captured old version will see a consistent snapshot. + nodes = updatedTop; + } + + if (nodeDeleted) + notifyEvents(Collections.singletonList(new TopologyEvent(false, deleted))); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Gets node from last saved topology snapshot by it's id. + * + * @param id Node id. + * @return Node or {@code null} if node was not found. + * @throws GridClientException If topology is failed and no nodes available. + */ + public GridClientNode node(UUID id) throws GridClientException { + assert id != null; + + lock.readLock().lock(); + + try { + if (lastError != null) + throw new GridClientDisconnectedException( + "Topology is failed [protocol=" + prot + ", routers=" + routerAddrs + ']', lastError); + else + return nodes.get(id); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Gets a collection of nodes from last saved topology snapshot by their ids. + * + * @param ids Collection of ids for which nodes should be retrieved.. + * @return Collection of nodes that are in topology. + * @throws GridClientException If topology is failed and no nodes available. + */ + public Collection<GridClientNode> nodes(Iterable<UUID> ids) throws GridClientException { + assert ids != null; + + Collection<GridClientNode> res = new LinkedList<>(); + + lock.readLock().lock(); + + try { + if (lastError != null) + throw new GridClientDisconnectedException( + "Latest topology update failed.", lastError); + + for (UUID id : ids) { + GridClientNodeImpl node = nodes.get(id); + + if (node != null) + res.add(node); + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Gets full topology snapshot. + * + * @return Collection of nodes that were in last captured topology snapshot. + * @throws GridClientException If topology is failed and no nodes available. + */ + public Collection<GridClientNodeImpl> nodes() throws GridClientException { + lock.readLock().lock(); + + try { + if (lastError != null) + throw new GridClientDisconnectedException( + "Latest topology update failed.", lastError); + + return Collections.unmodifiableCollection(nodes.values()); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * @return Whether topology is failed. + */ + public boolean failed() { + lock.readLock().lock(); + + try { + return lastError != null; + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Shutdowns executor service that performs listener notification. + */ + public void shutdown() { + GridClientUtils.shutdownNow(GridClientTopology.class, exec, log); + } + + /** + * Updates node properties according to current topology settings. + * Particularly attributes and metrics caching policies. + * + * @param node Node to be processed. + * @return The same node if cache is enabled or node contains no attributes and metrics, + * otherwise will return new node without attributes and metrics. + */ + private GridClientNodeImpl prepareNode(final GridClientNodeImpl node) { + final boolean noAttrsAndMetrics = + (metricsCache && attrCache) || (node.attributes().isEmpty() && node.metrics() == null); + + // Try to bypass object copying. + if (noAttrsAndMetrics && routerAddrs == null && node.connectable()) + return node; + + // Return a new node instance based on the original one. + GridClientNodeImpl.Builder nodeBuilder = GridClientNodeImpl.builder(node, !attrCache, !metricsCache); + + for (InetSocketAddress addr : node.availableAddresses(prot, true)) { + boolean router = routerAddrs == null || + routerAddrs.contains(addr.getHostName() + ":" + addr.getPort()) || + routerAddrs.contains(addr.getAddress().getHostAddress() + ":" + addr.getPort()); + + boolean reachable = noAttrsAndMetrics || !addr.getAddress().isLoopbackAddress() || + F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", ")); + + if (router && reachable) { + nodeBuilder.connectable(true); + + break; + } + } + + return nodeBuilder.build(); + } + + /** + * Runs listener notification is separate thread. + * + * @param evts Event list. + */ + private void notifyEvents(final Iterable<TopologyEvent> evts) { + try { + exec.execute(new Runnable() { + @Override public void run() { + for (TopologyEvent evt : evts) { + if (evt.added()) { + for (GridClientTopologyListener lsnr : topLsnrs) + lsnr.onNodeAdded(evt.node()); + } + else { + for (GridClientTopologyListener lsnr : topLsnrs) + lsnr.onNodeRemoved(evt.node()); + } + } + } + }); + } + catch (RejectedExecutionException e) { + log.warning("Unable to notify event listeners on topology change since client is shutting down: " + + e.getMessage()); + } + } + + /** + * Event for node adding and removal. + */ + private static class TopologyEvent { + /** Added or removed flag */ + private boolean added; + + /** Node that triggered event. */ + private GridClientNode node; + + /** + * Creates a new event. + * + * @param added If {@code true}, indicates that node was added to topology. + * If {@code false}, indicates that node was removed. + * @param node Added or removed node. + */ + private TopologyEvent(boolean added, GridClientNode node) { + this.added = added; + this.node = node; + } + + /** + * @return Flag indicating whether node was added or removed. + */ + private boolean added() { + return added; + } + + /** + * @return Node that triggered event. + */ + private GridClientNode node() { + return node; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java index f9695fa,0000000..c929575 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java @@@ -1,103 -1,0 +1,103 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.marshaller.optimized; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.internal.processors.rest.client.message.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** - * Wrapper, that adapts {@link org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller} to - * {@link org.apache.ignite.internal.client.marshaller.GridClientMarshaller} interface. ++ * Wrapper, that adapts {@link org.apache.ignite.marshaller.optimized.OptimizedMarshaller} to ++ * {@link GridClientMarshaller} interface. + */ +public class GridClientOptimizedMarshaller implements GridClientMarshaller { + /** ID. */ + public static final byte ID = 1; + + /** Optimized marshaller. */ - private final IgniteOptimizedMarshaller opMarsh; ++ private final OptimizedMarshaller opMarsh; + + /** + * Default constructor. + */ + public GridClientOptimizedMarshaller() { - opMarsh = new IgniteOptimizedMarshaller(); ++ opMarsh = new OptimizedMarshaller(); + } + + /** + * Constructs optimized marshaller with specific parameters. + * + * @param requireSer Flag to enforce {@link Serializable} interface or not. If {@code true}, + * then objects will be required to implement {@link Serializable} in order to be + * marshalled, if {@code false}, then such requirement will be relaxed. + * @param clsNames User preregistered class names. + * @param clsNamesPath Path to a file with user preregistered class names. + * @param poolSize Object streams pool size. + * @throws IOException If an I/O error occurs while writing stream header. + * @throws IgniteException If this marshaller is not supported on the current JVM. - * @see org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller ++ * @see org.apache.ignite.marshaller.optimized.OptimizedMarshaller + */ + public GridClientOptimizedMarshaller(boolean requireSer, List<String> clsNames, String clsNamesPath, int poolSize) + throws IOException { + try { - opMarsh = new IgniteOptimizedMarshaller(requireSer, clsNames, clsNamesPath, poolSize); ++ opMarsh = new OptimizedMarshaller(requireSer, clsNames, clsNamesPath, poolSize); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public ByteBuffer marshal(Object obj, int off) throws IOException { + try { + if (!(obj instanceof GridClientMessage)) + throw new IOException("Message serialization of given type is not supported: " + + obj.getClass().getName()); + + byte[] bytes = opMarsh.marshal(obj); + + ByteBuffer buf = ByteBuffer.allocate(off + bytes.length); + + buf.position(off); + + buf.put(bytes); + + buf.flip(); + + return buf; + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> T unmarshal(byte[] bytes) throws IOException { + try { + return opMarsh.unmarshal(bytes, null); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java index f5dc44c,0000000..479fd8c mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java @@@ -1,88 -1,0 +1,88 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router; + +import org.apache.ignite.mxbean.*; + +import java.util.*; + +/** + * MBean interface for TCP router. + */ - @IgniteMXBeanDescription("MBean for TCP router.") ++@MXBeanDescription("MBean for TCP router.") +public interface GridTcpRouterMBean { + /** + * Gets host for TCP binary protocol server. + * + * @return TCP host. + */ - @IgniteMXBeanDescription("Host for TCP binary protocol server.") ++ @MXBeanDescription("Host for TCP binary protocol server.") + public String getHost(); + + /** + * Gets port for TCP binary protocol server. + * + * @return TCP port. + */ - @IgniteMXBeanDescription("Port for TCP binary protocol server.") ++ @MXBeanDescription("Port for TCP binary protocol server.") + public int getPort(); + + /** + * Gets a flag indicating whether or not remote clients will be required to have a valid SSL certificate which + * validity will be verified with trust manager. + * + * @return Whether or not client authentication is required. + */ - @IgniteMXBeanDescription("Flag indicating whether or not SSL is enabled for incoming connections.") ++ @MXBeanDescription("Flag indicating whether or not SSL is enabled for incoming connections.") + public boolean isSslEnabled(); + + /** + * Gets a flag indicating whether or not remote clients will be required to have a valid SSL certificate which + * validity will be verified with trust manager. + * + * @return Whether or not client authentication is required. + */ - @IgniteMXBeanDescription("Flag indicating whether or not remote clients are required to have a valid SSL certificate.") ++ @MXBeanDescription("Flag indicating whether or not remote clients are required to have a valid SSL certificate.") + public boolean isSslClientAuth(); + + /** + * Gets list of server addresses where router's embedded client should connect. + * + * @return List of server addresses. + */ - @IgniteMXBeanDescription("Gets list of server addresses where router's embedded client should connect.") ++ @MXBeanDescription("Gets list of server addresses where router's embedded client should connect.") + public Collection<String> getServers(); + + /** + * Returns number of messages received by this router. + * Note that this parameter has approximate value. + * + * @return Number of messages received by this router. + */ - @IgniteMXBeanDescription("Number of messages received by this router.") ++ @MXBeanDescription("Number of messages received by this router.") + public long getReceivedCount(); + + /** + * Returns number of responses returned by this router. + * Note that this parameter has approximate value. + * + * @return Number of responses returned by this router. + */ - @IgniteMXBeanDescription("Number of responses returned by this router.") ++ @MXBeanDescription("Number of responses returned by this router.") + public long getSendCount(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java index 08090c9,0000000..31c84a9 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java @@@ -1,200 -1,0 +1,200 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.client.router.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.client.util.GridClientUtils.*; + +/** + * A {@link GridClient} router implementation. + */ +class GridRouterClientImpl implements GridClient { + /** Decorated client implementation. */ + private final GridClientImpl clientImpl; + + /** Client configuration. */ + private final GridClientConfiguration cliCfg; + + /** TCP connection managers. */ + private final ConcurrentMap<Byte, GridClientConnectionManager> connMgrMap = new ConcurrentHashMap8<>(); + + /** + * Creates a new TCP client based on the given configuration. + * + * @param id Client identifier. + * @param routerCfg Router configuration. + * @throws GridClientException If client configuration is incorrect. + * @throws GridServerUnreachableException If none of the servers + * specified in configuration can be reached. + */ + GridRouterClientImpl(UUID id, GridTcpRouterConfiguration routerCfg) throws GridClientException { + GridClientConfiguration cliCfg = new GridClientConfiguration(); + + cliCfg.setServers(routerCfg.getServers()); + cliCfg.setSslContextFactory(routerCfg.getSslContextFactory()); + cliCfg.setSecurityCredentialsProvider(routerCfg.getSecurityCredentialsProvider()); + + this.cliCfg = cliCfg; + - clientImpl = new GridClientImpl(id, cliCfg); ++ clientImpl = new GridClientImpl(id, cliCfg, true); + + if (cliCfg.getProtocol() != GridClientProtocol.TCP) + throw new AssertionError("Unknown protocol: " + cliCfg.getProtocol()); + } + + /** + * Send a raw packet "as is" directly to the given node. + * The exact types of acceptable arguments and return values depends on underlying connections. + * + * @param msg Raw message to send. + * @param destId Id of node to send message to. If {@code null} than node will be chosen + * from the topology randomly. + * @return Future, representing forwarded message. + * @throws GridServerUnreachableException If destination node can't be reached. + * @throws GridClientClosedException If client is closed. + * @throws GridClientException If any other client-related error occurs. + * @throws InterruptedException If router was interrupted while trying. + * to establish connection with destination node. + */ + GridClientFutureAdapter<?> forwardMessage(Object msg, @Nullable UUID destId, byte marshId) + throws GridClientException, InterruptedException { + GridClientTopology top = clientImpl.topology(); + + GridClientNode dest = destId != null ? + top.node(destId) : cliCfg.getBalancer().balancedNode( + applyFilter(top.nodes(), new GridClientPredicate<GridClientNodeImpl>() { + @Override public boolean apply(GridClientNodeImpl e) { + return restAvailable(e, cliCfg.getProtocol()); + } + })); + + if (dest == null) + throw new GridServerUnreachableException("Failed to resolve node for specified destination ID: " + destId); + + GridClientConnectionManager connMgr = connectionManager(marshId); + + GridClientConnection conn = null; + + // No reconnection handling there. Let client to do it if needed. + GridClientException cause; + + try { + conn = connMgr.connection(dest); + + return conn.forwardMessage(msg); + } + catch (GridClientConnectionResetException e) { + if (destId != null) + connMgr.terminateConnection(conn, top.node(destId), e); + else + connMgr.terminateConnection(conn, null, e); + + cause = e; + } + catch (GridClientException e) { + cause = e; + } + + GridClientFutureAdapter<Object> fail = new GridClientFutureAdapter<>(); + + fail.onDone(cause); + + return fail; + } + + /** + * @param marshId Marshaller ID. + * @return Connection manager. + * @throws GridClientException In case of error. + */ + private GridClientConnectionManager connectionManager(byte marshId) throws GridClientException { + GridClientConnectionManager mgr = connMgrMap.get(marshId); + + if (mgr == null) { + GridClientConnectionManager old = connMgrMap.putIfAbsent(marshId, mgr = - clientImpl.newConnectionManager(marshId)); ++ clientImpl.newConnectionManager(marshId, true)); + + if (old != null) + mgr = old; + } + + return mgr; + } + + /** + * Closes client. + * @param wait If {@code true} will wait for all pending requests to be proceeded. + */ + public void stop(boolean wait) { + clientImpl.stop(wait); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return clientImpl.id(); + } + + /** {@inheritDoc} */ + @Override public GridClientData data() throws GridClientException { + return clientImpl.data(); + } + + /** {@inheritDoc} */ + @Override public GridClientData data(String cacheName) throws GridClientException { + return clientImpl.data(cacheName); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute compute() { + return clientImpl.compute(); + } + + /** {@inheritDoc} */ + @Override public void addTopologyListener(GridClientTopologyListener lsnr) { + clientImpl.addTopologyListener(lsnr); + } + + /** {@inheritDoc} */ + @Override public void removeTopologyListener(GridClientTopologyListener lsnr) { + clientImpl.removeTopologyListener(lsnr); + } + + /** {@inheritDoc} */ + @Override public Collection<GridClientTopologyListener> topologyListeners() { + return clientImpl.topologyListeners(); + } + + /** {@inheritDoc} */ + @Override public boolean connected() { + return clientImpl.connected(); + } + + /** {@inheritDoc} */ + @Override public void close() { + clientImpl.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java index ccd0cc9,0000000..946896c mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java @@@ -1,169 -1,0 +1,169 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router.impl; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.router.*; +import org.apache.ignite.internal.processors.spring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; + +import java.net.*; +import java.util.*; +import java.util.logging.*; + +import static org.apache.ignite.internal.GridProductImpl.*; +import static org.apache.ignite.internal.IgniteComponentType.*; + +/** + * Loader class for router. + */ +public class GridRouterCommandLineStartup { + /** Logger. */ + @SuppressWarnings("FieldCanBeLocal") + private IgniteLogger log; + + /** TCP router. */ + private LifecycleAware tcpRouter; + + /** + * Search given context for required configuration and starts router. + * + * @param beans Beans loaded from spring configuration file. + */ + public void start(Map<Class<?>, Object> beans) { + log = (IgniteLogger)beans.get(IgniteLogger.class); + + if (log == null) { + U.error(log, "Failed to find logger definition in application context. Stopping the router."); + + return; + } + + GridTcpRouterConfiguration tcpCfg = (GridTcpRouterConfiguration)beans.get(GridTcpRouterConfiguration.class); + + if (tcpCfg == null) + U.warn(log, "TCP router startup skipped (configuration not found)."); + else { + tcpRouter = new GridTcpRouterImpl(tcpCfg); + + try { + tcpRouter.start(); + } + catch (Exception e) { + U.error(log, "Failed to start TCP router on port " + tcpCfg.getPort() + ": " + e.getMessage(), e); + + tcpRouter = null; + } + } + } + + /** + * Stops router. + */ + public void stop() { + if (tcpRouter != null) { + try { + tcpRouter.stop(); + } + catch (Exception e) { + U.error(log, "Error while stopping the router.", e); + } + } + } + + /** + * Wrapper method to run router from command-line. + * + * @param args Command-line arguments. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (_ / // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " ", + "Ignite Router Command Line Loader", + "ver. " + ACK_VER, + COPYRIGHT, + " " + ); + + IgniteSpringProcessor spring = SPRING.create(false); + + if (args.length < 1) { + X.error("Missing XML configuration path."); + + System.exit(1); + } + + String cfgPath = args[0]; + - URL cfgUrl = U.resolveGridGainUrl(cfgPath); ++ URL cfgUrl = U.resolveIgniteUrl(cfgPath); + + if (cfgUrl == null) { + X.error("Spring XML file not found (is IGNITE_HOME set?): " + cfgPath); + + System.exit(1); + } + + boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null; + + IgniteBiTuple<Object, Object> t = null; + Collection<Handler> savedHnds = null; + + if (isLog4jUsed) { + try { + t = U.addLog4jNoOpLogger(); + } + catch (Exception e) { + isLog4jUsed = false; + } + } + + if (!isLog4jUsed) + savedHnds = U.addJavaNoOpLogger(); + + Map<Class<?>, Object> beans; + + try { + beans = spring.loadBeans(cfgUrl, IgniteLogger.class, GridTcpRouterConfiguration.class); + } + finally { + if (isLog4jUsed && t != null) + U.removeLog4jNoOpLogger(t); + + if (!isLog4jUsed) + U.removeJavaNoOpLogger(savedHnds); + } + + final GridRouterCommandLineStartup routerStartup = new GridRouterCommandLineStartup(); + + routerStartup.start(beans); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + routerStartup.stop(); + } + }); + } +}