Repository: accumulo Updated Branches: refs/heads/master 0228da0a7 -> 483aee470
ACCUMULO-3696 limit the size of the queue for outgoing spans Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6ea7e6b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6ea7e6b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6ea7e6b Branch: refs/heads/master Commit: f6ea7e6b08033fb64cf65f2c7fc90cc8cec90531 Parents: 85942da Author: Eric C. Newton <eric.new...@gmail.com> Authored: Wed Mar 25 17:54:25 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Wed Mar 25 17:54:25 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/core/trace/DistributedTrace.java | 2 +- .../apache/accumulo/core/trace/ZooTraceClient.java | 4 ++-- .../instrument/receivers/AsyncSpanReceiver.java | 15 +++++++-------- .../instrument/receivers/SendSpansViaThrift.java | 9 +++++++-- .../trace/instrument/receivers/ZooSpanClient.java | 4 ++-- 5 files changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java index a5a327d..7b92676 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java @@ -37,6 +37,6 @@ public class DistributedTrace { address = "unknown"; } } - Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, application, 1000)); + Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, application, 1000, 5000)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java index fd8db85..58108fb 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java @@ -44,8 +44,8 @@ public class ZooTraceClient extends SendSpansViaThrift implements Watcher { final Random random = new Random(); final List<String> hosts = new ArrayList<String>(); - public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { - super(host, service, millis); + public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis, int maxQueueSize) throws IOException, KeeperException, InterruptedException { + super(host, service, millis, maxQueueSize); this.path = path; this.zoo = zoo; updateHosts(path, zoo.getChildren(path, this)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java index 6ebdee1..1226e03 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java @@ -51,10 +51,12 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece Timer timer = new Timer("SpanSender", true); final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>(); + final int maxQueueSize; - public AsyncSpanReceiver(String host, String service, long millis) { + public AsyncSpanReceiver(String host, String service, long millis, int maxQueueSize) { this.host = host; this.service = service; + this.maxQueueSize = maxQueueSize; timer.schedule(new TimerTask() { @Override public void run() { @@ -72,13 +74,6 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece while (!sendQueue.isEmpty()) { boolean sent = false; RemoteSpan s = sendQueue.peek(); - if (s.stop - s.start < 1) { - synchronized (sendQueue) { - sendQueue.remove(); - sendQueue.notifyAll(); - } - continue; - } SpanKey dest = getSpanKey(s.data); Destination client = clients.get(dest); if (client == null) { @@ -108,6 +103,10 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece @Override public void span(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { + if (sendQueue.size() > maxQueueSize) { + return; + } + SpanKey dest = getSpanKey(data); if (dest != null) { sendQueue.add(new RemoteSpan(host, service, traceId, spanId, parentId, start, stop, description, data)); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java index 2fff6ca..23d943a 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.trace.instrument.receivers; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Map; @@ -36,8 +37,8 @@ public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { private static final String THRIFT = "thrift://"; - public SendSpansViaThrift(String host, String service, long millis) { - super(host, service, millis); + public SendSpansViaThrift(String host, String service, long millis, int maxQueueSize) { + super(host, service, millis, maxQueueSize); } @Override @@ -55,6 +56,9 @@ public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { TTransport transport = new TSocket(sock); TProtocol prot = new TBinaryProtocol(transport); return new Client(prot); + } catch (IOException ex) { + log.trace(ex, ex); + return null; } catch (Exception ex) { log.error(ex, ex); return null; @@ -73,6 +77,7 @@ public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { } } + @Override protected String getSpanKey(Map<String,String> data) { String dest = data.get("dest"); if (dest != null && dest.startsWith(THRIFT)) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java index 0466426..7a6e44f 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java @@ -46,8 +46,8 @@ public class ZooSpanClient extends SendSpansViaThrift { final Random random = new Random(); final List<String> hosts = new ArrayList<String>(); - public ZooSpanClient(String keepers, final String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { - super(host, service, millis); + public ZooSpanClient(String keepers, final String path, String host, String service, long millis, int maxQueueSize) throws IOException, KeeperException, InterruptedException { + super(host, service, millis, maxQueueSize); this.path = path; zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() { @Override