orpiske commented on code in PR #15088: URL: https://github.com/apache/camel/pull/15088#discussion_r1712549483
########## components/camel-tahu/pom.xml: ########## @@ -0,0 +1,305 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>4.8.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-tahu</artifactId> + <packaging>jar</packaging> + <name>Camel :: Tahu</name> + <description>Camel Eclipse Tahu support for Sparkplug B</description> + + <properties> + </properties> + + <dependencies> + + <dependency> + <groupId>org.eclipse.tahu</groupId> + <artifactId>tahu-core</artifactId> + <version>${tahu-version}</version> + <exclusions> + <exclusion> + <groupId>org.eclipse.paho</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.eclipse.tahu</groupId> + <artifactId>tahu-edge</artifactId> + <version>${tahu-version}</version> + <exclusions> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.eclipse.tahu</groupId> + <artifactId>tahu-host</artifactId> + <version>${tahu-version}</version> + <exclusions> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>${paho-version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons-io-version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>${commons-compress-version}</version> + </dependency> + + <!-- camel --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest</artifactId> + <version>${hamcrest-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> + + <!-- test infra --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>hivemq</artifactId> + <version>${testcontainers-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.tahu</groupId> + <artifactId>tahu-edge-compat</artifactId> + <version>${tahu-version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.eclipse.tahu</groupId> + <artifactId>tahu-host-compat</artifactId> + <version>${tahu-version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <defaultGoal>install</defaultGoal> + + <plugins> + + <!-- generate camel meta-data --> + <plugin> + <groupId>org.apache.camel</groupId> + <artifactId>camel-component-maven-plugin</artifactId> + <version>${project.version}</version> + <executions> + <execution> + <id>generate</id> + <goals> + <goal>generate</goal> + </goals> + <phase>process-classes</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.4.0</version> + <executions> + <execution> + <phase>initialize</phase> + <goals> + <goal>add-source</goal> + <goal>add-resource</goal> + </goals> + <configuration> + <sources> + <source>src/generated/java</source> + </sources> + <resources> + <resource> + <directory>src/generated/resources</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <additionalClasspathElements> + <additionalClasspathElement>${project.basedir}/src/test/resources/sparkplug-tck-3.0.0.jar</additionalClasspathElement> + </additionalClasspathElements> + </configuration> + </plugin> + + </plugins> Review Comment: I think this whole section is unnecessary. Our build already should include all these resources by default. Was there a reason to include it? ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeMetricHandler.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +import org.eclipse.tahu.edge.api.MetricHandler; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.SparkplugBPayloadEncoder; +import org.eclipse.tahu.message.model.DeviceDescriptor; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.eclipse.tahu.message.model.MessageType; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.Metric.MetricBuilder; +import org.eclipse.tahu.message.model.MetricDataType; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; +import org.eclipse.tahu.message.model.SparkplugDescriptor; +import org.eclipse.tahu.message.model.SparkplugMeta; +import org.eclipse.tahu.message.model.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +class TahuEdgeMetricHandler implements MetricHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TahuEdgeMetricHandler.class); + + private final BdSeqManager bdSeqManager; + private volatile long currentBirthBdSeq; + private volatile long currentDeathBdSeq; + + private TahuEdgeClient client; + + private final EdgeNodeDescriptor edgeNodeDescriptor; + private final ConcurrentMap<SparkplugDescriptor, SparkplugBPayloadMap> descriptorMetricMap = new ConcurrentHashMap<>(); + + private final Marker loggingMarker; + + TahuEdgeMetricHandler(EdgeNodeDescriptor edgeNodeDescriptor, BdSeqManager bdSeqManager) { + this.edgeNodeDescriptor = edgeNodeDescriptor; + this.bdSeqManager = bdSeqManager; + + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + + LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor called"); + + currentBirthBdSeq = currentDeathBdSeq = bdSeqManager.getNextDeathBdSeqNum(); + + LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor complete"); + } + + void setClient(TahuEdgeClient client) { + this.client = client; + } + + @Override + public Topic getDeathTopic() { + LOG.trace(loggingMarker, "MetricHandler getDeathTopic called"); + + try { + return new Topic(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX, edgeNodeDescriptor, MessageType.NDEATH); + } finally { + LOG.trace(loggingMarker, "MetricHandler getDeathTopic complete"); + } + } + + @Override + public byte[] getDeathPayloadBytes() throws Exception { + LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes called"); + + try { + currentDeathBdSeq &= 0xFFL; + + SparkplugBPayload deathPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .addMetric(new MetricBuilder( + SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, + MetricDataType.Int64, currentDeathBdSeq).createMetric()) + .createPayload(); + + LOG.debug(loggingMarker, "Created death payload with bdSeq metric {}", currentDeathBdSeq); + + currentBirthBdSeq = currentDeathBdSeq++; + bdSeqManager.storeNextDeathBdSeqNum(currentDeathBdSeq); + + SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder(); + + return encoder.getBytes(deathPayload, true); + } finally { + LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes complete"); + } + } + + @Override + public boolean hasMetric(SparkplugDescriptor sparkplugDescriptor, String metricName) { + LOG.trace(loggingMarker, "MetricHandler hasMetric called: sparkplugDescriptor {} metricName {}", + sparkplugDescriptor, metricName); + + boolean metricFound = false; + try { + metricFound = descriptorMetricMap.containsKey(sparkplugDescriptor) + && descriptorMetricMap.get(sparkplugDescriptor).getMetric(metricName) != null; + return metricFound; + } finally { + LOG.trace(loggingMarker, "MetricHandler hasMetric complete (metricFound = {})", metricFound); + } + } + + @Override + public void publishBirthSequence() { + LOG.trace(loggingMarker, "MetricHandler publishBirthSequence called"); + + try { + Date timestamp = new Date(); + + // SparkplugBPayloadMap, not a SparkplugBPayload + SparkplugBPayloadMap nBirthPayload = new SparkplugBPayloadMap.SparkplugBPayloadMapBuilder() + .setTimestamp(timestamp) + .addMetrics(getCachedMetrics(edgeNodeDescriptor)) + .addMetric(new MetricBuilder( + SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, + MetricDataType.Int64, currentBirthBdSeq).createMetric()) + .createPayload(); + + LOG.debug(loggingMarker, "Created birth payload with bdSeq metric {}", currentBirthBdSeq); + + client.publishNodeBirth(nBirthPayload); + + descriptorMetricMap.keySet().stream().filter(sd -> sd.isDeviceDescriptor()).forEach(sd -> { + DeviceDescriptor deviceDescriptor = (DeviceDescriptor) sd; + String deviceId = deviceDescriptor.getDeviceId(); + + SparkplugBPayload dBirthPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .setTimestamp(timestamp) + .addMetrics(getCachedMetrics(deviceDescriptor)) + .createPayload(); + + client.publishDeviceBirth(deviceId, dBirthPayload); + }); + + } catch (Exception e) { + LOG.error(loggingMarker, "Exception caught publishing birth sequence", e); + throw new TahuException(edgeNodeDescriptor, e); + } finally { + LOG.trace(loggingMarker, "MetricHandler publishBirthSequence complete"); + } + } + + SparkplugBPayloadMap addDeviceMetricDataPayloadMap( + SparkplugDescriptor metricDescriptor, SparkplugBPayloadMap metricDataTypePayloadMap) { + LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap called: {}", metricDescriptor); + + try { + return descriptorMetricMap.put(metricDescriptor, metricDataTypePayloadMap); + } finally { + LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap complete"); + } + } + + List<Metric> getCachedMetrics(SparkplugDescriptor sd) { + return Optional.ofNullable(descriptorMetricMap.get(sd)).map(SparkplugBPayloadMap::getMetrics).orElse(List.of()); + } + + SparkplugBPayloadMap getDescriptorMetricMap(SparkplugDescriptor sd) { + return descriptorMetricMap.get(sd); + } + + void updateCachedMetrics(SparkplugDescriptor sd, SparkplugBPayload payload) { + Optional.ofNullable(descriptorMetricMap.get(sd)).ifPresent(cachedMetrics -> { + payload.getMetrics().stream().forEach(payloadMetric -> { + cachedMetrics.updateMetricValue(payloadMetric.getName(), payloadMetric, null); + }); + }); + } + + long getCurrentBirthBdSeq() { + LOG.trace(loggingMarker, "getCurrentBirthBdSeq() : {}", currentBirthBdSeq); + return currentBirthBdSeq; + } + + long getCurrentDeathBdSeq() { + LOG.trace(loggingMarker, "getCurrentDeathBdSeq() : {}", currentDeathBdSeq); + return currentDeathBdSeq; + } + + List<Metric> processCMDMetrics(SparkplugBPayload payload, SparkplugDescriptor cmdDescriptor) { + List<Metric> receivedMetrics = payload.getMetrics(); + if (receivedMetrics == null || receivedMetrics.isEmpty()) { + return List.of(); + } + + // Look for a METRIC_NODE_REBIRTH received metric with True value + if (!cmdDescriptor.isDeviceDescriptor()) { + Map<Boolean, List<Metric>> groupedMetrics = receivedMetrics.stream() + .collect(Collectors.groupingBy(m -> (Boolean) SparkplugMeta.METRIC_NODE_REBIRTH.equals(m.getName()) + && m.getDataType() == MetricDataType.Boolean && (Boolean) m.getValue())); + + if (groupedMetrics.containsKey(Boolean.TRUE) && !groupedMetrics.get(Boolean.TRUE).isEmpty()) { + client.handleRebirthRequest(true); + } + + receivedMetrics = groupedMetrics.get(Boolean.FALSE); + } + + final SparkplugBPayloadMap cachedMetrics = descriptorMetricMap.get(cmdDescriptor); + if (cachedMetrics == null) { + return List.of(); + } + + List<Metric> responseMetrics = receivedMetrics.stream().map(m -> { + Metric cachedMetric = cachedMetrics.getMetric(m.getName()); + + if (cachedMetric == null) { + LOG.warn(loggingMarker, "Received CMD request for {} metric {} not in configured metrics - skipping", + cmdDescriptor, m.getName()); + return null; + } + + try { + Metric responseMetric = new Metric(cachedMetric); + + responseMetric.setHistorical(true); + + return responseMetric; + } catch (Exception e) { + LOG.warn(loggingMarker, + "Exception caught copying metric handling CMD request for {} metric {} - skipping", + cmdDescriptor, m.getName()); + return null; + } Review Comment: Also consider it for other blocks like this. ########## components/camel-tahu/src/test/java/org/apache/camel/component/tahu/SparkplugTCKService.java: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.nio.charset.StandardCharsets; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.tahu.services.HiveMQService; +import org.apache.camel.component.tahu.services.HiveMQServiceFactory; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.builder.PredicateBuilder; +import org.apache.camel.test.infra.common.services.TestService; +import org.apache.camel.test.infra.core.MockUtils; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.apache.camel.test.junit5.TestSupport.bodyAs; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers +public class SparkplugTCKService implements TestService, BeforeEachCallback, AfterEachCallback { + + private static final Logger LOG = LoggerFactory.getLogger(SparkplugTCKService.class); + + public static final String SPARKPLUG_TCK_TEST_CONTROL_TOPIC = "SPARKPLUG_TCK/TEST_CONTROL"; + public static final String SPARKPLUG_TCK_LOG_TOPIC = "SPARKPLUG_TCK/LOG"; + public static final String SPARKPLUG_TCK_RESULT_TOPIC = "SPARKPLUG_TCK/RESULT"; + + @RegisterExtension + HiveMQService hiveMQService = HiveMQServiceFactory.createService(); + + private final SparkplugTCKMessageListener spTckMessageListener; + + private MqttClient mqttClient; + + public SparkplugTCKService() { + this.spTckMessageListener = new SparkplugTCKMessageListener(); + } + + String getMqttHostAddress() { + if (!hiveMQService.isRunning()) { + hiveMQService.initialize(); + } + + return hiveMQService.getMqttHostAddress(); + } + + @Override + public void beforeEach(ExtensionContext extensionContext) { + LOG.trace("beforeEach called"); + + spTckResultMockEndpoint.expectedMessageCount(1); + spTckResultMockEndpoint.message(0).body(String.class).contains("OVERALL: PASS"); + + spTckResultMockNotify = new NotifyBuilder(monitorCamelContext) + .from("direct:" + SparkplugTCKService.SPARKPLUG_TCK_RESULT_TOPIC) + .whenCompleted(1) + .create(); + + spTckLogMockNotify = new NotifyBuilder(monitorCamelContext) + .from("direct:" + SparkplugTCKService.SPARKPLUG_TCK_LOG_TOPIC) + .filter(PredicateBuilder.and( + // Filter the expected log messages--anything else causes the TCK test to fail + bodyAs(String.class).not().contains("Creating simulated host application"), + bodyAs(String.class).not().contains("Waiting for the Edge and Device to come online"), + bodyAs(String.class).not().contains("Edge Send Complex Data"), + bodyAs(String.class).not().contains("Host Application is online, so using that"))) + .whenCompleted(1) + .create(); + + LOG.trace("beforeEach completed"); + } + + @Override + public void afterEach(ExtensionContext extensionContext) { + LOG.trace("afterEach called"); + + MockEndpoint.resetMocks(monitorCamelContext); + + LOG.trace("afterEach completed"); + } + + private CamelContext monitorCamelContext; + private ProducerTemplate monitorProducerTemplate; + + MockEndpoint spTckLogMockEndpoint; + MockEndpoint spTckResultMockEndpoint; + + NotifyBuilder spTckResultMockNotify; + NotifyBuilder spTckLogMockNotify; + + @Override + public void beforeAll(ExtensionContext extensionContext) { + LOG.trace("beforeAll called"); + + try { + startMonitorCamelContext(extensionContext); + startClient(extensionContext); + } finally { + LOG.trace("beforeAll completed"); + } + } + + private void startClient(ExtensionContext extensionContext) { + try { + mqttClient = new MqttClient( + getMqttHostAddress(), "Tahu-Test-" + MqttClient.generateClientId(), new MemoryPersistence()); + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + if (hiveMQService.getUserName() != null) { + mqttConnectOptions.setUserName(hiveMQService.getUserName()); + mqttConnectOptions.setPassword(hiveMQService.getUserPassword()); + } + + mqttClient.connect(mqttConnectOptions); + + mqttClient.subscribe(SPARKPLUG_TCK_RESULT_TOPIC, spTckMessageListener); + mqttClient.subscribe(SPARKPLUG_TCK_LOG_TOPIC, spTckMessageListener); + + } catch (MqttException e) { + fail("Exception caught connecting MQTT test client", e); + } + } Review Comment: Do not use `fail` in start up, `setUp` or `tearDown` methods and also don't use `fail` within any code that is providing the test infrastructure. It makes it much harder for us to investigate the actual problem of the test when they happen. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/CamelBdSeqManager.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.FileUtils; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +public final class CamelBdSeqManager implements BdSeqManager { + + private static final Logger LOG = LoggerFactory.getLogger(CamelBdSeqManager.class); + + private static final Charset bdSeqNumFileCharset = StandardCharsets.UTF_8; + + private final File bdSeqNumFile; + + private final Marker loggingMarker; + + CamelBdSeqManager(EdgeNodeDescriptor edgeNodeDescriptor) { + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + LOG.trace(loggingMarker, "CamelBdSeqManager constructor called"); + + String bdSeqNumFileName = FileUtils.getTempDirectoryPath() + "/CamelTahuTemp/" + + edgeNodeDescriptor.getDescriptorString() + "/bdSeqMgr"; Review Comment: IMHO, I think it would be better if we made the base directory configurable. ########## components/camel-tahu/src/test/resources/sparkplug-tck-3.0.0.jar: ########## Review Comment: This shouldn't be here. ########## components/camel-tahu/src/test/resources/hivemq-ce/extensions/sparkplug-tck/coverage/coverage-sparkplug.html: ########## @@ -0,0 +1,3105 @@ +<?xml version="1.0" encoding="UTF-8"?> Review Comment: Is this file (along with other `.png` and similar) needed? ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeClientCallback.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.List; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.RuntimeCamelException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.tahu.SparkplugParsingException; +import org.eclipse.tahu.message.PayloadDecoder; +import org.eclipse.tahu.message.SparkplugBPayloadDecoder; +import org.eclipse.tahu.message.model.DeviceDescriptor; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.eclipse.tahu.message.model.MessageType; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.eclipse.tahu.message.model.SparkplugMeta; +import org.eclipse.tahu.message.model.StatePayload; +import org.eclipse.tahu.message.model.Topic; +import org.eclipse.tahu.mqtt.ClientCallback; +import org.eclipse.tahu.mqtt.MqttClientId; +import org.eclipse.tahu.mqtt.MqttServerName; +import org.eclipse.tahu.mqtt.MqttServerUrl; +import org.eclipse.tahu.util.SparkplugUtil; +import org.eclipse.tahu.util.TopicUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +class TahuEdgeClientCallback implements ClientCallback { + + private static final Logger LOG = LoggerFactory.getLogger(TahuEdgeClientCallback.class); + + private TahuEdgeClient client; + + private final EdgeNodeDescriptor edgeNodeDescriptor; + private final TahuEdgeMetricHandler tahuEdgeNodeMetricHandler; + + private final Marker loggingMarker; + + TahuEdgeClientCallback(EdgeNodeDescriptor edgeNodeDescriptor, TahuEdgeMetricHandler tahuEdgeNodeMetricHandler) { + this.edgeNodeDescriptor = edgeNodeDescriptor; + this.tahuEdgeNodeMetricHandler = tahuEdgeNodeMetricHandler; + + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + } + + void setClient(TahuEdgeClient client) { + this.client = client; + } + + @Override + public void messageArrived( + MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, String rawTopic, + MqttMessage mqttMessage) { + LOG.trace(loggingMarker, "{}: ClientCallback messageArrived called: topic {}", mqttClientId, rawTopic); + + try { + + final Topic topic; + try { + topic = TopicUtil.parseTopic(rawTopic); + } catch (SparkplugParsingException e) { + LOG.error(loggingMarker, "Exception caught parsing Sparkplug topic {}", rawTopic, e); + throw new RuntimeCamelException(e); + } + + if (!SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX.equals(topic.getNamespace())) { + LOG.warn(loggingMarker, "Received message on non-Sparkplug topic: {}", topic); + return; + } + + if (topic.isType(MessageType.STATE)) { + LOG.debug(loggingMarker, "Received STATE message: {} :: {}", topic, new String(mqttMessage.getPayload())); + + try { + ObjectMapper mapper = new ObjectMapper(); + StatePayload statePayload = mapper.readValue(mqttMessage.getPayload(), StatePayload.class); + client.handleStateMessage(topic.getHostApplicationId(), statePayload); + } catch (Exception e) { + LOG.error(loggingMarker, "Exception caught handling STATE message with topic {} and payload {}", topic, + new String(mqttMessage.getPayload())); + throw new RuntimeCamelException(e); + } + return; + } + + final SparkplugBPayload payload; + + if (topic.isType(MessageType.NDEATH) && topic.getEdgeNodeDescriptor().equals(edgeNodeDescriptor)) { + if (!client.isDisconnectedOrDisconnecting()) { + // MQTT Server published our LWT message before client finished disconnecting + + if (!client.isConnectedToPrimaryHost()) { + LOG.debug(loggingMarker, + "Received unexpected LWT for {} but not connected to primary host - ignoring", + edgeNodeDescriptor); + return; + } + + // Find payload's bdSeq to determine how to proceed + long messageBdSeq; + try { + payload = new SparkplugBPayloadDecoder().buildFromByteArray(mqttMessage.getPayload(), null); + messageBdSeq = SparkplugUtil.getBdSequenceNumber(payload); + } catch (Exception e) { + LOG.error(loggingMarker, + "Exception caught handling DEATH message while connected to primary host on topic {}", + topic, e); + throw new RuntimeCamelException(e); + } + + long currentBirthBdSeq = tahuEdgeNodeMetricHandler.getCurrentBirthBdSeq(); + + if (currentBirthBdSeq == messageBdSeq) { + // This is our latest LWT - treat as a rebirth + LOG.warn(loggingMarker, "Received unexpected LWT for {} - publishing BIRTH sequence", + edgeNodeDescriptor); + try { + client.handleRebirthRequest(true); + } catch (Exception e) { + LOG.warn(loggingMarker, + "Received unexpected LWT but failed to publish new BIRTH sequence for {}", + edgeNodeDescriptor); + throw new RuntimeCamelException(e); + } + } else { + LOG.warn(loggingMarker, + "Received unexpected LWT for {} with different bdSeq - expected {} received {} - ignoring", + edgeNodeDescriptor, currentBirthBdSeq, messageBdSeq); + } + } else { + LOG.debug(loggingMarker, "Received expected LWT for {} - no action required", + topic.getEdgeNodeDescriptor()); + } + return; + } + + if (!topic.isType(MessageType.NCMD) && !topic.isType(MessageType.DCMD)) { + LOG.debug(loggingMarker, "Received unexpected Sparkplug message of type {} - ignoring", topic.getType()); + return; + } + + try { + PayloadDecoder<SparkplugBPayload> decoder = new SparkplugBPayloadDecoder(); + payload = decoder.buildFromByteArray(mqttMessage.getPayload(), null); + + if (topic.isType(MessageType.NCMD)) { + handleNCMDMessage(payload); + } else if (topic.isType(MessageType.DCMD)) { + handleDCMDMessage(payload, topic.getDeviceId()); + } + } catch (Exception e) { + LOG.error(loggingMarker, "Exception caught decoding Sparkplug message with topic {} and payload {}", topic, + new String(mqttMessage.getPayload())); + throw new RuntimeCamelException(e); + } + } finally { + LOG.trace(loggingMarker, "{}: ClientCallback messageArrived complete", mqttClientId); + } + } + + void handleNCMDMessage(SparkplugBPayload ncmdPayload) { + List<Metric> responseMetrics = tahuEdgeNodeMetricHandler.processCMDMetrics(ncmdPayload, edgeNodeDescriptor); + + if (responseMetrics.isEmpty()) { + LOG.warn(loggingMarker, "Received NCMD with no valid metrics to write for {} - ignoring", + edgeNodeDescriptor); + return; + } + + SparkplugBPayload ndataPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .addMetrics(responseMetrics) + .createPayload(); + + LOG.debug(loggingMarker, "Publishing NDATA based on NCMD message for {}", edgeNodeDescriptor); + + client.publishNodeData(ndataPayload); + } + + void handleDCMDMessage(SparkplugBPayload dcmdPayload, String deviceId) { + DeviceDescriptor deviceDescriptor = new DeviceDescriptor(edgeNodeDescriptor, deviceId); + + List<Metric> responseMetrics = tahuEdgeNodeMetricHandler.processCMDMetrics(dcmdPayload, deviceDescriptor); + + if (responseMetrics.isEmpty()) { + LOG.warn(loggingMarker, "Received DCMD with no valid metrics to write for {} - ignoring", deviceDescriptor); + return; + } + + SparkplugBPayload ddataPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .addMetrics(responseMetrics) + .createPayload(); + + LOG.debug(loggingMarker, "Publishing DDATA based on DCMD message for {}", deviceDescriptor); + + client.publishDeviceData(deviceId, ddataPayload); + } + + @Override + public void shutdown() { + LOG.trace(loggingMarker, "ClientCallback shutdown called"); + + LOG.trace(loggingMarker, "ClientCallback shutdown complete"); Review Comment: Are these needed? ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeMetricHandler.java: ########## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +import org.eclipse.tahu.edge.api.MetricHandler; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.SparkplugBPayloadEncoder; +import org.eclipse.tahu.message.model.DeviceDescriptor; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.eclipse.tahu.message.model.MessageType; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.Metric.MetricBuilder; +import org.eclipse.tahu.message.model.MetricDataType; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; +import org.eclipse.tahu.message.model.SparkplugDescriptor; +import org.eclipse.tahu.message.model.SparkplugMeta; +import org.eclipse.tahu.message.model.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +class TahuEdgeMetricHandler implements MetricHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TahuEdgeMetricHandler.class); + + private final BdSeqManager bdSeqManager; + private volatile long currentBirthBdSeq; + private volatile long currentDeathBdSeq; + + private TahuEdgeClient client; + + private final EdgeNodeDescriptor edgeNodeDescriptor; + private final ConcurrentMap<SparkplugDescriptor, SparkplugBPayloadMap> descriptorMetricMap = new ConcurrentHashMap<>(); + + private final Marker loggingMarker; + + TahuEdgeMetricHandler(EdgeNodeDescriptor edgeNodeDescriptor, BdSeqManager bdSeqManager) { + this.edgeNodeDescriptor = edgeNodeDescriptor; + this.bdSeqManager = bdSeqManager; + + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + + LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor called"); + + currentBirthBdSeq = currentDeathBdSeq = bdSeqManager.getNextDeathBdSeqNum(); + + LOG.trace(loggingMarker, "TahuEdgeMetricHandler constructor complete"); + } + + void setClient(TahuEdgeClient client) { + this.client = client; + } + + @Override + public Topic getDeathTopic() { + LOG.trace(loggingMarker, "MetricHandler getDeathTopic called"); + + try { + return new Topic(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX, edgeNodeDescriptor, MessageType.NDEATH); + } finally { + LOG.trace(loggingMarker, "MetricHandler getDeathTopic complete"); + } + } + + @Override + public byte[] getDeathPayloadBytes() throws Exception { + LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes called"); + + try { + currentDeathBdSeq &= 0xFFL; + + SparkplugBPayload deathPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .addMetric(new MetricBuilder( + SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, + MetricDataType.Int64, currentDeathBdSeq).createMetric()) + .createPayload(); + + LOG.debug(loggingMarker, "Created death payload with bdSeq metric {}", currentDeathBdSeq); + + currentBirthBdSeq = currentDeathBdSeq++; + bdSeqManager.storeNextDeathBdSeqNum(currentDeathBdSeq); + + SparkplugBPayloadEncoder encoder = new SparkplugBPayloadEncoder(); + + return encoder.getBytes(deathPayload, true); + } finally { + LOG.trace(loggingMarker, "MetricHandler getDeathPayloadBytes complete"); + } + } + + @Override + public boolean hasMetric(SparkplugDescriptor sparkplugDescriptor, String metricName) { + LOG.trace(loggingMarker, "MetricHandler hasMetric called: sparkplugDescriptor {} metricName {}", + sparkplugDescriptor, metricName); + + boolean metricFound = false; + try { + metricFound = descriptorMetricMap.containsKey(sparkplugDescriptor) + && descriptorMetricMap.get(sparkplugDescriptor).getMetric(metricName) != null; + return metricFound; + } finally { + LOG.trace(loggingMarker, "MetricHandler hasMetric complete (metricFound = {})", metricFound); + } + } + + @Override + public void publishBirthSequence() { + LOG.trace(loggingMarker, "MetricHandler publishBirthSequence called"); + + try { + Date timestamp = new Date(); + + // SparkplugBPayloadMap, not a SparkplugBPayload + SparkplugBPayloadMap nBirthPayload = new SparkplugBPayloadMap.SparkplugBPayloadMapBuilder() + .setTimestamp(timestamp) + .addMetrics(getCachedMetrics(edgeNodeDescriptor)) + .addMetric(new MetricBuilder( + SparkplugMeta.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY, + MetricDataType.Int64, currentBirthBdSeq).createMetric()) + .createPayload(); + + LOG.debug(loggingMarker, "Created birth payload with bdSeq metric {}", currentBirthBdSeq); + + client.publishNodeBirth(nBirthPayload); + + descriptorMetricMap.keySet().stream().filter(sd -> sd.isDeviceDescriptor()).forEach(sd -> { + DeviceDescriptor deviceDescriptor = (DeviceDescriptor) sd; + String deviceId = deviceDescriptor.getDeviceId(); + + SparkplugBPayload dBirthPayload = new SparkplugBPayload.SparkplugBPayloadBuilder() + .setTimestamp(timestamp) + .addMetrics(getCachedMetrics(deviceDescriptor)) + .createPayload(); + + client.publishDeviceBirth(deviceId, dBirthPayload); + }); + + } catch (Exception e) { + LOG.error(loggingMarker, "Exception caught publishing birth sequence", e); + throw new TahuException(edgeNodeDescriptor, e); + } finally { + LOG.trace(loggingMarker, "MetricHandler publishBirthSequence complete"); + } + } + + SparkplugBPayloadMap addDeviceMetricDataPayloadMap( + SparkplugDescriptor metricDescriptor, SparkplugBPayloadMap metricDataTypePayloadMap) { + LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap called: {}", metricDescriptor); + + try { + return descriptorMetricMap.put(metricDescriptor, metricDataTypePayloadMap); + } finally { + LOG.trace(loggingMarker, "addDeviceMetricDataPayloadMap complete"); + } + } + + List<Metric> getCachedMetrics(SparkplugDescriptor sd) { + return Optional.ofNullable(descriptorMetricMap.get(sd)).map(SparkplugBPayloadMap::getMetrics).orElse(List.of()); + } + + SparkplugBPayloadMap getDescriptorMetricMap(SparkplugDescriptor sd) { + return descriptorMetricMap.get(sd); + } + + void updateCachedMetrics(SparkplugDescriptor sd, SparkplugBPayload payload) { + Optional.ofNullable(descriptorMetricMap.get(sd)).ifPresent(cachedMetrics -> { + payload.getMetrics().stream().forEach(payloadMetric -> { + cachedMetrics.updateMetricValue(payloadMetric.getName(), payloadMetric, null); + }); + }); + } + + long getCurrentBirthBdSeq() { + LOG.trace(loggingMarker, "getCurrentBirthBdSeq() : {}", currentBirthBdSeq); + return currentBirthBdSeq; + } + + long getCurrentDeathBdSeq() { + LOG.trace(loggingMarker, "getCurrentDeathBdSeq() : {}", currentDeathBdSeq); + return currentDeathBdSeq; + } + + List<Metric> processCMDMetrics(SparkplugBPayload payload, SparkplugDescriptor cmdDescriptor) { + List<Metric> receivedMetrics = payload.getMetrics(); + if (receivedMetrics == null || receivedMetrics.isEmpty()) { + return List.of(); + } + + // Look for a METRIC_NODE_REBIRTH received metric with True value + if (!cmdDescriptor.isDeviceDescriptor()) { + Map<Boolean, List<Metric>> groupedMetrics = receivedMetrics.stream() + .collect(Collectors.groupingBy(m -> (Boolean) SparkplugMeta.METRIC_NODE_REBIRTH.equals(m.getName()) + && m.getDataType() == MetricDataType.Boolean && (Boolean) m.getValue())); + + if (groupedMetrics.containsKey(Boolean.TRUE) && !groupedMetrics.get(Boolean.TRUE).isEmpty()) { + client.handleRebirthRequest(true); + } + + receivedMetrics = groupedMetrics.get(Boolean.FALSE); + } + + final SparkplugBPayloadMap cachedMetrics = descriptorMetricMap.get(cmdDescriptor); + if (cachedMetrics == null) { + return List.of(); + } + + List<Metric> responseMetrics = receivedMetrics.stream().map(m -> { + Metric cachedMetric = cachedMetrics.getMetric(m.getName()); + + if (cachedMetric == null) { + LOG.warn(loggingMarker, "Received CMD request for {} metric {} not in configured metrics - skipping", + cmdDescriptor, m.getName()); + return null; + } + + try { + Metric responseMetric = new Metric(cachedMetric); + + responseMetric.setHistorical(true); + + return responseMetric; + } catch (Exception e) { + LOG.warn(loggingMarker, + "Exception caught copying metric handling CMD request for {} metric {} - skipping", + cmdDescriptor, m.getName()); + return null; + } Review Comment: (advisory) Maybe move this block to a separate method. It makes the code easier to read/maintain. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/CamelBdSeqManager.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.FileUtils; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +public final class CamelBdSeqManager implements BdSeqManager { + + private static final Logger LOG = LoggerFactory.getLogger(CamelBdSeqManager.class); + + private static final Charset bdSeqNumFileCharset = StandardCharsets.UTF_8; + + private final File bdSeqNumFile; + + private final Marker loggingMarker; + + CamelBdSeqManager(EdgeNodeDescriptor edgeNodeDescriptor) { + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + LOG.trace(loggingMarker, "CamelBdSeqManager constructor called"); + + String bdSeqNumFileName = FileUtils.getTempDirectoryPath() + "/CamelTahuTemp/" + + edgeNodeDescriptor.getDescriptorString() + "/bdSeqMgr"; + + bdSeqNumFile = new File(bdSeqNumFileName); + + LOG.trace(loggingMarker, "CamelBdSeqManager constructor complete"); + } + + // This method is NOT intended to increment the stored value, only to retrieve it + @Override + public long getNextDeathBdSeqNum() { + LOG.trace(loggingMarker, "BdSeqManager getNextDeathBdSeqNum called"); + try { + long bdSeqNum = 0L; + if (bdSeqNumFile.exists() && FileUtils.sizeOf(bdSeqNumFile) > 0L) { + String bdSeqFileContents = FileUtils.readFileToString(bdSeqNumFile, bdSeqNumFileCharset); + + bdSeqNum = normalizeBdSeq(Long.parseLong(bdSeqFileContents)); + + LOG.debug(loggingMarker, "Next Death bdSeq number: {}", bdSeqNum); + } + return bdSeqNum; + } catch (Exception e) { + LOG.debug(loggingMarker, "Failed to get the bdSeq number from the persistent directory", e); + storeNextDeathBdSeqNum(0); Review Comment: It feels like it should be logging this at a different log level, no? At least a warning as to why it couldn't get the number. ########## components/camel-tahu/src/test/java/org/apache/camel/component/tahu/TahuHostConsumerTest.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import org.apache.camel.CamelContext; +import org.apache.camel.test.infra.core.annotations.RouteFixture; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Disabled +public class TahuHostConsumerTest extends TahuTestSupport { + + static enum HostAppTestProfile { + SESSION_ESTABLISHMENT_TEST("host SessionEstablishmentTest", false), + SESSION_TERMINATION_TEST("host SessionTerminationTest TestHostApp", true), + // SEND_COMMAND_TEST("host SendCommandTest G2 E2 D2", false), + // EDGE_SESSION_TERMINATION_TEST("host EdgeSessionTerminationTest G2 E2 D2", false), + // MESSAGE_ORDERING_TEST("host MessageOrderingTest G2 E2 D2 5000", false), + ; + + private HostAppTestProfile(String testConfig, boolean disconnect) { + this.testConfig = testConfig; + this.disconnect = disconnect; + } + + final String testConfig; + final boolean disconnect; + } + + private static final Logger LOG = LoggerFactory.getLogger(TahuHostConsumerTest.class); + + @ParameterizedTest + @EnumSource + public void tckSessionTest(HostAppTestProfile profile) throws Exception { + + initiateTckTest(profile.testConfig); + + } + + @RouteFixture Review Comment: (advisory) IMHO, these sets are simple enough that they can use the traditional `CamelTestSupport` from `camel-test-junit-5`. Typically, we use the `camel-test-infra-core` for components that have *very* complex testing needs/patterns (i.e.; `camel-jms`, `camel-kafka`) ... because its flexibility comes at the cost of an API that is much harder to use. ########## components/camel-tahu/src/test/java/org/apache/camel/component/tahu/SparkplugTCKService.java: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.nio.charset.StandardCharsets; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.NotifyBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.tahu.services.HiveMQService; +import org.apache.camel.component.tahu.services.HiveMQServiceFactory; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.builder.PredicateBuilder; +import org.apache.camel.test.infra.common.services.TestService; +import org.apache.camel.test.infra.core.MockUtils; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.apache.camel.test.junit5.TestSupport.bodyAs; +import static org.junit.jupiter.api.Assertions.fail; + +@Testcontainers Review Comment: A few notes: 1. We don't use those annotations. 2. Container-based test infrastructure should be in the [test-infra modules](https://camel.apache.org/manual/test-infra.html). 3. The base service should be an interface and the concrete test service an implementation of that interface. 4. Ideally, it should be a singleton service, so it doesn't overload ASF CI. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/CamelBdSeqManager.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.FileUtils; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +public final class CamelBdSeqManager implements BdSeqManager { + + private static final Logger LOG = LoggerFactory.getLogger(CamelBdSeqManager.class); + + private static final Charset bdSeqNumFileCharset = StandardCharsets.UTF_8; + + private final File bdSeqNumFile; + + private final Marker loggingMarker; + + CamelBdSeqManager(EdgeNodeDescriptor edgeNodeDescriptor) { + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + LOG.trace(loggingMarker, "CamelBdSeqManager constructor called"); + + String bdSeqNumFileName = FileUtils.getTempDirectoryPath() + "/CamelTahuTemp/" + + edgeNodeDescriptor.getDescriptorString() + "/bdSeqMgr"; Review Comment: Another thing here is that it assumes `/` as the path separator. While it wouldn't cause the code to fail, it's more appropriate to use `File.separator`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org