Author: davsclaus Date: Wed Nov 11 12:45:52 2009 New Revision: 834858 URL: http://svn.apache.org/viewvc?rev=834858&view=rev Log: CAMEL-2135: Fixed sending POST data. Added concurrent test.
Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java (with props) Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834858&r1=834857&r2=834858&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java Wed Nov 11 12:45:52 2009 @@ -17,6 +17,7 @@ package org.apache.camel.component.jetty; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.LinkedHashMap; import java.util.Map; @@ -25,6 +26,7 @@ import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mortbay.io.Buffer; @@ -66,6 +68,15 @@ } @Override + protected void onRequestComplete() throws IOException { + // close the input stream when its not needed anymore + InputStream is = getRequestContentSource(); + if (is != null) { + ObjectHelper.close(is, "RequestContentSource", LOG); + } + } + + @Override protected void onResponseComplete() { doTaskCompleted(); } Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834858&r1=834857&r2=834858&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Wed Nov 11 12:45:52 2009 @@ -17,6 +17,7 @@ package org.apache.camel.component.jetty; import java.io.IOException; +import java.io.InputStream; import java.net.URISyntaxException; import java.util.Map; @@ -31,10 +32,13 @@ import org.apache.camel.component.http.helper.HttpProducerHelper; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.mortbay.io.Buffer; +import org.mortbay.io.ByteArrayBuffer; import org.mortbay.jetty.client.HttpClient; import org.mortbay.jetty.client.HttpExchange; @@ -111,6 +115,30 @@ httpExchange.setMethod(method); httpExchange.setURL(url); + // if we post then set data + if (HttpMethods.POST.equals(methodToUse)) { + + String contentType = ExchangeHelper.getContentType(exchange); + if (contentType != null) { + httpExchange.setRequestContentType(contentType); + } + + // try with String at first + String data = exchange.getIn().getBody(String.class); + if (data != null) { + String charset = exchange.getProperty(Exchange.CHARSET_NAME, String.class); + if (charset != null) { + httpExchange.setRequestContent(new ByteArrayBuffer(data, charset)); + } else { + httpExchange.setRequestContent(new ByteArrayBuffer(data)); + } + } else { + // then fallback to input stream + InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, exchange.getIn().getBody()); + httpExchange.setRequestContentSource(is); + } + } + doSetQueryParameters(exchange, httpExchange); // and copy headers from IN message Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java?rev=834858&view=auto ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java (added) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java Wed Nov 11 12:45:52 2009 @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jetty.jettyproducer; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Jetty HTTP producer concurrent test. + * + * @version $Revision$ + */ +public class JettyHttpProducerConcurrentTest extends CamelTestSupport { + + @Test + public void testNoConcurrentProducers() throws Exception { + doSendMessages(1, 1); + } + + @Test + public void testConcurrentProducers() throws Exception { + doSendMessages(10, 5); + } + + private void doSendMessages(int files, int poolSize) throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(files); + getMockEndpoint("mock:result").assertNoDuplicates(body()); + + ExecutorService executor = Executors.newFixedThreadPool(poolSize); + Map<Integer, Future> responses = new ConcurrentHashMap(); + for (int i = 0; i < files; i++) { + final int index = i; + Future out = executor.submit(new Callable<Object>() { + public Object call() throws Exception { + return template.requestBody("jetty://http://localhost:9080/echo", "" + index, String.class); + } + }); + responses.put(index, out); + } + + assertMockEndpointsSatisfied(); + + assertEquals(files, responses.size()); + + // get all responses + Set unique = new HashSet(); + for (Future future : responses.values()) { + unique.add(future.get()); + } + + // should be 10 unique responses + assertEquals("Should be " + files + " unique responses", files, unique.size()); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // expose a echo service + from("jetty:http://localhost:9080/echo") + .convertBodyTo(String.class) + .to("log:input") + .transform(body().append(body())).to("mock:result"); + } + }; + } + +} Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerConcurrentTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date