davsclaus commented on code in PR #18212:
URL: https://github.com/apache/camel/pull/18212#discussion_r2122688786


##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -90,6 +90,12 @@ public class NatsConfiguration {
     private boolean traceConnection;
     @UriParam(label = "advanced")
     private HeaderFilterStrategy headerFilterStrategy = new 
DefaultHeaderFilterStrategy();
+    @UriParam(label = "advanced", defaultValue = "true")
+    private boolean jetstreamEnabled = true;
+    @UriParam(label = "advanced", defaultValue = "default-stream")
+    private String jetstreamName = "default-stream";

Review Comment:
   What is the meaning of the jetstream-name, can you tell more about that



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java:
##########
@@ -123,26 +130,70 @@ class NatsConsumingTask implements Runnable {
         @Override
         public void run() {
             try {
-                NatsConsumer.this.dispatcher = 
this.connection.createDispatcher(new CamelNatsMessageHandler());
-                if 
(ObjectHelper.isNotEmpty(this.configuration.getQueueName())) {
-                    NatsConsumer.this.dispatcher = 
NatsConsumer.this.dispatcher.subscribe(
-                            
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                            
NatsConsumer.this.getEndpoint().getConfiguration().getQueueName());
-                    if 
(ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()))
 {
-                        NatsConsumer.this.dispatcher.unsubscribe(
-                                
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                                
Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()));
-                    }
-                    if (NatsConsumer.this.dispatcher.isActive()) {
-                        NatsConsumer.this.setActive(true);
+                NatsConfiguration config = getEndpoint().getConfiguration();
+                String topic = config.getTopic();
+                String queueName = config.getQueueName();
+                String maxMessagesStr = config.getMaxMessages();
+                Integer maxMessages = null;
+                if (ObjectHelper.isNotEmpty(maxMessagesStr)) {
+                    maxMessages = Integer.parseInt(maxMessagesStr);
+                }
+
+                if (config.isJetstreamEnabled() && 
connection.getServerInfo().isJetStreamAvailable()) {
+                    String streamName = this.configuration.getJetstreamName();
+                    String consumerName
+                            = ObjectHelper.isNotEmpty(queueName) ? queueName : 
"consumer-" + System.currentTimeMillis(); // Generate a default consumer name 
if queueName is not provided
+                    LOG.info("Setting up JetStream PUSH consumer for stream: 
'{}', durable: '{}', topic: {} ", streamName,
+                            consumerName, this.configuration.getTopic());
+
+                    JetStreamManagement jsm = connection.jetStreamManagement();
+                    try {
+                        // Try to get the stream, create it if it doesn't exist
+                        jsm.getStreamInfo(streamName);
+                    } catch (JetStreamApiException e) {
+                        if (e.getErrorCode() == 404) {
+                            StreamConfiguration streamConfig = 
StreamConfiguration.builder()
+                                    .name(streamName)
+                                    .subjects(topic)
+                                    .build();
+                            jsm.addStream(streamConfig);
+                        } else {
+                            throw e;
+                        }
                     }
+
+                    ConsumerConfiguration.Builder ccBuilder = 
ConsumerConfiguration.builder()
+                            .durable(consumerName);
+
+                    ccBuilder.deliverSubject(null);
+                    ConsumerConfiguration cc = ccBuilder.build();
+
+                    PushSubscribeOptions pushOptions = 
PushSubscribeOptions.builder()
+                            .configuration(cc)
+                            .build();
+
+                    NatsConsumer.this.dispatcher = 
this.connection.createDispatcher(new CamelNatsMessageHandler());
+
+                    NatsConsumer.this.jetStreamSubscription = 
this.connection.jetStream().subscribe(
+                            
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
+                            queueName,
+                            dispatcher,
+                            new CamelNatsMessageHandler(),
+                            true,
+                            pushOptions);
+
+                    NatsConsumer.this.setActive(true);
                 } else {
-                    NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher
-                            
.subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic());
-                    if 
(ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()))
 {
-                        NatsConsumer.this.dispatcher.unsubscribe(
-                                
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                                
Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()));
+                    LOG.debug("Setting up standard NATS consumer for topic: 
{}",
+                            
NatsConsumer.this.getEndpoint().getConfiguration().getTopic());
+                    NatsConsumer.this.dispatcher = 
connection.createDispatcher(new CamelNatsMessageHandler());
+                    if (ObjectHelper.isNotEmpty(queueName)) {
+                        NatsConsumer.this.dispatcher = 
NatsConsumer.this.dispatcher.subscribe(topic, queueName);
+                    } else {
+                        NatsConsumer.this.dispatcher = 
NatsConsumer.this.dispatcher.subscribe(topic);
+                    }
+                    if (maxMessages != null) {

Review Comment:
   What is this code doing with maxMessage and unsubscribe ?



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -193,4 +195,65 @@ protected void doStop() throws Exception {
         super.doStop();
     }
 
+    private void publishWithJetStream(NatsConfiguration config, final byte[] 
body, final Exchange exchange) {
+        LOG.debug("JetStream is available");
+        JetStreamManagement jsm;
+        JetStream js;
+        try {
+            jsm = this.connection.jetStreamManagement();
+            js = jsm.jetStream();
+            if (js == null) {
+                jsm.addStream(StreamConfiguration.builder()
+                        .name(config.getJetstreamName())
+                        .subjects(config.getTopic())
+                        .build());
+                js = jsm.jetStream();
+            }
+        } catch (IOException | JetStreamApiException e) {
+            throw new RuntimeException("Failed to initialize JetStream: " + 
e.getMessage(), e);
+        }
+
+        final NatsMessage.Builder builder = NatsMessage.builder()
+                .data(body)
+                .subject(config.getTopic())
+                .headers(this.buildHeaders(exchange));
+
+        if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
+            final String replySubject = config.getReplySubject();
+            builder.replyTo(replySubject);
+        }
+        final Message jetStreamMessage = builder.build();
+
+        PublishAck pa;
+        if (config.isJetstreamAsync()) {
+            CompletableFuture<PublishAck> future = 
js.publishAsync(jetStreamMessage);
+            try {
+                pa = future.get(1, TimeUnit.SECONDS);

Review Comment:
   Why is the timeout so low at only 1 second



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java:
##########
@@ -90,6 +90,12 @@ public class NatsConfiguration {
     private boolean traceConnection;
     @UriParam(label = "advanced")
     private HeaderFilterStrategy headerFilterStrategy = new 
DefaultHeaderFilterStrategy();
+    @UriParam(label = "advanced", defaultValue = "false")
+    private boolean jetstreamEnabled = false;

Review Comment:
   I dont think this option should be advanced as jestream support has been 
asked for for many years



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java:
##########
@@ -123,26 +130,70 @@ class NatsConsumingTask implements Runnable {
         @Override
         public void run() {
             try {
-                NatsConsumer.this.dispatcher = 
this.connection.createDispatcher(new CamelNatsMessageHandler());
-                if 
(ObjectHelper.isNotEmpty(this.configuration.getQueueName())) {
-                    NatsConsumer.this.dispatcher = 
NatsConsumer.this.dispatcher.subscribe(
-                            
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                            
NatsConsumer.this.getEndpoint().getConfiguration().getQueueName());
-                    if 
(ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()))
 {
-                        NatsConsumer.this.dispatcher.unsubscribe(
-                                
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                                
Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages()));
-                    }
-                    if (NatsConsumer.this.dispatcher.isActive()) {
-                        NatsConsumer.this.setActive(true);
+                NatsConfiguration config = getEndpoint().getConfiguration();

Review Comment:
   Can you split up the run method into 2 so we have 1 with jetstream and 
another with the previous code as-is



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -193,4 +195,65 @@ protected void doStop() throws Exception {
         super.doStop();
     }
 
+    private void publishWithJetStream(NatsConfiguration config, final byte[] 
body, final Exchange exchange) {
+        LOG.debug("JetStream is available");
+        JetStreamManagement jsm;
+        JetStream js;
+        try {
+            jsm = this.connection.jetStreamManagement();
+            js = jsm.jetStream();
+            if (js == null) {
+                jsm.addStream(StreamConfiguration.builder()
+                        .name(config.getJetstreamName())
+                        .subjects(config.getTopic())
+                        .build());
+                js = jsm.jetStream();
+            }
+        } catch (IOException | JetStreamApiException e) {
+            throw new RuntimeException("Failed to initialize JetStream: " + 
e.getMessage(), e);
+        }
+
+        final NatsMessage.Builder builder = NatsMessage.builder()
+                .data(body)
+                .subject(config.getTopic())
+                .headers(this.buildHeaders(exchange));
+
+        if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
+            final String replySubject = config.getReplySubject();
+            builder.replyTo(replySubject);
+        }
+        final Message jetStreamMessage = builder.build();
+
+        PublishAck pa;
+        if (config.isJetstreamAsync()) {
+            CompletableFuture<PublishAck> future = 
js.publishAsync(jetStreamMessage);
+            try {
+                pa = future.get(1, TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                throw new RuntimeException("Failed to publish message 
asynchronously with JetStream: " + e.getMessage(), e);
+            }
+            LOG.info("Publish Sequence async: {}", pa.getSeqno());
+        } else {
+            try {
+                pa = js.publish(jetStreamMessage);
+            } catch (IOException | JetStreamApiException e) {
+                throw new RuntimeException("Failed to publish message 
synchronously with JetStream: " + e.getMessage(), e);
+            }
+            LOG.info("Publish Sequence synchronously: {}", pa.getSeqno());

Review Comment:
   Same here



##########
components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java:
##########
@@ -193,4 +195,65 @@ protected void doStop() throws Exception {
         super.doStop();
     }
 
+    private void publishWithJetStream(NatsConfiguration config, final byte[] 
body, final Exchange exchange) {
+        LOG.debug("JetStream is available");
+        JetStreamManagement jsm;
+        JetStream js;
+        try {
+            jsm = this.connection.jetStreamManagement();
+            js = jsm.jetStream();
+            if (js == null) {
+                jsm.addStream(StreamConfiguration.builder()
+                        .name(config.getJetstreamName())
+                        .subjects(config.getTopic())
+                        .build());
+                js = jsm.jetStream();
+            }
+        } catch (IOException | JetStreamApiException e) {
+            throw new RuntimeException("Failed to initialize JetStream: " + 
e.getMessage(), e);
+        }
+
+        final NatsMessage.Builder builder = NatsMessage.builder()
+                .data(body)
+                .subject(config.getTopic())
+                .headers(this.buildHeaders(exchange));
+
+        if (ObjectHelper.isNotEmpty(config.getReplySubject())) {
+            final String replySubject = config.getReplySubject();
+            builder.replyTo(replySubject);
+        }
+        final Message jetStreamMessage = builder.build();
+
+        PublishAck pa;
+        if (config.isJetstreamAsync()) {
+            CompletableFuture<PublishAck> future = 
js.publishAsync(jetStreamMessage);
+            try {
+                pa = future.get(1, TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                throw new RuntimeException("Failed to publish message 
asynchronously with JetStream: " + e.getMessage(), e);
+            }
+            LOG.info("Publish Sequence async: {}", pa.getSeqno());

Review Comment:
   Do not use INFO logging as it spams logs when sending many messages



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to