davsclaus commented on code in PR #18212: URL: https://github.com/apache/camel/pull/18212#discussion_r2123145991
########## components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java: ########## @@ -123,26 +130,70 @@ class NatsConsumingTask implements Runnable { @Override public void run() { try { - NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); - if (ObjectHelper.isNotEmpty(this.configuration.getQueueName())) { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - NatsConsumer.this.getEndpoint().getConfiguration().getQueueName()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); - } - if (NatsConsumer.this.dispatcher.isActive()) { - NatsConsumer.this.setActive(true); + NatsConfiguration config = getEndpoint().getConfiguration(); + String topic = config.getTopic(); + String queueName = config.getQueueName(); + String maxMessagesStr = config.getMaxMessages(); + Integer maxMessages = null; + if (ObjectHelper.isNotEmpty(maxMessagesStr)) { + maxMessages = Integer.parseInt(maxMessagesStr); + } + + if (config.isJetstreamEnabled() && connection.getServerInfo().isJetStreamAvailable()) { + String streamName = this.configuration.getJetstreamName(); + String consumerName + = ObjectHelper.isNotEmpty(queueName) ? queueName : "consumer-" + System.currentTimeMillis(); // Generate a default consumer name if queueName is not provided + LOG.info("Setting up JetStream PUSH consumer for stream: '{}', durable: '{}', topic: {} ", streamName, + consumerName, this.configuration.getTopic()); + + JetStreamManagement jsm = connection.jetStreamManagement(); + try { + // Try to get the stream, create it if it doesn't exist + jsm.getStreamInfo(streamName); + } catch (JetStreamApiException e) { + if (e.getErrorCode() == 404) { + StreamConfiguration streamConfig = StreamConfiguration.builder() + .name(streamName) + .subjects(topic) + .build(); + jsm.addStream(streamConfig); + } else { + throw e; + } } + + ConsumerConfiguration.Builder ccBuilder = ConsumerConfiguration.builder() + .durable(consumerName); + + ccBuilder.deliverSubject(null); + ConsumerConfiguration cc = ccBuilder.build(); + + PushSubscribeOptions pushOptions = PushSubscribeOptions.builder() + .configuration(cc) + .build(); + + NatsConsumer.this.dispatcher = this.connection.createDispatcher(new CamelNatsMessageHandler()); + + NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe( + NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), + queueName, + dispatcher, + new CamelNatsMessageHandler(), + true, + pushOptions); + + NatsConsumer.this.setActive(true); } else { - NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher - .subscribe(NatsConsumer.this.getEndpoint().getConfiguration().getTopic()); - if (ObjectHelper.isNotEmpty(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())) { - NatsConsumer.this.dispatcher.unsubscribe( - NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), - Integer.parseInt(NatsConsumer.this.getEndpoint().getConfiguration().getMaxMessages())); + LOG.debug("Setting up standard NATS consumer for topic: {}", + NatsConsumer.this.getEndpoint().getConfiguration().getTopic()); + NatsConsumer.this.dispatcher = connection.createDispatcher(new CamelNatsMessageHandler()); + if (ObjectHelper.isNotEmpty(queueName)) { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic, queueName); + } else { + NatsConsumer.this.dispatcher = NatsConsumer.this.dispatcher.subscribe(topic); + } + if (maxMessages != null) { Review Comment: Ah okay its a special feature in camel-nats only - a bit odd as its not something common in the other messaging components -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org