This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.21.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit d36c11b35a3eb839c320897a7ba3beb5b3fe9891 Author: akhettar <aya...@zotix.co> AuthorDate: Tue Jun 12 16:31:12 2018 +0100 CAMEL-12573:Fixing class cast exception. Handling kafka.Partition as Integer and kafka.Offset as a Long. --- .../opentracing/decorators/KafkaSpanDecorator.java | 16 +++++++++-- .../decorators/KafkaSpanDecoratorTest.java | 32 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java index ecc9279..1ec0a68 100644 --- a/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java +++ b/components/camel-opentracing/src/main/java/org/apache/camel/opentracing/decorators/KafkaSpanDecorator.java @@ -58,7 +58,7 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator { public void pre(Span span, Exchange exchange, Endpoint endpoint) { super.pre(span, exchange, endpoint); - String partition = (String)exchange.getIn().getHeader(PARTITION); + String partition = getValue(exchange, PARTITION, Integer.class); if (partition != null) { span.setTag(KAFKA_PARTITION_TAG, partition); } @@ -73,10 +73,22 @@ public class KafkaSpanDecorator extends AbstractMessagingSpanDecorator { span.setTag(KAFKA_KEY_TAG, key); } - String offset = (String)exchange.getIn().getHeader(OFFSET); + String offset = getValue(exchange, OFFSET, Long.class); if (offset != null) { span.setTag(KAFKA_OFFSET_TAG, offset); } } + /** + * Extracts header value from the exchange for given header + * @param exchange the {@link Exchange} + * @param header the header name + * @param type the class type of the exchange header + * @return + */ + private <T> String getValue(final Exchange exchange, final String header, Class<T> type) { + T partition = exchange.getIn().getHeader(header, type); + return partition != null ? String.valueOf(partition) : exchange.getIn().getHeader(header, String.class); + } + } diff --git a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java index b3eda9c..2c51def 100644 --- a/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java +++ b/components/camel-opentracing/src/test/java/org/apache/camel/opentracing/decorators/KafkaSpanDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.camel.opentracing.decorators; import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; +import jdk.nashorn.internal.IntDeque; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -59,7 +60,7 @@ public class KafkaSpanDecoratorTest { } @Test - public void testPre() { + public void testPreOffsetAndPartitionAsStringHeader() { String testKey = "TestKey"; String testOffset = "TestOffset"; String testPartition = "TestPartition"; @@ -72,8 +73,8 @@ public class KafkaSpanDecoratorTest { Mockito.when(endpoint.getEndpointUri()).thenReturn("test"); Mockito.when(exchange.getIn()).thenReturn(message); Mockito.when(message.getHeader(KafkaSpanDecorator.KEY)).thenReturn(testKey); - Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET)).thenReturn(testOffset); - Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION)).thenReturn(testPartition); + Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, String.class)).thenReturn(testOffset); + Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, String.class)).thenReturn(testPartition); Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION_KEY)).thenReturn(testPartitionKey); SpanDecorator decorator = new KafkaSpanDecorator(); @@ -89,4 +90,29 @@ public class KafkaSpanDecoratorTest { assertEquals(testPartitionKey, span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_KEY_TAG)); } + @Test + public void testPrePartitionAsIntegerHeaderAndOffsetAsLongHeader() { + Long testOffset = 4875454l; + Integer testPartition = 0; + + Endpoint endpoint = Mockito.mock(Endpoint.class); + Exchange exchange = Mockito.mock(Exchange.class); + Message message = Mockito.mock(Message.class); + + Mockito.when(endpoint.getEndpointUri()).thenReturn("test"); + Mockito.when(exchange.getIn()).thenReturn(message); + Mockito.when(message.getHeader(KafkaSpanDecorator.OFFSET, Long.class)).thenReturn(testOffset); + Mockito.when(message.getHeader(KafkaSpanDecorator.PARTITION, Integer.class)).thenReturn(testPartition); + + SpanDecorator decorator = new KafkaSpanDecorator(); + + MockTracer tracer = new MockTracer(); + MockSpan span = tracer.buildSpan("TestSpan").start(); + + decorator.pre(span, exchange, endpoint); + + assertEquals(String.valueOf(testOffset), span.tags().get(KafkaSpanDecorator.KAFKA_OFFSET_TAG)); + assertEquals(String.valueOf(testPartition), span.tags().get(KafkaSpanDecorator.KAFKA_PARTITION_TAG)); + } + } -- To stop receiving notification emails like this one, please contact acosent...@apache.org.