orpiske commented on code in PR #15088:
URL: https://github.com/apache/camel/pull/15088#discussion_r1715277517


##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Arrays;
+import java.util.List;
+
+// import java.util.Map;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.tahu.message.BdSeqManager;
+import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
+
+/**
+ * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse 
Tahu
+ */
+@UriEndpoint(firstVersion = "4.0.0",
+             scheme = TahuConstants.BASE_SCHEME,
+             title = "Tahu",
+             syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX,
+             alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX,
+             category = { Category.MESSAGING, Category.IOT, 
Category.MONITORING },
+             headersClass = TahuConstants.class)
+public class TahuEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+
+    @UriPath(label = "producer", description = "ID of the group")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String groupId;
+
+    @UriPath(label = "producer", description = "ID of the edge node")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String edgeNode;
+
+    @UriPath(label = "producer (device only)", description = "ID of this edge 
node device")
+    @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true)
+    private final String deviceId;
+
+    @UriParam(label = "producer (edge node only)", description = "Host ID of 
the primary host application for this edge node")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private String primaryHostId;
+
+    @UriParam(label = "producer (edge node only)",
+              description = "ID of each device connected to this edge node, as 
a comma-separated list")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true)
+    private String deviceIds;
+
+    @UriParam(label = "producer",
+              description = "Tahu SparkplugBPayloadMap to configure metric 
data types for this edge node or device  NOTE: This payload is used exclusively 
as a Sparkplug B spec-compliant configuration for all possible edge node or 
device metric names, aliases, and data types. This configuration is required to 
publish proper Sparkplug B NBIRTH and DBIRTH payloads.")

Review Comment:
   I'd use "Note that this payload" so that the phrase fits well with the UI 
tooling using the code.



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeClientCallback.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.List;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.RuntimeCamelException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.tahu.SparkplugParsingException;
+import org.eclipse.tahu.message.PayloadDecoder;
+import org.eclipse.tahu.message.SparkplugBPayloadDecoder;
+import org.eclipse.tahu.message.model.DeviceDescriptor;
+import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
+import org.eclipse.tahu.message.model.MessageType;
+import org.eclipse.tahu.message.model.Metric;
+import org.eclipse.tahu.message.model.SparkplugBPayload;
+import org.eclipse.tahu.message.model.SparkplugMeta;
+import org.eclipse.tahu.message.model.StatePayload;
+import org.eclipse.tahu.message.model.Topic;
+import org.eclipse.tahu.mqtt.ClientCallback;
+import org.eclipse.tahu.mqtt.MqttClientId;
+import org.eclipse.tahu.mqtt.MqttServerName;
+import org.eclipse.tahu.mqtt.MqttServerUrl;
+import org.eclipse.tahu.util.SparkplugUtil;
+import org.eclipse.tahu.util.TopicUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+class TahuEdgeClientCallback implements ClientCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TahuEdgeClientCallback.class);
+
+    private TahuEdgeClient client;
+
+    private final EdgeNodeDescriptor edgeNodeDescriptor;
+    private final TahuEdgeMetricHandler tahuEdgeNodeMetricHandler;
+
+    private final Marker loggingMarker;
+
+    TahuEdgeClientCallback(EdgeNodeDescriptor edgeNodeDescriptor, 
TahuEdgeMetricHandler tahuEdgeNodeMetricHandler) {
+        this.edgeNodeDescriptor = edgeNodeDescriptor;
+        this.tahuEdgeNodeMetricHandler = tahuEdgeNodeMetricHandler;
+
+        loggingMarker = 
MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString());
+    }
+
+    void setClient(TahuEdgeClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public void messageArrived(
+            MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, 
MqttClientId mqttClientId, String rawTopic,
+            MqttMessage mqttMessage) {
+
+        final Topic topic;
+        try {
+            topic = TopicUtil.parseTopic(rawTopic);
+        } catch (SparkplugParsingException e) {
+            LOG.error(loggingMarker, "Exception caught parsing Sparkplug topic 
{}", rawTopic, e);

Review Comment:
   (advisory) In general we avoid logging and throwing ... This is not a 
requirement and you can still find some of our code doing that, but we have 
been trying to avoid it and clean it up.



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuConfiguration.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.support.jsse.SSLContextParameters;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.tahu.model.MqttServerDefinition;
+import org.eclipse.tahu.mqtt.MqttClientId;
+import org.eclipse.tahu.mqtt.MqttServerName;
+import org.eclipse.tahu.mqtt.MqttServerUrl;
+
+@UriParams
+public class TahuConfiguration implements Cloneable {
+
+    private static final Pattern SERVER_DEF_PATTERN = Pattern
+            
.compile("([^:]+):(?:(?!tcp|ssl)([^:]+):)?((?:tcp|ssl):(?://)?[\\p{Alnum}.-]+(?::\\d+)?),?");
+
+    @UriParam(label = "common",
+              description = "MQTT server definition list, given with the 
following syntax in a comma-separated list: 
MqttServerName:[MqttClientId:](tcp|ssl)://hostname[:port],...")
+    private String servers;
+
+    @UriParam(label = "common",
+              description = "MQTT client ID to use for all server definitions, 
rather than specifying the same one for each",
+              defaultValueNote = "If neither the clientId parameter nor an 
MqttClientId defined for an MQTT Server, a random MQTT Client ID will be 
generated, prefaced with \"Camel\"")
+    private String clientId;
+
+    @UriParam(label = "common",
+              description = "MQTT client ID length check enabled", 
defaultValue = "true")
+    private boolean checkClientIdLength = true;
+
+    @UriParam(label = "security", description = "Username for MQTT server 
authentication", secret = true)
+    private String username;
+
+    @UriParam(label = "security", description = "Password for MQTT server 
authentication", secret = true)
+    private String password;
+
+    @UriParam(label = "common", description = "Delay before node a rebirth 
message will be sent", defaultValue = "5000")
+    private long rebirthDebounceDelay = 5000L;
+
+    @UriParam(label = "common", description = "Connection keep alive timeout 
in seconds", defaultValue = "30")
+    private int keepAliveTimeout = 30;
+
+    @UriParam(label = "security", description = "SSL configuration for MQTT 
server connections")
+    private SSLContextParameters sslContextParameters;
+
+    public String getServers() {
+        return servers;
+    }
+
+    public void setServers(String servers) {
+        this.servers = servers;
+    }
+
+    public List<MqttServerDefinition> getServerDefinitionList() {
+        List<MqttServerDefinition> serverDefinitionList;
+        if (ObjectHelper.isEmpty(servers)) {
+            serverDefinitionList = List.of();
+        } else if (!SERVER_DEF_PATTERN.matcher(servers).find()) {
+            throw new RuntimeCamelException("Server definition list has 
invalid syntax: " + servers);
+        } else {
+            Matcher serverDefMatcher = SERVER_DEF_PATTERN.matcher(servers);
+            serverDefinitionList = serverDefMatcher.results().map(matchResult 
-> {
+
+                // MatchResult does not support named groups
+                String serverName = matchResult.group(1);
+                String clientId = matchResult.group(2);
+                String serverUrl = matchResult.group(3);
+
+                return parseFromUrlString(serverName, clientId, serverUrl);
+            }).toList();
+        }
+        return serverDefinitionList;
+    }
+
+    private MqttServerDefinition parseFromUrlString(
+            String serverName, String clientId, String serverUrl) {
+        if (ObjectHelper.isEmpty(serverName) || 
ObjectHelper.isEmpty(serverUrl)) {
+            throw new RuntimeCamelException(
+                    "Both serverName and serverUrl must be specified in MQTT 
server definitions");
+        }

Review Comment:
   Here you can use `ObjectHelper.notNullOrEmpty` and it should do all of that 
for you. 



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Arrays;
+import java.util.List;
+
+// import java.util.Map;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.tahu.message.BdSeqManager;
+import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
+
+/**
+ * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse 
Tahu
+ */
+@UriEndpoint(firstVersion = "4.0.0",
+             scheme = TahuConstants.BASE_SCHEME,
+             title = "Tahu",
+             syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX,
+             alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX,
+             category = { Category.MESSAGING, Category.IOT, 
Category.MONITORING },
+             headersClass = TahuConstants.class)
+public class TahuEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+
+    @UriPath(label = "producer", description = "ID of the group")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String groupId;
+
+    @UriPath(label = "producer", description = "ID of the edge node")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String edgeNode;
+
+    @UriPath(label = "producer (device only)", description = "ID of this edge 
node device")
+    @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true)
+    private final String deviceId;
+
+    @UriParam(label = "producer (edge node only)", description = "Host ID of 
the primary host application for this edge node")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private String primaryHostId;
+
+    @UriParam(label = "producer (edge node only)",
+              description = "ID of each device connected to this edge node, as 
a comma-separated list")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true)
+    private String deviceIds;
+
+    @UriParam(label = "producer",
+              description = "Tahu SparkplugBPayloadMap to configure metric 
data types for this edge node or device  NOTE: This payload is used exclusively 
as a Sparkplug B spec-compliant configuration for all possible edge node or 
device metric names, aliases, and data types. This configuration is required to 
publish proper Sparkplug B NBIRTH and DBIRTH payloads.")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private SparkplugBPayloadMap metricDataTypePayloadMap;
+
+    @UriParam
+    private final TahuConfiguration configuration;
+
+    @UriParam(label = "producer (edge node only),advanced", description = 
"Flag enabling support for metric aliases",
+              defaultValue = "false")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private boolean useAliases = false;
+
+    @UriParam(label = "producer,advanced",
+              description = "To use a custom HeaderFilterStrategy to filter 
headers used as Sparkplug metrics",
+              defaultValueNote = "Defaults to sending all Camel Message 
headers with name prefixes of \""
+                                 + TahuConstants.METRIC_HEADER_PREFIX + "\", 
including those with null values")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME })
+    private volatile HeaderFilterStrategy headerFilterStrategy;
+
+    @UriParam(label = "producer (edge node only),advanced",
+              description = "To use a specific 
org.eclipse.tahu.message.BdSeqManager implementation to manage edge node 
birth-death sequence numbers",
+              defaultValue = 
"org.apache.camel.component.tahu.CamelBdSeqManager")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private volatile BdSeqManager bdSeqManager;
+
+    @UriParam(label = "producer (edge node only),advanced",
+              description = "Path for Sparkplug B NBIRTH/NDEATH sequence 
number persistence files. This path will contain files named as \"<Edge Node 
ID>-bdSeqNum\" and must be writable by the executing process' user",
+              defaultValue = "${sys:java.io.tmpdir}/CamelTahuTemp")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private String bdSeqNumPath;
+
+    @UriPath(label = "consumer", description = "ID for the host application")
+    @Metadata(applicableFor = TahuConstants.HOST_APP_SCHEME, required = true)
+    private final String hostId;
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration,
+                 String groupId, String edgeNode) {
+        this(uri, component, configuration, groupId, edgeNode, null);
+    }
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration,
+                 String groupId, String edgeNode, String deviceId) {
+        super(uri, component);
+
+        this.configuration = configuration;
+
+        this.groupId = groupId;
+        this.edgeNode = edgeNode;
+        this.deviceId = deviceId;
+        this.hostId = null;
+
+    }
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration, String hostId) {
+        super(uri, component);
+
+        this.configuration = configuration;
+
+        this.groupId = null;
+        this.edgeNode = null;
+        this.deviceId = null;
+        this.hostId = hostId;
+
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        if (ObjectHelper.isEmpty(getGroupId()) || 
ObjectHelper.isEmpty(getEdgeNode())) {
+            throw new FailedToCreateProducerException(
+                    this, new IllegalArgumentException("No groupId and/or 
edgeNode configured for this Endpoint"));
+        }
+
+        TahuEdgeProducer.Builder producerBuilder = new 
TahuEdgeProducer.Builder(this)
+                .groupId(groupId)
+                .edgeNode(edgeNode);
+
+        if (deviceId != null) {
+            producerBuilder.deviceId(deviceId);
+        }
+
+        TahuEdgeProducer producer = producerBuilder.build();
+
+        return producer;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        if (ObjectHelper.isEmpty(getHostId())) {

Review Comment:
   Same not as before about `ObjectHelper.isNotNullOrEmpty`.



##########
components/camel-tahu/src/test/java/org/apache/camel/component/tahu/TahuEdgeProducerTest.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Disabled

Review Comment:
   Ideally, disabled tests should have a reason (i.e; `@Disabled("requires this 
and that to run")`). If the test is manual, it should be named 
`<something>ManualTest.java` so it doesn't get flagged in the code analysis.



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Arrays;
+import java.util.List;
+
+// import java.util.Map;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.tahu.message.BdSeqManager;
+import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
+
+/**
+ * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse 
Tahu
+ */
+@UriEndpoint(firstVersion = "4.0.0",
+             scheme = TahuConstants.BASE_SCHEME,
+             title = "Tahu",
+             syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX,
+             alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX,
+             category = { Category.MESSAGING, Category.IOT, 
Category.MONITORING },
+             headersClass = TahuConstants.class)
+public class TahuEndpoint extends DefaultEndpoint implements 
HeaderFilterStrategyAware {
+
+    @UriPath(label = "producer", description = "ID of the group")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String groupId;
+
+    @UriPath(label = "producer", description = "ID of the edge node")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private final String edgeNode;
+
+    @UriPath(label = "producer (device only)", description = "ID of this edge 
node device")
+    @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true)
+    private final String deviceId;
+
+    @UriParam(label = "producer (edge node only)", description = "Host ID of 
the primary host application for this edge node")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private String primaryHostId;
+
+    @UriParam(label = "producer (edge node only)",
+              description = "ID of each device connected to this edge node, as 
a comma-separated list")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true)
+    private String deviceIds;
+
+    @UriParam(label = "producer",
+              description = "Tahu SparkplugBPayloadMap to configure metric 
data types for this edge node or device  NOTE: This payload is used exclusively 
as a Sparkplug B spec-compliant configuration for all possible edge node or 
device metric names, aliases, and data types. This configuration is required to 
publish proper Sparkplug B NBIRTH and DBIRTH payloads.")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME }, required = true)
+    private SparkplugBPayloadMap metricDataTypePayloadMap;
+
+    @UriParam
+    private final TahuConfiguration configuration;
+
+    @UriParam(label = "producer (edge node only),advanced", description = 
"Flag enabling support for metric aliases",
+              defaultValue = "false")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private boolean useAliases = false;
+
+    @UriParam(label = "producer,advanced",
+              description = "To use a custom HeaderFilterStrategy to filter 
headers used as Sparkplug metrics",
+              defaultValueNote = "Defaults to sending all Camel Message 
headers with name prefixes of \""
+                                 + TahuConstants.METRIC_HEADER_PREFIX + "\", 
including those with null values")
+    @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, 
TahuConstants.DEVICE_SCHEME })
+    private volatile HeaderFilterStrategy headerFilterStrategy;
+
+    @UriParam(label = "producer (edge node only),advanced",
+              description = "To use a specific 
org.eclipse.tahu.message.BdSeqManager implementation to manage edge node 
birth-death sequence numbers",
+              defaultValue = 
"org.apache.camel.component.tahu.CamelBdSeqManager")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private volatile BdSeqManager bdSeqManager;
+
+    @UriParam(label = "producer (edge node only),advanced",
+              description = "Path for Sparkplug B NBIRTH/NDEATH sequence 
number persistence files. This path will contain files named as \"<Edge Node 
ID>-bdSeqNum\" and must be writable by the executing process' user",
+              defaultValue = "${sys:java.io.tmpdir}/CamelTahuTemp")
+    @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME)
+    private String bdSeqNumPath;
+
+    @UriPath(label = "consumer", description = "ID for the host application")
+    @Metadata(applicableFor = TahuConstants.HOST_APP_SCHEME, required = true)
+    private final String hostId;
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration,
+                 String groupId, String edgeNode) {
+        this(uri, component, configuration, groupId, edgeNode, null);
+    }
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration,
+                 String groupId, String edgeNode, String deviceId) {
+        super(uri, component);
+
+        this.configuration = configuration;
+
+        this.groupId = groupId;
+        this.edgeNode = edgeNode;
+        this.deviceId = deviceId;
+        this.hostId = null;
+
+    }
+
+    TahuEndpoint(String uri, TahuComponent component, TahuConfiguration 
configuration, String hostId) {
+        super(uri, component);
+
+        this.configuration = configuration;
+
+        this.groupId = null;
+        this.edgeNode = null;
+        this.deviceId = null;
+        this.hostId = hostId;
+
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        if (ObjectHelper.isEmpty(getGroupId()) || 
ObjectHelper.isEmpty(getEdgeNode())) {

Review Comment:
   Same not as before about `ObjectHelper.isNotNullOrEmpty`. 



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgePayloadConverter.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Date;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+// import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConversionException;
+import org.apache.camel.TypeConverter;
+// import org.apache.camel.spi.HeaderFilterStrategy;
+// import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.eclipse.tahu.SparkplugInvalidTypeException;
+import org.eclipse.tahu.message.model.Metric;
+import org.eclipse.tahu.message.model.MetricDataType;
+import org.eclipse.tahu.message.model.SparkplugBPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// WIP
+// @Converter(generateLoader = true)
+public class TahuEdgePayloadConverter { // implements 
HeaderFilterStrategyAware {

Review Comment:
   Note to myself: to-be-reviewed after it's ready.



##########
components/camel-tahu/src/test/java/org/apache/camel/component/tahu/services/HiveMQService.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu.services;
+
+import org.apache.camel.test.infra.common.services.TestService;
+
+public interface HiveMQService extends TestService {

Review Comment:
   You can move this to an other test services to a new module in the 
`test-infra` dir.



##########
components/camel-tahu/src/test/resources/hivemq-ce/extensions/sparkplug-tck/third-party-licenses/license-dependency.html:
##########


Review Comment:
   This file shouldn't be here.



##########
components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.tahu;
+
+import java.util.Arrays;
+import java.util.List;
+
+// import java.util.Map;
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.eclipse.tahu.message.BdSeqManager;
+import org.eclipse.tahu.message.model.SparkplugBPayloadMap;
+
+/**
+ * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse 
Tahu
+ */
+@UriEndpoint(firstVersion = "4.0.0",

Review Comment:
   4.8.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to