Repository: accumulo Updated Branches: refs/heads/1.7 61bfbb22e -> 0eef354c5
ACCUMULO-3862 improved how AsyncSpanReceiver drops short spans, added test for min span length Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0eef354c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0eef354c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0eef354c Branch: refs/heads/1.7 Commit: 0eef354c56e9ef2788c72f4dbaac596653e25bbe Parents: 61bfbb2 Author: Billie Rinaldi <bil...@apache.org> Authored: Fri May 29 13:29:32 2015 -0700 Committer: Billie Rinaldi <bil...@apache.org> Committed: Fri May 29 13:29:40 2015 -0700 ---------------------------------------------------------------------- .../accumulo/tracer/AsyncSpanReceiver.java | 14 +- .../accumulo/tracer/AsyncSpanReceiverTest.java | 129 +++++++++++++++++++ 2 files changed, 134 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java index 28a9088..a35734d 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/AsyncSpanReceiver.java @@ -55,7 +55,7 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece public static final String SEND_TIMER_MILLIS = "tracer.send.timer.millis"; public static final String QUEUE_SIZE = "tracer.queue.size"; - private static final String SPAN_MIN_MS = "tracer.span.min.ms"; + public static final String SPAN_MIN_MS = "tracer.span.min.ms"; private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>(); @@ -109,14 +109,6 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece while (!sendQueue.isEmpty()) { boolean sent = false; RemoteSpan s = sendQueue.peek(); - if (s.stop - s.start < minSpanSize) { - synchronized (sendQueue) { - sendQueue.remove(); - sendQueue.notifyAll(); - sendQueueSize.decrementAndGet(); - } - continue; - } SpanKey dest = getSpanKey(s.data); Destination client = clients.get(dest); if (client == null) { @@ -167,6 +159,10 @@ public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanRece @Override public void receiveSpan(Span s) { + if (s.getStopTimeMillis() - s.getStartTimeMillis() < minSpanSize) { + return; + } + Map<String,String> data = convertToStrings(s.getKVAnnotations()); SpanKey dest = getSpanKey(data); http://git-wip-us.apache.org/repos/asf/accumulo/blob/0eef354c/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java new file mode 100644 index 0000000..6744efc --- /dev/null +++ b/server/tracer/src/test/java/org/apache/accumulo/tracer/AsyncSpanReceiverTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tracer; + +import org.apache.accumulo.tracer.thrift.RemoteSpan; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.impl.MilliSpan; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +public class AsyncSpanReceiverTest { + static class TestReceiver extends AsyncSpanReceiver<String,String> { + AtomicInteger spansSent = new AtomicInteger(0); + + TestReceiver() { + super(HTraceConfiguration.EMPTY); + } + + TestReceiver(HTraceConfiguration conf) { + super(conf); + } + + @Override + protected String createDestination(String o) throws Exception { + return "DEST"; + } + + @Override + protected void send(String resource, RemoteSpan span) throws Exception { + spansSent.incrementAndGet(); + } + + @Override + protected String getSpanKey(Map data) { + return "DEST"; + } + + int getSpansSent() { + return spansSent.get(); + } + + int getQueueSize() { + return sendQueueSize.get(); + } + } + + Span createSpan(long length) { + long time = System.currentTimeMillis(); + Span span = new MilliSpan.Builder().begin(time).end(time + length).description("desc").parents(Collections.<Long> emptyList()).spanId(1).traceId(2).build(); + return span; + } + + @Test + public void test() throws InterruptedException { + TestReceiver receiver = new TestReceiver(); + + receiver.receiveSpan(createSpan(0)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(0, receiver.getSpansSent()); + + receiver.receiveSpan(createSpan(1)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(1, receiver.getSpansSent()); + } + + @Test + public void testKeepAll() throws InterruptedException { + TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS, "0"))); + + receiver.receiveSpan(createSpan(0)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(1, receiver.getSpansSent()); + } + + @Test + public void testExcludeMore() throws InterruptedException { + TestReceiver receiver = new TestReceiver(HTraceConfiguration.fromMap(Collections.singletonMap(AsyncSpanReceiver.SPAN_MIN_MS, "10"))); + + receiver.receiveSpan(createSpan(0)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(0, receiver.getSpansSent()); + + receiver.receiveSpan(createSpan(9)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(0, receiver.getSpansSent()); + + receiver.receiveSpan(createSpan(10)); + while (receiver.getQueueSize() > 0) { + Thread.sleep(500); + } + assertEquals(0, receiver.getQueueSize()); + assertEquals(1, receiver.getSpansSent()); + } +}