ACCUMULO-3696 limit span queue
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/483aee47 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/483aee47 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/483aee47 Branch: refs/heads/master Commit: 483aee47058398af079b41a4395dac1525d2750b Parents: 0228da0 f6ea7e6 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Wed Mar 25 18:33:48 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Wed Mar 25 18:33:48 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/core/trace/DistributedTrace.java | 1 + .../java/org/apache/accumulo/tracer/AsyncSpanReceiver.java | 6 ++++++ .../java/org/apache/accumulo/tracer/SendSpansViaThrift.java | 5 +++++ 3 files changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java index 484f93d,7b92676..6a3f55a --- a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java @@@ -17,190 -17,26 +17,191 @@@ package org.apache.accumulo.core.trace; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; -import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.fate.zookeeper.ZooReader; -import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.SpanReceiverBuilder; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Utility class to enable tracing for Accumulo server processes. + * + */ public class DistributedTrace { + private static final Logger log = LoggerFactory.getLogger(DistributedTrace.class); + + private static final String HTRACE_CONF_PREFIX = "hadoop."; + + public static final String TRACE_HOST_PROPERTY = "trace.host"; + public static final String TRACE_SERVICE_PROPERTY = "trace.service"; ++ public static final String TRACE_QUEUE_SIZE_PROPERTY = "trace.queue.size"; + + public static final String TRACER_ZK_HOST = "tracer.zookeeper.host"; + public static final String TRACER_ZK_TIMEOUT = "tracer.zookeeper.timeout"; + public static final String TRACER_ZK_PATH = "tracer.zookeeper.path"; + + private static final HashSet<SpanReceiver> receivers = new HashSet<SpanReceiver>(); + + /** + * @deprecated since 1.7, use {@link DistributedTrace#enable(String, String, org.apache.accumulo.core.client.ClientConfiguration)} instead + */ + @Deprecated public static void enable(Instance instance, ZooReader zoo, String application, String address) throws IOException, KeeperException, InterruptedException { - String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS; - if (address == null) { + enable(address, application); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + */ + public static void enable() { + enable(null, null); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. If service name is null, the simple name of the class will be used. + */ + public static void enable(String service) { + enable(null, service); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. If host name is null, it will be determined. If service name is null, the simple name + * of the class will be used. + */ + public static void enable(String hostname, String service) { + enable(hostname, service, ClientConfiguration.loadDefault()); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. If host name is null, it will be determined. If service name is null, the simple name + * of the class will be used. Properties required in the client configuration include + * {@link org.apache.accumulo.core.client.ClientConfiguration.ClientProperty#TRACE_SPAN_RECEIVERS} and any properties specific to the span receiver. + */ + public static void enable(String hostname, String service, ClientConfiguration conf) { + String spanReceivers = conf.get(ClientProperty.TRACE_SPAN_RECEIVERS); + String zookeepers = conf.get(ClientProperty.INSTANCE_ZK_HOST); + long timeout = AccumuloConfiguration.getTimeInMillis(conf.get(ClientProperty.INSTANCE_ZK_TIMEOUT)); + String zkPath = conf.get(ClientProperty.TRACE_ZK_PATH); + Map<String,String> properties = conf.getAllPropertiesWithPrefix(ClientProperty.TRACE_SPAN_RECEIVER_PREFIX); + enableTracing(hostname, service, spanReceivers, zookeepers, timeout, zkPath, properties); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. If host name is null, it will be determined. If service name is null, the simple name + * of the class will be used. + */ + public static void enable(String hostname, String service, AccumuloConfiguration conf) { + String spanReceivers = conf.get(Property.TRACE_SPAN_RECEIVERS); + String zookeepers = conf.get(Property.INSTANCE_ZK_HOST); + long timeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + String zkPath = conf.get(Property.TRACE_ZK_PATH); + Map<String,String> properties = conf.getAllPropertiesWithPrefix(Property.TRACE_SPAN_RECEIVER_PREFIX); + enableTracing(hostname, service, spanReceivers, zookeepers, timeout, zkPath, properties); + } + + private static void enableTracing(String hostname, String service, String spanReceivers, String zookeepers, long timeout, String zkPath, + Map<String,String> properties) { + Configuration conf = new Configuration(false); + conf.set(Property.TRACE_SPAN_RECEIVERS.toString(), spanReceivers); + + // remaining properties will be parsed through an HTraceConfiguration by SpanReceivers + setProperty(conf, TRACER_ZK_HOST, zookeepers); + setProperty(conf, TRACER_ZK_TIMEOUT, (int) timeout); + setProperty(conf, TRACER_ZK_PATH, zkPath); + for (Entry<String,String> property : properties.entrySet()) { + setProperty(conf, property.getKey().substring(Property.TRACE_SPAN_RECEIVER_PREFIX.getKey().length()), property.getValue()); + } + if (hostname != null) { + setProperty(conf, TRACE_HOST_PROPERTY, hostname); + } + if (service != null) { + setProperty(conf, TRACE_SERVICE_PROPERTY, service); + } + org.apache.htrace.Trace.setProcessId(service); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override + public void run() { + Trace.off(); + closeReceivers(); + } + }, 0); + loadSpanReceivers(conf); + } + + /** + * Disable tracing by closing SpanReceivers for the current process. + */ + public static void disable() { + closeReceivers(); + } + + private static synchronized void loadSpanReceivers(Configuration conf) { + if (!receivers.isEmpty()) { + log.info("Already loaded span receivers, enable tracing does not need to be called again"); + return; + } + String[] receiverNames = conf.getTrimmedStrings(Property.TRACE_SPAN_RECEIVERS.toString()); + if (receiverNames == null || receiverNames.length == 0) { + return; + } + for (String className : receiverNames) { + SpanReceiverBuilder builder = new SpanReceiverBuilder(wrapHadoopConf(conf)); + SpanReceiver rcvr = builder.spanReceiverClass(className.trim()).build(); + if (rcvr == null) { + log.warn("Failed to load SpanReceiver " + className); + } else { + receivers.add(rcvr); + log.info("SpanReceiver " + className + " was loaded successfully."); + } + } + for (SpanReceiver rcvr : receivers) { + org.apache.htrace.Trace.addReceiver(rcvr); + } + } + + private static void setProperty(Configuration conf, String key, String value) { + conf.set(HTRACE_CONF_PREFIX + key, value); + } + + private static void setProperty(Configuration conf, String key, int value) { + conf.setInt(HTRACE_CONF_PREFIX + key, value); + } + + private static HTraceConfiguration wrapHadoopConf(final Configuration conf) { + return new HTraceConfiguration() { + @Override + public String get(String key) { + return conf.get(HTRACE_CONF_PREFIX + key); + } + + @Override + public String get(String key, String defaultValue) { + return conf.get(HTRACE_CONF_PREFIX + key, defaultValue); + } + }; + } + + private static synchronized void closeReceivers() { + for (SpanReceiver rcvr : receivers) { try { - address = InetAddress.getLocalHost().getHostAddress().toString(); - } catch (UnknownHostException e) { - address = "unknown"; + rcvr.close(); + } catch (IOException e) { + log.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e); } } - Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, application, 1000, 5000)); + receivers.clear(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java index 6df82ce,0000000..385273d mode 100644,000000..100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java @@@ -1,174 -1,0 +1,180 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tracer; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.accumulo.core.trace.DistributedTrace; +import org.apache.accumulo.tracer.thrift.Annotation; +import org.apache.accumulo.tracer.thrift.RemoteSpan; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.TimelineAnnotation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Deliver Span information periodically to a destination. + * <ul> + * <li>Send host and service information with the span. + * <li>Cache Destination objects by some key that can be extracted from the span. + * <li>Can be used to queue spans up for delivery over RPC, or for saving into a file. + * </ul> + */ +public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanReceiver { + + private static final Logger log = LoggerFactory.getLogger(AsyncSpanReceiver.class); + + public static final String SEND_TIMER_MILLIS = "send.timer.millis"; + + private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>(); + + protected String host = null; + protected String service = null; + + protected abstract Destination createDestination(SpanKey key) throws Exception; + + protected abstract void send(Destination resource, RemoteSpan span) throws Exception; + + protected abstract SpanKey getSpanKey(Map<ByteBuffer,ByteBuffer> data); + + Timer timer = new Timer("SpanSender", true); + protected final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>(); ++ int maxQueueSize = 5000; + + // Visible for testing + AsyncSpanReceiver() {} + + public AsyncSpanReceiver(HTraceConfiguration conf) { + host = conf.get(DistributedTrace.TRACE_HOST_PROPERTY, host); + if (host == null) { + try { + host = InetAddress.getLocalHost().getCanonicalHostName().toString(); + } catch (UnknownHostException e) { + host = "unknown"; + } + } + service = conf.get(DistributedTrace.TRACE_SERVICE_PROPERTY, service); ++ maxQueueSize = conf.getInt(DistributedTrace.TRACE_QUEUE_SIZE_PROPERTY, maxQueueSize); + + int millis = conf.getInt(SEND_TIMER_MILLIS, 1000); + timer.schedule(new TimerTask() { + @Override + public void run() { + try { + sendSpans(); + } catch (Exception ex) { + log.warn("Exception sending spans to destination", ex); + } + } + + }, millis, millis); + } + + protected void sendSpans() { + while (!sendQueue.isEmpty()) { + boolean sent = false; + RemoteSpan s = sendQueue.peek(); + SpanKey dest = getSpanKey(s.data); + Destination client = clients.get(dest); + if (client == null) { + try { + clients.put(dest, createDestination(dest)); + } catch (Exception ex) { + log.warn("Exception creating connection to span receiver", ex); + } + } + if (client != null) { + try { + send(client, s); + synchronized (sendQueue) { + sendQueue.remove(); + sendQueue.notifyAll(); + } + sent = true; + } catch (Exception ex) { + log.warn("Got error sending to " + dest + ", refreshing client", ex); + clients.remove(dest); + } + } + if (!sent) + break; + } + } + + public static Map<ByteBuffer,ByteBuffer> convertToByteBuffers(Map<byte[],byte[]> bytesMap) { + if (bytesMap == null) + return null; + Map<ByteBuffer,ByteBuffer> result = new HashMap<ByteBuffer,ByteBuffer>(); + for (Entry<byte[],byte[]> bytes : bytesMap.entrySet()) { + result.put(ByteBuffer.wrap(bytes.getKey()), ByteBuffer.wrap(bytes.getValue())); + } + return result; + } + + public static List<Annotation> convertToAnnotations(List<TimelineAnnotation> annotations) { + if (annotations == null) + return null; + List<Annotation> result = new ArrayList<Annotation>(); + for (TimelineAnnotation annotation : annotations) { + result.add(new Annotation(annotation.getTime(), annotation.getMessage())); + } + return result; + } + + @Override + public void receiveSpan(Span s) { + Map<ByteBuffer,ByteBuffer> data = convertToByteBuffers(s.getKVAnnotations()); ++ + SpanKey dest = getSpanKey(data); + if (dest != null) { + List<Annotation> annotations = convertToAnnotations(s.getTimelineAnnotations()); ++ if (sendQueue.size() > maxQueueSize) { ++ return; ++ } + sendQueue.add(new RemoteSpan(host, service == null ? s.getProcessId() : service, s.getTraceId(), s.getSpanId(), s.getParentId(), s.getStartTimeMillis(), + s.getStopTimeMillis(), s.getDescription(), data, annotations)); + } + } + + @Override + public void close() { + synchronized (sendQueue) { + while (!sendQueue.isEmpty()) { + try { + sendQueue.wait(); + } catch (InterruptedException e) { + log.warn("flush interrupted"); + break; + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/483aee47/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java index f24c7b9,0000000..7a8dc16 mode 100644,000000..100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/SendSpansViaThrift.java @@@ -1,97 -1,0 +1,102 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tracer; + +import static java.nio.charset.StandardCharsets.UTF_8; + ++import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.accumulo.tracer.thrift.RemoteSpan; +import org.apache.accumulo.tracer.thrift.SpanReceiver.Client; +import org.apache.htrace.HTraceConfiguration; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; + +/** + * Send Span data to a destination using thrift. + */ +public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { + + private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SendSpansViaThrift.class); + + private static final String THRIFT = "thrift://"; + + // Visible for testing + SendSpansViaThrift() {} + + public SendSpansViaThrift(HTraceConfiguration conf) { + super(conf); + } + + @Override + protected Client createDestination(String destination) throws Exception { + if (destination == null) + return null; + try { + int portSeparatorIndex = destination.lastIndexOf(':'); + String host = destination.substring(0, portSeparatorIndex); + int port = Integer.parseInt(destination.substring(portSeparatorIndex + 1)); + log.debug("Connecting to " + host + ":" + port); + InetSocketAddress addr = new InetSocketAddress(host, port); + Socket sock = new Socket(); + sock.connect(addr); + TTransport transport = new TSocket(sock); + TProtocol prot = new TBinaryProtocol(transport); + return new Client(prot); ++ } catch (IOException ex) { ++ log.trace("{}", ex, ex); ++ return null; + } catch (Exception ex) { + log.error(ex.getMessage(), ex); + return null; + } + } + + @Override + protected void send(Client client, RemoteSpan s) throws Exception { + if (client != null) { + try { + client.span(s); + } catch (Exception ex) { + client.getInputProtocol().getTransport().close(); + throw ex; + } + } + } + + private static final ByteBuffer DEST = ByteBuffer.wrap("dest".getBytes(UTF_8)); + ++ @Override + protected String getSpanKey(Map<ByteBuffer,ByteBuffer> data) { + String dest = new String(data.get(DEST).array()); + if (dest != null && dest.startsWith(THRIFT)) { + String hostAddress = dest.substring(THRIFT.length()); + String[] hostAddr = hostAddress.split(":", 2); + if (hostAddr.length == 2) { + return hostAddress; + } + } + return null; + } + +}