Repository: camel Updated Branches: refs/heads/master f096e2608 -> d1ee73ec8
CAMEL-11054 Create SPI for Log EIP to enable other components to intercept/enrich logged messages Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1ee73ec Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1ee73ec Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1ee73ec Branch: refs/heads/master Commit: d1ee73ec8533cb9295cd198de2f987231e4a9d91 Parents: f096e26 Author: Tomohisa Igarashi <tm.igara...@gmail.com> Authored: Thu Mar 30 22:56:21 2017 +0900 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 6 11:00:49 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 12 ++++ .../apache/camel/component/log/LogEndpoint.java | 3 +- .../apache/camel/impl/DefaultCamelContext.java | 11 ++++ .../org/apache/camel/model/LogDefinition.java | 3 +- .../camel/processor/CamelLogProcessor.java | 37 ++++++++++- .../apache/camel/processor/LogProcessor.java | 34 +++++++++- .../camel/processor/interceptor/Tracer.java | 2 +- .../java/org/apache/camel/spi/LogListener.java | 45 +++++++++++++ .../camel/component/log/LogListenerTest.java | 66 ++++++++++++++++++++ .../camel/processor/LogEipListenerTest.java | 66 ++++++++++++++++++++ 10 files changed, 274 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index 1d5be88..8aec529 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,6 +58,7 @@ import org.apache.camel.spi.Injector; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.Language; import org.apache.camel.spi.LifecycleStrategy; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.ManagementMBeanAssembler; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; @@ -1965,4 +1967,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { */ RuntimeCamelCatalog getRuntimeCamelCatalog(); + /** + * Gets a list of {@link LogListener}. + */ + Set<LogListener> getLogListeners(); + + /** + * Adds a {@link LogListener}. + */ + void addlogListener(LogListener listener); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java index 05c4e50..29d5b3a 100644 --- a/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java @@ -27,6 +27,7 @@ import org.apache.camel.processor.DefaultExchangeFormatter; import org.apache.camel.processor.DefaultMaskingFormatter; import org.apache.camel.processor.ThroughputLogger; import org.apache.camel.spi.ExchangeFormatter; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; @@ -141,7 +142,7 @@ public class LogEndpoint extends ProcessorEndpoint { Long groupDelay = getGroupDelay(); answer = new ThroughputLogger(camelLogger, this.getCamelContext(), getGroupInterval(), groupDelay, groupActiveOnly); } else { - answer = new CamelLogProcessor(camelLogger, localFormatter, getMaskingFormatter()); + answer = new CamelLogProcessor(camelLogger, localFormatter, getMaskingFormatter(), getCamelContext().getLogListeners()); } // the logger is the processor setProcessor(answer); http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 1633616..f16de74 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -141,6 +142,7 @@ import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.Language; import org.apache.camel.spi.LanguageResolver; import org.apache.camel.spi.LifecycleStrategy; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.ManagementMBeanAssembler; import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; @@ -231,6 +233,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private RestRegistry restRegistry = new DefaultRestRegistry(); private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); private List<RoutePolicyFactory> routePolicyFactories = new ArrayList<RoutePolicyFactory>(); + private Set<LogListener> logListeners = new LinkedHashSet<>(); // special flags to control the first startup which can are special private volatile boolean firstStartDone; @@ -2684,6 +2687,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon getRoutePolicyFactories().add(routePolicyFactory); } + public Set<LogListener> getLogListeners() { + return logListeners; + } + + public void addlogListener(LogListener listener) { + logListeners.add(listener); + } + public void setStreamCaching(Boolean cache) { this.streamCache = cache; } http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java index 3e18d9a..66493e6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java @@ -29,6 +29,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.processor.DefaultMaskingFormatter; import org.apache.camel.processor.LogProcessor; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; @@ -125,7 +126,7 @@ public class LogDefinition extends NoOutputDefinition<LogDefinition> { LoggingLevel level = getLoggingLevel() != null ? getLoggingLevel() : LoggingLevel.INFO; CamelLogger camelLogger = new CamelLogger(logger, level, getMarker()); - return new LogProcessor(exp, camelLogger, getMaskingFormatter(routeContext)); + return new LogProcessor(exp, camelLogger, getMaskingFormatter(routeContext), routeContext.getCamelContext().getLogListeners()); } private MaskingFormatter getMaskingFormatter(RouteContext routeContext) { http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java index e5a3e5c..88a3a9d 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java @@ -16,6 +16,10 @@ */ package org.apache.camel.processor; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; @@ -23,9 +27,12 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.spi.ExchangeFormatter; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link Processor} which just logs to a {@link CamelLogger} object which can be used @@ -37,10 +44,13 @@ import org.apache.camel.util.CamelLogger; * @version */ public class CamelLogProcessor implements AsyncProcessor, IdAware { + + private static final Logger LOG = LoggerFactory.getLogger(CamelLogProcessor.class); private String id; private CamelLogger log; private ExchangeFormatter formatter; private MaskingFormatter maskingFormatter; + private Set<LogListener> listeners; public CamelLogProcessor() { this(new CamelLogger(CamelLogProcessor.class.getName())); @@ -51,10 +61,11 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware { this.log = log; } - public CamelLogProcessor(CamelLogger log, ExchangeFormatter formatter, MaskingFormatter maskingFormatter) { + public CamelLogProcessor(CamelLogger log, ExchangeFormatter formatter, MaskingFormatter maskingFormatter, Set<LogListener> listeners) { this(log); this.formatter = formatter; this.maskingFormatter = maskingFormatter; + this.listeners = listeners; } @Override @@ -80,6 +91,7 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware { if (maskingFormatter != null) { output = maskingFormatter.format(output); } + output = fireListeners(exchange, output); log.log(output); } callback.done(true); @@ -92,6 +104,7 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware { if (maskingFormatter != null) { output = maskingFormatter.format(output); } + output = fireListeners(exchange, output); log.log(output, exception); } } @@ -102,10 +115,32 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware { if (maskingFormatter != null) { output = maskingFormatter.format(output); } + output = fireListeners(exchange, output); log.log(output); } } + private String fireListeners(Exchange exchange, String message) { + if (listeners == null) { + return message; + } + for (LogListener listener : listeners) { + if (listener == null) { + continue; + } + try { + String output = listener.onLog(exchange, log, message); + message = output != null ? output : message; + } catch (Throwable t) { + LOG.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("", t); + } + } + } + return message; + } + public CamelLogger getLogger() { return log; } http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java index 1d4884c..c8fa263 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java @@ -16,16 +16,23 @@ */ package org.apache.camel.processor; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.LogListener; import org.apache.camel.spi.MaskingFormatter; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CamelLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A processor which evaluates an {@link Expression} and logs it. @@ -34,15 +41,18 @@ import org.apache.camel.util.CamelLogger; */ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware { + private static final Logger LOG = LoggerFactory.getLogger(LogProcessor.class); private String id; private final Expression expression; private final CamelLogger logger; private final MaskingFormatter formatter; + private final Set<LogListener> listeners; - public LogProcessor(Expression expression, CamelLogger logger, MaskingFormatter formatter) { + public LogProcessor(Expression expression, CamelLogger logger, MaskingFormatter formatter, Set<LogListener> listeners) { this.expression = expression; this.logger = logger; this.formatter = formatter; + this.listeners = listeners; } public void process(Exchange exchange) throws Exception { @@ -57,6 +67,7 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac if (formatter != null) { msg = formatter.format(msg); } + msg = fireListeners(exchange, msg); logger.doLog(msg); } } catch (Exception e) { @@ -68,6 +79,27 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac return true; } + private String fireListeners(Exchange exchange, String message) { + if (listeners == null) { + return message; + } + for (LogListener listener : listeners) { + if (listener == null) { + continue; + } + try { + String output = listener.onLog(exchange, logger, message); + message = output != null ? output : message; + } catch (Throwable t) { + LOG.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("", t); + } + } + } + return message; + } + @Override public String toString() { return "Log(" + logger.getLog().getName() + ")[" + expression + "]"; http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java index 5d8747f..5c7a1cc 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java @@ -102,7 +102,7 @@ public class Tracer implements InterceptStrategy, Service { */ public synchronized CamelLogProcessor getLogger(ExchangeFormatter formatter) { if (logger == null) { - logger = new CamelLogProcessor(new CamelLogger(getLogName(), getLogLevel()), formatter, null); + logger = new CamelLogProcessor(new CamelLogger(getLogName(), getLogLevel()), formatter, null, null); } return logger; } http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/spi/LogListener.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/LogListener.java b/camel-core/src/main/java/org/apache/camel/spi/LogListener.java new file mode 100644 index 0000000..6d7151e --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/LogListener.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.util.CamelLogger; +import org.slf4j.Logger; +import org.slf4j.Marker; + +/** + * An event listener SPI for logging. Listeners are registered into {@link LogProcessor} and + * {@link CamelLogProcessor} so that the logging events are delivered for both of Log Component and Log EIP. + * + */ +public interface LogListener { + + /** + * Invoked right before Log component or Log EIP logs. + * Note that {@link CamelLogger} holds the {@link LoggingLevel} and {@link Marker}. + * The listener can check {@link CamelLogger#getLevel()} to see in which log level + * this is going to be logged. + * + * @param exchange camel exchange + * @param camelLogger {@link CamelLogger} + * @param message log message + * @return log message, possibly enriched by the listener + */ + String onLog(Exchange exchange, CamelLogger camelLogger, String message); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java b/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java new file mode 100644 index 0000000..dbdaf12 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.log; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.model.Constants; +import org.apache.camel.util.jndi.JndiTest; +import org.junit.Assert; +import org.junit.Test; + +public class LogListenerTest { + private static boolean listenerFired; + + @Test + public void testLogMask() throws Exception { + listenerFired = false; + CamelContext context = createCamelContext(); + MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class); + mock.expectedMessageCount(1); + context.addlogListener((exchange, camelLogger, message) -> { + Assert.assertEquals("Exchange[ExchangePattern: InOnly, BodyType: String, Body: hello]", message); + listenerFired = true; + return message + " - modified by listener"; + }); + context.start(); + context.createProducerTemplate().sendBody("direct:foo", "hello"); + mock.assertIsSatisfied(); + Assert.assertEquals(true, listenerFired); + context.stop(); + } + + protected CamelContext createCamelContext() throws Exception { + JndiRegistry registry = new JndiRegistry(JndiTest.createInitialContext()); + CamelContext context = new DefaultCamelContext(registry); + context.addRoutes(createRouteBuilder()); + return context; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:foo").routeId("foo").to("log:foo").to("mock:foo"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java b/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java new file mode 100644 index 0000000..0e52fde --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.spi.LogListener; +import org.apache.camel.util.jndi.JndiTest; +import org.junit.Assert; +import org.junit.Test; + +public class LogEipListenerTest { + private static boolean listenerFired; + + @Test + public void testLogListener() throws Exception { + listenerFired = false; + CamelContext context = createCamelContext(); + MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class); + mock.expectedMessageCount(1); + context.addlogListener((exchange, camelLogger, message) -> { + Assert.assertEquals("Got hello", message); + listenerFired = true; + return message + " - modified by listener"; + }); + context.start(); + context.createProducerTemplate().sendBody("direct:foo", "hello"); + mock.assertIsSatisfied(); + Assert.assertEquals(true, listenerFired); + context.stop(); + } + + protected CamelContext createCamelContext() throws Exception { + JndiRegistry registry = new JndiRegistry(JndiTest.createInitialContext()); + CamelContext context = new DefaultCamelContext(registry); + context.addRoutes(createRouteBuilder()); + return context; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:foo").routeId("foo").log("Got ${body}").to("mock:foo"); + } + }; + } + +}