http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientAbstractProjection.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientAbstractProjection.java index 1ff0ea8,0000000..2b55bf5 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientAbstractProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientAbstractProjection.java @@@ -1,458 -1,0 +1,458 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.client.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.logging.*; + +import static org.apache.ignite.internal.client.util.GridClientUtils.*; + +/** + * Class contains common connection-error handling logic. + */ +abstract class GridClientAbstractProjection<T extends GridClientAbstractProjection> { + /** Logger. */ + private static final Logger log = Logger.getLogger(GridClientAbstractProjection.class.getName()); + + /** List of nodes included in this projection. If null, all nodes in topology are included. */ + protected Collection<GridClientNode> nodes; + + /** Node filter to be applied for this projection. */ + protected GridClientPredicate<? super GridClientNode> filter; + + /** Balancer to be used in this projection. */ + protected GridClientLoadBalancer balancer; + + /** Count of reconnect retries before exception is thrown. */ + private static final int RETRY_CNT = 3; + + /** Retry delay. */ + private static final int RETRY_DELAY = 1000; + + /** Client instance. */ + protected GridClientImpl client; + + /** + * Creates projection with specified client. + * + * @param client Client instance to use. + * @param nodes Collections of nodes included in this projection. + * @param filter Node filter to be applied. + * @param balancer Balancer to use. + */ + protected GridClientAbstractProjection(GridClientImpl client, Collection<GridClientNode> nodes, + GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer) { + assert client != null; + + this.client = client; + this.nodes = nodes; + this.filter = filter; + this.balancer = balancer; + } + + /** + * This method executes request to a communication layer and handles connection error, if it occurs. + * In case of communication exception client instance is notified and new instance of client is created. + * If none of the grid servers can be reached, an exception is thrown. + * + * @param c Closure to be executed. + * @param <R> Result future type. + * @return Future returned by closure. + */ + protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<R> c) { + try { + GridClientNode node = null; + + boolean changeNode = false; + + Throwable cause = null; + + for (int i = 0; i < RETRY_CNT; i++) { + if (node == null || changeNode) + try { + node = balancedNode(node); + } + catch (GridClientException e) { + if (node == null) + throw e; + else + throw new GridServerUnreachableException( + "All nodes in projection failed when retrying to perform request. Attempts made: " + i, + cause); + } + + GridClientConnection conn = null; + + try { + conn = client.connectionManager().connection(node); + + return c.apply(conn, node.nodeId()); + } + catch (GridConnectionIdleClosedException e) { + client.connectionManager().terminateConnection(conn, node, e); + + // It's ok, just reconnect to the same node. + changeNode = false; + + cause = e; + } + catch (GridClientConnectionResetException e) { + client.connectionManager().terminateConnection(conn, node, e); + + changeNode = true; + + cause = e; + } + catch (GridServerUnreachableException e) { + changeNode = true; + + cause = e; + } + + U.sleep(RETRY_DELAY); + } + + assert cause != null; + + throw new GridServerUnreachableException("Failed to communicate with grid nodes " + + "(maximum count of retries reached).", cause); + } + catch (GridClientException e) { + return new GridClientFutureAdapter<>(e); + } - catch (IgniteInterruptedException | InterruptedException e) { ++ catch (IgniteInterruptedCheckedException | InterruptedException e) { + Thread.currentThread().interrupt(); + + return new GridClientFutureAdapter<>( + new GridClientException("Interrupted when (re)trying to perform request.", e)); + } + } + + /** + * This method executes request to a communication layer and handles connection error, if it occurs. Server + * is picked up according to the projection affinity and key given. Connection will be made with the node + * on which key is cached. In case of communication exception client instance is notified and new instance + * of client is created. If none of servers can be reached, an exception is thrown. + * + * @param c Closure to be executed. + * @param cacheName Cache name for which mapped node will be calculated. + * @param affKey Affinity key. + * @param <R> Type of result in future. + * @return Operation future. + */ + protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<R> c, String cacheName, + @Nullable Object affKey) { + GridClientDataAffinity affinity = client.affinity(cacheName); + + // If pinned (fixed-nodes) or no affinity provided use balancer. + if (nodes != null || affinity == null || affKey == null) + return withReconnectHandling(c); + + try { + Collection<? extends GridClientNode> prjNodes = projectionNodes(); + + if (prjNodes.isEmpty()) + throw new GridServerUnreachableException("Failed to get affinity node (no nodes in topology were " + + "accepted by the filter): " + filter); + + GridClientNode node = affinity.node(affKey, prjNodes); + + for (int i = 0; i < RETRY_CNT; i++) { + GridClientConnection conn = null; + + try { + conn = client.connectionManager().connection(node); + + return c.apply(conn, node.nodeId()); + } + catch (GridConnectionIdleClosedException e) { + client.connectionManager().terminateConnection(conn, node, e); + } + catch (GridClientConnectionResetException e) { + client.connectionManager().terminateConnection(conn, node, e); + + if (!checkNodeAlive(node.nodeId())) + throw new GridServerUnreachableException("Failed to communicate with mapped grid node for " + + "given affinity key (node left the grid) [nodeId=" + node.nodeId() + ", affKey=" + affKey + + ']', e); + } + catch (RuntimeException | Error e) { + if (conn != null) + client.connectionManager().terminateConnection(conn, node, e); + + throw e; + } + + U.sleep(RETRY_DELAY); + } + + throw new GridServerUnreachableException("Failed to communicate with mapped grid node for given affinity " + + "key (did node leave the grid?) [nodeId=" + node.nodeId() + ", affKey=" + affKey + ']'); + } + catch (GridClientException e) { + return new GridClientFutureAdapter<>(e); + } - catch (IgniteInterruptedException | InterruptedException e) { ++ catch (IgniteInterruptedCheckedException | InterruptedException e) { + Thread.currentThread().interrupt(); + + return new GridClientFutureAdapter<>(new GridClientException("Interrupted when (re)trying to perform " + + "request.", e)); + } + } + + /** + * Tries to refresh node on every possible connection in topology. + * + * @param nodeId Node id to check. + * @return {@code True} if response was received, {@code false} if either {@code null} response received or + * no nodes can be contacted at all. + * @throws GridClientException If failed to refresh topology or if client was closed manually. + * @throws InterruptedException If interrupted. + */ + protected boolean checkNodeAlive(UUID nodeId) throws GridClientException, InterruptedException { + // Try to get node information on any of the connections possible. + for (GridClientNodeImpl node : client.topology().nodes()) { + try { + // Do not try to connect to the same node. + if (node.nodeId().equals(nodeId)) + continue; + + GridClientConnection conn = client.connectionManager().connection(node); + + try { + GridClientNode target = conn.node(nodeId, false, false, node.nodeId()).get(); + + if (target == null) + client.topology().nodeFailed(nodeId); + + return target != null; + } + catch (GridClientConnectionResetException e) { + client.connectionManager().terminateConnection(conn, node, e); + } + catch (GridClientClosedException e) { + throw e; + } + catch (GridClientException e) { + if (log.isLoggable(Level.FINE)) + log.log(Level.FINE, "Node request failed, try next node.", e); + } + } + catch (GridServerUnreachableException e) { + if (log.isLoggable(Level.FINE)) + log.log(Level.FINE, "Node request failed, try next node.", e); + } + } + + return false; + } + + /** + * Gets most recently refreshed topology. If this compute instance is a projection, + * then only nodes that satisfy projection criteria will be returned. + * + * @return Most recently refreshed topology. + * @throws GridClientException If failed to refresh topology. + */ + public Collection<? extends GridClientNode> projectionNodes() throws GridClientException { + return projectionNodes(null); + } + + /** + * Gets most recently refreshed topology. If this compute instance is a projection, + * then only nodes that satisfy projection criteria will be returned. + * + * @param pred Predicate to additionally filter projection nodes, + * if {@code null} just return projection. + * @return Most recently refreshed topology. + * @throws GridClientException If failed to refresh topology. + */ + protected Collection<? extends GridClientNode> projectionNodes(@Nullable GridClientPredicate<GridClientNode> pred) + throws GridClientException { + Collection<? extends GridClientNode> prjNodes; + + if (nodes == null) { + // Dynamic projection, ask topology for current snapshot. + prjNodes = client.topology().nodes(); + + if (filter != null || pred != null) + prjNodes = applyFilter(prjNodes, filter, pred); + } + else + prjNodes = nodes; + + return prjNodes; + } + + /** + * Return balanced node for current projection. + * + * @param exclude Nodes to exclude. + * @return Balanced node. + * @throws GridServerUnreachableException If topology is empty. + */ + private GridClientNode balancedNode(@Nullable final GridClientNode exclude) throws GridClientException { + GridClientPredicate<GridClientNode> excludeFilter = exclude == null ? + new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return restAvailable(e, client.cfg.getProtocol()); + } + + @Override public String toString() { + return "Filter nodes with available REST."; + } + } : + new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return !exclude.equals(e) && restAvailable(e, client.cfg.getProtocol()); + } + + @Override public String toString() { + return "Filter nodes with available REST and " + + "exclude (probably due to connection failure) node: " + exclude.nodeId(); + } + }; + + Collection<? extends GridClientNode> prjNodes = projectionNodes(excludeFilter); + + if (prjNodes.isEmpty()) + throw new GridServerUnreachableException("Failed to get balanced node (no nodes in topology were " + + "accepted by the filters): " + Arrays.asList(filter, excludeFilter)); + + if (prjNodes.size() == 1) { + GridClientNode ret = GridClientUtils.first(prjNodes); + + assert ret != null; + + return ret; + } + + return balancer.balancedNode(prjNodes); + } + + /** + * Creates a sub-projection for current projection. + * + * @param nodes Collection of nodes that sub-projection will be restricted to. If {@code null}, + * created projection is dynamic and will take nodes from topology. + * @param filter Filter to be applied to nodes in projection. If {@code null} - no filter applied. + * @param balancer Balancer to use in projection. If {@code null} - inherit balancer from the current projection. + * @param factory Factory to create new projection. + * @return Created projection. + * @throws GridClientException If resulting projection is empty. Note that this exception may + * only be thrown on case of static projections, i.e. where collection of nodes is not null. + */ + protected T createProjection(@Nullable Collection<GridClientNode> nodes, + @Nullable GridClientPredicate<? super GridClientNode> filter, @Nullable GridClientLoadBalancer balancer, + ProjectionFactory<T> factory) throws GridClientException { + if (nodes != null && nodes.isEmpty()) + throw new GridClientException("Failed to create projection: given nodes collection is empty."); + + if (filter != null && this.filter != null) + filter = new GridClientAndPredicate<>(this.filter, filter); + else if (filter == null) + filter = this.filter; + + Collection<GridClientNode> subset = intersection(this.nodes, nodes); + + if (subset != null && subset.isEmpty()) + throw new GridClientException("Failed to create projection (given node set does not overlap with " + + "existing node set) [prjNodes=" + this.nodes + ", nodes=" + nodes); + + if (filter != null && subset != null) { + subset = applyFilter(subset, filter); + + if (subset != null && subset.isEmpty()) + throw new GridClientException("Failed to create projection (none of the nodes in projection node " + + "set passed the filter) [prjNodes=" + subset + ", filter=" + filter + ']'); + } + + if (balancer == null) + balancer = this.balancer; + + return factory.create(nodes, filter, balancer); + } + + /** + * Calculates intersection of two collections. Returned collection always a new collection. + * + * @param first First collection to intersect. + * @param second Second collection to intersect. + * @return Intersection or {@code null} if both collections are {@code null}. + */ + @Nullable private Collection<GridClientNode> intersection(@Nullable Collection<? extends GridClientNode> first, + @Nullable Collection<? extends GridClientNode> second) { + if (first == null && second == null) + return null; + + if (first != null && second != null) { + Collection<GridClientNode> res = new LinkedList<>(first); + + res.retainAll(second); + + return res; + } + else + return new ArrayList<>(first != null ? first : second); + } + + /** + * Factory for new projection creation. + * + * @param <X> Projection implementation. + */ + protected static interface ProjectionFactory<X extends GridClientAbstractProjection> { + /** + * Subclasses must implement this method and return concrete implementation of projection needed. + * + * @param nodes Nodes that are included in projection. + * @param filter Filter to be applied. + * @param balancer Balancer to be used. + * @return Created projection. + */ + public X create(@Nullable Collection<GridClientNode> nodes, + @Nullable GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer); + } + + /** + * Closure to execute reconnect-handling code. + */ + protected static interface ClientProjectionClosure<R> { + /** + * All closures that implement this interface may safely call all methods of communication connection. + * If any exceptions in connection occur, they will be automatically handled by projection. + * + * @param conn Communication connection that should be accessed. + * @param affinityNodeId Affinity node ID. + * @return Future - result of operation. + * @throws GridClientConnectionResetException If connection was unexpectedly reset. Connection will be + * either re-established or different server will be accessed (if available). + * @throws GridClientClosedException If client was manually closed by user. + */ + public GridClientFuture<R> apply(GridClientConnection conn, UUID affinityNodeId) + throws GridClientConnectionResetException, GridClientClosedException; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index 667adb5,0000000..becfaf6 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@@ -1,763 -1,0 +1,764 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl.connection; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.util.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.internal.processors.rest.client.message.*; - import org.apache.ignite.plugin.security.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.nio.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.logger.java.*; ++import org.apache.ignite.plugin.security.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; + +import static java.util.logging.Level.*; +import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*; +import static org.apache.ignite.internal.GridNodeAttributes.*; + +/** + * Cached connections manager. + */ +abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager { + /** Count of reconnect retries before init considered failed. */ + private static final int INIT_RETRY_CNT = 3; + + /** Initialization retry interval. */ + private static final int INIT_RETRY_INTERVAL = 1000; + + /** Class logger. */ + private final Logger log; + + /** NIO server. */ + private GridNioServer srv; + + /** Active connections. */ + private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>(); + + /** Active connections of nodes. */ + private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>(); + + /** SSL context. */ + private final SSLContext sslCtx; + + /** Client configuration. */ + protected final GridClientConfiguration cfg; + + /** Topology. */ + private final GridClientTopology top; + + /** Client id. */ + private final UUID clientId; + + /** Router endpoints to use instead of topology info. */ + private final Collection<InetSocketAddress> routers; + + /** Closed flag. */ + private volatile boolean closed; + + /** Shared executor service. */ + private final ExecutorService executor; + + /** Endpoint striped lock. */ + private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16); + + /** Service for ping requests, {@code null} if HTTP protocol is used. */ + private final ScheduledExecutorService pingExecutor; + + /** Marshaller ID. */ + private final Byte marshId; + + /** Message writer. */ + @SuppressWarnings("FieldCanBeLocal") + private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { + @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { + assert msg != null; + assert buf != null; + + msg.messageWriter(this, nodeId); + + return msg.writeTo(buf); + } + + @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, + ByteBuffer buf) throws IOException { + assert msg != null; + assert out != null; + assert buf != null; + assert buf.hasArray(); + + msg.messageWriter(this, nodeId); + + boolean finished = false; + int cnt = 0; + + while (!finished) { + finished = msg.writeTo(buf); + + out.write(buf.array(), 0, buf.position()); + + cnt += buf.position(); + + buf.clear(); + } + + return cnt; + } + }; + + /** + * @param clientId Client ID. + * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. + * @param cfg Client configuration. + * @param routers Routers or empty collection to use endpoints from topology info. + * @param top Topology. + * @param marshId Marshaller ID. + * @throws GridClientException In case of error. + */ + @SuppressWarnings("unchecked") + protected GridClientConnectionManagerAdapter(UUID clientId, + SSLContext sslCtx, + GridClientConfiguration cfg, + Collection<InetSocketAddress> routers, + GridClientTopology top, + @Nullable Byte marshId) + throws GridClientException { + assert clientId != null : "clientId != null"; + assert cfg != null : "cfg != null"; + assert routers != null : "routers != null"; + assert top != null : "top != null"; + + this.clientId = clientId; + this.sslCtx = sslCtx; + this.cfg = cfg; + this.routers = new ArrayList<>(routers); + this.top = top; + + log = Logger.getLogger(getClass().getName()); + + executor = cfg.getExecutorService() != null ? cfg.getExecutorService() : + Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true)); + + pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null; + + this.marshId = marshId; + + if (marshId == null && cfg.getMarshaller() == null) + throw new GridClientException("Failed to start client (marshaller is not configured)."); + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + try { + IgniteLogger gridLog = new IgniteJavaLogger(false); + + GridNioFilter[] filters; + + GridNioMessageReader msgReader = new GridNioMessageReader() { + @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, + ByteBuffer buf) { + assert msg != null; + assert buf != null; + + msg.messageReader(this, nodeId); + + return msg.readFrom(buf); + } + + @Nullable @Override public GridTcpMessageFactory messageFactory() { + return null; + } + }; + + GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true); + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog); + + sslFilter.directMode(true); + sslFilter.clientMode(true); + + filters = new GridNioFilter[]{codecFilter, sslFilter}; + } + else + filters = new GridNioFilter[]{codecFilter}; + + srv = GridNioServer.builder().address(U.getLocalHost()) + .port(-1) + .listener(new NioListener(log)) + .filters(filters) + .logger(gridLog) + .selectorCount(Runtime.getRuntime().availableProcessors()) + .sendQueueLimit(1024) + .byteOrder(ByteOrder.nativeOrder()) + .tcpNoDelay(cfg.isTcpNoDelay()) + .directBuffer(true) + .directMode(true) + .socketReceiveBufferSize(0) + .socketSendBufferSize(0) + .idleTimeout(Long.MAX_VALUE) + .gridName("gridClient") + .messageWriter(msgWriter) + .daemon(cfg.isDaemon()) + .build(); + + srv.start(); + } + catch (IOException | IgniteCheckedException e) { + throw new GridClientException("Failed to start connection server.", e); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException { + init0(); + + GridClientException firstEx = null; + + for (int i = 0; i < INIT_RETRY_CNT; i++) { + Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs); + + while (!srvsCp.isEmpty()) { + GridClientConnection conn = null; + + try { + conn = connect(null, srvsCp); + + conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get(); + + return; + } + catch (GridServerUnreachableException e) { + // No connection could be opened to any of initial addresses - exit to retry loop. + assert conn == null : + "GridClientConnectionResetException was thrown from GridClientConnection#topology"; + + if (firstEx == null) + firstEx = e; + + break; + } + catch (GridClientConnectionResetException e) { + // Connection was established but topology update failed - + // trying other initial addresses if any. + assert conn != null : "GridClientConnectionResetException was thrown from connect()"; + + if (firstEx == null) + firstEx = e; + + if (!srvsCp.remove(conn.serverAddress())) + // We have misbehaving collection or equals - just exit to avoid infinite loop. + break; + } + } + + Thread.sleep(INIT_RETRY_INTERVAL); + } + + for (GridClientConnection c : conns.values()) { + conns.remove(c.serverAddress(), c); + + c.close(FAILED, false); + } + + throw firstEx; + } + + /** + * Additional initialization. + * + * @throws GridClientException In case of error. + */ + protected abstract void init0() throws GridClientException; + + /** + * Gets active communication facade. + * + * @param node Remote node to which connection should be established. + * @throws GridServerUnreachableException If none of the servers can be reached after the exception. + * @throws GridClientClosedException If client was closed manually. + * @throws InterruptedException If connection was interrupted. + */ + @Override public GridClientConnection connection(GridClientNode node) + throws GridClientClosedException, GridServerUnreachableException, InterruptedException { + assert node != null; + + // Use router's connections if defined. + if (!routers.isEmpty()) + return connection(null, routers); + + GridClientConnection conn = nodeConns.get(node.nodeId()); + + if (conn != null) { + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + closeIdle(); + else + return conn; + } + + // Use node's connection, if node is available over rest. + Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true); + + List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size()); + + for (InetSocketAddress endpoint : endpoints) + if (!endpoint.isUnresolved()) + resolvedEndpoints.add(endpoint); + + if (resolvedEndpoints.isEmpty()) { + throw new GridServerUnreachableException("No available endpoints to connect " + + "(is rest enabled for this node?): " + node); + } + + boolean sameHost = node.attributes().isEmpty() || + F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", ")); + + Collection<InetSocketAddress> srvs = new LinkedHashSet<>(); + + if (sameHost) { + Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true)); + + srvs.addAll(resolvedEndpoints); + } + else { + for (InetSocketAddress endpoint : resolvedEndpoints) + if (!endpoint.getAddress().isLoopbackAddress()) + srvs.add(endpoint); + } + + return connection(node.nodeId(), srvs); + } + + /** + * Returns connection to one of the given addresses. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs Collection of addresses to connect to. + * @return Connection to use for operations, targeted for the given node. + * @throws GridServerUnreachableException If connection can't be established. + * @throws GridClientClosedException If connections manager has been closed already. + * @throws InterruptedException If connection was interrupted. + */ + public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) + throws GridServerUnreachableException, GridClientClosedException, InterruptedException { + if (srvs == null || srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid" + + " (address list is empty)."); + + checkClosed(); + + // Search for existent connection. + for (InetSocketAddress endPoint : srvs) { + assert endPoint != null; + + GridClientConnection conn = conns.get(endPoint); + + if (conn == null) + continue; + + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + closeIdle(); + + continue; + } + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + + return connect(nodeId, srvs); + } + + /** + * Creates a connected facade and returns it. Called either from constructor or inside + * a write lock. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs List of server addresses that this method will try to connect to. + * @return Established connection. + * @throws GridServerUnreachableException If none of the servers can be reached. + * @throws InterruptedException If connection was interrupted. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) + throws GridServerUnreachableException, InterruptedException { + if (srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " + + "list is empty)."); + + Exception cause = null; + + for (InetSocketAddress srv : srvs) { + try { + return connect(nodeId, srv); + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + if (cause == null) + cause = e; + else if (log.isLoggable(INFO)) + log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']'); + } + } + + assert cause != null; + + throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause); + } + + /** + * Create new connection to specified server. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param addr Remote socket to connect. + * @return Established connection. + * @throws IOException If connection failed. + * @throws GridClientException If protocol error happened. + * @throws InterruptedException If thread was interrupted before connection was established. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr) + throws IOException, GridClientException, InterruptedException { + endpointStripedLock.lock(addr); + + try { + GridClientConnection old = conns.get(addr); + + if (old != null) { + if (old.isClosed()) { + conns.remove(addr, old); + + if (nodeId != null) + nodeConns.remove(nodeId, old); + } + else { + if (nodeId != null) + nodeConns.put(nodeId, old); + + return old; + } + } + + GridSecurityCredentials cred = null; + + try { + if (cfg.getSecurityCredentialsProvider() != null) + cred = cfg.getSecurityCredentialsProvider().credentials(); + } + catch (IgniteCheckedException e) { + throw new GridClientException("Failed to obtain client credentials.", e); + } + + GridClientConnection conn; + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor, + cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(), + cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal()); + } + else + throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " + + cfg.getProtocol()); + + old = conns.putIfAbsent(addr, conn); + + assert old == null; + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + finally { + endpointStripedLock.unlock(addr); + } + } + + /** + * @return Get thread local used to enable keep portables mode. + */ + protected ThreadLocal<Boolean> keepPortablesThreadLocal() { + return null; + } + + /** {@inheritDoc} */ + @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) { + if (log.isLoggable(Level.FINE)) + log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" + + conn.serverAddress() + ", errMsg=" + e.getMessage() + ']'); + + closeIdle(); + + conn.close(FAILED, false); + } + + /** + * Closes all opened connections. + * + * @param waitCompletion If {@code true} waits for all pending requests to be proceeded. + */ + @SuppressWarnings("TooBroadScope") + @Override public void stop(boolean waitCompletion) { + Collection<GridClientConnection> closeConns; + + if (closed) + return; + + // Mark manager as closed. + closed = true; + + // Remove all connections from cache. + closeConns = new ArrayList<>(conns.values()); + + conns.clear(); + + nodeConns.clear(); + + // Close old connection outside the writer lock. + for (GridClientConnection conn : closeConns) + conn.close(CLIENT_CLOSED, waitCompletion); + + if (pingExecutor != null) + GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log); + + GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log); + + if (srv != null) + srv.stop(); + } + + /** + * Close all connections idling for more then + * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void closeIdle() { + for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<UUID, GridClientConnection> entry = it.next(); + + GridClientConnection conn = entry.getValue(); + + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + conns.remove(conn.serverAddress(), conn); + + nodeConns.remove(entry.getKey(), conn); + } + } + + for (GridClientConnection conn : conns.values()) + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + conns.remove(conn.serverAddress(), conn); + } + + /** + * Checks and throws an exception if this client was closed. + * + * @throws GridClientClosedException If client was closed. + */ + private void checkClosed() throws GridClientClosedException { + if (closed) + throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore)."); + } + + /** + */ + private static class NioListener implements GridNioServerListener { + /** */ + private final Logger log; + + /** + * @param log Logger. + */ + private NioListener(Logger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Session connected: " + ses); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (log.isLoggable(Level.FINE)) + log.fine("Session disconnected: " + ses); + + GridClientFutureAdapter<Boolean> handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) + handshakeFut.onDone( + new GridClientConnectionResetException("Failed to perform handshake (connection failed).")); + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + if (conn != null) + conn.close(FAILED, false); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, Object msg) { + GridClientFutureAdapter<Boolean> handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) { + assert msg instanceof GridClientHandshakeResponse; + + handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg); + } + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + assert conn != null; + + if (msg instanceof GridClientMessageWrapper) { + GridClientMessageWrapper req = (GridClientMessageWrapper)msg; + + if (req.messageSize() != 0) { + assert req.message() != null; + + conn.handleResponse(req); + } + else + conn.handlePingResponse(); + } + else { + assert msg instanceof GridClientPingPacket : msg; + + conn.handlePingResponse(); + } + } + } + + /** + * Handles client handshake response. + * + * @param handshakeFut Future. + * @param msg A handshake response. + */ + private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut, + GridClientHandshakeResponse msg) { + byte rc = msg.resultCode(); + + if (rc != GridClientHandshakeResponse.OK.resultCode()) { + handshakeFut.onDone(new GridClientHandshakeException(rc, + "Handshake failed due to internal error (see server log for more details).")); + } + else + handshakeFut.onDone(true); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of write timeout."); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of idle timeout."); + + ses.close(); + } + } + + /** + * + */ + private static class NioParser implements GridNioParser { + /** Message metadata key. */ + private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Message reader. */ + private final GridNioMessageReader msgReader; + + /** + * @param msgReader Message reader. + */ + NioParser(GridNioMessageReader msgReader) { + this.msgReader = msgReader; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) { + byte code = buf.get(); + + return new GridClientHandshakeResponse(code); + } + + GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY); + + if (msg == null && buf.hasRemaining()) { + byte type = buf.get(); + + if (type == GridClientMessageWrapper.REQ_HEADER) + msg = new GridClientMessageWrapper(); + else + throw new IOException("Invalid message type: " + type); + } + + boolean finished = false; + + if (buf.hasRemaining()) + finished = msgReader.read(null, msg, buf); + + if (finished) + return msg; + else { + ses.addMeta(MSG_META_KEY, msg); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + // No encoding needed for direct messages. + throw new UnsupportedEncodingException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridRouterFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/GridRouterFactory.java index a2aaf3a,0000000..b895903 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridRouterFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridRouterFactory.java @@@ -1,121 -1,0 +1,126 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.router.impl.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * This factory is responsible for router lifecycle management. + * All router should be started, accessed and stopped through this factory. + * <h1 class="header">Embedding router</h1> + * You can use {@link GridTcpRouterConfiguration} to set configuration parameters and pass them to + * {@link #startTcpRouter(GridTcpRouterConfiguration)}. + * <p> + * See {@link GridTcpRouter} for example on how to configure and start embedded router. + * <h1 class="header">Standalone router startup</h1> + * Alternatively you can run routers as a standalone processes by executing + * {@code IGNITE_HOME/bin/igniterouter.sh} or {@code IGNITE_HOME/bin/igniterouter.bat}. + * They both accept path to a configuration file as first command line argument. + * See {@code IGNITE_HOME/config/router/default-router.xml} for configuration example. + * + * @see GridTcpRouter + */ +public final class GridRouterFactory { + /** Map of running TCP routers. */ + private static ConcurrentMap<UUID, GridTcpRouterImpl> tcpRouters = + new ConcurrentHashMap<>(); + + /** + * Ensure singleton, + */ + private GridRouterFactory() { + // No-op. + } + + /** + * Starts a TCP router with given configuration. + * <p> + * Starting router will be assigned a randomly generated UUID which can be obtained + * by {@link GridTcpRouter#id()} method. Later this instance could be obtained via + * {@link #tcpRouter} method. + * + * @param cfg Router configuration. + * @return Started router. + * @throws IgniteCheckedException If router start failed. + */ + public static GridTcpRouter startTcpRouter(GridTcpRouterConfiguration cfg) throws IgniteCheckedException { + GridTcpRouterImpl router = new GridTcpRouterImpl(cfg); + - router.start(); ++ try { ++ router.start(); ++ } ++ catch (Exception e) { ++ throw new IgniteCheckedException("Failed to start router: " + e, e); ++ } + + GridTcpRouterImpl old = tcpRouters.putIfAbsent(router.id(), router); + + assert old == null : "UUIDs collision happens [tcpRouters=" + tcpRouters + ", router=" + router + ']'; + + return router; + } + + /** + * Stops particular TCP router. + * + * @param tcpRouterId Id of the router to stop. + */ + public static void stopTcpRouter(UUID tcpRouterId) { + GridTcpRouterImpl router = tcpRouters.remove(tcpRouterId); + + if (router != null) + router.stop(); + } + + /** + * Returns TCP router with the given id. + * + * @param id Router Id. + * @return Router with the given id or {@code null} if router not found. + */ + @Nullable public static GridTcpRouter tcpRouter(UUID id) { + return tcpRouters.get(id); + } + + /** + * Returns collection of all currently running TCP routers. + * + * @return Collection of currently running {@link GridTcpRouter}s. + */ + public static Collection<GridTcpRouter> allTcpRouters() { + return new ArrayList<GridTcpRouter>(tcpRouters.values()); + } + + /** + * Stops all currently active routers. + */ + public static void stopAllRouters() { + for (Iterator<GridTcpRouterImpl> it = tcpRouters.values().iterator(); it.hasNext(); ) { + GridTcpRouterImpl router = it.next(); + + it.remove(); + + router.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterConfiguration.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterConfiguration.java index e27cdb8,0000000..4221282 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterConfiguration.java @@@ -1,306 -1,0 +1,307 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.plugin.security.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * This class defines runtime configuration for TCP router. + * <p> + * Note that you should only set values + * that differ from defaults, as router will automatically pick default values + * for all values that are not set. + * <p> + * For more information about router configuration and startup refer to {@code GridRouterFactory} + * documentation. + */ +public class GridTcpRouterConfiguration { + /** Default servers to which router will try to connect. */ + public static final Collection<String> DFLT_SERVERS = + Collections.singleton("127.0.0.1:" + IgniteConfiguration.DFLT_TCP_PORT); + + /** Default TCP host for router to bind to. */ + public static final String DFLT_TCP_HOST = "0.0.0.0"; + + /** Default TCP port. The next port number after Grid's default is used. */ + public static final int DFLT_TCP_PORT = IgniteConfiguration.DFLT_TCP_PORT + 1; + + /** Default port range. */ + public static final int DFLT_PORT_RANGE = 0; + + /** Default nodelay. */ + public static final boolean DFLT_TCP_NODELAY = true; + + /** Host. */ + private String host = DFLT_TCP_HOST; + + /** Port. */ + private int port = DFLT_TCP_PORT; + + /** Port range. */ + @SuppressWarnings("RedundantFieldInitialization") + private int portRange = DFLT_PORT_RANGE; + + /** No delay. */ + private boolean noDelay = DFLT_TCP_NODELAY; + + /** Idle timeout. */ + private long idleTimeout = IgniteConfiguration.DFLT_REST_IDLE_TIMEOUT; + + /** Client auth. */ + private boolean sslClientAuth; + + /** Ssl context factory. */ + private GridSslContextFactory sslCtxFactory; + + /** Collection of servers */ + private Collection<String> srvrs = DFLT_SERVERS; + + /** Logger. */ + private IgniteLogger log; + + /** Credentials. */ + private GridSecurityCredentialsProvider credsProvider; + + /** + * Gets TCP host or IP address for router to bind to. + * <p> + * If not defined, router will try to bind to all interfaces. + * + * @return TCP host. + */ + public String getHost() { + return host; + } + + /** + * Gets port for TCP binary protocol server. + * <p> + * Default is {@link #DFLT_TCP_PORT}. + * + * @return TCP port. + */ + public int getPort() { + return port; + } + + /** + * Gets port range for TCP binary protocol server. If port number returned from {@link #getPort()} + * is busy then ports withing this range will be tried. + * <p> + * Note: zero-range means only user-specified port will be used. + * <p> + * Default is {@link #DFLT_PORT_RANGE}. + * + * @return TCP port. + */ + public int getPortRange() { + return portRange; + } + + /** + * Gets flag indicating whether {@code TCP_NODELAY} option should be set for accepted client connections. + * Setting this option reduces network latency and should be set to {@code true} in majority of cases. + * For more information, see {@link Socket#setTcpNoDelay(boolean)} + * <p/> + * If not specified, default value is {@code true}. + * + * @return Whether {@code TCP_NODELAY} option should be enabled. + */ + public boolean isNoDelay() { + return noDelay; + } + + /** + * Gets timeout in milliseconds to consider connection idle. If no messages sent by client + * within this interval router closes idling connection. + * <p/> + * If not specified, default value is {@link org.apache.ignite.configuration.IgniteConfiguration#DFLT_REST_IDLE_TIMEOUT}. + * + * @return Idle timeout. + */ + public long getIdleTimeout() { + return idleTimeout; + } + + /** + * Gets a flag indicating whether or not remote clients will be required to have + * a valid SSL certificate which validity will be verified with trust manager. + * + * @return Whether or not client authentication is required. + */ + public boolean isSslClientAuth() { + return sslClientAuth; + } + + /** + * Gets SSL context factory that will be used for creating a secure socket layer + * of both rest binary server and out coming connections. + * + * @return SslContextFactory instance. + * @see GridSslContextFactory + */ + @Nullable public GridSslContextFactory getSslContextFactory() { + return sslCtxFactory; + } + + /** + * Gets list of server addresses to which router should try to connect to. + * <p> + * Node that this list will be used only for initial grid connectivity. + * Once connected to the grid, router may establish connections to any grid node. + * + * @return List of server addresses. + */ + public Collection<String> getServers() { + return srvrs; + } + + /** + * Gets logger for the router instance. + * If no logger provided JDK logging will be used by router implementation. + * + * @return Logger or {@code null} if no logger provided by configuration. + */ + public IgniteLogger getLogger() { + return log; + } + + /** + * Gets credentials provider for grid access. + * <p> + * This credentials will be used only for initial connection and topology discovery + * by the router, not for client's request authorization. + * + * @return Credentials. + */ + @Nullable public GridSecurityCredentialsProvider getSecurityCredentialsProvider() { + return credsProvider; + } + + /** + * Sets host for router. + * + * @param host Host. + */ + public void setHost(String host) { + this.host = host; + } + + /** + * Sets port for router. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets port range router will be allowed to try. + * <p> + * Note: zero-range means only user-specified port will be used. + * + * @param portRange Port range. + * @see #DFLT_PORT_RANGE + */ + public void setPortRange(int portRange) { + A.ensure(portRange >= 0, "portRange >= 0"); + + this.portRange = portRange; + } + + /** + * Sets flag indicating whether {@code TCP_NODELAY} option should be set + * for accepted client connections. + * + * @param noDelay No delay. + */ + public void setNoDelay(boolean noDelay) { + this.noDelay = noDelay; + } + + /** + * Sets idle timeout. + * + * @param idleTimeout Idle timeout in milliseconds. + */ + public void setIdleTimeout(long idleTimeout) { + this.idleTimeout = idleTimeout; + } + + /** + * Sets flag indicating whether or not remote clients will be required to have + * a valid SSL certificate which validity will be verified with trust manager. + * + * @param sslClientAuth Ssl client auth. + */ + public void setSslClientAuth(boolean sslClientAuth) { + this.sslClientAuth = sslClientAuth; + } + + /** + * Sets SSL context factory that will be used for creating a secure socket layer + * of both rest binary server and out coming connections. + * + * @param sslCtxFactory Ssl context factory. + */ + public void setSslContextFactory(GridSslContextFactory sslCtxFactory) { + this.sslCtxFactory = sslCtxFactory; + } + + /** + * Sets list of server addresses where router's embedded client should connect. + * + * @param srvrs List of servers. + */ + public void setServers(Collection<String> srvrs) { + this.srvrs = srvrs; + } + + /** + * Sets logger for the router instance. + * + * @param log Logger. + */ + public void setLogger(IgniteLogger log) { + this.log = log; + } + + /** + * Sets credentials provider for grid access. + * + * @param credsProvider Credentials provider. + */ + public void setSecurityCredentialsProvider(GridSecurityCredentialsProvider credsProvider) { + this.credsProvider = credsProvider; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridTcpRouterConfiguration.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java index c6b4cad,0000000..41397a7 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java @@@ -1,164 -1,0 +1,171 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router.impl; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.router.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.internal.processors.spring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lang.*; ++import org.apache.ignite.lifecycle.*; + +import java.net.*; +import java.util.*; +import java.util.logging.*; + - import static org.apache.ignite.internal.IgniteComponentType.*; +import static org.apache.ignite.internal.GridProductImpl.*; ++import static org.apache.ignite.internal.IgniteComponentType.*; + +/** + * Loader class for router. + */ +public class GridRouterCommandLineStartup { + /** Logger. */ + @SuppressWarnings("FieldCanBeLocal") + private IgniteLogger log; + + /** TCP router. */ + private LifecycleAware tcpRouter; + + /** + * Search given context for required configuration and starts router. + * + * @param beans Beans loaded from spring configuration file. + */ + public void start(Map<Class<?>, Object> beans) { + log = (IgniteLogger)beans.get(IgniteLogger.class); + + if (log == null) { + U.error(log, "Failed to find logger definition in application context. Stopping the router."); + + return; + } + + GridTcpRouterConfiguration tcpCfg = (GridTcpRouterConfiguration)beans.get(GridTcpRouterConfiguration.class); + + if (tcpCfg == null) + U.warn(log, "TCP router startup skipped (configuration not found)."); + else { + tcpRouter = new GridTcpRouterImpl(tcpCfg); + - if (tcpRouter != null) { - try { - tcpRouter.start(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to start TCP router on port " + tcpCfg.getPort() + ": " + e.getMessage(), e); ++ try { ++ tcpRouter.start(); ++ } ++ catch (Exception e) { ++ U.error(log, "Failed to start TCP router on port " + tcpCfg.getPort() + ": " + e.getMessage(), e); + - tcpRouter = null; - } ++ tcpRouter = null; + } + } + } + + /** + * Stops router. + */ + public void stop() { + if (tcpRouter != null) { + try { + tcpRouter.stop(); + } - catch (IgniteCheckedException e) { ++ catch (Exception e) { + U.error(log, "Error while stopping the router.", e); + } + } + } + + /** + * Wrapper method to run router from command-line. + * + * @param args Command-line arguments. + * @throws IgniteCheckedException If failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (_ / // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " ", + "Ignite Router Command Line Loader", + "ver. " + ACK_VER, + COPYRIGHT, + " " + ); + + IgniteSpringProcessor spring = SPRING.create(false); + + if (args.length < 1) { + X.error("Missing XML configuration path."); + + System.exit(1); + } + + String cfgPath = args[0]; + + URL cfgUrl = U.resolveGridGainUrl(cfgPath); + + if (cfgUrl == null) { + X.error("Spring XML file not found (is IGNITE_HOME set?): " + cfgPath); + + System.exit(1); + } + + boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null; + + IgniteBiTuple<Object, Object> t = null; + Collection<Handler> savedHnds = null; + - if (isLog4jUsed) - t = U.addLog4jNoOpLogger(); - else ++ if (isLog4jUsed) { ++ try { ++ t = U.addLog4jNoOpLogger(); ++ } ++ catch (Exception e) { ++ isLog4jUsed = false; ++ } ++ } ++ ++ if (!isLog4jUsed) + savedHnds = U.addJavaNoOpLogger(); + + Map<Class<?>, Object> beans; + + try { + beans = spring.loadBeans(cfgUrl, IgniteLogger.class, GridTcpRouterConfiguration.class); + } + finally { + if (isLog4jUsed && t != null) + U.removeLog4jNoOpLogger(t); + + if (!isLog4jUsed) + U.removeJavaNoOpLogger(savedHnds); + } + + final GridRouterCommandLineStartup routerStartup = new GridRouterCommandLineStartup(); + + routerStartup.start(beans); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + routerStartup.stop(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java index a5ebc2c,0000000..4636da8 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java @@@ -1,348 -1,0 +1,350 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.router.impl; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.router.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.lifecycle.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.nio.ssl.*; +import org.apache.ignite.internal.util.typedef.internal.*; ++import org.apache.ignite.lifecycle.*; ++import org.apache.ignite.logger.java.*; +import org.jetbrains.annotations.*; + +import javax.management.*; +import javax.net.ssl.*; +import java.lang.management.*; +import java.lang.reflect.*; +import java.net.*; +import java.nio.*; +import java.util.*; + +/** + * Wrapper class for router process. + */ +public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean, LifecycleAware { + /** */ + private static final String ENT_NIO_LSNR_CLS = "org.gridgain.client.router.impl.GridTcpRouterNioListenerEntImpl"; + + /** Id. */ + private final UUID id = UUID.randomUUID(); + + /** Configuration. */ + private final GridTcpRouterConfiguration cfg; + + /** Logger. */ + private final IgniteLogger log; + + /** Server. */ + private GridNioServer<GridClientMessage> srv; + + /** Client. */ + private GridRouterClientImpl client; + + /** MBean name. */ + private ObjectName mbeanName; + + /** NIO parser. */ + private volatile GridTcpRouterNioParser parser; + + /** Host where server was actually bound. */ + private volatile String bindHost; + + /** Port where server was actually bound. */ + private volatile int bindPort; + + /** + * Creates new router instance. + * + * @param cfg Router configuration. + */ + public GridTcpRouterImpl(GridTcpRouterConfiguration cfg) { + this.cfg = cfg; + + log = cfg.getLogger() != null ? + cfg.getLogger().getLogger(getClass()) : new IgniteJavaLogger().getLogger(getClass()); + } + + /** + * Starts router. + * - * @throws IgniteCheckedException If failed. ++ * @throws IgniteException If failed. + */ - @Override public void start() throws IgniteCheckedException { ++ @Override public void start() throws IgniteException { + try { + client = createClient(cfg); + } + catch (GridClientException e) { - throw new IgniteCheckedException("Failed to initialise embedded client.", e); ++ throw new IgniteException("Failed to initialise embedded client.", e); + } + + GridNioServerListener<GridClientMessage> lsnr; + + try { + Class<?> cls = Class.forName(ENT_NIO_LSNR_CLS); + + Constructor<?> cons = cls.getDeclaredConstructor(IgniteLogger.class, GridRouterClientImpl.class); + + cons.setAccessible(true); + + lsnr = (GridNioServerListener<GridClientMessage>)cons.newInstance(log, client); + } + catch (ClassNotFoundException ignored) { + lsnr = new GridTcpRouterNioListenerOsImpl(log, client); + } + catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { - throw new IgniteCheckedException("Failed to create NIO listener.", e); ++ throw new IgniteException("Failed to create NIO listener.", e); + } + + parser = new GridTcpRouterNioParser(); + + final InetAddress hostAddr; + + try { + hostAddr = InetAddress.getByName(cfg.getHost()); + } + catch (UnknownHostException e) { - throw new IgniteCheckedException("Failed to resolve grid address for configured host: " + cfg.getHost(), e); ++ throw new IgniteException("Failed to resolve grid address for configured host: " + cfg.getHost(), e); + } + + SSLContext sslCtx; + + try { + GridSslContextFactory sslCtxFactory = cfg.getSslContextFactory(); + + sslCtx = sslCtxFactory == null ? null : sslCtxFactory.createSslContext(); + } + catch (SSLException e) { - throw new IgniteCheckedException("Failed to create SSL context.", e); ++ throw new IgniteException("Failed to create SSL context.", e); + } + + for (int port = cfg.getPort(), last = port + cfg.getPortRange(); port <= last; port++) { + if (startTcpServer(hostAddr, port, lsnr, parser, cfg.isNoDelay(), sslCtx, cfg.isSslClientAuth(), + cfg.isSslClientAuth())) { + if (log.isInfoEnabled()) + log.info("TCP router successfully started for endpoint: " + hostAddr.getHostAddress() + ":" + port); + + bindPort = port; + bindHost = hostAddr.getHostName(); + + break; + } + else + U.warn(log, "TCP REST router failed to start on endpoint: " + hostAddr.getHostAddress() + ":" + port + + ". Will try next port within allowed port range."); + } + + if (bindPort == 0) - throw new IgniteCheckedException("Failed to bind TCP router server (possibly all ports in range " + ++ throw new IgniteException("Failed to bind TCP router server (possibly all ports in range " + + "are in use) [firstPort=" + cfg.getPort() + ", lastPort=" + (cfg.getPort() + cfg.getPortRange()) + + ", addr=" + hostAddr + ']'); + + try { + ObjectName objName = U.registerMBean( + ManagementFactory.getPlatformMBeanServer(), + "Router", + "TCP Router " + id, + getClass().getSimpleName(), + this, + GridTcpRouterMBean.class); + + if (log.isDebugEnabled()) + log.debug("Registered MBean: " + objName); + + mbeanName = objName; + } + catch (JMException e) { + U.error(log, "Failed to register MBean.", e); + } + } + + /** + * Stops this router. + */ + @Override public void stop() { + if (srv != null) + srv.stop(); + + if (client != null) + client.stop(true); + + if (mbeanName != null) + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(mbeanName); + + if (log.isDebugEnabled()) + log.debug("Unregistered MBean: " + mbeanName); + } + catch (JMException e) { + U.error(log, "Failed to unregister MBean.", e); + } + + if (log.isInfoEnabled()) + log.info("TCP router successfully stopped."); + } + + /** + * Tries to start server with given parameters. + * + * @param hostAddr Host on which server should be bound. + * @param port Port on which server should be bound. + * @param lsnr Server message listener. + * @param parser Server message parser. + * @param tcpNoDelay Flag indicating whether TCP_NODELAY flag should be set for accepted connections. + * @param sslCtx SSL context in case if SSL is enabled. + * @param wantClientAuth Whether client will be requested for authentication. + * @param needClientAuth Whether client is required to be authenticated. + * @return {@code True} if server successfully started, {@code false} if port is used and + * server was unable to start. + */ + private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr, + GridNioParser parser, boolean tcpNoDelay, @Nullable SSLContext sslCtx, boolean wantClientAuth, + boolean needClientAuth) { + try { + GridNioFilter codec = new GridNioCodecFilter(parser, log, false); + + // This name is required to be unique in order to avoid collisions with + // ThreadWorkerGroups running in the same JVM by other routers/nodes. + String gridName = "router-" + id; + + GridNioFilter[] filters; + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log); + + sslFilter.wantClientAuth(wantClientAuth); + + sslFilter.needClientAuth(needClientAuth); + + filters = new GridNioFilter[] { codec, sslFilter }; + } + else + filters = new GridNioFilter[] { codec }; + + srv = GridNioServer.<GridClientMessage>builder() + .address(hostAddr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(Runtime.getRuntime().availableProcessors()) + .gridName(gridName) + .tcpNoDelay(tcpNoDelay) + .directBuffer(false) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(0) + .socketReceiveBufferSize(0) + .sendQueueLimit(0) + .filters(filters) + .idleTimeout(cfg.getIdleTimeout()) + .build(); + + srv.start(); + + return true; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to start TCP router protocol on port " + port + ": " + e.getMessage()); + + srv = null; + + return false; + } + } + + /** + * Creates a client for forwarding messages to the grid. + * + * @param routerCfg Router configuration. + * @return Client instance. + * @throws GridClientException If client creation failed. + */ + private GridRouterClientImpl createClient(GridTcpRouterConfiguration routerCfg) throws GridClientException { + UUID clientId = UUID.randomUUID(); + + return new GridRouterClientImpl(clientId, routerCfg); + } + + /** {@inheritDoc} */ + @Override public String getHost() { + return bindHost; + } + + /** {@inheritDoc} */ + @Override public int getPort() { + return bindPort; + } + + /** {@inheritDoc} */ + @Override public boolean isSslEnabled() { + return cfg.getSslContextFactory() != null; + } + + /** {@inheritDoc} */ + @Override public boolean isSslClientAuth() { + return cfg.isSslClientAuth(); + } + + /** {@inheritDoc} */ + @Override public Collection<String> getServers() { + return cfg.getServers(); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** {@inheritDoc} */ + @Override public GridTcpRouterConfiguration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @Override public long getReceivedCount() { + return parser != null ? parser.getReceivedCount() : 0; + } + + /** {@inheritDoc} */ + @Override public long getSendCount() { + return parser != null ? parser.getSendCount() : 0; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridTcpRouterImpl that = (GridTcpRouterImpl)o; + + return id.equals(that.id); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java index 718231f,46bc955..c8d4348 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java @@@ -17,8 -17,8 +17,9 @@@ package org.apache.ignite.internal.processors.portable.os; + import org.apache.ignite.client.marshaller.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.marshaller.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.portable.*; import org.apache.ignite.portables.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java index 63b3a0f,3c99eb4..95495d7 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridLifecycleAwareSelfTest.java @@@ -18,8 -18,8 +18,9 @@@ package org.apache.ignite.internal; import org.apache.ignite.*; + import org.apache.ignite.client.ssl.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.client.ssl.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.logger.java.*; import org.apache.ignite.marshaller.optimized.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index c1dd3ff,e506271..a308ae9 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@@ -21,14 -21,13 +21,15 @@@ import junit.framework.* import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.consistenthash.*; -import org.apache.ignite.client.ssl.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.client.ssl.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; + import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/modules/hadoop/src/main/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocol.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e8d42f5d/pom.xml ----------------------------------------------------------------------