This is an automated email from the ASF dual-hosted git repository.

ffang pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.14.x by this push:
     new b94cc3a633e [CAMEL-22414]The response stream is not cached for 
camel-cxf-rest producer (#19376)
b94cc3a633e is described below

commit b94cc3a633e0d0b1c399c450fd2e2c57088ecfeb
Author: Freeman(Yue) Fang <[email protected]>
AuthorDate: Tue Sep 30 12:30:14 2025 -0400

    [CAMEL-22414]The response stream is not cached for camel-cxf-rest producer 
(#19376)
    
    (cherry picked from commit ce5c53d6abd1629c186df12470de130c6b53fb50)
---
 .../component/cxf/jaxrs/CxfConverterLoader.java    |   8 ++
 .../camel/component/cxf/jaxrs/CxfConverter.java    |  15 +++
 .../cxf/jaxrs/CxfRsProducerStreamCacheTest.java    | 130 +++++++++++++++++++++
 3 files changed, 153 insertions(+)

diff --git 
a/components/camel-cxf/camel-cxf-rest/src/generated/java/org/apache/camel/component/cxf/jaxrs/CxfConverterLoader.java
 
b/components/camel-cxf/camel-cxf-rest/src/generated/java/org/apache/camel/component/cxf/jaxrs/CxfConverterLoader.java
index 698247d0117..8758b49dba1 100644
--- 
a/components/camel-cxf/camel-cxf-rest/src/generated/java/org/apache/camel/component/cxf/jaxrs/CxfConverterLoader.java
+++ 
b/components/camel-cxf/camel-cxf-rest/src/generated/java/org/apache/camel/component/cxf/jaxrs/CxfConverterLoader.java
@@ -85,6 +85,14 @@ public final class CxfConverterLoader implements 
TypeConverterLoader, CamelConte
                 }
                 return answer;
             });
+        addTypeConverter(registry, org.apache.camel.StreamCache.class, 
jakarta.ws.rs.core.Response.class, true,
+            (type, exchange, value) -> {
+                Object answer = 
org.apache.camel.component.cxf.jaxrs.CxfConverter.toStreamCache((jakarta.ws.rs.core.Response)
 value, exchange);
+                if (true && answer == null) {
+                    answer = Void.class;
+                }
+                return answer;
+            });
         addTypeConverter(registry, 
org.apache.camel.component.cxf.common.DataFormat.class, java.lang.String.class, 
false,
             (type, exchange, value) -> {
                 Object answer = 
org.apache.camel.component.cxf.jaxrs.CxfConverter.toDataFormat((java.lang.String)
 value);
diff --git 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfConverter.java
 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfConverter.java
index 9e34680bc7e..3db5285c1cf 100644
--- 
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfConverter.java
+++ 
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfConverter.java
@@ -29,6 +29,7 @@ import javax.xml.namespace.QName;
 
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
+import org.apache.camel.StreamCache;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.cxf.common.DataFormat;
 import org.apache.camel.converter.stream.CachedOutputStream;
@@ -123,6 +124,20 @@ public final class CxfConverter {
         return null;
     }
 
+    @Converter(allowNull = true)
+    public static StreamCache toStreamCache(Response response, Exchange 
exchange) {
+        InputStream is = toInputStream(response, exchange);
+
+        TypeConverterRegistry registry = 
exchange.getContext().getTypeConverterRegistry();
+        TypeConverter tc = registry.lookup(StreamCache.class, is.getClass());
+
+        if (tc != null) {
+            return tc.convertTo(StreamCache.class, exchange, is);
+        }
+
+        return null;
+    }
+
     /**
      * Use a fallback type converter so we can convert the embedded list 
element if the value is MessageContentsList.
      * The algorithm of this converter finds the first non-null list element 
from the list and applies conversion to the
diff --git 
a/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerStreamCacheTest.java
 
b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerStreamCacheTest.java
new file mode 100644
index 00000000000..576acd849d3
--- /dev/null
+++ 
b/components/camel-cxf/camel-cxf-rest/src/test/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducerStreamCacheTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf.jaxrs;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.POST;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.core.MediaType;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CxfRsProducerStreamCacheTest extends CamelTestSupport {
+
+    private int port;
+    private Server rsServer;
+
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        port = AvailablePortFinder.getNextAvailable();
+        startRsEchoServer();
+
+    }
+
+    @AfterEach
+    public void stopServer() {
+        if (rsServer != null) {
+            rsServer.stop();
+            rsServer.destroy();
+        }
+    }
+
+    private void startRsEchoServer() {
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setAddress("http://localhost:"; + port + "/rs");
+        sf.setServiceBeans(Collections.singletonList(new EchoResource()));
+        rsServer = sf.create();
+        rsServer.start();
+    }
+
+    @Path("/")
+    public static class EchoResource {
+        @POST
+        @Path("/echo")
+        @Consumes(MediaType.WILDCARD)
+        @Produces(MediaType.TEXT_PLAIN)
+        public String echo(String body) {
+            return body;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        final String cxfrsUri = "cxfrs://http://localhost:"; + port + "/rs"
+                                + "?httpClientAPI=true"
+                                + "&throwExceptionOnFailure=false";
+
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Ensure stream caching is ON for the context
+                getContext().setStreamCaching(true);
+                getContext().getStreamCachingStrategy().setSpoolEnabled(true);
+                getContext().getStreamCachingStrategy().setSpoolThreshold(1024 
* 1024 * 10); // 10MB
+
+                from("direct:start")
+
+                        .setHeader(Exchange.HTTP_METHOD, constant("POST"))
+                        .setHeader(Exchange.HTTP_PATH, constant("/echo"))
+                        .setExchangePattern(ExchangePattern.InOut)
+                        // 1) Call the REST endpoint via cxfrs PRODUCER
+                        .to(cxfrsUri)
+                        // 2) read response after cxfrs call multiple times
+                        .process(e -> {
+                            String body = e.getIn().getBody(String.class);
+
+                        })
+                        .log("The body is ===> ${body}");
+
+            }
+        };
+    }
+
+    @Test
+    public void testProducerStreamCacheWithCxfrs() throws InterruptedException 
{
+        final String payload = "hello-cxfrs-producer-stream-cache";
+        InputStream body = new 
ByteArrayInputStream(payload.getBytes(StandardCharsets.UTF_8));
+
+        ProducerTemplate tpl = template;
+        String response = tpl.requestBody("direct:start", body, String.class);
+
+        assertEquals(payload, response, "Echo response must match original 
payload");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+    }
+}

Reply via email to