oscerd commented on code in PR #18212: URL: https://github.com/apache/camel/pull/18212#discussion_r2115171248
########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -16,15 +16,15 @@ */ package org.apache.camel.component.nats; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; Review Comment: Please avoid * import ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -90,6 +90,12 @@ public class NatsConfiguration { private boolean traceConnection; @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy(); + @UriParam(label = "advanced", defaultValue = "true") + private boolean jetstreamEnabled = true; Review Comment: I don't think the default value should be true. It's not sure people will use jetstream by default. ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -108,16 +108,53 @@ public boolean process(Exchange exchange, AsyncCallback callback) { } else { LOG.debug("Publishing to topic: {}", config.getTopic()); - final NatsMessage.Builder builder = NatsMessage.builder() - .data(body) - .subject(config.getTopic()) - .headers(this.buildHeaders(exchange)); - - if (ObjectHelper.isNotEmpty(config.getReplySubject())) { - final String replySubject = config.getReplySubject(); - builder.replyTo(replySubject); + if (config.isJetstreamEnabled() && this.connection.getServerInfo().isJetStreamAvailable()) { + LOG.info("JetStream is available"); + + JetStreamManagement jsm; + JetStream js; + try { + jsm = this.connection.jetStreamManagement(); + js = jsm.jetStream(); + if(js == null) { + jsm.addStream(StreamConfiguration.builder() + .name(config.getJetstreamName()) + .subjects(config.getTopic()) + .build()); + js = jsm.jetStream(); + } + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + PublishAck pa; + if (config.isJetstreamAsync()) { + CompletableFuture<PublishAck> future = js.publishAsync(config.getTopic(), body); + try { + pa = future.get(1, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + LOG.info("Publish Sequence async: {}", pa.getSeqno()); + } else { + try { + pa = js.publish(config.getTopic(), body); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException(e); + } + LOG.info("Publish Sequence synchronously: {}", pa.getSeqno()); + } + } else { Review Comment: I would do the other way around. Jet stream will be not enabled by default, so its part should be in the else. ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -16,15 +16,15 @@ */ package org.apache.camel.component.nats; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; -import io.nats.client.Connection; +import io.nats.client.*; Review Comment: Please avoid * import ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -454,4 +460,55 @@ public boolean isTraceConnection() { public void setTraceConnection(boolean traceConnection) { this.traceConnection = traceConnection; } + + /** + * Whether to enable JetStream support for this endpoint. + */ + public boolean isJetstreamEnabled() { + return jetstreamEnabled; + } + + /** + * Sets whether to enable JetStream support for this endpoint. + * + * @param jetstreamEnabled {@code true} to enable JetStream, {@code false} otherwise. + */ + public void setJetstreamEnabled(boolean jetstreamEnabled) { + this.jetstreamEnabled = jetstreamEnabled; + } + + /** + * The name of the JetStream stream to use. If not specified, a default stream + * will be used if JetStream is enabled. + */ + public String getJetstreamName() { + return jetstreamName; + } + + /** + * Sets the name of the JetStream stream to use. + * + * @param jetstreamName The name of the JetStream stream. Review Comment: Please remove the param, is not needed. We are going to generate documentation from the javadoc, this would mess the documentation page. ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -90,6 +90,12 @@ public class NatsConfiguration { private boolean traceConnection; @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy(); + @UriParam(label = "advanced", defaultValue = "true") + private boolean jetstreamEnabled = true; + @UriParam(label = "advanced", defaultValue = "default-stream") + private String jetstreamName = "default-stream"; Review Comment: Is this the default name of the stream or something you're assuming? ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -108,16 +108,53 @@ public boolean process(Exchange exchange, AsyncCallback callback) { } else { LOG.debug("Publishing to topic: {}", config.getTopic()); - final NatsMessage.Builder builder = NatsMessage.builder() - .data(body) - .subject(config.getTopic()) - .headers(this.buildHeaders(exchange)); - - if (ObjectHelper.isNotEmpty(config.getReplySubject())) { - final String replySubject = config.getReplySubject(); - builder.replyTo(replySubject); + if (config.isJetstreamEnabled() && this.connection.getServerInfo().isJetStreamAvailable()) { + LOG.info("JetStream is available"); Review Comment: Use DEBUG. ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -454,4 +460,55 @@ public boolean isTraceConnection() { public void setTraceConnection(boolean traceConnection) { this.traceConnection = traceConnection; } + + /** + * Whether to enable JetStream support for this endpoint. + */ + public boolean isJetstreamEnabled() { + return jetstreamEnabled; + } + + /** + * Sets whether to enable JetStream support for this endpoint. + * + * @param jetstreamEnabled {@code true} to enable JetStream, {@code false} otherwise. Review Comment: Please remove the param, is not needed. We are going to generate documentation from the javadoc, this would mess the documentation page. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org