oscerd commented on code in PR #18212:
URL: https://github.com/apache/camel/pull/18212#discussion_r2115171248


##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -16,15 +16,15 @@
  */
 package org.apache.camel.component.nats;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;

Review Comment:
   Please avoid * import



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -90,6 +90,12 @@ public class NatsConfiguration {
     private boolean traceConnection;
     @UriParam(label = "advanced")
     private HeaderFilterStrategy headerFilterStrategy = new 
DefaultHeaderFilterStrategy();
+    @UriParam(label = "advanced", defaultValue = "true")
+    private boolean jetstreamEnabled = true;

Review Comment:
   I don't think the default value should be true. It's not sure people will 
use jetstream by default.



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -108,16 +108,53 @@ public boolean process(Exchange exchange, AsyncCallback 
callback) {
         } else {
             LOG.debug("Publishing to topic: {}", config.getTopic());
 
-            final NatsMessage.Builder builder = NatsMessage.builder()
-                    .data(body)
-                    .subject(config.getTopic())
-                    .headers(this.buildHeaders(exchange));
-
-            if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
-                final String replySubject = config.getReplySubject();
-                builder.replyTo(replySubject);
+            if (config.isJetstreamEnabled() && 
this.connection.getServerInfo().isJetStreamAvailable()) {
+                LOG.info("JetStream is available");
+
+                JetStreamManagement jsm;
+                JetStream js;
+                try {
+                    jsm = this.connection.jetStreamManagement();
+                    js = jsm.jetStream();
+                    if(js == null) {
+                        jsm.addStream(StreamConfiguration.builder()
+                                .name(config.getJetstreamName())
+                                .subjects(config.getTopic())
+                                .build());
+                        js = jsm.jetStream();
+                    }
+                } catch (IOException | JetStreamApiException e) {
+                    throw new RuntimeException(e);
+                }
+                PublishAck pa;
+                if (config.isJetstreamAsync()) {
+                    CompletableFuture<PublishAck> future = 
js.publishAsync(config.getTopic(), body);
+                    try {
+                        pa = future.get(1, TimeUnit.SECONDS);
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                        throw new RuntimeException(e);
+                    }
+                    LOG.info("Publish Sequence async: {}", pa.getSeqno());
+                } else {
+                    try {
+                        pa = js.publish(config.getTopic(), body);
+                    } catch (IOException | JetStreamApiException e) {
+                        throw new RuntimeException(e);
+                    }
+                    LOG.info("Publish Sequence synchronously: {}", 
pa.getSeqno());
+                }
+            } else {

Review Comment:
   I would do the other way around. Jet stream will be not enabled by default, 
so its part should be in the else.



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -16,15 +16,15 @@
  */
 package org.apache.camel.component.nats;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
-import io.nats.client.Connection;
+import io.nats.client.*;

Review Comment:
   Please avoid * import



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -454,4 +460,55 @@ public boolean isTraceConnection() {
     public void setTraceConnection(boolean traceConnection) {
         this.traceConnection = traceConnection;
     }
+
+    /**
+     * Whether to enable JetStream support for this endpoint.
+     */
+    public boolean isJetstreamEnabled() {
+        return jetstreamEnabled;
+    }
+
+    /**
+     * Sets whether to enable JetStream support for this endpoint.
+     *
+     * @param jetstreamEnabled {@code true} to enable JetStream, {@code false} 
otherwise.
+     */
+    public void setJetstreamEnabled(boolean jetstreamEnabled) {
+        this.jetstreamEnabled = jetstreamEnabled;
+    }
+
+    /**
+     * The name of the JetStream stream to use. If not specified, a default 
stream
+     * will be used if JetStream is enabled.
+     */
+    public String getJetstreamName() {
+        return jetstreamName;
+    }
+
+    /**
+     * Sets the name of the JetStream stream to use.
+     *
+     * @param jetstreamName The name of the JetStream stream.

Review Comment:
   Please remove the param, is not needed. We are going to generate 
documentation from the javadoc, this would mess the documentation page.



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -90,6 +90,12 @@ public class NatsConfiguration {
     private boolean traceConnection;
     @UriParam(label = "advanced")
     private HeaderFilterStrategy headerFilterStrategy = new 
DefaultHeaderFilterStrategy();
+    @UriParam(label = "advanced", defaultValue = "true")
+    private boolean jetstreamEnabled = true;
+    @UriParam(label = "advanced", defaultValue = "default-stream")
+    private String jetstreamName = "default-stream";

Review Comment:
   Is this the default name of the stream or something you're assuming?



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -108,16 +108,53 @@ public boolean process(Exchange exchange, AsyncCallback 
callback) {
         } else {
             LOG.debug("Publishing to topic: {}", config.getTopic());
 
-            final NatsMessage.Builder builder = NatsMessage.builder()
-                    .data(body)
-                    .subject(config.getTopic())
-                    .headers(this.buildHeaders(exchange));
-
-            if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
-                final String replySubject = config.getReplySubject();
-                builder.replyTo(replySubject);
+            if (config.isJetstreamEnabled() && 
this.connection.getServerInfo().isJetStreamAvailable()) {
+                LOG.info("JetStream is available");

Review Comment:
   Use DEBUG.



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -454,4 +460,55 @@ public boolean isTraceConnection() {
     public void setTraceConnection(boolean traceConnection) {
         this.traceConnection = traceConnection;
     }
+
+    /**
+     * Whether to enable JetStream support for this endpoint.
+     */
+    public boolean isJetstreamEnabled() {
+        return jetstreamEnabled;
+    }
+
+    /**
+     * Sets whether to enable JetStream support for this endpoint.
+     *
+     * @param jetstreamEnabled {@code true} to enable JetStream, {@code false} 
otherwise.

Review Comment:
   Please remove the param, is not needed. We are going to generate 
documentation from the javadoc, this would mess the documentation page.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to