davsclaus commented on code in PR #16702: URL: https://github.com/apache/camel/pull/16702#discussion_r1902913693
########## components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Configuration.java: ########## @@ -88,6 +88,8 @@ public class PahoMqtt5Configuration implements Cloneable { private int executorServiceTimeout = 1; @UriParam(defaultValue = "-1") private long sessionExpiryInterval = -1; + @UriParam(defaultValue = "false") + private boolean manualAcksEnabled = false; Review Comment: a boolean is false by default, so remove ########## components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java: ########## @@ -155,6 +160,23 @@ public Exchange createExchange(MqttMessage mqttMessage, String topic) { paho.setHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, mqttMessage.getProperties()); exchange.setIn(paho); + if (getEndpoint().getConfiguration().isManualAcksEnabled()) { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + try { + PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onFailure(Exchange exchange) { + LOG.error("Error: " + exchange.getExchangeId(), exchange.getException()); Review Comment: A better description, such as "Rollback message due to error processing ..." ########## components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Configuration.java: ########## @@ -548,6 +550,22 @@ public void setSessionExpiryInterval(long sessionExpiryInterval) { this.sessionExpiryInterval = sessionExpiryInterval; } + /** + * Sets whether to use manual acknowledgements for the client. + * + * By default, this is false and message will be automatically acknowledged. If set to true, the acknowledgement is + * added in the exchange's completion callback. + * + */ + public boolean isManualAcksEnabled() { + Review Comment: remove empty line ########## components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java: ########## @@ -155,6 +160,23 @@ public Exchange createExchange(MqttMessage mqttMessage, String topic) { paho.setHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES, mqttMessage.getProperties()); exchange.setIn(paho); + if (getEndpoint().getConfiguration().isManualAcksEnabled()) { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + try { + PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()); + } catch (MqttException e) { + throw new RuntimeException(e); Review Comment: dont rethrown but log a WARN about not able to commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org