Repository: accumulo
Updated Branches:
  refs/heads/master 0228da0a7 -> 483aee470


ACCUMULO-3696 limit the size of the queue for outgoing spans


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6ea7e6b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6ea7e6b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6ea7e6b

Branch: refs/heads/master
Commit: f6ea7e6b08033fb64cf65f2c7fc90cc8cec90531
Parents: 85942da
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Wed Mar 25 17:54:25 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Wed Mar 25 17:54:25 2015 -0400

----------------------------------------------------------------------
 .../apache/accumulo/core/trace/DistributedTrace.java |  2 +-
 .../apache/accumulo/core/trace/ZooTraceClient.java   |  4 ++--
 .../instrument/receivers/AsyncSpanReceiver.java      | 15 +++++++--------
 .../instrument/receivers/SendSpansViaThrift.java     |  9 +++++++--
 .../trace/instrument/receivers/ZooSpanClient.java    |  4 ++--
 5 files changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java 
b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
index a5a327d..7b92676 100644
--- a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
+++ b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
@@ -37,6 +37,6 @@ public class DistributedTrace {
         address = "unknown";
       }
     }
-    Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, 
application, 1000));
+    Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, 
application, 1000, 5000));
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java 
b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
index fd8db85..58108fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java
@@ -44,8 +44,8 @@ public class ZooTraceClient extends SendSpansViaThrift 
implements Watcher {
   final Random random = new Random();
   final List<String> hosts = new ArrayList<String>();
 
-  public ZooTraceClient(ZooReader zoo, String path, String host, String 
service, long millis) throws IOException, KeeperException, InterruptedException 
{
-    super(host, service, millis);
+  public ZooTraceClient(ZooReader zoo, String path, String host, String 
service, long millis, int maxQueueSize) throws IOException, KeeperException, 
InterruptedException {
+    super(host, service, millis, maxQueueSize);
     this.path = path;
     this.zoo = zoo;
     updateHosts(path, zoo.getChildren(path, this));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --git 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java
 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java
index 6ebdee1..1226e03 100644
--- 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java
+++ 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java
@@ -51,10 +51,12 @@ public abstract class 
AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
 
   Timer timer = new Timer("SpanSender", true);
   final AbstractQueue<RemoteSpan> sendQueue = new 
ConcurrentLinkedQueue<RemoteSpan>();
+  final int maxQueueSize;
 
-  public AsyncSpanReceiver(String host, String service, long millis) {
+  public AsyncSpanReceiver(String host, String service, long millis, int 
maxQueueSize) {
     this.host = host;
     this.service = service;
+    this.maxQueueSize = maxQueueSize;
     timer.schedule(new TimerTask() {
       @Override
       public void run() {
@@ -72,13 +74,6 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> 
implements SpanRece
     while (!sendQueue.isEmpty()) {
       boolean sent = false;
       RemoteSpan s = sendQueue.peek();
-      if (s.stop - s.start < 1) {
-        synchronized (sendQueue) {
-          sendQueue.remove();
-          sendQueue.notifyAll();
-        }
-        continue;
-      }
       SpanKey dest = getSpanKey(s.data);
       Destination client = clients.get(dest);
       if (client == null) {
@@ -108,6 +103,10 @@ public abstract class 
AsyncSpanReceiver<SpanKey,Destination> implements SpanRece
   @Override
   public void span(long traceId, long spanId, long parentId, long start, long 
stop, String description, Map<String,String> data) {
 
+    if (sendQueue.size() > maxQueueSize) {
+      return;
+    }
+
     SpanKey dest = getSpanKey(data);
     if (dest != null) {
       sendQueue.add(new RemoteSpan(host, service, traceId, spanId, parentId, 
start, stop, description, data));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java
----------------------------------------------------------------------
diff --git 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java
 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java
index 2fff6ca..23d943a 100644
--- 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java
+++ 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.trace.instrument.receivers;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Map;
@@ -36,8 +37,8 @@ public class SendSpansViaThrift extends 
AsyncSpanReceiver<String,Client> {
 
   private static final String THRIFT = "thrift://";
 
-  public SendSpansViaThrift(String host, String service, long millis) {
-    super(host, service, millis);
+  public SendSpansViaThrift(String host, String service, long millis, int 
maxQueueSize) {
+    super(host, service, millis, maxQueueSize);
   }
 
   @Override
@@ -55,6 +56,9 @@ public class SendSpansViaThrift extends 
AsyncSpanReceiver<String,Client> {
       TTransport transport = new TSocket(sock);
       TProtocol prot = new TBinaryProtocol(transport);
       return new Client(prot);
+    } catch (IOException ex) {
+      log.trace(ex, ex);
+      return null;
     } catch (Exception ex) {
       log.error(ex, ex);
       return null;
@@ -73,6 +77,7 @@ public class SendSpansViaThrift extends 
AsyncSpanReceiver<String,Client> {
     }
   }
 
+  @Override
   protected String getSpanKey(Map<String,String> data) {
     String dest = data.get("dest");
     if (dest != null && dest.startsWith(THRIFT)) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6ea7e6b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
----------------------------------------------------------------------
diff --git 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
index 0466426..7a6e44f 100644
--- 
a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
+++ 
b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/ZooSpanClient.java
@@ -46,8 +46,8 @@ public class ZooSpanClient extends SendSpansViaThrift {
   final Random random = new Random();
   final List<String> hosts = new ArrayList<String>();
 
-  public ZooSpanClient(String keepers, final String path, String host, String 
service, long millis) throws IOException, KeeperException, InterruptedException 
{
-    super(host, service, millis);
+  public ZooSpanClient(String keepers, final String path, String host, String 
service, long millis, int maxQueueSize) throws IOException, KeeperException, 
InterruptedException {
+    super(host, service, millis, maxQueueSize);
     this.path = path;
     zoo = new ZooKeeper(keepers, 30 * 1000, new Watcher() {
       @Override

Reply via email to