ryan-highley commented on code in PR #15088:
URL: https://github.com/apache/camel/pull/15088#discussion_r1714163318


##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeClientCallback.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.List;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.RuntimeCamelException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.tahu.SparkplugParsingException;
+import org.eclipse.tahu.message.PayloadDecoder;
+import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
+import org.eclipse.tahu.message.model.DeviceDescriptor;
+import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
+import org.eclipse.tahu.message.model.MessageType;
+import org.eclipse.tahu.message.model.Metric;
+import org.eclipse.tahu.message.model.SparkplugBPayload;
+import org.eclipse.tahu.message.model.SparkplugMeta;
+import org.eclipse.tahu.message.model.StatePayload;
+import org.eclipse.tahu.message.model.Topic;
+import org.eclipse.tahu.mqtt.ClientCallback;
+import org.eclipse.tahu.mqtt.MqttClientId;
+import org.eclipse.tahu.mqtt.MqttServerName;
+import org.eclipse.tahu.mqtt.MqttServerUrl;
+import org.eclipse.tahu.util.SparkplugUtil;
+import org.eclipse.tahu.util.TopicUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+class TahuEdgeClientCallback implements ClientCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TahuEdgeClientCallback.class);
+
+    private TahuEdgeClient client;
+
+    private final EdgeNodeDescriptor edgeNodeDescriptor;
+    private final TahuEdgeMetricHandler tahuEdgeNodeMetricHandler;
+
+    private final Marker loggingMarker;
+
+    TahuEdgeClientCallback(EdgeNodeDescriptor edgeNodeDescriptor, 
TahuEdgeMetricHandler tahuEdgeNodeMetricHandler) {
+        this.edgeNodeDescriptor = edgeNodeDescriptor;
+        this.tahuEdgeNodeMetricHandler = tahuEdgeNodeMetricHandler;
+
+        loggingMarker = 
MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString());
+    }
+
+    void setClient(TahuEdgeClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void messageArrived(
+            MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, 
MqttClientId mqttClientId, String rawTopic,
+            MqttMessage mqttMessage) {
+        LOG.trace(loggingMarker, "{}: ClientCallback messageArrived called: 
topic {}", mqttClientId, rawTopic);
+
+        try {
+
+            final Topic topic;
+            try {
+                topic = TopicUtil.parseTopic(rawTopic);
+            } catch (SparkplugParsingException e) {
+                LOG.error(loggingMarker, "Exception caught parsing Sparkplug 
topic {}", rawTopic, e);
+                throw new RuntimeCamelException(e);
+            }
+
+            if 
(!SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX.equals(topic.getNamespace())) {
+                LOG.warn(loggingMarker, "Received message on non-Sparkplug 
topic: {}", topic);
+                return;
+            }
+
+            if (topic.isType(MessageType.STATE)) {
+                LOG.debug(loggingMarker, "Received STATE message: {} :: {}", 
topic, new String(mqttMessage.getPayload()));
+
+                try {
+                    ObjectMapper mapper = new ObjectMapper();
+                    StatePayload statePayload = 
mapper.readValue(mqttMessage.getPayload(), StatePayload.class);
+                    client.handleStateMessage(topic.getHostApplicationId(), 
statePayload);
+                } catch (Exception e) {
+                    LOG.error(loggingMarker, "Exception caught handling STATE 
message with topic {} and payload {}", topic,
+                            new String(mqttMessage.getPayload()));
+                    throw new RuntimeCamelException(e);
+                }
+                return;
+            }
+
+            final SparkplugBPayload payload;
+
+            if (topic.isType(MessageType.NDEATH) && 
topic.getEdgeNodeDescriptor().equals(edgeNodeDescriptor)) {
+                if (!client.isDisconnectedOrDisconnecting()) {
+                    // MQTT Server published our LWT message before client 
finished disconnecting
+
+                    if (!client.isConnectedToPrimaryHost()) {
+                        LOG.debug(loggingMarker,
+                                "Received unexpected LWT for {} but not 
connected to primary host - ignoring",
+                                edgeNodeDescriptor);
+                        return;
+                    }
+
+                    // Find payload's bdSeq to determine how to proceed
+                    long messageBdSeq;
+                    try {
+                        payload = new 
SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), null);
+                        messageBdSeq = 
SparkplugUtil.getBdSequenceNumber(payload);
+                    } catch (Exception e) {
+                        LOG.error(loggingMarker,
+                                "Exception caught handling DEATH message while 
connected to primary host on topic {}",
+                                topic, e);
+                        throw new RuntimeCamelException(e);
+                    }
+
+                    long currentBirthBdSeq = 
tahuEdgeNodeMetricHandler.getCurrentBirthBdSeq();
+
+                    if (currentBirthBdSeq == messageBdSeq) {
+                        // This is our latest LWT - treat as a rebirth
+                        LOG.warn(loggingMarker, "Received unexpected LWT for 
{} - publishing BIRTH sequence",
+                                edgeNodeDescriptor);
+                        try {
+                            client.handleRebirthRequest(true);
+                        } catch (Exception e) {
+                            LOG.warn(loggingMarker,
+                                    "Received unexpected LWT but failed to 
publish new BIRTH sequence for {}",
+                                    edgeNodeDescriptor);
+                            throw new RuntimeCamelException(e);
+                        }
+                    } else {
+                        LOG.warn(loggingMarker,
+                                "Received unexpected LWT for {} with different 
bdSeq - expected {} received {} - ignoring",
+                                edgeNodeDescriptor, currentBirthBdSeq, 
messageBdSeq);
+                    }
+                } else {
+                    LOG.debug(loggingMarker, "Received expected LWT for {} - 
no action required",
+                            topic.getEdgeNodeDescriptor());
+                }
+                return;
+            }
+
+            if (!topic.isType(MessageType.NCMD) && 
!topic.isType(MessageType.DCMD)) {
+                LOG.debug(loggingMarker, "Received unexpected Sparkplug 
message of type {} - ignoring", topic.getType());
+                return;
+            }
+
+            try {
+                PayloadDecoder<SparkplugBPayload> decoder = new 
SparkplugBPayloadDecoder();
+                payload = decoder.buildFromByteArray(mqttMessage.getPayload(), 
null);
+
+                if (topic.isType(MessageType.NCMD)) {
+                    handleNCMDMessage(payload);
+                } else if (topic.isType(MessageType.DCMD)) {
+                    handleDCMDMessage(payload, topic.getDeviceId());
+                }
+            } catch (Exception e) {
+                LOG.error(loggingMarker, "Exception caught decoding Sparkplug 
message with topic {} and payload {}", topic,
+                        new String(mqttMessage.getPayload()));
+                throw new RuntimeCamelException(e);
+            }
+        } finally {
+            LOG.trace(loggingMarker, "{}: ClientCallback messageArrived 
complete", mqttClientId);
+        }
+    }
+
+    void handleNCMDMessage(SparkplugBPayload ncmdPayload) {
+        List<Metric> responseMetrics = 
tahuEdgeNodeMetricHandler.processCMDMetrics(ncmdPayload, edgeNodeDescriptor);
+
+        if (responseMetrics.isEmpty()) {
+            LOG.warn(loggingMarker, "Received NCMD with no valid metrics to 
write for {} - ignoring",
+                    edgeNodeDescriptor);
+            return;
+        }
+
+        SparkplugBPayload ndataPayload = new 
SparkplugBPayload.SparkplugBPayloadBuilder()
+                .addMetrics(responseMetrics)
+                .createPayload();
+
+        LOG.debug(loggingMarker, "Publishing NDATA based on NCMD message for 
{}", edgeNodeDescriptor);
+
+        client.publishNodeData(ndataPayload);
+    }
+
+    void handleDCMDMessage(SparkplugBPayload dcmdPayload, String deviceId) {
+        DeviceDescriptor deviceDescriptor = new 
DeviceDescriptor(edgeNodeDescriptor, deviceId);
+
+        List<Metric> responseMetrics = 
tahuEdgeNodeMetricHandler.processCMDMetrics(dcmdPayload, deviceDescriptor);
+
+        if (responseMetrics.isEmpty()) {
+            LOG.warn(loggingMarker, "Received DCMD with no valid metrics to 
write for {} - ignoring", deviceDescriptor);
+            return;
+        }
+
+        SparkplugBPayload ddataPayload = new 
SparkplugBPayload.SparkplugBPayloadBuilder()
+                .addMetrics(responseMetrics)
+                .createPayload();
+
+        LOG.debug(loggingMarker, "Publishing DDATA based on DCMD message for 
{}", deviceDescriptor);
+
+        client.publishDeviceData(deviceId, ddataPayload);
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.trace(loggingMarker, "ClientCallback shutdown called");
+
+        LOG.trace(loggingMarker, "ClientCallback shutdown complete");

Review Comment:
   No, not really. I'll remove them throughout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to