This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 087b5a7 Stream caching should be able to init with camel-quarkus out of the box 087b5a7 is described below commit 087b5a7db18c8070e37b119cb9db0513e3dd0865 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Aug 9 09:15:00 2019 +0200 Stream caching should be able to init with camel-quarkus out of the box --- .../impl/engine/DefaultStreamCachingStrategy.java | 34 +++-- .../StreamCachingSpoolDirectoryQuarkusTest.java | 147 +++++++++++++++++++++ 2 files changed, 173 insertions(+), 8 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java index 9d0ccb4..8fd7663 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java @@ -22,6 +22,7 @@ import java.lang.management.MemoryMXBean; import java.util.LinkedHashSet; import java.util.Set; import java.util.UUID; +import java.util.regex.Matcher; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -222,15 +223,32 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came } protected String resolveSpoolDirectory(String path) { - String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false); - if (name != null) { - name = customResolveManagementName(name); - } - // and then check again with invalid check to ensure all ## is resolved - if (name != null) { - name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true); + if (camelContext.getManagementNameStrategy() != null) { + String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false); + if (name != null) { + name = customResolveManagementName(name); + } + // and then check again with invalid check to ensure all ## is resolved + if (name != null) { + name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true); + } + return name; + } else { + return defaultManagementName(path); } - return name; + } + + protected String defaultManagementName(String path) { + // must quote the names to have it work as literal replacement + String name = Matcher.quoteReplacement(camelContext.getName()); + + // replace tokens + String answer = path; + answer = answer.replaceFirst("#camelId#", name); + answer = answer.replaceFirst("#name#", name); + // replace custom + answer = customResolveManagementName(answer); + return answer; } protected String customResolveManagementName(String pattern) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java new file mode 100644 index 0000000..7ba40f8 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/StreamCachingSpoolDirectoryQuarkusTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FilterInputStream; +import java.io.InputStream; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.spi.ManagementNameStrategy; +import org.apache.camel.spi.StreamCachingStrategy; +import org.junit.Before; +import org.junit.Test; + +public class StreamCachingSpoolDirectoryQuarkusTest extends ContextTestSupport { + + private MyCustomSpoolRule spoolRule = new MyCustomSpoolRule(); + + private class MyCamelContext extends DefaultCamelContext { + + public MyCamelContext(boolean init) { + super(init); + } + + @Override + public ManagementNameStrategy getManagementNameStrategy() { + // quarkus has no management at all + return null; + } + } + + @Override + @Before + public void setUp() throws Exception { + deleteDirectory("target/data/cachedir"); + super.setUp(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = new MyCamelContext(false); + context.disableJMX(); + context.setRegistry(createRegistry()); + context.setLoadTypeConverters(isLoadTypeConverters()); + return context; + } + + @Test + public void testByteArrayInputStream() throws Exception { + getMockEndpoint("mock:english").expectedBodiesReceived("<hello/>"); + getMockEndpoint("mock:dutch").expectedBodiesReceived("<hallo/>"); + getMockEndpoint("mock:german").expectedBodiesReceived("<hallo/>"); + getMockEndpoint("mock:french").expectedBodiesReceived("<hellos/>"); + + // need to wrap in MyInputStream as ByteArrayInputStream is optimized to just reuse in memory buffer + // and not needed to spool to disk + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hello/>".getBytes()))); + + spoolRule.setSpool(true); + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hallo/>".getBytes()))); + template.sendBody("direct:a", new MyInputStream(new ByteArrayInputStream("<hellos/>".getBytes()))); + + assertMockEndpointsSatisfied(); + } + + private final class MyInputStream extends FilterInputStream { + + private MyInputStream(InputStream in) { + super(in); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.getStreamCachingStrategy().setSpoolDirectory("target/cachedir"); + context.getStreamCachingStrategy().addSpoolRule(spoolRule); + context.getStreamCachingStrategy().setAnySpoolRules(true); + context.setStreamCaching(true); + + from("direct:a") + .choice() + .when(xpath("//hello")).to("mock:english") + .when(xpath("//hallo")).to("mock:dutch", "mock:german") + .otherwise().to("mock:french") + .end() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // check if spool file exists + if (spoolRule.isSpool()) { + String[] names = new File("target/cachedir").list(); + assertEquals("There should be a cached spool file", 1, names.length); + } + } + }); + + } + }; + } + + private static final class MyCustomSpoolRule implements StreamCachingStrategy.SpoolRule { + + private volatile boolean spool; + + @Override + public boolean shouldSpoolCache(long length) { + return spool; + } + + public boolean isSpool() { + return spool; + } + + public void setSpool(boolean spool) { + this.spool = spool; + } + + @Override + public String toString() { + return "MyCustomSpoolRule"; + } + } +} +