http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java index 82112da,0000000..ac2fe86 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java @@@ -1,152 -1,0 +1,128 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * Descriptor of remote grid node. Use {@link GridClientCompute#nodes()} to + * get a full view over remote grid nodes. + */ +public interface GridClientNode { + /** + * Gets ID of a remote node. + * + * @return Node ID. + */ + public UUID nodeId(); + + /** + * Gets consistent globally unique node ID. Unlike {@link #nodeId()} method, + * this method returns consistent node ID which survives node restarts. + * + * @return Consistent globally unique node ID. + */ + public Object consistentId(); + + /** + * Gets list of REST TCP server addresses of remote node. + * + * @return REST TCP server addresses. + */ + public List<String> tcpAddresses(); + + /** + * Gets list of REST TCP server host names of remote node. + * + * @return REST TCP server host names. + */ + public List<String> tcpHostNames(); + + /** - * Gets list of REST HTTP server addresses of remote node. - * - * @return REST HTTP server addresses. - */ - @Deprecated - public List<String> jettyAddresses(); - - /** - * Gets list of REST HTTP server host names of remote node. - * - * @return REST HTTP server host names. - */ - @Deprecated - public List<String> jettyHostNames(); - - /** + * Gets client TCP port of remote node. + * + * @return Remote tcp port. + */ + public int tcpPort(); + + /** - * Gets client HTTP port of remote node. - * - * @return Remote http port. - */ - @Deprecated - public int httpPort(); - - /** + * Gets all attributes of remote node. Note that all system and + * environment properties are automatically includes in node + * attributes. User can also attach custom attributes and then + * use them to further filter remote nodes into virtual subgrids + * for task execution. + * + * @return All node attributes. + */ + public Map<String, Object> attributes(); + + /** + * Gets specific attribute of remote node. + * + * @param name Attribute name. + * @return Attribute value. + * @see #attributes() + */ + @Nullable public <T> T attribute(String name); + + /** + * Gets various dynamic metrics of remote node. + * + * @return Metrics of remote node. + */ + public GridClientNodeMetrics metrics(); + + /** + * Gets all configured caches and their types on remote node. + * + * @return Map in which key is a configured cache name on the node, + * value is mode of configured cache. + */ + public Map<String, GridClientCacheMode> caches(); + + /** + * Gets node replica count for consistent hash ring (valid only for + * {@code PARTITIONED} caches). + * + * @return Node replica count for consistent hash ring. + */ + public int replicaCount(); + + /** + * Gets collection of addresses on which REST binary protocol is bound. + * + * @param proto Protocol for which addresses are obtained. + * @param filterResolved Whether to filter resolved addresses ( {@link InetSocketAddress#isUnresolved()} + * returns {@code False} ) or not. + * @return List of addresses. + */ + public Collection<InetSocketAddress> availableAddresses(GridClientProtocol proto, boolean filterResolved); + + /** + * Indicates whether client can establish direct connection with this node. + * So it is guaranteed that that any request will take only one network + * 'hop' before it will be processed by a Grid node. + * + * @return {@code true} if node can be directly connected, + * {@code false} if request may be passed through a router. + */ + public boolean connectable(); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java index cab20d8,0000000..f64d14c mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java @@@ -1,30 -1,0 +1,26 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +/** + * Protocol that will be used when client connections are created. + */ +public enum GridClientProtocol { - /** Communication via HTTP protocol. */ - @Deprecated - HTTP, - + /** Communication via tcp binary protocol. */ + TCP +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java index e86aa42,0000000..e6e041e mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java @@@ -1,263 -1,0 +1,263 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.client.util.GridClientUtils.*; - import static org.apache.ignite.internal.GridNodeAttributes.*; ++import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * Compute projection implementation. + */ +class GridClientComputeImpl extends GridClientAbstractProjection<GridClientComputeImpl> implements GridClientCompute { + /** */ + private static final ThreadLocal<Boolean> KEEP_PORTABLES = new ThreadLocal<Boolean>() { + @Override protected Boolean initialValue() { + return false; + } + }; + + /** */ + private static final GridClientPredicate<GridClientNode> DAEMON = new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return "true".equals(e.<String>attribute(ATTR_DAEMON)); + } + }; + + /** */ + private static final GridClientPredicate<GridClientNode> NOT_DAEMON = new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return !"true".equals(e.<String>attribute(ATTR_DAEMON)); + } + }; + + /** Projection factory. */ + @SuppressWarnings("TypeMayBeWeakened") + private final GridClientComputeFactory prjFactory = new GridClientComputeFactory(); + + /** + * Creates a new compute projection. + * + * @param client Started client. + * @param nodes Nodes to be included in this projection. If {@code null}, + * then nodes from the current topology snapshot will be used. + * @param nodeFilter Node filter to be used for this projection. If {@code null}, + * then no filter would be applied to the node list. + * @param balancer Balancer to be used in this projection. If {@code null}, + * then no balancer will be used. + */ + GridClientComputeImpl(GridClientImpl client, Collection<GridClientNode> nodes, + GridClientPredicate<? super GridClientNode> nodeFilter, GridClientLoadBalancer balancer) { + super(client, nodes, nodeFilter, balancer); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute projection(GridClientNode node) throws GridClientException { + A.notNull(node, "node"); + + return createProjection(Collections.singletonList(node), null, null, prjFactory); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute projection(GridClientPredicate<? super GridClientNode> filter) + throws GridClientException { + return createProjection(null, filter, null, prjFactory); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute projection(Collection<GridClientNode> nodes) throws GridClientException { + return createProjection(nodes, null, null, prjFactory); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute projection(GridClientPredicate<? super GridClientNode> filter, + GridClientLoadBalancer balancer) throws GridClientException { + return createProjection(null, filter, balancer, prjFactory); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute projection(Collection<GridClientNode> nodes, GridClientLoadBalancer balancer) + throws GridClientException { + return createProjection(nodes, null, balancer, prjFactory); + } + + /** {@inheritDoc} */ + @Override public GridClientLoadBalancer balancer() { + return balancer; + } + + /** {@inheritDoc} */ + @Override public <R> R execute(String taskName, Object taskArg) throws GridClientException { + return this.<R>executeAsync(taskName, taskArg).get(); + } + + /** {@inheritDoc} */ + @Override public <R> GridClientFuture<R> executeAsync(final String taskName, final Object taskArg) { + A.notNull(taskName, "taskName"); + + final boolean keepPortables = KEEP_PORTABLES.get(); + + KEEP_PORTABLES.set(false); + + return withReconnectHandling(new ClientProjectionClosure<R>() { + @Override public GridClientFuture<R> apply(GridClientConnection conn, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + return conn.execute(taskName, taskArg, destNodeId, keepPortables); + } + }); + } + + /** {@inheritDoc} */ + @Override public <R> R affinityExecute(String taskName, String cacheName, Object affKey, Object taskArg) + throws GridClientException { + return this.<R>affinityExecuteAsync(taskName, cacheName, affKey, taskArg).get(); + } + + /** {@inheritDoc} */ + @Override public <R> GridClientFuture<R> affinityExecuteAsync(final String taskName, String cacheName, + Object affKey, final Object taskArg) { + A.notNull(taskName, "taskName"); + + final boolean keepPortables = KEEP_PORTABLES.get(); + + KEEP_PORTABLES.set(false); + + return withReconnectHandling(new ClientProjectionClosure<R>() { + @Override public GridClientFuture<R> apply(GridClientConnection conn, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + return conn.execute(taskName, taskArg, destNodeId, keepPortables); + } + }, cacheName, affKey); + } + + /** {@inheritDoc} */ + @Override public GridClientNode node(UUID id) throws GridClientException { + A.notNull(id, "id"); + + return client.topology().node(id); + } + + /** + * Gets most recently refreshed topology. If this compute instance is a projection, + * then only nodes that satisfy projection criteria will be returned. + * + * @return Most recently refreshed topology. + */ + @Override public Collection<GridClientNode> nodes() throws GridClientException { + return applyFilter(projectionNodes(), NOT_DAEMON); + } + + /** {@inheritDoc} */ + @Override public Collection<GridClientNode> nodes(Collection<UUID> ids) throws GridClientException { + A.notNull(ids, "ids"); + + return client.topology().nodes(ids); + } + + /** {@inheritDoc} */ + @Override public Collection<GridClientNode> nodes(GridClientPredicate<GridClientNode> filter) + throws GridClientException { + A.notNull(filter, "filter"); + + return applyFilter(projectionNodes(), new GridClientAndPredicate<>(filter, NOT_DAEMON)); + } + + /** {@inheritDoc} */ + @Override public Collection<GridClientNode> daemonNodes() throws GridClientException { + return applyFilter(projectionNodes(), DAEMON); + } + + /** {@inheritDoc} */ + @Override public GridClientNode refreshNode(UUID id, boolean includeAttrs, boolean includeMetrics) + throws GridClientException { + return refreshNodeAsync(id, includeAttrs, includeMetrics).get(); + } + + /** {@inheritDoc} */ + @Override public GridClientFuture<GridClientNode> refreshNodeAsync(final UUID id, final boolean includeAttrs, + final boolean includeMetrics) { + A.notNull(id, "id"); + + return withReconnectHandling(new ClientProjectionClosure<GridClientNode>() { + @Override public GridClientFuture<GridClientNode> apply(GridClientConnection conn, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + return conn.node(id, includeAttrs, includeMetrics, destNodeId); + } + }); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridClientNode refreshNode(String ip, boolean includeAttrs, boolean inclMetrics) + throws GridClientException { + return refreshNodeAsync(ip, includeAttrs, inclMetrics).get(); + } + + /** {@inheritDoc} */ + @Override public GridClientFuture<GridClientNode> refreshNodeAsync(final String ip, final boolean inclAttrs, + final boolean includeMetrics) { + A.notNull(ip, "ip"); + + return withReconnectHandling(new ClientProjectionClosure<GridClientNode>() { + @Override public GridClientFuture<GridClientNode> apply(GridClientConnection conn, UUID destNodeId) + throws GridClientConnectionResetException, GridClientClosedException { + return conn.node(ip, inclAttrs, includeMetrics, destNodeId); + } + }); + } + + /** {@inheritDoc} */ + @Override public List<GridClientNode> refreshTopology(boolean includeAttrs, boolean includeMetrics) + throws GridClientException { + return refreshTopologyAsync(includeAttrs, includeMetrics).get(); + } + + /** {@inheritDoc} */ + @Override public GridClientFuture<List<GridClientNode>> refreshTopologyAsync(final boolean inclAttrs, + final boolean includeMetrics) { + return withReconnectHandling(new ClientProjectionClosure<List<GridClientNode>>() { + @Override public GridClientFuture<List<GridClientNode>> apply(GridClientConnection conn, UUID destNodeId) + throws GridClientConnectionResetException, + GridClientClosedException { + return conn.topology(inclAttrs, includeMetrics, destNodeId); + } + }); + } + + /** {@inheritDoc} */ + @Override public GridClientCompute withKeepPortables() { + KEEP_PORTABLES.set(true); + + return this; + } + + /** {@inheritDoc} */ + private class GridClientComputeFactory implements ProjectionFactory<GridClientComputeImpl> { + /** {@inheritDoc} */ + @Override public GridClientComputeImpl create(Collection<GridClientNode> nodes, + GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer) { + return new GridClientComputeImpl(client, nodes, filter, balancer); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java index fb843dc,0000000..9b9f22a mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java @@@ -1,524 -1,0 +1,527 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.balancer.*; +import org.apache.ignite.internal.client.impl.connection.*; +import org.apache.ignite.internal.client.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; +import java.lang.reflect.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.logging.*; + - import static org.apache.ignite.internal.GridNodeAttributes.*; ++import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * Client implementation. + */ +public class GridClientImpl implements GridClient { + /** Enterprise connection manager class name. */ + private static final String ENT_CONN_MGR_CLS = + "org.apache.ignite.internal.client.impl.connection.GridClientConnectionManagerEntImpl"; + + /** Null mask object. */ + private static final Object NULL_MASK = new Object(); + + /** Logger. */ + private static final Logger log = Logger.getLogger(GridClientImpl.class.getName()); + + /** */ + static { + boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null; + + try { + if (isLog4jUsed) + U.addLog4jNoOpLogger(); + else + U.addJavaNoOpLogger(); + } + catch (IgniteCheckedException ignored) { + // Our log4j warning suppression failed, leave it as is. + } + } + + /** Client ID. */ + private final UUID id; + + /** Client configuration. */ + protected final GridClientConfiguration cfg; + + /** SSL context if ssl enabled. */ + private SSLContext sslCtx; + + /** Main compute projection. */ + private final GridClientComputeImpl compute; + + /** Data projections. */ + private ConcurrentMap<Object, GridClientDataImpl> dataMap = new ConcurrentHashMap<>(); + + /** Topology. */ + protected GridClientTopology top; + + /** Topology updater thread. */ + private final Thread topUpdateThread; + + /** Closed flag. */ + private AtomicBoolean closed = new AtomicBoolean(); + + /** Connection manager. */ + protected GridClientConnectionManager connMgr; + + /** Routers. */ + private final Collection<InetSocketAddress> routers; + + /** Servers. */ + private final Collection<InetSocketAddress> srvs; + + /** + * Creates a new client based on a given configuration. + * + * @param id Client identifier. + * @param cfg0 Client configuration. - * @throws org.apache.ignite.internal.client.GridClientException If client configuration is incorrect. ++ * @param routerClient Router client flag. ++ * @throws GridClientException If client configuration is incorrect. + * @throws GridServerUnreachableException If none of the servers specified in configuration can + * be reached. + */ + @SuppressWarnings("CallToThreadStartDuringObjectConstruction") - public GridClientImpl(UUID id, GridClientConfiguration cfg0) throws GridClientException { ++ public GridClientImpl(UUID id, GridClientConfiguration cfg0, boolean routerClient) throws GridClientException { + this.id = id; + + cfg = new GridClientConfiguration(cfg0); + + boolean success = false; + + try { + top = new GridClientTopology(cfg); + + for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) { + GridClientDataAffinity aff = dataCfg.getAffinity(); + + if (aff instanceof GridClientTopologyListener) + addTopologyListener((GridClientTopologyListener)aff); + } + + if (cfg.getBalancer() instanceof GridClientTopologyListener) + top.addTopologyListener((GridClientTopologyListener)cfg.getBalancer()); + + GridSslContextFactory factory = cfg.getSslContextFactory(); + + if (factory != null) { + try { + sslCtx = factory.createSslContext(); + } + catch (SSLException e) { + throw new GridClientException("Failed to create client (unable to create SSL context, " + + "check ssl context factory configuration): " + e.getMessage(), e); + } + } + + if (cfg.isAutoFetchMetrics() && !cfg.isEnableMetricsCache()) + log.warning("Auto-fetch for metrics is enabled without enabling caching for them."); + + if (cfg.isAutoFetchAttributes() && !cfg.isEnableAttributesCache()) + log.warning( + "Auto-fetch for node attributes is enabled without enabling caching for them."); + + srvs = parseAddresses(cfg.getServers()); + routers = parseAddresses(cfg.getRouters()); + + if (srvs.isEmpty() && routers.isEmpty()) + throw new GridClientException("Servers addresses and routers addresses cannot both be empty " + + "for client (please fix configuration and restart): " + this); + + if (!srvs.isEmpty() && !routers.isEmpty()) + throw new GridClientException("Servers addresses and routers addresses cannot both be provided " + + "for client (please fix configuration and restart): " + this); + - connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null); ++ connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null, routerClient); + + try { + // Init connection manager, it should cause topology update. + tryInitTopology(); + } + catch (GridClientException e) { + top.fail(e); + + log.warning("Failed to initialize topology on client start. Will retry in background."); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new GridClientException("Client startup was interrupted.", e); + } + + topUpdateThread = new TopologyUpdaterThread(); + + topUpdateThread.setDaemon(true); + + topUpdateThread.start(); + + compute = new GridClientComputeImpl(this, null, null, cfg.getBalancer()); + + if (log.isLoggable(Level.INFO)) + log.info("Client started [id=" + id + ", protocol=" + cfg.getProtocol() + ']'); + + success = true; + } + finally { + if (!success) + stop(false); + } + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return id; + } + + /** + * Closes client. + * @param waitCompletion If {@code true} will wait for all pending requests to be proceeded. + */ + public void stop(boolean waitCompletion) { + if (closed.compareAndSet(false, true)) { + // Shutdown the topology refresh thread. + if (topUpdateThread != null) + topUpdateThread.interrupt(); + + // Shutdown listener notification. + if (top != null) + top.shutdown(); + + if (connMgr != null) + connMgr.stop(waitCompletion); + + for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) { + GridClientDataAffinity aff = dataCfg.getAffinity(); + + if (aff instanceof GridClientTopologyListener) + removeTopologyListener((GridClientTopologyListener)aff); + } + + if (log.isLoggable(Level.INFO)) + log.info("Client stopped [id=" + id + ", waitCompletion=" + waitCompletion + ']'); + } + } + + /** {@inheritDoc} */ + @Override public GridClientData data() throws GridClientException { + return data(null); + } + + /** {@inheritDoc} */ + @Override public GridClientData data(@Nullable final String cacheName) throws GridClientException { + checkClosed(); + + Object key = maskNull(cacheName); + + GridClientDataImpl data = dataMap.get(key); + + if (data == null) { + GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName); + + if (dataCfg == null && cacheName != null) + throw new GridClientException("Data configuration for given cache name was not provided: " + + cacheName); + + GridClientLoadBalancer balancer = dataCfg != null ? dataCfg.getPinnedBalancer() : + new GridClientRandomBalancer(); + + GridClientPredicate<GridClientNode> cacheNodes = new GridClientPredicate<GridClientNode>() { + @Override public boolean apply(GridClientNode e) { + return e.caches().containsKey(cacheName); + } + + @Override public String toString() { + return "GridClientHasCacheFilter [cacheName=" + cacheName + "]"; + } + }; + + data = new GridClientDataImpl( + cacheName, this, null, cacheNodes, balancer, null, cfg.isEnableMetricsCache()); + + GridClientDataImpl old = dataMap.putIfAbsent(key, data); + + if (old != null) + data = old; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public GridClientCompute compute() { + return compute; + } + + /** {@inheritDoc} */ + @Override public void addTopologyListener(GridClientTopologyListener lsnr) { + top.addTopologyListener(lsnr); + } + + /** {@inheritDoc} */ + @Override public void removeTopologyListener(GridClientTopologyListener lsnr) { + top.removeTopologyListener(lsnr); + } + + /** {@inheritDoc} */ + @Override public Collection<GridClientTopologyListener> topologyListeners() { + return top.topologyListeners(); + } + + /** {@inheritDoc} */ + @Override public boolean connected() { + return !top.failed(); + } + + /** {@inheritDoc} */ + @Override public void close() { + GridClientFactory.stop(id); + } + + /** + * Gets topology instance. + * + * @return Topology instance. + */ + public GridClientTopology topology() { + return top; + } + + /** + * @return Connection manager. + */ + public GridClientConnectionManager connectionManager() { + return connMgr; + } + + /** + * Gets data affinity for a given cache name. + * + * @param cacheName Name of cache for which affinity is obtained. Data configuration with this name + * must be configured at client startup. + * @return Data affinity object. + * @throws IllegalArgumentException If client data with given name was not configured. + */ + GridClientDataAffinity affinity(String cacheName) { + GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName); + + return dataCfg == null ? null : dataCfg.getAffinity(); + } + + /** + * Checks and throws an exception if this client was closed. + * + * @throws GridClientClosedException If client was closed. + */ + private void checkClosed() throws GridClientClosedException { + if (closed.get()) + throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore)."); + } + + /** + * Masks null cache name with unique object. + * + * @param cacheName Name to be masked. + * @return Original name or some unique object if name is null. + */ + private Object maskNull(String cacheName) { + return cacheName == null ? NULL_MASK : cacheName; + } + + /** + * Maps Collection of strings to collection of {@code InetSocketAddress}es. + * + * @param cfgAddrs Collection fo string representations of addresses. + * @return Collection of {@code InetSocketAddress}es + * @throws GridClientException In case of error. + */ + private static Collection<InetSocketAddress> parseAddresses(Collection<String> cfgAddrs) + throws GridClientException { + Collection<InetSocketAddress> addrs = new ArrayList<>(cfgAddrs.size()); + + for (String srvStr : cfgAddrs) { + try { + String[] split = srvStr.split(":"); + + InetSocketAddress addr = new InetSocketAddress(split[0], Integer.parseInt(split[1])); + + addrs.add(addr); + } + catch (RuntimeException e) { + throw new GridClientException("Failed to create client (invalid server address specified): " + + srvStr, e); + } + } + + return Collections.unmodifiableCollection(addrs); + } + + /** + * @return New connection manager based on current client settings. + * @throws GridClientException If failed to start connection server. + */ - public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId) throws GridClientException { - return createConnectionManager(id, sslCtx, cfg, routers, top, marshId); ++ public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId, boolean routerClient) ++ throws GridClientException { ++ return createConnectionManager(id, sslCtx, cfg, routers, top, marshId, routerClient); + } + + /** + * @param clientId Client ID. + * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. + * @param cfg Client configuration. + * @param routers Routers or empty collection to use endpoints from topology info. + * @param top Topology. + * @throws GridClientException In case of error. + */ + private GridClientConnectionManager createConnectionManager(UUID clientId, SSLContext sslCtx, + GridClientConfiguration cfg, Collection<InetSocketAddress> routers, GridClientTopology top, - @Nullable Byte marshId) ++ @Nullable Byte marshId, boolean routerClient) + throws GridClientException { + GridClientConnectionManager mgr; + + try { + Class<?> cls = Class.forName(ENT_CONN_MGR_CLS); + + Constructor<?> cons = cls.getConstructor(UUID.class, SSLContext.class, GridClientConfiguration.class, + Collection.class, GridClientTopology.class, Byte.class); + - mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId); ++ mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId, ++ routerClient); + } + catch (ClassNotFoundException ignored) { - mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId); ++ mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId, routerClient); + } + catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new GridClientException("Failed to create client connection manager.", e); + } + + return mgr; + } + + /** + * Tries to init client topology using configured set of servers or routers. + * + * @throws GridClientException If initialisation failed. + * @throws InterruptedException If initialisation was interrupted. + */ + private void tryInitTopology() throws GridClientException, InterruptedException { + boolean hasSrvs = routers.isEmpty(); + + final Collection<InetSocketAddress> connSrvs = (hasSrvs) ? new LinkedHashSet<>(srvs) : routers; + + if (hasSrvs) { + // Add REST endpoints for all nodes from previous topology snapshot. + try { + for (GridClientNodeImpl node : top.nodes()) { + Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true); + + List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size()); + + for (InetSocketAddress endpoint : endpoints) + if (!endpoint.isUnresolved()) + resolvedEndpoints.add(endpoint); + + boolean sameHost = node.attributes().isEmpty() || + F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", ")); + + if (sameHost) { + Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true)); + + connSrvs.addAll(resolvedEndpoints); + } + else { + for (InetSocketAddress endpoint : resolvedEndpoints) + if (!endpoint.getAddress().isLoopbackAddress()) + connSrvs.add(endpoint); + } + } + } + catch (GridClientDisconnectedException ignored) { + // Ignore if latest topology update failed. + } + } + + connMgr.init(connSrvs); + + Map<String, GridClientCacheMode> overallCaches = new HashMap<>(); + + for (GridClientNodeImpl node : top.nodes()) + overallCaches.putAll(node.caches()); + + for (Map.Entry<String, GridClientCacheMode> entry : overallCaches.entrySet()) { + GridClientDataAffinity affinity = affinity(entry.getKey()); + + if (affinity instanceof GridClientPartitionAffinity && entry.getValue() != + GridClientCacheMode.PARTITIONED) + log.warning(GridClientPartitionAffinity.class.getSimpleName() + " is used for a cache configured " + + "for non-partitioned mode [cacheName=" + entry.getKey() + ", cacheMode=" + entry.getValue() + ']'); + } + } + + /** + * Thread that updates topology according to refresh interval specified in configuration. + */ + @SuppressWarnings("BusyWait") + private class TopologyUpdaterThread extends Thread { + /** + * Creates topology refresh thread. + */ + private TopologyUpdaterThread() { + super(id + "-topology-update"); + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + while (!isInterrupted()) { + Thread.sleep(cfg.getTopologyRefreshFrequency()); + + try { + tryInitTopology(); + } + catch (GridClientException e) { + top.fail(e); + + if (log.isLoggable(Level.FINE)) + log.fine("Failed to update topology: " + e.getMessage()); + } + } + } + catch (InterruptedException ignored) { + // Client is shutting down. + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "GridClientImpl [id=" + id + ", closed=" + closed + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java index c6d4e1c,0000000..6692550 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java @@@ -1,411 -1,0 +1,396 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Client node implementation. + */ +public class GridClientNodeImpl implements GridClientNode { + /** Node id. */ + private UUID nodeId; + + /** Consistent ID. */ + private Object consistentId; + + /** REST TCP server addresses. */ + private List<String> tcpAddrs = Collections.emptyList(); + + /** REST TCP server host names. */ + private List<String> tcpHostNames = Collections.emptyList(); + + /** Port for TCP rest binary protocol. */ + private int tcpPort; + + /** Node attributes. */ + private Map<String, Object> attrs = Collections.emptyMap(); + + /** Node metrics. */ + private GridClientNodeMetrics metrics; + + /** Node caches. */ + private Map<String, GridClientCacheMode> caches = Collections.emptyMap(); + + /** Replica count for partitioned cache. */ + private int replicaCnt; + + /** Connectable property. */ + private boolean connectable; + + /** Cache for REST TCP socket addresses. */ + private final AtomicReference<Collection<InetSocketAddress>> tcpSockAddrs = new AtomicReference<>(); + + /** + * Default constructor (private). + */ + private GridClientNodeImpl() { + // No-op. + } + + /** + * Creates and returns a builder for a new instance + * of this class. + * + * @return Builder for new instance. + */ + public static Builder builder() { + return new Builder(new GridClientNodeImpl()); + } + + /** + * Creates and returns a builder for a new instance + * of this class, copying data from an input instance. + * + * @param from Instance to copy data from. + * @param skipAttrs Whether to skip attributes. + * @param skipMetrics Whether to skip metrics. + * @return Builder for new instance. + */ + public static Builder builder(GridClientNode from, boolean skipAttrs, boolean skipMetrics) { + Builder b = new Builder(new GridClientNodeImpl()) + .nodeId(from.nodeId()) + .consistentId(from.consistentId()) + .tcpAddresses(from.tcpAddresses()) + .tcpPort(from.tcpPort()) + .caches(from.caches()) + .replicaCount(from.replicaCount()) + .connectable(from.connectable()); + + if (!skipAttrs) + b.attributes(from.attributes()); + + if (!skipMetrics) + b.metrics(from.metrics()); + + return b; + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public Object consistentId() { + return consistentId; + } + + /** {@inheritDoc} */ + @Override public List<String> tcpAddresses() { + return tcpAddrs; + } + + /** {@inheritDoc} */ + @Override public List<String> tcpHostNames() { + return tcpHostNames; + } + + /** {@inheritDoc} */ - @Override public List<String> jettyAddresses() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public List<String> jettyHostNames() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ + @Override public int tcpPort() { + return tcpPort; + } + + /** {@inheritDoc} */ - @Override public int httpPort() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ + @Override public Map<String, Object> attributes() { + return Collections.unmodifiableMap(attrs); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public <T> T attribute(String name) { + return (T)attrs.get(name); + } + + /** {@inheritDoc} */ + @Override public GridClientNodeMetrics metrics() { + return metrics; + } + + /** {@inheritDoc} */ + @Override public Map<String, GridClientCacheMode> caches() { + return caches; + } + + /** {@inheritDoc} */ + @Override public int replicaCount() { + return replicaCnt; + } + + /** {@inheritDoc} */ + @Override public Collection<InetSocketAddress> availableAddresses(GridClientProtocol proto, + boolean filterResolved) { + Collection<String> addrs; + Collection<String> hostNames; + AtomicReference<Collection<InetSocketAddress>> addrsCache; + final int port; + + if (proto == GridClientProtocol.TCP) { + addrsCache = tcpSockAddrs; + addrs = tcpAddrs; + hostNames = tcpHostNames; + port = tcpPort; + } + else + throw new AssertionError("Unknown protocol: " + proto); + + Collection<InetSocketAddress> addrs0 = addrsCache.get(); + + if (addrs0 != null) + return filterIfNecessary(addrs0, filterResolved); + + addrs0 = U.toSocketAddresses(addrs, hostNames, port); + + if (!addrsCache.compareAndSet(null, addrs0)) + return filterIfNecessary(addrsCache.get(), filterResolved); + + return filterIfNecessary(addrs0, filterResolved); + } + + /** + * Filters sockets with resolved addresses. + * + * @param addrs Addresses to filter. + * @param filter Flag indicating whether filter should be applied or not. + * @return Collection copy without unresolved addresses if flag is set and collection itself otherwise. + */ + private Collection<InetSocketAddress> filterIfNecessary(Collection<InetSocketAddress> addrs, boolean filter) { + if (!filter) + return addrs; + + List<InetSocketAddress> res = new ArrayList<>(addrs.size()); + + for (InetSocketAddress addr : addrs) + if (!addr.isUnresolved()) + res.add(addr); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean connectable() { + return connectable; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof GridClientNodeImpl)) return false; + + GridClientNodeImpl that = (GridClientNodeImpl)o; + + return nodeId.equals(that.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "GridClientNodeImpl [nodeId=" + nodeId + + ", consistentId=" + consistentId + + ", tcpAddrs=" + tcpAddrs + + ", tcpHostNames=" + tcpHostNames + + ", binaryPort=" + tcpPort + + ']'; + } + + /** + * Builder for instances of this class. + */ + @SuppressWarnings("PublicInnerClass") + public static final class Builder { + /** */ + private GridClientNodeImpl impl; + + /** */ + private boolean built; + + /** + * @param impl Implementation reference to build. + */ + private Builder(GridClientNodeImpl impl) { + this.impl = impl; + } + + /** + * Finishes instance construction and returns a + * newly-built instance. + * + * @return A newly-built instance. + */ + public GridClientNodeImpl build() { + if (built) + throw new AssertionError("Instance already built."); + + built = true; + + return impl; + } + + /** + * Sets node ID. + * + * @param nodeId Node ID. + * @return This for chaining. + */ + public Builder nodeId(UUID nodeId) { + impl.nodeId = nodeId; + + return this; + } + + /** + * Sets node consistent ID. + * + * @param consistentId New consistent ID. + * @return This for chaining. + */ + public Builder consistentId(Object consistentId) { + impl.consistentId = consistentId; + + return this; + } + + /** + * Sets list of REST TCP server addresses. + * + * @param tcpAddrs List of address strings. + * @return This for chaining. + */ + public Builder tcpAddresses(Collection<String> tcpAddrs) { + impl.tcpAddrs = U.sealList(tcpAddrs); + + return this; + } + + /** + * Sets list of REST TCP server host names. + * + * @param tcpHostNames List of host names. + * @return This for chaining. + */ + public Builder tcpHostNames(Collection<String> tcpHostNames) { + impl.tcpHostNames = U.sealList(tcpHostNames); + + return this; + } + + /** + * Sets remote TCP port value. + * + * @param tcpPort Sets remote port value. + * @return This for chaining. + */ + public Builder tcpPort(int tcpPort) { + impl.tcpPort = tcpPort; + + return this; + } + + /** + * Sets node attributes. + * + * @param attrs Node attributes. + * @return This for chaining. + */ + public Builder attributes(Map<String, Object> attrs) { + impl.attrs = U.sealMap(attrs); + + return this; + } + + /** + * Sets node metrics. + * + * @param metrics Metrics. + * @return This for chaining. + */ + public Builder metrics(GridClientNodeMetrics metrics) { + impl.metrics = metrics; + + return this; + } + + /** + * Sets caches available on remote node. + * + * @param caches Cache map. + * @return This for chaining. + */ + public Builder caches(Map<String, GridClientCacheMode> caches) { + impl.caches = U.sealMap(caches); + + return this; + } + + + /** + * Sets replica count for node on consistent hash ring. + * + * @param replicaCnt Replica count. + * @return This for chaining. + */ + public Builder replicaCount(int replicaCnt) { + impl.replicaCnt = replicaCnt; + + return this; + } + + /** + * Sets connectable property. + * + * @param connectable Connectable value. + * @return This for chaining. + */ + public Builder connectable(boolean connectable) { + impl.connectable = connectable; + + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java index 0d8c2f7,0000000..7fa6b4c mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java @@@ -1,62 -1,0 +1,62 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl; + +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Delegating thread factory which forces all spawned thread to be daemons. + */ +public class GridClientThreadFactory implements ThreadFactory { + /** Pool number. */ + private static final AtomicInteger poolCtr = new AtomicInteger(1); + + /** Thread number. */ + private final AtomicInteger threadCtr = new AtomicInteger(1); + + /** Prefix. */ + private final String prefix; + + /** Daemon flag. */ + private final boolean daemon; + + /** + * Constructor. + * + * @param name Name prefix. + * @param daemon Daemon flag. + */ + public GridClientThreadFactory(String name, boolean daemon) { + this.daemon = daemon; + - prefix = "gridgain-client-" + name + "-" + poolCtr.getAndIncrement() + "-"; ++ prefix = "ignite-client-" + name + "-" + poolCtr.getAndIncrement() + "-"; + } + + /** {@inheritDoc} */ + @Override public Thread newThread(@NotNull Runnable r) { + Thread thread = new Thread(r, prefix + threadCtr.incrementAndGet()); + + if (daemon) + thread.setDaemon(true); + + return thread; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index e9a3267,0000000..eca5de0 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@@ -1,763 -1,0 +1,644 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl.connection; + +import org.apache.ignite.*; +import org.apache.ignite.internal.client.*; +import org.apache.ignite.internal.client.impl.*; +import org.apache.ignite.internal.client.util.*; +import org.apache.ignite.internal.processors.rest.client.message.*; - import org.apache.ignite.internal.util.direct.*; ++import org.apache.ignite.internal.processors.rest.protocols.tcp.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.nio.ssl.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.plugin.security.*; +import org.jetbrains.annotations.*; + +import javax.net.ssl.*; +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.logging.*; + +import static java.util.logging.Level.*; - import static org.apache.ignite.internal.GridNodeAttributes.*; +import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*; ++import static org.apache.ignite.internal.IgniteNodeAttributes.*; + +/** + * Cached connections manager. + */ +abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager { + /** Count of reconnect retries before init considered failed. */ + private static final int INIT_RETRY_CNT = 3; + + /** Initialization retry interval. */ + private static final int INIT_RETRY_INTERVAL = 1000; + + /** Class logger. */ + private final Logger log; + + /** NIO server. */ + private GridNioServer srv; + + /** Active connections. */ + private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>(); + + /** Active connections of nodes. */ + private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>(); + + /** SSL context. */ + private final SSLContext sslCtx; + + /** Client configuration. */ + protected final GridClientConfiguration cfg; + + /** Topology. */ + private final GridClientTopology top; + + /** Client id. */ + private final UUID clientId; + + /** Router endpoints to use instead of topology info. */ + private final Collection<InetSocketAddress> routers; + + /** Closed flag. */ + private volatile boolean closed; + + /** Shared executor service. */ + private final ExecutorService executor; + + /** Endpoint striped lock. */ + private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16); + + /** Service for ping requests, {@code null} if HTTP protocol is used. */ + private final ScheduledExecutorService pingExecutor; + + /** Marshaller ID. */ + private final Byte marshId; + - /** Message writer. */ - @SuppressWarnings("FieldCanBeLocal") - private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { - @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageWriter(this, nodeId); - - return msg.writeTo(buf); - } - - @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, - ByteBuffer buf) throws IOException { - assert msg != null; - assert out != null; - assert buf != null; - assert buf.hasArray(); - - msg.messageWriter(this, nodeId); - - boolean finished = false; - int cnt = 0; - - while (!finished) { - finished = msg.writeTo(buf); - - out.write(buf.array(), 0, buf.position()); - - cnt += buf.position(); - - buf.clear(); - } - - return cnt; - } - }; - + /** + * @param clientId Client ID. + * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. + * @param cfg Client configuration. + * @param routers Routers or empty collection to use endpoints from topology info. + * @param top Topology. + * @param marshId Marshaller ID. + * @throws GridClientException In case of error. + */ + @SuppressWarnings("unchecked") + protected GridClientConnectionManagerAdapter(UUID clientId, + SSLContext sslCtx, + GridClientConfiguration cfg, + Collection<InetSocketAddress> routers, + GridClientTopology top, - @Nullable Byte marshId) ++ @Nullable Byte marshId, ++ boolean routerClient) + throws GridClientException { + assert clientId != null : "clientId != null"; + assert cfg != null : "cfg != null"; + assert routers != null : "routers != null"; + assert top != null : "top != null"; + + this.clientId = clientId; + this.sslCtx = sslCtx; + this.cfg = cfg; + this.routers = new ArrayList<>(routers); + this.top = top; + + log = Logger.getLogger(getClass().getName()); + + executor = cfg.getExecutorService() != null ? cfg.getExecutorService() : + Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true)); + + pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null; + + this.marshId = marshId; + + if (marshId == null && cfg.getMarshaller() == null) + throw new GridClientException("Failed to start client (marshaller is not configured)."); + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + try { - IgniteLogger gridLog = new IgniteJavaLogger(false); ++ IgniteLogger gridLog = new JavaLogger(false); + + GridNioFilter[] filters; + - GridNioMessageReader msgReader = new GridNioMessageReader() { - @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, - ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageReader(this, nodeId); - - return msg.readFrom(buf); - } - - @Nullable @Override public GridTcpMessageFactory messageFactory() { - return null; - } - }; - - GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true); ++ GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(routerClient), gridLog, false); + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog); + - sslFilter.directMode(true); ++ sslFilter.directMode(false); + sslFilter.clientMode(true); + + filters = new GridNioFilter[]{codecFilter, sslFilter}; + } + else + filters = new GridNioFilter[]{codecFilter}; + + srv = GridNioServer.builder().address(U.getLocalHost()) + .port(-1) + .listener(new NioListener(log)) + .filters(filters) + .logger(gridLog) + .selectorCount(Runtime.getRuntime().availableProcessors()) + .sendQueueLimit(1024) + .byteOrder(ByteOrder.nativeOrder()) + .tcpNoDelay(cfg.isTcpNoDelay()) + .directBuffer(true) - .directMode(true) ++ .directMode(false) + .socketReceiveBufferSize(0) + .socketSendBufferSize(0) + .idleTimeout(Long.MAX_VALUE) - .gridName("gridClient") - .messageWriter(msgWriter) ++ .gridName(routerClient ? "routerClient" : "gridClient") + .daemon(cfg.isDaemon()) + .build(); + + srv.start(); + } + catch (IOException | IgniteCheckedException e) { + throw new GridClientException("Failed to start connection server.", e); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException { + init0(); + + GridClientException firstEx = null; + + for (int i = 0; i < INIT_RETRY_CNT; i++) { + Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs); + + while (!srvsCp.isEmpty()) { + GridClientConnection conn = null; + + try { + conn = connect(null, srvsCp); + + conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get(); + + return; + } + catch (GridServerUnreachableException e) { + // No connection could be opened to any of initial addresses - exit to retry loop. + assert conn == null : + "GridClientConnectionResetException was thrown from GridClientConnection#topology"; + + if (firstEx == null) + firstEx = e; + + break; + } + catch (GridClientConnectionResetException e) { + // Connection was established but topology update failed - + // trying other initial addresses if any. + assert conn != null : "GridClientConnectionResetException was thrown from connect()"; + + if (firstEx == null) + firstEx = e; + + if (!srvsCp.remove(conn.serverAddress())) + // We have misbehaving collection or equals - just exit to avoid infinite loop. + break; + } + } + + Thread.sleep(INIT_RETRY_INTERVAL); + } + + for (GridClientConnection c : conns.values()) { + conns.remove(c.serverAddress(), c); + + c.close(FAILED, false); + } + + throw firstEx; + } + + /** + * Additional initialization. + * + * @throws GridClientException In case of error. + */ + protected abstract void init0() throws GridClientException; + + /** + * Gets active communication facade. + * + * @param node Remote node to which connection should be established. + * @throws GridServerUnreachableException If none of the servers can be reached after the exception. + * @throws GridClientClosedException If client was closed manually. + * @throws InterruptedException If connection was interrupted. + */ + @Override public GridClientConnection connection(GridClientNode node) + throws GridClientClosedException, GridServerUnreachableException, InterruptedException { + assert node != null; + + // Use router's connections if defined. + if (!routers.isEmpty()) + return connection(null, routers); + + GridClientConnection conn = nodeConns.get(node.nodeId()); + + if (conn != null) { + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + closeIdle(); + else + return conn; + } + + // Use node's connection, if node is available over rest. + Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true); + + List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size()); + + for (InetSocketAddress endpoint : endpoints) + if (!endpoint.isUnresolved()) + resolvedEndpoints.add(endpoint); + + if (resolvedEndpoints.isEmpty()) { + throw new GridServerUnreachableException("No available endpoints to connect " + + "(is rest enabled for this node?): " + node); + } + + boolean sameHost = node.attributes().isEmpty() || + F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", ")); + + Collection<InetSocketAddress> srvs = new LinkedHashSet<>(); + + if (sameHost) { + Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true)); + + srvs.addAll(resolvedEndpoints); + } + else { + for (InetSocketAddress endpoint : resolvedEndpoints) + if (!endpoint.getAddress().isLoopbackAddress()) + srvs.add(endpoint); + } + + return connection(node.nodeId(), srvs); + } + + /** + * Returns connection to one of the given addresses. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs Collection of addresses to connect to. + * @return Connection to use for operations, targeted for the given node. + * @throws GridServerUnreachableException If connection can't be established. + * @throws GridClientClosedException If connections manager has been closed already. + * @throws InterruptedException If connection was interrupted. + */ + public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) + throws GridServerUnreachableException, GridClientClosedException, InterruptedException { + if (srvs == null || srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid" + + " (address list is empty)."); + + checkClosed(); + + // Search for existent connection. + for (InetSocketAddress endPoint : srvs) { + assert endPoint != null; + + GridClientConnection conn = conns.get(endPoint); + + if (conn == null) + continue; + + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + closeIdle(); + + continue; + } + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + + return connect(nodeId, srvs); + } + + /** + * Creates a connected facade and returns it. Called either from constructor or inside + * a write lock. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs List of server addresses that this method will try to connect to. + * @return Established connection. + * @throws GridServerUnreachableException If none of the servers can be reached. + * @throws InterruptedException If connection was interrupted. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs) + throws GridServerUnreachableException, InterruptedException { + if (srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " + + "list is empty)."); + + Exception cause = null; + + for (InetSocketAddress srv : srvs) { + try { + return connect(nodeId, srv); + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + if (cause == null) + cause = e; + else if (log.isLoggable(INFO)) + log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']'); + } + } + + assert cause != null; + + throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause); + } + + /** + * Create new connection to specified server. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param addr Remote socket to connect. + * @return Established connection. + * @throws IOException If connection failed. + * @throws GridClientException If protocol error happened. + * @throws InterruptedException If thread was interrupted before connection was established. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr) + throws IOException, GridClientException, InterruptedException { + endpointStripedLock.lock(addr); + + try { + GridClientConnection old = conns.get(addr); + + if (old != null) { + if (old.isClosed()) { + conns.remove(addr, old); + + if (nodeId != null) + nodeConns.remove(nodeId, old); + } + else { + if (nodeId != null) + nodeConns.put(nodeId, old); + + return old; + } + } + + GridSecurityCredentials cred = null; + + try { + if (cfg.getSecurityCredentialsProvider() != null) + cred = cfg.getSecurityCredentialsProvider().credentials(); + } + catch (IgniteCheckedException e) { + throw new GridClientException("Failed to obtain client credentials.", e); + } + + GridClientConnection conn; + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor, + cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(), + cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal()); + } + else + throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " + + cfg.getProtocol()); + + old = conns.putIfAbsent(addr, conn); + + assert old == null; + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + finally { + endpointStripedLock.unlock(addr); + } + } + + /** + * @return Get thread local used to enable keep portables mode. + */ + protected ThreadLocal<Boolean> keepPortablesThreadLocal() { + return null; + } + + /** {@inheritDoc} */ + @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) { + if (log.isLoggable(Level.FINE)) + log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" + + conn.serverAddress() + ", errMsg=" + e.getMessage() + ']'); + + closeIdle(); + + conn.close(FAILED, false); + } + + /** + * Closes all opened connections. + * + * @param waitCompletion If {@code true} waits for all pending requests to be proceeded. + */ + @SuppressWarnings("TooBroadScope") + @Override public void stop(boolean waitCompletion) { + Collection<GridClientConnection> closeConns; + + if (closed) + return; + + // Mark manager as closed. + closed = true; + + // Remove all connections from cache. + closeConns = new ArrayList<>(conns.values()); + + conns.clear(); + + nodeConns.clear(); + + // Close old connection outside the writer lock. + for (GridClientConnection conn : closeConns) + conn.close(CLIENT_CLOSED, waitCompletion); + + if (pingExecutor != null) + GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log); + + GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log); + + if (srv != null) + srv.stop(); + } + + /** + * Close all connections idling for more then + * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void closeIdle() { + for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<UUID, GridClientConnection> entry = it.next(); + + GridClientConnection conn = entry.getValue(); + + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + conns.remove(conn.serverAddress(), conn); + + nodeConns.remove(entry.getKey(), conn); + } + } + + for (GridClientConnection conn : conns.values()) + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + conns.remove(conn.serverAddress(), conn); + } + + /** + * Checks and throws an exception if this client was closed. + * + * @throws GridClientClosedException If client was closed. + */ + private void checkClosed() throws GridClientClosedException { + if (closed) + throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore)."); + } + + /** + */ + private static class NioListener implements GridNioServerListener { + /** */ + private final Logger log; + + /** + * @param log Logger. + */ + private NioListener(Logger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Session connected: " + ses); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (log.isLoggable(Level.FINE)) + log.fine("Session disconnected: " + ses); + + GridClientFutureAdapter<Boolean> handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) + handshakeFut.onDone( + new GridClientConnectionResetException("Failed to perform handshake (connection failed).")); + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + if (conn != null) + conn.close(FAILED, false); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, Object msg) { + GridClientFutureAdapter<Boolean> handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) { + assert msg instanceof GridClientHandshakeResponse; + + handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg); + } + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + assert conn != null; + - if (msg instanceof GridClientMessageWrapper) { - GridClientMessageWrapper req = (GridClientMessageWrapper)msg; - - if (req.messageSize() != 0) { - assert req.message() != null; - - conn.handleResponse(req); - } - else - conn.handlePingResponse(); - } - else { - assert msg instanceof GridClientPingPacket : msg; - ++ if (msg instanceof GridClientPingPacket) + conn.handlePingResponse(); ++ else { ++ try { ++ conn.handleResponse((GridClientMessage)msg); ++ } ++ catch (IOException e) { ++ log.log(Level.SEVERE, "Failed to parse response.", e); ++ } + } + } + } + + /** + * Handles client handshake response. + * + * @param handshakeFut Future. + * @param msg A handshake response. + */ + private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut, + GridClientHandshakeResponse msg) { + byte rc = msg.resultCode(); + + if (rc != GridClientHandshakeResponse.OK.resultCode()) { + handshakeFut.onDone(new GridClientHandshakeException(rc, + "Handshake failed due to internal error (see server log for more details).")); + } + else + handshakeFut.onDone(true); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of write timeout."); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of idle timeout."); + + ses.close(); + } + } - - /** - * - */ - private static class NioParser implements GridNioParser { - /** Message metadata key. */ - private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** Message reader. */ - private final GridNioMessageReader msgReader; - - /** - * @param msgReader Message reader. - */ - NioParser(GridNioMessageReader msgReader) { - this.msgReader = msgReader; - } - - /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE); - - if (handshakeFut != null) { - byte code = buf.get(); - - return new GridClientHandshakeResponse(code); - } - - GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY); - - if (msg == null && buf.hasRemaining()) { - byte type = buf.get(); - - if (type == GridClientMessageWrapper.REQ_HEADER) - msg = new GridClientMessageWrapper(); - else - throw new IOException("Invalid message type: " + type); - } - - boolean finished = false; - - if (buf.hasRemaining()) - finished = msgReader.read(null, msg, buf); - - if (finished) - return msg; - else { - ses.addMeta(MSG_META_KEY, msg); - - return null; - } - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - // No encoding needed for direct messages. - throw new UnsupportedEncodingException(); - } - } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java index d16b235,0000000..6ffd50d mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java @@@ -1,47 -1,0 +1,48 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.impl.connection; + +import org.apache.ignite.internal.client.*; + +import javax.net.ssl.*; +import java.net.*; +import java.util.*; + +/** + * Open source version of connection manager. + */ +public class GridClientConnectionManagerOsImpl extends GridClientConnectionManagerAdapter { + /** + * @param clientId Client ID. + * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. + * @param cfg Client configuration. + * @param routers Routers or empty collection to use endpoints from topology info. + * @param top Topology. + * @throws GridClientException In case of error. + */ + public GridClientConnectionManagerOsImpl(UUID clientId, SSLContext sslCtx, GridClientConfiguration cfg, - Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId) throws GridClientException { - super(clientId, sslCtx, cfg, routers, top, marshId); ++ Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId, boolean routerClient) ++ throws GridClientException { ++ super(clientId, sslCtx, cfg, routers, top, marshId, routerClient); + } + + /** {@inheritDoc} */ + @Override protected void init0() throws GridClientException { + // No-op. + } +}