ACCUMULO-3574 Add an IT for testing transport caching.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1213ee2b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1213ee2b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1213ee2b Branch: refs/heads/1.6 Commit: 1213ee2beaa4ac970c7bb6ba84355bae049cb4c4 Parents: 6111043 Author: Josh Elser <els...@apache.org> Authored: Thu Feb 12 19:18:36 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Thu Feb 12 19:18:36 2015 -0500 ---------------------------------------------------------------------- .../core/client/impl/ThriftTransportKey.java | 8 +- .../core/client/impl/ThriftTransportPool.java | 4 +- .../accumulo/test/TransportCachingIT.java | 115 +++++++++++++++++++ 3 files changed, 124 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java index de33941..8e3ee47 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java @@ -19,7 +19,10 @@ package org.apache.accumulo.core.client.impl; import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.accumulo.core.util.SslConnectionParams; -class ThriftTransportKey { +import com.google.common.annotations.VisibleForTesting; + +@VisibleForTesting +public class ThriftTransportKey { private final String location; private final int port; private final long timeout; @@ -27,7 +30,8 @@ class ThriftTransportKey { private int hash = -1; - ThriftTransportKey(String location, long timeout, SslConnectionParams sslParams) { + @VisibleForTesting + public ThriftTransportKey(String location, long timeout, SslConnectionParams sslParams) { ArgumentChecker.notNull(location); String[] locationAndPort = location.split(":", 2); if (locationAndPort.length == 2) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index 575a537..33997e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@ -41,6 +41,7 @@ import org.apache.log4j.Logger; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import com.google.common.annotations.VisibleForTesting; import com.google.common.net.HostAndPort; public class ThriftTransportPool { @@ -420,7 +421,8 @@ public class ThriftTransportPool { return createNewTransport(cacheKey); } - Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException { + @VisibleForTesting + public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException { servers = new ArrayList<ThriftTransportKey>(servers); http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java new file mode 100644 index 0000000..ddbd3e8 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.ServerConfigurationUtil; +import org.apache.accumulo.core.client.impl.ThriftTransportKey; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.SslConnectionParams; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that {@link ThriftTransportPool} actually adheres to the cachedConnection argument + */ +public class TransportCachingIT extends AccumuloClusterIT { + private static final Logger log = LoggerFactory.getLogger(TransportCachingIT.class); + + @Test + public void testCachedTransport() { + Connector conn = getConnector(); + Instance instance = conn.getInstance(); + long rpcTimeout = DefaultConfiguration.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue()); + + // create list of servers + ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>(); + + // add tservers + ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS)) { + String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver; + byte[] data = ZooUtil.getLockData(zc, path); + if (data != null && !new String(data, UTF_8).equals("master")) + servers.add(new ThriftTransportKey(new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT), rpcTimeout, SslConnectionParams + .forClient(ServerConfigurationUtil.getConfiguration(instance)))); + } + + ThriftTransportPool pool = ThriftTransportPool.getInstance(); + TTransport first = null; + while (null == first) { + try { + // Get a transport (cached or not) + first = pool.getAnyTransport(servers, true).getSecond(); + } catch (TTransportException e) { + log.warn("Failed to obtain transport to " + servers); + } + } + + assertNotNull(first); + // Return it to unreserve it + pool.returnTransport(first); + + TTransport second = null; + while (null == second) { + try { + // Get a cached transport (should be the first) + second = pool.getAnyTransport(servers, true).getSecond(); + } catch (TTransportException e) { + log.warn("Failed obtain 2nd transport to " + servers); + } + } + + // We should get the same transport + assertTrue("Expected the first and second to be the same instance", first == second); + // Return the 2nd + pool.returnTransport(second); + + TTransport third = null; + while (null == third) { + try { + // Get a non-cached transport + third = pool.getAnyTransport(servers, false).getSecond(); + } catch (TTransportException e) { + log.warn("Failed obtain 2nd transport to " + servers); + } + } + + assertFalse("Expected second and third transport to be different instances", second == third); + pool.returnTransport(third); + } +}