davsclaus commented on code in PR #18212: URL: https://github.com/apache/camel/pull/18212#discussion_r2122688786
########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -90,6 +90,12 @@ public class NatsConfiguration { private boolean traceConnection; @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy(); + @UriParam(label = "advanced", defaultValue = "true") + private boolean jetstreamEnabled = true; + @UriParam(label = "advanced", defaultValue = "default-stream") + private String jetstreamName = "default-stream"; Review Comment: What is the meaning of the jetstream-name, can you tell more about that ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java: ########## @@ -123,26 +130,70 @@ class NatsConsumingTask implements Runnable { @Override public void run() { try { - NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); - if (ObjectHelper.isNotEmpty(this.configuration.getQueueName())) { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - NatsConsumer.this.getEndpoint().getConfiguration().getQueueName()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); - } - if (NatsConsumer.this.dispatcher.isActive()) { - NatsConsumer.this.setActive(true); + NatsConfiguration config = getEndpoint().getConfiguration(); + String topic = config.getTopic(); + String queueName = config.getQueueName(); + String maxMessagesStr = config.getMaxMessages(); + Integer maxMessages = null; + if (ObjectHelper.isNotEmpty(maxMessagesStr)) { + maxMessages = Integer.parseInt(maxMessagesStr); + } + + if (config.isJetstreamEnabled() && connection.getServerInfo().isJetStreamAvailable()) { + String streamName = this.configuration.getJetstreamName(); + String consumerName + = ObjectHelper.isNotEmpty(queueName) ? queueName : "consumer-" + System.currentTimeMillis(); // Generate a default consumer name if queueName is not provided + LOG.info("Setting up JetStream PUSH consumer for stream: '{}', durable: '{}', topic: {} ", streamName, + consumerName, this.configuration.getTopic()); + + JetStreamManagement jsm = connection.jetStreamManagement(); + try { + // Try to get the stream, create it if it doesn't exist + jsm.getStreamInfo(streamName); + } catch (JetStreamApiException e) { + if (e.getErrorCode() == 404) { + StreamConfiguration streamConfig = StreamConfiguration.builder() + .name(streamName) + .subjects(topic) + .build(); + jsm.addStream(streamConfig); + } else { + throw e; + } } + + ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder() + .durable(consumerName); + + ccBuilder.deliverSubject(null); + ConsumerConfiguration cc = ccBuilder.build(); + + PushSubscribeOptions pushOptions = PushSubscribeOptions.builder() + .configuration(cc) + .build(); + + NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); + + NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe( + NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), + queueName, + dispatcher, + new CamelNatsMessageHandler(), + true, + pushOptions); + + NatsConsumer.this.setActive(true); } else { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher - .subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); + LOG.debug("Setting up standard NATS consumer for topic: {}", + NatsConsumer.this.getEndpoint().getConfiguration().getTopic()); + NatsConsumer.this.dispatcher = connection.createDispatcher(new CamelNatsMessageHandler()); + if (ObjectHelper.isNotEmpty(queueName)) { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic, queueName); + } else { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic); + } + if (maxMessages != null) { Review Comment: What is this code doing with maxMessage and unsubscribe ? ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -193,4 +195,65 @@ protected void doStop() throws Exception { super.doStop(); } + private void publishWithJetStream(NatsConfiguration config, final byte[] body, final Exchange exchange) { + LOG.debug("JetStream is available"); + JetStreamManagement jsm; + JetStream js; + try { + jsm = this.connection.jetStreamManagement(); + js = jsm.jetStream(); + if (js == null) { + jsm.addStream(StreamConfiguration.builder() + .name(config.getJetstreamName()) + .subjects(config.getTopic()) + .build()); + js = jsm.jetStream(); + } + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException("Failed to initialize JetStream: " + e.getMessage(), e); + } + + final NatsMessage.Builder builder = NatsMessage.builder() + .data(body) + .subject(config.getTopic()) + .headers(this.buildHeaders(exchange)); + + if (ObjectHelper.isNotEmpty(config.getReplySubject())) { + final String replySubject = config.getReplySubject(); + builder.replyTo(replySubject); + } + final Message jetStreamMessage = builder.build(); + + PublishAck pa; + if (config.isJetstreamAsync()) { + CompletableFuture<PublishAck> future = js.publishAsync(jetStreamMessage); + try { + pa = future.get(1, TimeUnit.SECONDS); Review Comment: Why is the timeout so low at only 1 second ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java: ########## @@ -90,6 +90,12 @@ public class NatsConfiguration { private boolean traceConnection; @UriParam(label = "advanced") private HeaderFilterStrategy headerFilterStrategy = new DefaultHeaderFilterStrategy(); + @UriParam(label = "advanced", defaultValue = "false") + private boolean jetstreamEnabled = false; Review Comment: I dont think this option should be advanced as jestream support has been asked for for many years ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java: ########## @@ -123,26 +130,70 @@ class NatsConsumingTask implements Runnable { @Override public void run() { try { - NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); - if (ObjectHelper.isNotEmpty(this.configuration.getQueueName())) { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - NatsConsumer.this.getEndpoint().getConfiguration().getQueueName()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); - } - if (NatsConsumer.this.dispatcher.isActive()) { - NatsConsumer.this.setActive(true); + NatsConfiguration config = getEndpoint().getConfiguration(); Review Comment: Can you split up the run method into 2 so we have 1 with jetstream and another with the previous code as-is ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -193,4 +195,65 @@ protected void doStop() throws Exception { super.doStop(); } + private void publishWithJetStream(NatsConfiguration config, final byte[] body, final Exchange exchange) { + LOG.debug("JetStream is available"); + JetStreamManagement jsm; + JetStream js; + try { + jsm = this.connection.jetStreamManagement(); + js = jsm.jetStream(); + if (js == null) { + jsm.addStream(StreamConfiguration.builder() + .name(config.getJetstreamName()) + .subjects(config.getTopic()) + .build()); + js = jsm.jetStream(); + } + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException("Failed to initialize JetStream: " + e.getMessage(), e); + } + + final NatsMessage.Builder builder = NatsMessage.builder() + .data(body) + .subject(config.getTopic()) + .headers(this.buildHeaders(exchange)); + + if (ObjectHelper.isNotEmpty(config.getReplySubject())) { + final String replySubject = config.getReplySubject(); + builder.replyTo(replySubject); + } + final Message jetStreamMessage = builder.build(); + + PublishAck pa; + if (config.isJetstreamAsync()) { + CompletableFuture<PublishAck> future = js.publishAsync(jetStreamMessage); + try { + pa = future.get(1, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to publish message asynchronously with JetStream: " + e.getMessage(), e); + } + LOG.info("Publish Sequence async: {}", pa.getSeqno()); + } else { + try { + pa = js.publish(jetStreamMessage); + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException("Failed to publish message synchronously with JetStream: " + e.getMessage(), e); + } + LOG.info("Publish Sequence synchronously: {}", pa.getSeqno()); Review Comment: Same here ########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java: ########## @@ -193,4 +195,65 @@ protected void doStop() throws Exception { super.doStop(); } + private void publishWithJetStream(NatsConfiguration config, final byte[] body, final Exchange exchange) { + LOG.debug("JetStream is available"); + JetStreamManagement jsm; + JetStream js; + try { + jsm = this.connection.jetStreamManagement(); + js = jsm.jetStream(); + if (js == null) { + jsm.addStream(StreamConfiguration.builder() + .name(config.getJetstreamName()) + .subjects(config.getTopic()) + .build()); + js = jsm.jetStream(); + } + } catch (IOException | JetStreamApiException e) { + throw new RuntimeException("Failed to initialize JetStream: " + e.getMessage(), e); + } + + final NatsMessage.Builder builder = NatsMessage.builder() + .data(body) + .subject(config.getTopic()) + .headers(this.buildHeaders(exchange)); + + if (ObjectHelper.isNotEmpty(config.getReplySubject())) { + final String replySubject = config.getReplySubject(); + builder.replyTo(replySubject); + } + final Message jetStreamMessage = builder.build(); + + PublishAck pa; + if (config.isJetstreamAsync()) { + CompletableFuture<PublishAck> future = js.publishAsync(jetStreamMessage); + try { + pa = future.get(1, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to publish message asynchronously with JetStream: " + e.getMessage(), e); + } + LOG.info("Publish Sequence async: {}", pa.getSeqno()); Review Comment: Do not use INFO logging as it spams logs when sending many messages -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org