Repository: accumulo Updated Branches: refs/heads/master 64b9ad34a -> 826091460
ACCUMULO-3428 Switch over to using HostAndPort more Not a complete switch yet because the use of String to track addresses is deeply engrained in lots of core code still (ZooKeeperInstance, *TabletLocator and more). This at least consolidates ThriftUtil and TServerUtil to using HostAndPort consistently to match what the server processes are using to start themselves (Master, TServer) Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/82609146 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/82609146 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/82609146 Branch: refs/heads/master Commit: 8260914600b7e0d1e6ba1a8de0e1489b9074b198 Parents: 64b9ad3 Author: Josh Elser <els...@apache.org> Authored: Tue Dec 16 23:37:24 2014 -0500 Committer: Josh Elser <els...@apache.org> Committed: Wed Dec 17 11:30:03 2014 -0500 ---------------------------------------------------------------------- .../core/client/impl/ConditionalWriterImpl.java | 84 ++++++++++---------- .../client/impl/InstanceOperationsImpl.java | 8 +- .../accumulo/core/client/impl/MasterClient.java | 6 +- .../core/client/impl/ReplicationClient.java | 8 +- .../accumulo/core/client/impl/ServerClient.java | 7 +- .../core/client/impl/TableOperationsImpl.java | 11 ++- .../impl/TabletServerBatchReaderIterator.java | 15 ++-- .../client/impl/TabletServerBatchWriter.java | 13 +-- .../core/client/impl/ThriftScanner.java | 16 ++-- .../core/client/impl/ThriftTransportKey.java | 45 +++++------ .../core/client/impl/ThriftTransportPool.java | 25 +++--- .../accumulo/core/client/impl/Writer.java | 17 ++-- .../apache/accumulo/core/rpc/ThriftUtil.java | 28 ++----- .../accumulo/server/client/BulkImporter.java | 10 ++- .../server/util/VerifyTabletAssignments.java | 22 ++--- .../CloseWriteAheadLogReferences.java | 6 +- .../org/apache/accumulo/monitor/Monitor.java | 16 ++-- .../accumulo/monitor/servlets/ScanServlet.java | 9 ++- .../apache/accumulo/tserver/TabletServer.java | 8 +- .../replication/AccumuloReplicaSystem.java | 17 ++-- .../apache/accumulo/test/WrongTabletTest.java | 10 ++- .../org/apache/accumulo/test/TotalQueuedIT.java | 4 +- ...bageCollectorCommunicatesWithTServersIT.java | 4 +- 23 files changed, 203 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 7b84e45..8440c1c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -88,6 +88,8 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + class ConditionalWriterImpl implements ConditionalWriter { private static ThreadPoolExecutor cleanupThreadPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); @@ -242,9 +244,9 @@ class ConditionalWriterImpl implements ConditionalWriter { } } } - - private void queueRetry(List<QCMutation> mutations, String server) { - + + private void queueRetry(List<QCMutation> mutations, HostAndPort server) { + if (timeout < Long.MAX_VALUE) { long time = System.currentTimeMillis(); @@ -256,11 +258,11 @@ class ConditionalWriterImpl implements ConditionalWriter { if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) { TimedOutException toe; if (server != null) - toe = new TimedOutException(Collections.singleton(server)); + toe = new TimedOutException(Collections.singleton(server.toString())); else toe = new TimedOutException("Conditional mutation timed out"); - - qcm.queueResult(new Result(toe, qcm, server)); + + qcm.queueResult(new Result(toe, qcm, (null == server ? null : server.toString()))); } else { mutations2.add(qcm); } @@ -446,7 +448,7 @@ class ConditionalWriterImpl implements ConditionalWriter { try { TabletServerMutations<QCMutation> mutations = dequeue(location); if (mutations != null) - sendToServer(location, mutations); + sendToServer(HostAndPort.fromString(location), mutations); } finally { reschedule(this); } @@ -465,7 +467,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } private static class SessionID { - String location; + HostAndPort location; String lockId; long sessionID; boolean reserved; @@ -476,10 +478,10 @@ class ConditionalWriterImpl implements ConditionalWriter { return System.currentTimeMillis() - lastAccessTime < ttl * .95; } } - - private HashMap<String,SessionID> cachedSessionIDs = new HashMap<String,SessionID>(); - - private SessionID reserveSessionID(String location, TabletClientService.Iface client, TInfo tinfo) throws ThriftSecurityException, TException { + + private HashMap<HostAndPort,SessionID> cachedSessionIDs = new HashMap<HostAndPort,SessionID>(); + + private SessionID reserveSessionID(HostAndPort location, TabletClientService.Iface client, TInfo tinfo) throws ThriftSecurityException, TException { // avoid cost of repeatedly making RPC to create sessions, reuse sessions synchronized (cachedSessionIDs) { SessionID sid = cachedSessionIDs.get(location); @@ -513,15 +515,15 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - - private void invalidateSessionID(String location) { + + private void invalidateSessionID(HostAndPort location) { synchronized (cachedSessionIDs) { cachedSessionIDs.remove(location); } } - - private void unreserveSessionID(String location) { + + private void unreserveSessionID(HostAndPort location) { synchronized (cachedSessionIDs) { SessionID sid = cachedSessionIDs.get(location); if (sid != null) { @@ -540,8 +542,8 @@ class ConditionalWriterImpl implements ConditionalWriter { activeSessions.add(sid); return activeSessions; } - - private TabletClientService.Iface getClient(String location) throws TTransportException { + + private TabletClientService.Iface getClient(HostAndPort location) throws TTransportException { TabletClientService.Iface client; if (timeout < context.getClientTimeoutInMillis()) client = ThriftUtil.getTServerClient(location, context, timeout); @@ -549,8 +551,8 @@ class ConditionalWriterImpl implements ConditionalWriter { client = ThriftUtil.getTServerClient(location, context); return client; } - - private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) { + + private void sendToServer(HostAndPort location, TabletServerMutations<QCMutation> mutations) { TabletClientService.Iface client = null; TInfo tinfo = Tracer.traceInfo(); @@ -591,7 +593,7 @@ class ConditionalWriterImpl implements ConditionalWriter { extentsToInvalidate.add(cmk.ke); } else { QCMutation qcm = cmidToCm.get(tcmResult.cmid).cm; - qcm.queueResult(new Result(fromThrift(tcmResult.status), qcm, location)); + qcm.queueResult(new Result(fromThrift(tcmResult.status), qcm, location.toString())); } } @@ -606,12 +608,12 @@ class ConditionalWriterImpl implements ConditionalWriter { context.getInstance(), tableId), tse); queueException(location, cmidToCm, ase); } catch (TTransportException e) { - locator.invalidateCache(context.getInstance(), location); + locator.invalidateCache(context.getInstance(), location.toString()); invalidateSession(location, mutations, cmidToCm, sessionId); } catch (TApplicationException tae) { - queueException(location, cmidToCm, new AccumuloServerException(location, tae)); + queueException(location, cmidToCm, new AccumuloServerException(location.toString(), tae)); } catch (TException e) { - locator.invalidateCache(context.getInstance(), location); + locator.invalidateCache(context.getInstance(), location.toString()); invalidateSession(location, mutations, cmidToCm, sessionId); } catch (Exception e) { queueException(location, cmidToCm, e); @@ -621,27 +623,27 @@ class ConditionalWriterImpl implements ConditionalWriter { ThriftUtil.returnClient((TServiceClient) client); } } - - private void queueRetry(Map<Long,CMK> cmidToCm, String location) { + + private void queueRetry(Map<Long,CMK> cmidToCm, HostAndPort location) { ArrayList<QCMutation> ignored = new ArrayList<QCMutation>(); for (CMK cmk : cmidToCm.values()) ignored.add(cmk.cm); queueRetry(ignored, location); } - - private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) { + + private void queueException(HostAndPort location, Map<Long,CMK> cmidToCm, Exception e) { for (CMK cmk : cmidToCm.values()) - cmk.cm.queueResult(new Result(e, cmk.cm, location)); + cmk.cm.queueResult(new Result(e, cmk.cm, location.toString())); } - - private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, SessionID sessionId) { + + private void invalidateSession(HostAndPort location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, SessionID sessionId) { if (sessionId == null) { queueRetry(cmidToCm, location); } else { try { invalidateSession(sessionId, location); for (CMK cmk : cmidToCm.values()) - cmk.cm.queueResult(new Result(Status.UNKNOWN, cmk.cm, location)); + cmk.cm.queueResult(new Result(Status.UNKNOWN, cmk.cm, location.toString())); } catch (Exception e2) { queueException(location, cmidToCm, e2); } @@ -654,8 +656,8 @@ class ConditionalWriterImpl implements ConditionalWriter { * * If a conditional mutation is taking a long time to process, then this method will wait for it to finish... unless this exceeds timeout. */ - private void invalidateSession(SessionID sessionId, String location) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + private void invalidateSession(SessionID sessionId, HostAndPort location) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + long sleepTime = 50; long startTime = System.currentTimeMillis(); @@ -668,7 +670,7 @@ class ConditionalWriterImpl implements ConditionalWriter { if (!ZooLock.isLockHeld(zcf.getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), lid)) { // ACCUMULO-1152 added a tserver lock check to the tablet location cache, so this invalidation prevents future attempts to contact the // tserver even its gone zombie and is still running w/o a lock - locator.invalidateCache(context.getInstance(), location); + locator.invalidateCache(context.getInstance(), location.toString()); return; } @@ -678,22 +680,22 @@ class ConditionalWriterImpl implements ConditionalWriter { return; } catch (TApplicationException tae) { - throw new AccumuloServerException(location, tae); + throw new AccumuloServerException(location.toString(), tae); } catch (TException e) { - locator.invalidateCache(context.getInstance(), location); + locator.invalidateCache(context.getInstance(), location.toString()); } if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) - throw new TimedOutException(Collections.singleton(location)); - + throw new TimedOutException(Collections.singleton(location.toString())); + UtilWaitThread.sleep(sleepTime); sleepTime = Math.min(2 * sleepTime, MAX_SLEEP); } } - - private void invalidateSession(long sessionId, String location) throws TException { + + private void invalidateSession(long sessionId, HostAndPort location) throws TException { TabletClientService.Iface client = null; TInfo tinfo = Tracer.traceInfo(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java index a62496b..b95d8b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java @@ -48,6 +48,8 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import com.google.common.net.HostAndPort; + /** * Provides a class for administering the accumulo instance */ @@ -124,9 +126,10 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException { + final HostAndPort parsedTserver = HostAndPort.fromString(tserver); Client client = null; try { - client = ThriftUtil.getTServerClient(tserver, context); + client = ThriftUtil.getTServerClient(parsedTserver, context); List<ActiveScan> as = new ArrayList<ActiveScan>(); for (org.apache.accumulo.core.tabletserver.thrift.ActiveScan activeScan : client.getActiveScans(Tracer.traceInfo(), context.rpcCreds())) { @@ -161,9 +164,10 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public List<ActiveCompaction> getActiveCompactions(String tserver) throws AccumuloException, AccumuloSecurityException { + final HostAndPort parsedTserver = HostAndPort.fromString(tserver); Client client = null; try { - client = ThriftUtil.getTServerClient(tserver, context); + client = ThriftUtil.getTServerClient(parsedTserver, context); List<ActiveCompaction> as = new ArrayList<ActiveCompaction>(); for (org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction activeCompaction : client.getActiveCompactions(Tracer.traceInfo(), context.rpcCreds())) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java index 74b8ea9..a9ad8a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java @@ -34,6 +34,8 @@ import org.apache.log4j.Logger; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; +import com.google.common.net.HostAndPort; + public class MasterClient { private static final Logger log = Logger.getLogger(MasterClient.class); @@ -57,8 +59,8 @@ public class MasterClient { return null; } - String master = locations.get(0); - if (master.endsWith(":0")) + HostAndPort master = HostAndPort.fromString(locations.get(0)); + if (0 == master.getPort()) return null; try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java index edfba50..95b71ee 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java @@ -105,7 +105,7 @@ public class ReplicationClient { try { // Master requests can take a long time: don't ever time out - ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(), context); + ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr, context); return client; } catch (TTransportException tte) { log.debug("Failed to connect to master coordinator service ({})", coordinatorAddr, tte); @@ -122,7 +122,7 @@ public class ReplicationClient { * Server to connect to * @return A ReplicationServicer client to the given host in the given instance */ - public static ReplicationServicer.Client getServicerConnection(ClientContext context, String server) throws TTransportException { + public static ReplicationServicer.Client getServicerConnection(ClientContext context, HostAndPort server) throws TTransportException { checkNotNull(context); checkNotNull(server); @@ -202,7 +202,7 @@ public class ReplicationClient { } } - public static <T> T executeServicerWithReturn(ClientContext context, String tserver, ClientExecReturn<T,ReplicationServicer.Client> exec) + public static <T> T executeServicerWithReturn(ClientContext context, HostAndPort tserver, ClientExecReturn<T,ReplicationServicer.Client> exec) throws AccumuloException, AccumuloSecurityException, TTransportException { ReplicationServicer.Client client = null; while (true) { @@ -222,7 +222,7 @@ public class ReplicationClient { } } - public static void executeServicer(ClientContext context, String tserver, ClientExec<ReplicationServicer.Client> exec) throws AccumuloException, + public static void executeServicer(ClientContext context, HostAndPort tserver, ClientExec<ReplicationServicer.Client> exec) throws AccumuloException, AccumuloSecurityException, TTransportException { ReplicationServicer.Client client = null; try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java index 1e44727..84124ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java @@ -128,8 +128,11 @@ public class ServerClient { for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS)) { String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver; byte[] data = ZooUtil.getLockData(zc, path); - if (data != null && !new String(data, UTF_8).equals("master")) - servers.add(new ThriftTransportKey(new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT), rpcTimeout, context)); + if (data != null) { + String strData = new String(data, UTF_8); + if (!strData.equals("master")) + servers.add(new ThriftTransportKey(new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context)); + } } boolean opened = false; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java index 6d9ab46..dca40da 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -63,6 +62,7 @@ import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.FindMax; import org.apache.accumulo.core.client.admin.TableOperations; @@ -118,6 +118,7 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import com.google.common.base.Joiner; +import com.google.common.net.HostAndPort; public class TableOperationsImpl extends TableOperationsHelper { @@ -494,12 +495,14 @@ public class TableOperationsImpl extends TableOperationsHelper { continue; } + HostAndPort address = HostAndPort.fromString(tl.tablet_location); + try { - TabletClientService.Client client = ThriftUtil.getTServerClient(tl.tablet_location, context); + TabletClientService.Client client = ThriftUtil.getTServerClient(address, context); try { OpTimer opTimer = null; if (log.isTraceEnabled()) - opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + tl.tablet_location + " at " + split); + opTimer = new OpTimer(log, Level.TRACE).start("Splitting tablet " + tl.tablet_extent + " on " + address + " at " + split); client.splitTablet(Tracer.traceInfo(), context.rpcCreds(), tl.tablet_extent.toThrift(), TextUtil.getByteBuffer(split)); @@ -513,7 +516,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } } catch (TApplicationException tae) { - throw new AccumuloServerException(tl.tablet_location, tae); + throw new AccumuloServerException(address.toString(), tae); } catch (TTransportException e) { tabLocator.invalidateCache(context.getInstance(), tl.tablet_location); continue; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java index fb5c20b..eb80f8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java @@ -70,6 +70,8 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import com.google.common.net.HostAndPort; + public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> { private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class); @@ -608,12 +610,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value timeoutTracker.startingScan(); TTransport transport = null; try { - TabletClientService.Client client; + final HostAndPort parsedServer = HostAndPort.fromString(server); + final TabletClientService.Client client; if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) - client = ThriftUtil.getTServerClient(server, context, timeoutTracker.getTimeOut()); + client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut()); else - client = ThriftUtil.getTServerClient(server, context); - + client = ThriftUtil.getTServerClient(parsedServer, context); + try { OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + " #tablets=" + requested.size() + " #ranges=" @@ -628,8 +631,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Translator.translate(columns, Translators.CT), options.serverSideIteratorList, options.serverSideIteratorOptions, ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites); if (waitForWrites) - ThriftScanner.serversWaitedForWrites.get(ttype).add(server); - + ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString()); + MultiScanResult scanResult = imsr.result; opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index 30a707e..c54c2f1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -71,6 +71,8 @@ import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; +import com.google.common.net.HostAndPort; + /* * Differences from previous TabletServerBatchWriter * + As background threads finish sending mutations to tablet servers they decrement memory usage @@ -841,13 +843,14 @@ public class TabletServerBatchWriter { timeoutTracker.startingWrite(); try { - TabletClientService.Iface client; - + final HostAndPort parsedServer = HostAndPort.fromString(location); + final TabletClientService.Iface client; + if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) - client = ThriftUtil.getTServerClient(location, context, timeoutTracker.getTimeOut()); + client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut()); else - client = ThriftUtil.getTServerClient(location, context); - + client = ThriftUtil.getTServerClient(parsedServer, context); + try { MutationSet allFailures = new MutationSet(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index 90e4421..7b6284d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@ -65,6 +65,8 @@ import org.apache.log4j.Logger; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; +import com.google.common.net.HostAndPort; + public class ThriftScanner { private static final Logger log = Logger.getLogger(ThriftScanner.class); @@ -81,10 +83,11 @@ public class ThriftScanner { Authorizations authorizations, boolean retry) throws AccumuloException, AccumuloSecurityException, NotServingTabletException { if (server == null) throw new AccumuloException(new IOException()); - + + final HostAndPort parsedServer = HostAndPort.fromString(server); try { TInfo tinfo = Tracer.traceInfo(); - TabletClientService.Client client = ThriftUtil.getTServerClient(server, context); + TabletClientService.Client client = ThriftUtil.getTServerClient(parsedServer, context); try { // not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below ScanState scanState = new ScanState(context, extent.getTableId(), authorizations, range, fetchedColumns, size, serverSideIteratorList, @@ -375,10 +378,11 @@ public class ThriftScanner { return null; OpTimer opTimer = new OpTimer(log, Level.TRACE); - - TInfo tinfo = Tracer.traceInfo(); - TabletClientService.Client client = ThriftUtil.getTServerClient(loc.tablet_location, context); - + + final TInfo tinfo = Tracer.traceInfo(); + final HostAndPort parsedLocation = HostAndPort.fromString(loc.tablet_location); + TabletClientService.Client client = ThriftUtil.getTServerClient(parsedLocation, context); + String old = Thread.currentThread().getName(); try { ScanResult sr; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java index 176e947..6dc846f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java @@ -16,43 +16,34 @@ */ package org.apache.accumulo.core.client.impl; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.accumulo.core.rpc.SslConnectionParams; +import com.google.common.net.HostAndPort; + class ThriftTransportKey { - private final String location; - private final int port; + private final HostAndPort server; private final long timeout; private final SslConnectionParams sslParams; - + private int hash = -1; - - ThriftTransportKey(String location, long timeout, ClientContext context) { - checkArgument(location != null, "location is null"); - String[] locationAndPort = location.split(":", 2); - if (locationAndPort.length == 2) { - this.location = locationAndPort[0]; - this.port = Integer.parseInt(locationAndPort[1]); - } else - throw new IllegalArgumentException("Location was expected to contain port but did not. location=" + location); - + + ThriftTransportKey(HostAndPort server, long timeout, ClientContext context) { + checkNotNull(server, "location is null"); + this.server = server; this.timeout = timeout; this.sslParams = context.getClientSslParams(); } - - String getLocation() { - return location; - } - - int getPort() { - return port; + + HostAndPort getServer() { + return server; } - + long getTimeout() { return timeout; } - + public boolean isSsl() { return sslParams != null; } @@ -62,19 +53,19 @@ class ThriftTransportKey { if (!(o instanceof ThriftTransportKey)) return false; ThriftTransportKey ttk = (ThriftTransportKey) o; - return location.equals(ttk.location) && port == ttk.port && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams))); + return server.equals(ttk.server) && timeout == ttk.timeout && (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams))); } - + @Override public int hashCode() { if (hash == -1) hash = toString().hashCode(); return hash; } - + @Override public String toString() { - return (isSsl()?"ssl:":"") + location + ":" + Integer.toString(port) + " (" + Long.toString(timeout) + ")"; + return (isSsl() ? "ssl:" : "") + server + " (" + Long.toString(timeout) + ")"; } public SslConnectionParams getSslParams() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index 1220850..5da803b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@ -385,11 +385,7 @@ public class ThriftTransportPool { private ThriftTransportPool() {} - public TTransport getTransportWithDefaultTimeout(String addr, ClientContext context) throws TTransportException { - return getTransport(addr, context.getClientTimeoutInMillis(), context); - } - - public TTransport getTransport(String location, long milliseconds, ClientContext context) throws TTransportException { + public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context) throws TTransportException { return getTransport(new ThriftTransportKey(location, milliseconds, context)); } @@ -406,7 +402,7 @@ public class ThriftTransportPool { for (CachedConnection cachedConnection : ccl) { if (!cachedConnection.isReserved()) { cachedConnection.setReserved(true); - log.trace("Using existing connection to {}:{}", cacheKey.getLocation(), cacheKey.getPort()); + log.trace("Using existing connection to {}", cacheKey.getServer()); return cachedConnection.transport; } } @@ -435,8 +431,9 @@ public class ThriftTransportPool { for (CachedConnection cachedConnection : getCache().get(ttk)) { if (!cachedConnection.isReserved()) { cachedConnection.setReserved(true); - log.trace("Using existing connection to {}:{}", ttk.getLocation(), ttk.getPort()); - return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + final String serverAddr = ttk.getServer().toString(); + log.trace("Using existing connection to {}", serverAddr); + return new Pair<String,TTransport>(serverAddr, cachedConnection.transport); } } } @@ -456,8 +453,9 @@ public class ThriftTransportPool { for (CachedConnection cachedConnection : cachedConnList) { if (!cachedConnection.isReserved()) { cachedConnection.setReserved(true); - log.trace("Using existing connection to {}:{} timeout {}", ttk.getLocation(), ttk.getPort(), ttk.getTimeout()); - return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + final String serverAddr = ttk.getServer().toString(); + log.trace("Using existing connection to {} timeout {}", serverAddr, ttk.getTimeout()); + return new Pair<String,TTransport>(serverAddr, cachedConnection.transport); } } } @@ -465,7 +463,7 @@ public class ThriftTransportPool { } try { - return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk)); + return new Pair<String,TTransport>(ttk.getServer().toString(), createNewTransport(ttk)); } catch (TTransportException tte) { log.debug("Failed to connect to " + servers.get(index), tte); servers.remove(index); @@ -477,10 +475,9 @@ public class ThriftTransportPool { } private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException { - TTransport transport = ThriftUtil.createClientTransport(HostAndPort.fromParts(cacheKey.getLocation(), cacheKey.getPort()), (int) cacheKey.getTimeout(), - cacheKey.getSslParams()); + TTransport transport = ThriftUtil.createClientTransport(cacheKey.getServer(), (int) cacheKey.getTimeout(), cacheKey.getSslParams()); - log.trace("Creating new connection to connection to {}:{}", cacheKey.getLocation(), cacheKey.getPort()); + log.trace("Creating new connection to connection to {}", cacheKey.getServer()); CachedTTransport tsc = new CachedTTransport(transport, cacheKey); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java index d7761e9..552ddae 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java @@ -38,6 +38,8 @@ import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; +import com.google.common.net.HostAndPort; + public class Writer { private static final Logger log = Logger.getLogger(Writer.class); @@ -55,8 +57,8 @@ public class Writer { public Writer(ClientContext context, String table) { this(context, new Text(table)); } - - private static void updateServer(ClientContext context, Mutation m, KeyExtent extent, String server) throws TException, NotServingTabletException, + + private static void updateServer(ClientContext context, Mutation m, KeyExtent extent, HostAndPort server) throws TException, NotServingTabletException, ConstraintViolationException, AccumuloSecurityException { checkArgument(m != null, "m is null"); checkArgument(extent != null, "extent is null"); @@ -89,20 +91,21 @@ public class Writer { UtilWaitThread.sleep(500); continue; } - + + final HostAndPort parsedLocation = HostAndPort.fromString(tabLoc.tablet_location); try { - updateServer(context, m, tabLoc.tablet_extent, tabLoc.tablet_location); + updateServer(context, m, tabLoc.tablet_extent, parsedLocation); return; } catch (NotServingTabletException e) { - log.trace("Not serving tablet, server = " + tabLoc.tablet_location); + log.trace("Not serving tablet, server = " + parsedLocation); TabletLocator.getLocator(context, table).invalidateCache(tabLoc.tablet_extent); } catch (ConstraintViolationException cve) { - log.error("error sending update to " + tabLoc.tablet_location + ": " + cve); + log.error("error sending update to " + parsedLocation + ": " + cve); // probably do not need to invalidate cache, but it does not hurt TabletLocator.getLocator(context, table).invalidateCache(tabLoc.tablet_extent); throw cve; } catch (TException e) { - log.error("error sending update to " + tabLoc.tablet_location + ": " + e); + log.error("error sending update to " + parsedLocation + ": " + e); TabletLocator.getLocator(context, table).invalidateCache(tabLoc.tablet_extent); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index c95a62b..8b8304c 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -87,20 +87,6 @@ public class ThriftUtil { } /** - * Create a Thrift client using the given factory with a pooled transport (if available), the address and client context - * - * @param factory - * Thrift client factory - * @param address - * Server address for client to connect to - * @param context - * RPC options - */ - public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException { - return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address.toString(), context)); - } - - /** * Create a Thrift client using the given factory with a pooled transport (if available), the address, and client context with no timeout. * * @param factory @@ -110,7 +96,7 @@ public class ThriftUtil { * @param context * RPC options */ - public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, String address, ClientContext context) + public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException { return getClient(factory, address, context, 0); } @@ -126,7 +112,7 @@ public class ThriftUtil { * @param context * RPC options */ - public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context) throws TTransportException { + public static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context) throws TTransportException { TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context); return createClient(factory, transport); } @@ -143,7 +129,7 @@ public class ThriftUtil { * @param timeout * Socket timeout which overrides the ClientContext timeout */ - private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context, long timeout) + private static <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context, long timeout) throws TTransportException { TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context); return createClient(factory, transport); @@ -169,7 +155,7 @@ public class ThriftUtil { * @param context * RPC options */ - public static TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException { + public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context) throws TTransportException { return getClient(new TabletClientService.Client.Factory(), address, context); } @@ -183,7 +169,7 @@ public class ThriftUtil { * @param timeout * Socket timeout which overrides the ClientContext timeout */ - public static TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException { + public static TabletClientService.Client getTServerClient(HostAndPort address, ClientContext context, long timeout) throws TTransportException { return getClient(new TabletClientService.Client.Factory(), address, context, timeout); } @@ -198,7 +184,7 @@ public class ThriftUtil { * @param exec * The closure to execute */ - public static void execute(String address, ClientContext context, ClientExec<TabletClientService.Client> exec) throws AccumuloException, + public static void execute(HostAndPort address, ClientContext context, ClientExec<TabletClientService.Client> exec) throws AccumuloException, AccumuloSecurityException { while (true) { TabletClientService.Client client = null; @@ -231,7 +217,7 @@ public class ThriftUtil { * Closure with a return value to execute * @return The result from the closure */ - public static <T> T execute(String address, ClientContext context, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException, + public static <T> T execute(HostAndPort address, ClientContext context, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException, AccumuloSecurityException { while (true) { TabletClientService.Client client = null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index 4f21a15..8171555 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -72,6 +72,8 @@ import org.apache.thrift.TServiceClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + public class BulkImporter { private static final Logger log = LoggerFactory.getLogger(BulkImporter.class); @@ -436,12 +438,12 @@ public class BulkImporter { private class AssignmentTask implements Runnable { final Map<Path,List<KeyExtent>> assignmentFailures; - String location; + HostAndPort location; private Map<KeyExtent,List<PathSize>> assignmentsPerTablet; public AssignmentTask(Map<Path,List<KeyExtent>> assignmentFailures, String tableName, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) { this.assignmentFailures = assignmentFailures; - this.location = location; + this.location = HostAndPort.fromString(location); this.assignmentsPerTablet = assignmentsPerTablet; } @@ -576,8 +578,8 @@ public class BulkImporter { return assignmentFailures; } - - private List<KeyExtent> assignMapFiles(ClientContext context, String location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) + + private List<KeyExtent> assignMapFiles(ClientContext context, HostAndPort location, Map<KeyExtent,List<PathSize>> assignmentsPerTablet) throws AccumuloException, AccumuloSecurityException { try { long timeInMillis = context.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java index 1e25e44..8e6b339 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java @@ -57,6 +57,7 @@ import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import com.beust.jcommander.Parameter; +import com.google.common.net.HostAndPort; public class VerifyTabletAssignments { private static final Logger log = Logger.getLogger(VerifyTabletAssignments.class); @@ -91,9 +92,9 @@ public class VerifyTabletAssignments { MetadataServicer.forTableId(context, tableId).getTabletLocations(tabletLocations); final HashSet<KeyExtent> failures = new HashSet<KeyExtent>(); - - Map<String,List<KeyExtent>> extentsPerServer = new TreeMap<String,List<KeyExtent>>(); - + + Map<HostAndPort,List<KeyExtent>> extentsPerServer = new TreeMap<HostAndPort,List<KeyExtent>>(); + for (Entry<KeyExtent,String> entry : tabletLocations.entrySet()) { KeyExtent keyExtent = entry.getKey(); String loc = entry.getValue(); @@ -103,10 +104,11 @@ public class VerifyTabletAssignments { System.out.println(" Tablet " + keyExtent + " is located at " + loc); if (loc != null) { - List<KeyExtent> extentList = extentsPerServer.get(loc); + final HostAndPort parsedLoc = HostAndPort.fromString(loc); + List<KeyExtent> extentList = extentsPerServer.get(parsedLoc); if (extentList == null) { extentList = new ArrayList<KeyExtent>(); - extentsPerServer.put(loc, extentList); + extentsPerServer.put(parsedLoc, extentList); } if (check == null || check.contains(keyExtent)) @@ -115,7 +117,7 @@ public class VerifyTabletAssignments { } ExecutorService tp = Executors.newFixedThreadPool(20); - for (final Entry<String,List<KeyExtent>> entry : extentsPerServer.entrySet()) { + for (final Entry<HostAndPort,List<KeyExtent>> entry : extentsPerServer.entrySet()) { Runnable r = new Runnable() { @Override @@ -140,16 +142,16 @@ public class VerifyTabletAssignments { if (failures.size() > 0) checkTable(context, opts, tableName, failures); } - - private static void checkFailures(String server, HashSet<KeyExtent> failures, MultiScanResult scanResult) { + + private static void checkFailures(HostAndPort server, HashSet<KeyExtent> failures, MultiScanResult scanResult) { for (TKeyExtent tke : scanResult.failures.keySet()) { KeyExtent ke = new KeyExtent(tke); System.out.println(" Tablet " + ke + " failed at " + server); failures.add(ke); } } - - private static void checkTabletServer(ClientContext context, Entry<String,List<KeyExtent>> entry, HashSet<KeyExtent> failures) + + private static void checkTabletServer(ClientContext context, Entry<HostAndPort,List<KeyExtent>> entry, HashSet<KeyExtent> failures) throws ThriftSecurityException, TException, NoSuchScanIDException { TabletClientService.Iface client = ThriftUtil.getTServerClient(entry.getKey(), context); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index 83b1d54..9b60c88 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -295,12 +295,12 @@ public class CloseWriteAheadLogReferences implements Runnable { bw.addMutation(m); } - private String getMasterAddress() { + private HostAndPort getMasterAddress() { try { List<String> locations = context.getInstance().getMasterLocations(); if (locations.size() == 0) return null; - return locations.get(0); + return HostAndPort.fromString(locations.get(0)); } catch (Exception e) { log.warn("Failed to obtain master host " + e); } @@ -309,7 +309,7 @@ public class CloseWriteAheadLogReferences implements Runnable { } private MasterClientService.Client getMasterConnection() { - final String address = getMasterAddress(); + final HostAndPort address = getMasterAddress(); try { if (address == null) { log.warn("Could not fetch Master address"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index a207ea9..7ec0191 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -533,10 +533,11 @@ public class Monitor { this.fetched = System.currentTimeMillis(); } } - static final Map<String, ScanStats> allScans = new HashMap<String, ScanStats>(); - public static Map<String, ScanStats> getScans() { + + static final Map<HostAndPort,ScanStats> allScans = new HashMap<HostAndPort,ScanStats>(); + public static Map<HostAndPort, ScanStats> getScans() { synchronized (allScans) { - return new TreeMap<String, ScanStats>(allScans); + return new TreeMap<HostAndPort, ScanStats>(allScans); } } @@ -545,11 +546,12 @@ public class Monitor { return; Connector c = context.getConnector(); for (String server : c.instanceOperations().getTabletServers()) { - Client tserver = ThriftUtil.getTServerClient(server, context); + final HostAndPort parsedServer = HostAndPort.fromString(server); + Client tserver = ThriftUtil.getTServerClient(parsedServer, context); try { List<ActiveScan> scans = tserver.getActiveScans(null, context.rpcCreds()); synchronized (allScans) { - allScans.put(server, new ScanStats(scans)); + allScans.put(parsedServer, new ScanStats(scans)); } } catch (Exception ex) { log.debug("Failed to get active scans from {}", server, ex); @@ -558,10 +560,10 @@ public class Monitor { } } // Age off old scan information - Iterator<Entry<String,ScanStats>> entryIter = allScans.entrySet().iterator(); + Iterator<Entry<HostAndPort,ScanStats>> entryIter = allScans.entrySet().iterator(); long now = System.currentTimeMillis(); while (entryIter.hasNext()) { - Entry<String,ScanStats> entry = entryIter.next(); + Entry<HostAndPort,ScanStats> entry = entryIter.next(); if (now - entry.getValue().fetched > 5 * 60 * 1000) { entryIter.remove(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ScanServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ScanServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ScanServlet.java index 87c1755..e0fab03 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ScanServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ScanServlet.java @@ -22,16 +22,17 @@ import java.util.Map; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.accumulo.monitor.util.celltypes.PreciseNumberType; - import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.Monitor.ScanStats; import org.apache.accumulo.monitor.util.Table; import org.apache.accumulo.monitor.util.TableRow; import org.apache.accumulo.monitor.util.celltypes.DurationType; +import org.apache.accumulo.monitor.util.celltypes.PreciseNumberType; import org.apache.accumulo.monitor.util.celltypes.TServerLinkType; +import com.google.common.net.HostAndPort; + public class ScanServlet extends BasicServlet { private static final long serialVersionUID = 1L; @@ -43,13 +44,13 @@ public class ScanServlet extends BasicServlet { @Override protected void pageBody(HttpServletRequest req, HttpServletResponse response, StringBuilder sb) throws IOException { - Map<String,ScanStats> scans = Monitor.getScans(); + Map<HostAndPort,ScanStats> scans = Monitor.getScans(); Table scanTable = new Table("scanStatus", "Scan Status"); scanTable.addSortableColumn("Server", new TServerLinkType(), null); scanTable.addSortableColumn("#", new PreciseNumberType(0, 20, 0, 100), "Number of scans presently running"); scanTable.addSortableColumn("Oldest Age", new DurationType(0l, 5 * 60 * 1000l), "The age of the oldest scan on this server."); for (TabletServerStatus tserverInfo : Monitor.getMmi().getTServerInfo()) { - ScanStats stats = scans.get(tserverInfo.name); + ScanStats stats = scans.get(HostAndPort.fromString(tserverInfo.name)); if (stats != null) { TableRow row = scanTable.prepareRow(); row.add(tserverInfo); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3f709b0..e5067b1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2238,12 +2238,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable { return sp.address; } - private String getMasterAddress() { + private HostAndPort getMasterAddress() { try { List<String> locations = getInstance().getMasterLocations(); if (locations.size() == 0) return null; - return locations.get(0); + return HostAndPort.fromString(locations.get(0)); } catch (Exception e) { log.warn("Failed to obtain master host " + e); } @@ -2252,7 +2252,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } // Connect to the master for posting asynchronous results - private MasterClientService.Client masterConnection(String address) { + private MasterClientService.Client masterConnection(HostAndPort address) { try { if (address == null) { return null; @@ -2438,7 +2438,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }; SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); - String masterHost; + HostAndPort masterHost; while (!serverStopRequested) { // send all of the pending messages try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 9fc7fa4..7d6c59e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -72,6 +72,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; /** * @@ -167,11 +168,11 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // trying to replicate it again later some other time. int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); for (int i = 0; i < numAttempts; i++) { - String peerTserver; + String peerTserverStr; Span span = Trace.start("Fetch peer tserver"); try { // Ask the master on the remote what TServer we should talk with to replicate the data - peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerContext, new ClientExecReturn<String,ReplicationCoordinator.Client>() { + peerTserverStr = ReplicationClient.executeCoordinatorWithReturn(peerContext, new ClientExecReturn<String,ReplicationCoordinator.Client>() { @Override public String execute(ReplicationCoordinator.Client client) throws Exception { @@ -187,12 +188,14 @@ public class AccumuloReplicaSystem implements ReplicaSystem { span.stop(); } - if (null == peerTserver) { + if (null == peerTserverStr) { // Something went wrong, and we didn't get a valid tserver from the remote for some reason log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target); continue; } + final HostAndPort peerTserver = HostAndPort.fromString(peerTserverStr); + // We have a tserver on the remote -- send the data its way. Status finalStatus; final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE); @@ -217,7 +220,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { return finalStatus; } catch (TTransportException | AccumuloException | AccumuloSecurityException e) { - log.warn("Could not connect to remote server {}, will retry", peerTserver, e); + log.warn("Could not connect to remote server {}, will retry", peerTserverStr, e); UtilWaitThread.sleep(1000); } } @@ -231,7 +234,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - protected Status replicateRFiles(ClientContext peerContext, final String peerTserver, final ReplicationTarget target, + protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException, AccumuloSecurityException { DataInputStream input; @@ -277,7 +280,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { } } - protected Status replicateLogs(ClientContext peerContext, final String peerTserver, final ReplicationTarget target, + protected Status replicateLogs(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException, AccumuloSecurityException { @@ -334,7 +337,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { span.data("Batch size (bytes)", Long.toString(sizeLimit)); span.data("File", p.toString()); span.data("Peer instance name", peerContext.getInstance().getInstanceName()); - span.data("Peer tserver", peerTserver); + span.data("Peer tserver", peerTserver.toString()); span.data("Remote table ID", remoteTableId); ReplicationStats replResult; http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java index f6f2571..e4abed3 100644 --- a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java +++ b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java @@ -31,6 +31,7 @@ import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.hadoop.io.Text; import com.beust.jcommander.Parameter; +import com.google.common.net.HostAndPort; public class WrongTabletTest { @@ -43,16 +44,17 @@ public class WrongTabletTest { final Opts opts = new Opts(); opts.parseArgs(WrongTabletTest.class.getName(), args); - Instance inst = opts.getInstance(); - ServerConfigurationFactory conf = new ServerConfigurationFactory(inst); - ClientContext context = new AccumuloServerContext(conf) { + final HostAndPort location = HostAndPort.fromString(opts.location); + final Instance inst = opts.getInstance(); + final ServerConfigurationFactory conf = new ServerConfigurationFactory(inst); + final ClientContext context = new AccumuloServerContext(conf) { @Override public synchronized Credentials getCredentials() { return new Credentials(opts.principal, opts.getToken()); } }; try { - TabletClientService.Iface client = ThriftUtil.getTServerClient(opts.location, context); + TabletClientService.Iface client = ThriftUtil.getTServerClient(location, context); Mutation mutation = new Mutation(new Text("row_0003750001")); mutation.putDelete(new Text("colf"), new Text("colq")); http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java index 586ab5e..ad9d43c 100644 --- a/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java +++ b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java @@ -38,6 +38,8 @@ import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import com.google.common.net.HostAndPort; + // see ACCUMULO-1950 public class TotalQueuedIT extends ConfigurableMacIT { @@ -119,7 +121,7 @@ public class TotalQueuedIT extends ConfigurableMacIT { ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance()); AccumuloServerContext context = new AccumuloServerContext(confFactory); for (String address : c.instanceOperations().getTabletServers()) { - TabletClientService.Client client = ThriftUtil.getTServerClient(address, context); + TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context); TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds()); return status.syncs; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/82609146/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 544fb36..9d39325 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -59,6 +59,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + /** * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a * WAL is insufficient to determine if a WAL will never be used in the future. @@ -403,7 +405,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); - String tserver = tservers.get(0); + HostAndPort tserver = HostAndPort.fromString(tservers.get(0)); // Get the active WALs from that server log.info("Fetching active WALs from {}", tserver);