ACCUMULO-3696 limit span queue

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/483aee47
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/483aee47
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/483aee47

Branch: refs/heads/master
Commit: 483aee47058398af079b41a4395dac1525d2750b
Parents: 0228da0 f6ea7e6
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Wed Mar 25 18:33:48 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Wed Mar 25 18:33:48 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/trace/DistributedTrace.java  | 1 +
 .../java/org/apache/accumulo/tracer/AsyncSpanReceiver.java     | 6 ++++++
 .../java/org/apache/accumulo/tracer/SendSpansViaThrift.java    | 5 +++++
 3 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
----------------------------------------------------------------------
diff --cc 
core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
index 484f93d,7b92676..6a3f55a
--- a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
+++ b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java
@@@ -17,190 -17,26 +17,191 @@@
  package org.apache.accumulo.core.trace;
  
  import java.io.IOException;
 -import java.net.InetAddress;
 -import java.net.UnknownHostException;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
  
 -import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
  import org.apache.accumulo.core.client.Instance;
 -import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.fate.zookeeper.ZooReader;
 -import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.util.ShutdownHookManager;
 +import org.apache.htrace.HTraceConfiguration;
 +import org.apache.htrace.SpanReceiver;
 +import org.apache.htrace.SpanReceiverBuilder;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 +/**
 + * Utility class to enable tracing for Accumulo server processes.
 + *
 + */
  public class DistributedTrace {
 +  private static final Logger log = 
LoggerFactory.getLogger(DistributedTrace.class);
 +
 +  private static final String HTRACE_CONF_PREFIX = "hadoop.";
 +
 +  public static final String TRACE_HOST_PROPERTY = "trace.host";
 +  public static final String TRACE_SERVICE_PROPERTY = "trace.service";
++  public static final String TRACE_QUEUE_SIZE_PROPERTY = "trace.queue.size";
 +
 +  public static final String TRACER_ZK_HOST = "tracer.zookeeper.host";
 +  public static final String TRACER_ZK_TIMEOUT = "tracer.zookeeper.timeout";
 +  public static final String TRACER_ZK_PATH = "tracer.zookeeper.path";
 +
 +  private static final HashSet<SpanReceiver> receivers = new 
HashSet<SpanReceiver>();
 +
 +  /**
 +   * @deprecated since 1.7, use {@link DistributedTrace#enable(String, 
String, org.apache.accumulo.core.client.ClientConfiguration)} instead
 +   */
 +  @Deprecated
    public static void enable(Instance instance, ZooReader zoo, String 
application, String address) throws IOException, KeeperException, 
InterruptedException {
 -    String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
 -    if (address == null) {
 +    enable(address, application);
 +  }
 +
 +  /**
 +   * Enable tracing by setting up SpanReceivers for the current process.
 +   */
 +  public static void enable() {
 +    enable(null, null);
 +  }
 +
 +  /**
 +   * Enable tracing by setting up SpanReceivers for the current process. If 
service name is null, the simple name of the class will be used.
 +   */
 +  public static void enable(String service) {
 +    enable(null, service);
 +  }
 +
 +  /**
 +   * Enable tracing by setting up SpanReceivers for the current process. If 
host name is null, it will be determined. If service name is null, the simple 
name
 +   * of the class will be used.
 +   */
 +  public static void enable(String hostname, String service) {
 +    enable(hostname, service, ClientConfiguration.loadDefault());
 +  }
 +
 +  /**
 +   * Enable tracing by setting up SpanReceivers for the current process. If 
host name is null, it will be determined. If service name is null, the simple 
name
 +   * of the class will be used. Properties required in the client 
configuration include
 +   * {@link 
org.apache.accumulo.core.client.ClientConfiguration.ClientProperty#TRACE_SPAN_RECEIVERS}
 and any properties specific to the span receiver.
 +   */
 +  public static void enable(String hostname, String service, 
ClientConfiguration conf) {
 +    String spanReceivers = conf.get(ClientProperty.TRACE_SPAN_RECEIVERS);
 +    String zookeepers = conf.get(ClientProperty.INSTANCE_ZK_HOST);
 +    long timeout = 
AccumuloConfiguration.getTimeInMillis(conf.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
 +    String zkPath = conf.get(ClientProperty.TRACE_ZK_PATH);
 +    Map<String,String> properties = 
conf.getAllPropertiesWithPrefix(ClientProperty.TRACE_SPAN_RECEIVER_PREFIX);
 +    enableTracing(hostname, service, spanReceivers, zookeepers, timeout, 
zkPath, properties);
 +  }
 +
 +  /**
 +   * Enable tracing by setting up SpanReceivers for the current process. If 
host name is null, it will be determined. If service name is null, the simple 
name
 +   * of the class will be used.
 +   */
 +  public static void enable(String hostname, String service, 
AccumuloConfiguration conf) {
 +    String spanReceivers = conf.get(Property.TRACE_SPAN_RECEIVERS);
 +    String zookeepers = conf.get(Property.INSTANCE_ZK_HOST);
 +    long timeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 +    String zkPath = conf.get(Property.TRACE_ZK_PATH);
 +    Map<String,String> properties = 
conf.getAllPropertiesWithPrefix(Property.TRACE_SPAN_RECEIVER_PREFIX);
 +    enableTracing(hostname, service, spanReceivers, zookeepers, timeout, 
zkPath, properties);
 +  }
 +
 +  private static void enableTracing(String hostname, String service, String 
spanReceivers, String zookeepers, long timeout, String zkPath,
 +      Map<String,String> properties) {
 +    Configuration conf = new Configuration(false);
 +    conf.set(Property.TRACE_SPAN_RECEIVERS.toString(), spanReceivers);
 +
 +    // remaining properties will be parsed through an HTraceConfiguration by 
SpanReceivers
 +    setProperty(conf, TRACER_ZK_HOST, zookeepers);
 +    setProperty(conf, TRACER_ZK_TIMEOUT, (int) timeout);
 +    setProperty(conf, TRACER_ZK_PATH, zkPath);
 +    for (Entry<String,String> property : properties.entrySet()) {
 +      setProperty(conf, 
property.getKey().substring(Property.TRACE_SPAN_RECEIVER_PREFIX.getKey().length()),
 property.getValue());
 +    }
 +    if (hostname != null) {
 +      setProperty(conf, TRACE_HOST_PROPERTY, hostname);
 +    }
 +    if (service != null) {
 +      setProperty(conf, TRACE_SERVICE_PROPERTY, service);
 +    }
 +    org.apache.htrace.Trace.setProcessId(service);
 +    ShutdownHookManager.get().addShutdownHook(new Runnable() {
 +      @Override
 +      public void run() {
 +        Trace.off();
 +        closeReceivers();
 +      }
 +    }, 0);
 +    loadSpanReceivers(conf);
 +  }
 +
 +  /**
 +   * Disable tracing by closing SpanReceivers for the current process.
 +   */
 +  public static void disable() {
 +    closeReceivers();
 +  }
 +
 +  private static synchronized void loadSpanReceivers(Configuration conf) {
 +    if (!receivers.isEmpty()) {
 +      log.info("Already loaded span receivers, enable tracing does not need 
to be called again");
 +      return;
 +    }
 +    String[] receiverNames = 
conf.getTrimmedStrings(Property.TRACE_SPAN_RECEIVERS.toString());
 +    if (receiverNames == null || receiverNames.length == 0) {
 +      return;
 +    }
 +    for (String className : receiverNames) {
 +      SpanReceiverBuilder builder = new 
SpanReceiverBuilder(wrapHadoopConf(conf));
 +      SpanReceiver rcvr = builder.spanReceiverClass(className.trim()).build();
 +      if (rcvr == null) {
 +        log.warn("Failed to load SpanReceiver " + className);
 +      } else {
 +        receivers.add(rcvr);
 +        log.info("SpanReceiver " + className + " was loaded successfully.");
 +      }
 +    }
 +    for (SpanReceiver rcvr : receivers) {
 +      org.apache.htrace.Trace.addReceiver(rcvr);
 +    }
 +  }
 +
 +  private static void setProperty(Configuration conf, String key, String 
value) {
 +    conf.set(HTRACE_CONF_PREFIX + key, value);
 +  }
 +
 +  private static void setProperty(Configuration conf, String key, int value) {
 +    conf.setInt(HTRACE_CONF_PREFIX + key, value);
 +  }
 +
 +  private static HTraceConfiguration wrapHadoopConf(final Configuration conf) 
{
 +    return new HTraceConfiguration() {
 +      @Override
 +      public String get(String key) {
 +        return conf.get(HTRACE_CONF_PREFIX + key);
 +      }
 +
 +      @Override
 +      public String get(String key, String defaultValue) {
 +        return conf.get(HTRACE_CONF_PREFIX + key, defaultValue);
 +      }
 +    };
 +  }
 +
 +  private static synchronized void closeReceivers() {
 +    for (SpanReceiver rcvr : receivers) {
        try {
 -        address = InetAddress.getLocalHost().getHostAddress().toString();
 -      } catch (UnknownHostException e) {
 -        address = "unknown";
 +        rcvr.close();
 +      } catch (IOException e) {
 +        log.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), 
e);
        }
      }
 -    Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, 
application, 1000, 5000));
 +    receivers.clear();
    }
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
----------------------------------------------------------------------
diff --cc 
server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
index 6df82ce,0000000..385273d
mode 100644,000000..100644
--- 
a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
+++ 
b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java
@@@ -1,174 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
 +import java.net.InetAddress;
 +import java.net.UnknownHostException;
 +import java.nio.ByteBuffer;
 +import java.util.AbstractQueue;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Timer;
 +import java.util.TimerTask;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +
 +import org.apache.accumulo.core.trace.DistributedTrace;
 +import org.apache.accumulo.tracer.thrift.Annotation;
 +import org.apache.accumulo.tracer.thrift.RemoteSpan;
 +import org.apache.htrace.HTraceConfiguration;
 +import org.apache.htrace.Span;
 +import org.apache.htrace.SpanReceiver;
 +import org.apache.htrace.TimelineAnnotation;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Deliver Span information periodically to a destination.
 + * <ul>
 + * <li>Send host and service information with the span.
 + * <li>Cache Destination objects by some key that can be extracted from the 
span.
 + * <li>Can be used to queue spans up for delivery over RPC, or for saving 
into a file.
 + * </ul>
 + */
 +public abstract class AsyncSpanReceiver<SpanKey,Destination> implements 
SpanReceiver {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(AsyncSpanReceiver.class);
 +
 +  public static final String SEND_TIMER_MILLIS = "send.timer.millis";
 +
 +  private final Map<SpanKey,Destination> clients = new 
HashMap<SpanKey,Destination>();
 +
 +  protected String host = null;
 +  protected String service = null;
 +
 +  protected abstract Destination createDestination(SpanKey key) throws 
Exception;
 +
 +  protected abstract void send(Destination resource, RemoteSpan span) throws 
Exception;
 +
 +  protected abstract SpanKey getSpanKey(Map<ByteBuffer,ByteBuffer> data);
 +
 +  Timer timer = new Timer("SpanSender", true);
 +  protected final AbstractQueue<RemoteSpan> sendQueue = new 
ConcurrentLinkedQueue<RemoteSpan>();
++  int maxQueueSize = 5000;
 +
 +  // Visible for testing
 +  AsyncSpanReceiver() {}
 +
 +  public AsyncSpanReceiver(HTraceConfiguration conf) {
 +    host = conf.get(DistributedTrace.TRACE_HOST_PROPERTY, host);
 +    if (host == null) {
 +      try {
 +        host = InetAddress.getLocalHost().getCanonicalHostName().toString();
 +      } catch (UnknownHostException e) {
 +        host = "unknown";
 +      }
 +    }
 +    service = conf.get(DistributedTrace.TRACE_SERVICE_PROPERTY, service);
++    maxQueueSize = conf.getInt(DistributedTrace.TRACE_QUEUE_SIZE_PROPERTY, 
maxQueueSize);
 +
 +    int millis = conf.getInt(SEND_TIMER_MILLIS, 1000);
 +    timer.schedule(new TimerTask() {
 +      @Override
 +      public void run() {
 +        try {
 +          sendSpans();
 +        } catch (Exception ex) {
 +          log.warn("Exception sending spans to destination", ex);
 +        }
 +      }
 +
 +    }, millis, millis);
 +  }
 +
 +  protected void sendSpans() {
 +    while (!sendQueue.isEmpty()) {
 +      boolean sent = false;
 +      RemoteSpan s = sendQueue.peek();
 +      SpanKey dest = getSpanKey(s.data);
 +      Destination client = clients.get(dest);
 +      if (client == null) {
 +        try {
 +          clients.put(dest, createDestination(dest));
 +        } catch (Exception ex) {
 +          log.warn("Exception creating connection to span receiver", ex);
 +        }
 +      }
 +      if (client != null) {
 +        try {
 +          send(client, s);
 +          synchronized (sendQueue) {
 +            sendQueue.remove();
 +            sendQueue.notifyAll();
 +          }
 +          sent = true;
 +        } catch (Exception ex) {
 +          log.warn("Got error sending to " + dest + ", refreshing client", 
ex);
 +          clients.remove(dest);
 +        }
 +      }
 +      if (!sent)
 +        break;
 +    }
 +  }
 +
 +  public static Map<ByteBuffer,ByteBuffer> 
convertToByteBuffers(Map<byte[],byte[]> bytesMap) {
 +    if (bytesMap == null)
 +      return null;
 +    Map<ByteBuffer,ByteBuffer> result = new HashMap<ByteBuffer,ByteBuffer>();
 +    for (Entry<byte[],byte[]> bytes : bytesMap.entrySet()) {
 +      result.put(ByteBuffer.wrap(bytes.getKey()), 
ByteBuffer.wrap(bytes.getValue()));
 +    }
 +    return result;
 +  }
 +
 +  public static List<Annotation> 
convertToAnnotations(List<TimelineAnnotation> annotations) {
 +    if (annotations == null)
 +      return null;
 +    List<Annotation> result = new ArrayList<Annotation>();
 +    for (TimelineAnnotation annotation : annotations) {
 +      result.add(new Annotation(annotation.getTime(), 
annotation.getMessage()));
 +    }
 +    return result;
 +  }
 +
 +  @Override
 +  public void receiveSpan(Span s) {
 +    Map<ByteBuffer,ByteBuffer> data = 
convertToByteBuffers(s.getKVAnnotations());
++
 +    SpanKey dest = getSpanKey(data);
 +    if (dest != null) {
 +      List<Annotation> annotations = 
convertToAnnotations(s.getTimelineAnnotations());
++      if (sendQueue.size() > maxQueueSize) {
++         return;
++      }
 +      sendQueue.add(new RemoteSpan(host, service == null ? s.getProcessId() : 
service, s.getTraceId(), s.getSpanId(), s.getParentId(), s.getStartTimeMillis(),
 +          s.getStopTimeMillis(), s.getDescription(), data, annotations));
 +    }
 +  }
 +
 +  @Override
 +  public void close() {
 +    synchronized (sendQueue) {
 +      while (!sendQueue.isEmpty()) {
 +        try {
 +          sendQueue.wait();
 +        } catch (InterruptedException e) {
 +          log.warn("flush interrupted");
 +          break;
 +        }
 +      }
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java
----------------------------------------------------------------------
diff --cc 
server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java
index f24c7b9,0000000..7a8dc16
mode 100644,000000..100644
--- 
a/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java
+++ 
b/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java
@@@ -1,97 -1,0 +1,102 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
++import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +
 +import org.apache.accumulo.tracer.thrift.RemoteSpan;
 +import org.apache.accumulo.tracer.thrift.SpanReceiver.Client;
 +import org.apache.htrace.HTraceConfiguration;
 +import org.apache.thrift.protocol.TBinaryProtocol;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +
 +/**
 + * Send Span data to a destination using thrift.
 + */
 +public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> {
 +
 +  private static final org.slf4j.Logger log = 
org.slf4j.LoggerFactory.getLogger(SendSpansViaThrift.class);
 +
 +  private static final String THRIFT = "thrift://";
 +
 +  // Visible for testing
 +  SendSpansViaThrift() {}
 +
 +  public SendSpansViaThrift(HTraceConfiguration conf) {
 +    super(conf);
 +  }
 +
 +  @Override
 +  protected Client createDestination(String destination) throws Exception {
 +    if (destination == null)
 +      return null;
 +    try {
 +      int portSeparatorIndex = destination.lastIndexOf(':');
 +      String host = destination.substring(0, portSeparatorIndex);
 +      int port = Integer.parseInt(destination.substring(portSeparatorIndex + 
1));
 +      log.debug("Connecting to " + host + ":" + port);
 +      InetSocketAddress addr = new InetSocketAddress(host, port);
 +      Socket sock = new Socket();
 +      sock.connect(addr);
 +      TTransport transport = new TSocket(sock);
 +      TProtocol prot = new TBinaryProtocol(transport);
 +      return new Client(prot);
++    } catch (IOException ex) {
++      log.trace("{}", ex, ex);
++      return null;
 +    } catch (Exception ex) {
 +      log.error(ex.getMessage(), ex);
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  protected void send(Client client, RemoteSpan s) throws Exception {
 +    if (client != null) {
 +      try {
 +        client.span(s);
 +      } catch (Exception ex) {
 +        client.getInputProtocol().getTransport().close();
 +        throw ex;
 +      }
 +    }
 +  }
 +
 +  private static final ByteBuffer DEST = 
ByteBuffer.wrap("dest".getBytes(UTF_8));
 +
++  @Override
 +  protected String getSpanKey(Map<ByteBuffer,ByteBuffer> data) {
 +    String dest = new String(data.get(DEST).array());
 +    if (dest != null && dest.startsWith(THRIFT)) {
 +      String hostAddress = dest.substring(THRIFT.length());
 +      String[] hostAddr = hostAddress.split(":", 2);
 +      if (hostAddr.length == 2) {
 +        return hostAddress;
 +      }
 +    }
 +    return null;
 +  }
 +
 +}

Reply via email to