This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 479f5b2f619a chore(componets): make TraceProcessor async
479f5b2f619a is described below

commit 479f5b2f619a8906908deb874119543fc1c41263
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Fri Dec 19 11:19:19 2025 +0100

    chore(componets): make TraceProcessor async
---
 .../TraceProcessorsInterceptStrategy.java          | 57 ++++++++++++++++++----
 1 file changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
 
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
index e10c6d1a22e0..5b4ea9c53ba2 100644
--- 
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
+++ 
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
@@ -16,11 +16,15 @@
  */
 package org.apache.camel.telemetry;
 
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 
 /**
@@ -39,32 +43,65 @@ public class TraceProcessorsInterceptStrategy implements 
InterceptStrategy {
             CamelContext camelContext,
             NamedNode processorDefinition, Processor target, Processor 
nextTarget)
             throws Exception {
-        return new DelegateAsyncProcessor(new TraceProcessor(target, 
processorDefinition));
+        return new TraceProcessor(target, processorDefinition);
     }
 
-    private class TraceProcessor implements Processor {
+    private class TraceProcessor extends DelegateAsyncProcessor {
         private final NamedNode processorDefinition;
-        private final Processor target;
 
         public TraceProcessor(Processor target, NamedNode processorDefinition) 
{
-            this.target = target;
+            super(target);
             this.processorDefinition = processorDefinition;
         }
 
         @Override
         public void process(Exchange exchange) throws Exception {
-            String processor = processorDefinition.getId() + "-" + 
processorDefinition.getShortName();
-            if (tracer.isTraceProcessors() && !tracer.exclude(processor, 
exchange.getContext())) {
-                tracer.beginProcessorSpan(exchange, processor);
+            String processorName = processorDefinition.getId() + "-" + 
processorDefinition.getShortName();
+            if (tracer.isTraceProcessors() && !tracer.exclude(processorName, 
exchange.getContext())) {
+                tracer.beginProcessorSpan(exchange, processorName);
                 try {
-                    target.process(exchange);
+                    processor.process(exchange);
                 } finally {
-                    tracer.endProcessorSpan(exchange, processor);
+                    tracer.endProcessorSpan(exchange, processorName);
                 }
             } else {
                 // We must always execute this
-                target.process(exchange);
+                processor.process(exchange);
+            }
+        }
+
+        @Override
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            String processorName = processorDefinition.getId() + "-" + 
processorDefinition.getShortName();
+            boolean isTraceProcessor = tracer.isTraceProcessors() && 
!tracer.exclude(processorName, exchange.getContext());
+            if (isTraceProcessor) {
+                try {
+                    tracer.beginProcessorSpan(exchange, processorName);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
             }
+            return processor.process(exchange, doneSync -> {
+                try {
+                    callback.done(doneSync);
+                } finally {
+                    if (isTraceProcessor) {
+                        try {
+                            tracer.endProcessorSpan(exchange, processorName);
+                        } catch (Exception e) {
+                            exchange.setException(e);
+                        }
+                    }
+                }
+            });
+        }
+
+        @Override
+        public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+            AsyncCallbackToCompletableFutureAdapter<Exchange> callback
+                    = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+            process(exchange, callback);
+            return callback.getFuture();
         }
     }
 

Reply via email to