This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 479f5b2f619a chore(componets): make TraceProcessor async
479f5b2f619a is described below
commit 479f5b2f619a8906908deb874119543fc1c41263
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Fri Dec 19 11:19:19 2025 +0100
chore(componets): make TraceProcessor async
---
.../TraceProcessorsInterceptStrategy.java | 57 ++++++++++++++++++----
1 file changed, 47 insertions(+), 10 deletions(-)
diff --git
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
index e10c6d1a22e0..5b4ea9c53ba2 100644
---
a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
+++
b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/TraceProcessorsInterceptStrategy.java
@@ -16,11 +16,15 @@
*/
package org.apache.camel.telemetry;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.NamedNode;
import org.apache.camel.Processor;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
/**
@@ -39,32 +43,65 @@ public class TraceProcessorsInterceptStrategy implements
InterceptStrategy {
CamelContext camelContext,
NamedNode processorDefinition, Processor target, Processor
nextTarget)
throws Exception {
- return new DelegateAsyncProcessor(new TraceProcessor(target,
processorDefinition));
+ return new TraceProcessor(target, processorDefinition);
}
- private class TraceProcessor implements Processor {
+ private class TraceProcessor extends DelegateAsyncProcessor {
private final NamedNode processorDefinition;
- private final Processor target;
public TraceProcessor(Processor target, NamedNode processorDefinition)
{
- this.target = target;
+ super(target);
this.processorDefinition = processorDefinition;
}
@Override
public void process(Exchange exchange) throws Exception {
- String processor = processorDefinition.getId() + "-" +
processorDefinition.getShortName();
- if (tracer.isTraceProcessors() && !tracer.exclude(processor,
exchange.getContext())) {
- tracer.beginProcessorSpan(exchange, processor);
+ String processorName = processorDefinition.getId() + "-" +
processorDefinition.getShortName();
+ if (tracer.isTraceProcessors() && !tracer.exclude(processorName,
exchange.getContext())) {
+ tracer.beginProcessorSpan(exchange, processorName);
try {
- target.process(exchange);
+ processor.process(exchange);
} finally {
- tracer.endProcessorSpan(exchange, processor);
+ tracer.endProcessorSpan(exchange, processorName);
}
} else {
// We must always execute this
- target.process(exchange);
+ processor.process(exchange);
+ }
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ String processorName = processorDefinition.getId() + "-" +
processorDefinition.getShortName();
+ boolean isTraceProcessor = tracer.isTraceProcessors() &&
!tracer.exclude(processorName, exchange.getContext());
+ if (isTraceProcessor) {
+ try {
+ tracer.beginProcessorSpan(exchange, processorName);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
}
+ return processor.process(exchange, doneSync -> {
+ try {
+ callback.done(doneSync);
+ } finally {
+ if (isTraceProcessor) {
+ try {
+ tracer.endProcessorSpan(exchange, processorName);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
+ AsyncCallbackToCompletableFutureAdapter<Exchange> callback
+ = new AsyncCallbackToCompletableFutureAdapter<>(exchange);
+ process(exchange, callback);
+ return callback.getFuture();
}
}