ryan-highley commented on code in PR #15088:
URL: https://github.com/apache/camel/pull/15088#discussion_r1714258548


##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeMetricHandler.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+import org.eclipse.tahu.edge.api.MetricHandler;
+import org.eclipse.tahu.message.BdSeqManager;
+import org.eclipse.tahu.message.SparkplugBPayloadEncoder;
+import org.eclipse.tahu.message.model.DeviceDescriptor;
+import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
+import org.eclipse.tahu.message.model.MessageType;
+import org.eclipse.tahu.message.model.Metric;
+import org.eclipse.tahu.message.model.Metric.MetricBuilder;
+import org.eclipse.tahu.message.model.MetricDataType;
+import org.eclipse.tahu.message.model.SparkplugBPayload;
+import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
+import org.eclipse.tahu.message.model.SparkplugDescriptor;
+import org.eclipse.tahu.message.model.SparkplugMeta;
+import org.eclipse.tahu.message.model.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+class TahuEdgeMetricHandler implements MetricHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TahuEdgeMetricHandler.class);
+
+    private final BdSeqManager bdSeqManager;
+    private volatile long currentBirthBdSeq;
+    private volatile long currentDeathBdSeq;
+
+    private TahuEdgeClient client;
+
+    private final EdgeNodeDescriptor edgeNodeDescriptor;
+    private final ConcurrentMap<SparkplugDescriptor, SparkplugBPayloadMap> 
descriptorMetricMap = new ConcurrentHashMap<>();
+
+    private final Marker loggingMarker;
+
+    TahuEdgeMetricHandler(EdgeNodeDescriptor edgeNodeDescriptor, BdSeqManager 
bdSeqManager) {
+        this.edgeNodeDescriptor = edgeNodeDescriptor;
+        this.bdSeqManager = bdSeqManager;
+
+        loggingMarker = 
MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString());
+
+        LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor called");
+
+        currentBirthBdSeq = currentDeathBdSeq = 
bdSeqManager.getNextDeathBdSeqNum();
+
+        LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor complete");
+    }
+
+    void setClient(TahuEdgeClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public Topic getDeathTopic() {
+        LOG.trace(loggingMarker, "MetricHandler getDeathTopic called");
+
+        try {
+            return new Topic(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX, 
edgeNodeDescriptor, MessageType.NDEATH);
+        } finally {
+            LOG.trace(loggingMarker, "MetricHandler getDeathTopic complete");
+        }
+    }
+
+    @Override
+    public byte[] getDeathPayloadBytes() throws Exception {
+        LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes called");
+
+        try {
+            currentDeathBdSeq &= 0xFFL;
+
+            SparkplugBPayload deathPayload = new 
SparkplugBPayload.SparkplugBPayloadBuilder()
+                    .addMetric(new MetricBuilder(
+                            SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY,
+                            MetricDataType.Int64, 
currentDeathBdSeq).createMetric())
+                    .createPayload();
+
+            LOG.debug(loggingMarker, "Created death payload with bdSeq metric 
{}", currentDeathBdSeq);
+
+            currentBirthBdSeq = currentDeathBdSeq++;
+            bdSeqManager.storeNextDeathBdSeqNum(currentDeathBdSeq);
+
+            SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder();
+
+            return encoder.getBytes(deathPayload, true);
+        } finally {
+            LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes 
complete");
+        }
+    }
+
+    @Override
+    public boolean hasMetric(SparkplugDescriptor sparkplugDescriptor, String 
metricName) {
+        LOG.trace(loggingMarker, "MetricHandler hasMetric called: 
sparkplugDescriptor {} metricName {}",
+                sparkplugDescriptor, metricName);
+
+        boolean metricFound = false;
+        try {
+            metricFound = descriptorMetricMap.containsKey(sparkplugDescriptor)
+                    && 
descriptorMetricMap.get(sparkplugDescriptor).getMetric(metricName) != null;
+            return metricFound;
+        } finally {
+            LOG.trace(loggingMarker, "MetricHandler hasMetric complete 
(metricFound = {})", metricFound);
+        }
+    }
+
+    @Override
+    public void publishBirthSequence() {
+        LOG.trace(loggingMarker, "MetricHandler publishBirthSequence called");
+
+        try {
+            Date timestamp = new Date();
+
+            // SparkplugBPayloadMap, not a SparkplugBPayload
+            SparkplugBPayloadMap nBirthPayload = new 
SparkplugBPayloadMap.SparkplugBPayloadMapBuilder()
+                    .setTimestamp(timestamp)
+                    .addMetrics(getCachedMetrics(edgeNodeDescriptor))
+                    .addMetric(new MetricBuilder(
+                            SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY,
+                            MetricDataType.Int64, 
currentBirthBdSeq).createMetric())
+                    .createPayload();
+
+            LOG.debug(loggingMarker, "Created birth payload with bdSeq metric 
{}", currentBirthBdSeq);
+
+            client.publishNodeBirth(nBirthPayload);
+
+            descriptorMetricMap.keySet().stream().filter(sd -> 
sd.isDeviceDescriptor()).forEach(sd -> {
+                DeviceDescriptor deviceDescriptor = (DeviceDescriptor) sd;
+                String deviceId = deviceDescriptor.getDeviceId();
+
+                SparkplugBPayload dBirthPayload = new 
SparkplugBPayload.SparkplugBPayloadBuilder()
+                        .setTimestamp(timestamp)
+                        .addMetrics(getCachedMetrics(deviceDescriptor))
+                        .createPayload();
+
+                client.publishDeviceBirth(deviceId, dBirthPayload);
+            });
+
+        } catch (Exception e) {
+            LOG.error(loggingMarker, "Exception caught publishing birth 
sequence", e);
+            throw new TahuException(edgeNodeDescriptor, e);
+        } finally {
+            LOG.trace(loggingMarker, "MetricHandler publishBirthSequence 
complete");
+        }
+    }
+
+    SparkplugBPayloadMap addDeviceMetricDataPayloadMap(
+            SparkplugDescriptor metricDescriptor, SparkplugBPayloadMap 
metricDataTypePayloadMap) {
+        LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap called: {}", 
metricDescriptor);
+
+        try {
+            return descriptorMetricMap.put(metricDescriptor, 
metricDataTypePayloadMap);
+        } finally {
+            LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap complete");
+        }
+    }
+
+    List<Metric> getCachedMetrics(SparkplugDescriptor sd) {
+        return 
Optional.ofNullable(descriptorMetricMap.get(sd)).map(SparkplugBPayloadMap::getMetrics).orElse(List.of());
+    }
+
+    SparkplugBPayloadMap getDescriptorMetricMap(SparkplugDescriptor sd) {
+        return descriptorMetricMap.get(sd);
+    }
+
+    void updateCachedMetrics(SparkplugDescriptor sd, SparkplugBPayload 
payload) {
+        
Optional.ofNullable(descriptorMetricMap.get(sd)).ifPresent(cachedMetrics -> {
+            payload.getMetrics().stream().forEach(payloadMetric -> {
+                cachedMetrics.updateMetricValue(payloadMetric.getName(), 
payloadMetric, null);
+            });
+        });
+    }
+
+    long getCurrentBirthBdSeq() {
+        LOG.trace(loggingMarker, "getCurrentBirthBdSeq() : {}", 
currentBirthBdSeq);
+        return currentBirthBdSeq;
+    }
+
+    long getCurrentDeathBdSeq() {
+        LOG.trace(loggingMarker, "getCurrentDeathBdSeq() : {}", 
currentDeathBdSeq);
+        return currentDeathBdSeq;
+    }
+
+    List<Metric> processCMDMetrics(SparkplugBPayload payload, 
SparkplugDescriptor cmdDescriptor) {
+        List<Metric> receivedMetrics = payload.getMetrics();
+        if (receivedMetrics == null || receivedMetrics.isEmpty()) {
+            return List.of();
+        }
+
+        // Look for a METRIC_NODE_REBIRTH received metric with True value
+        if (!cmdDescriptor.isDeviceDescriptor()) {
+            Map<Boolean, List<Metric>> groupedMetrics = 
receivedMetrics.stream()
+                    .collect(Collectors.groupingBy(m -> (Boolean) 
SparkplugMeta.METRIC_NODE_REBIRTH.equals(m.getName())
+                            && m.getDataType() == MetricDataType.Boolean && 
(Boolean) m.getValue()));
+
+            if (groupedMetrics.containsKey(Boolean.TRUE) && 
!groupedMetrics.get(Boolean.TRUE).isEmpty()) {
+                client.handleRebirthRequest(true);
+            }
+
+            receivedMetrics = groupedMetrics.get(Boolean.FALSE);
+        }
+
+        final SparkplugBPayloadMap cachedMetrics = 
descriptorMetricMap.get(cmdDescriptor);
+        if (cachedMetrics == null) {
+            return List.of();
+        }
+
+        List<Metric> responseMetrics = receivedMetrics.stream().map(m -> {
+            Metric cachedMetric = cachedMetrics.getMetric(m.getName());
+
+            if (cachedMetric == null) {
+                LOG.warn(loggingMarker, "Received CMD request for {} metric {} 
not in configured metrics - skipping",
+                        cmdDescriptor, m.getName());
+                return null;
+            }
+
+            try {
+                Metric responseMetric = new Metric(cachedMetric);
+
+                responseMetric.setHistorical(true);
+
+                return responseMetric;
+            } catch (Exception e) {
+                LOG.warn(loggingMarker,
+                        "Exception caught copying metric handling CMD request 
for {} metric {} - skipping",
+                        cmdDescriptor, m.getName());
+                return null;
+            }

Review Comment:
   Definitely needs some code cleanup. Split out into separate methods and 
simplified calling each in the `messageArrived` callback method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to