orpiske commented on code in PR #15088: URL: https://github.com/apache/camel/pull/15088#discussion_r1715277517
########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Arrays; +import java.util.List; + +// import java.util.Map; +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultHeaderFilterStrategy; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; + +/** + * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse Tahu + */ +@UriEndpoint(firstVersion = "4.0.0", + scheme = TahuConstants.BASE_SCHEME, + title = "Tahu", + syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX, + alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX, + category = { Category.MESSAGING, Category.IOT, Category.MONITORING }, + headersClass = TahuConstants.class) +public class TahuEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { + + @UriPath(label = "producer", description = "ID of the group") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String groupId; + + @UriPath(label = "producer", description = "ID of the edge node") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String edgeNode; + + @UriPath(label = "producer (device only)", description = "ID of this edge node device") + @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true) + private final String deviceId; + + @UriParam(label = "producer (edge node only)", description = "Host ID of the primary host application for this edge node") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String primaryHostId; + + @UriParam(label = "producer (edge node only)", + description = "ID of each device connected to this edge node, as a comma-separated list") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true) + private String deviceIds; + + @UriParam(label = "producer", + description = "Tahu SparkplugBPayloadMap to configure metric data types for this edge node or device NOTE: This payload is used exclusively as a Sparkplug B spec-compliant configuration for all possible edge node or device metric names, aliases, and data types. This configuration is required to publish proper Sparkplug B NBIRTH and DBIRTH payloads.") Review Comment: I'd use "Note that this payload" so that the phrase fits well with the UI tooling using the code. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgeClientCallback.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.List; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.RuntimeCamelException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.tahu.SparkplugParsingException; +import org.eclipse.tahu.message.PayloadDecoder; +import org.eclipse.tahu.message.SparkplugBPayloadDecoder; +import org.eclipse.tahu.message.model.DeviceDescriptor; +import org.eclipse.tahu.message.model.EdgeNodeDescriptor; +import org.eclipse.tahu.message.model.MessageType; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.eclipse.tahu.message.model.SparkplugMeta; +import org.eclipse.tahu.message.model.StatePayload; +import org.eclipse.tahu.message.model.Topic; +import org.eclipse.tahu.mqtt.ClientCallback; +import org.eclipse.tahu.mqtt.MqttClientId; +import org.eclipse.tahu.mqtt.MqttServerName; +import org.eclipse.tahu.mqtt.MqttServerUrl; +import org.eclipse.tahu.util.SparkplugUtil; +import org.eclipse.tahu.util.TopicUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +class TahuEdgeClientCallback implements ClientCallback { + + private static final Logger LOG = LoggerFactory.getLogger(TahuEdgeClientCallback.class); + + private TahuEdgeClient client; + + private final EdgeNodeDescriptor edgeNodeDescriptor; + private final TahuEdgeMetricHandler tahuEdgeNodeMetricHandler; + + private final Marker loggingMarker; + + TahuEdgeClientCallback(EdgeNodeDescriptor edgeNodeDescriptor, TahuEdgeMetricHandler tahuEdgeNodeMetricHandler) { + this.edgeNodeDescriptor = edgeNodeDescriptor; + this.tahuEdgeNodeMetricHandler = tahuEdgeNodeMetricHandler; + + loggingMarker = MarkerFactory.getMarker(edgeNodeDescriptor.getDescriptorString()); + } + + void setClient(TahuEdgeClient client) { + this.client = client; + } + + @Override + public void messageArrived( + MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, String rawTopic, + MqttMessage mqttMessage) { + + final Topic topic; + try { + topic = TopicUtil.parseTopic(rawTopic); + } catch (SparkplugParsingException e) { + LOG.error(loggingMarker, "Exception caught parsing Sparkplug topic {}", rawTopic, e); Review Comment: (advisory) In general we avoid logging and throwing ... This is not a requirement and you can still find some of our code doing that, but we have been trying to avoid it and clean it up. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuConfiguration.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.support.jsse.SSLContextParameters; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.model.MqttServerDefinition; +import org.eclipse.tahu.mqtt.MqttClientId; +import org.eclipse.tahu.mqtt.MqttServerName; +import org.eclipse.tahu.mqtt.MqttServerUrl; + +@UriParams +public class TahuConfiguration implements Cloneable { + + private static final Pattern SERVER_DEF_PATTERN = Pattern + .compile("([^:]+):(?:(?!tcp|ssl)([^:]+):)?((?:tcp|ssl):(?://)?[\\p{Alnum}.-]+(?::\\d+)?),?"); + + @UriParam(label = "common", + description = "MQTT server definition list, given with the following syntax in a comma-separated list: MqttServerName:[MqttClientId:](tcp|ssl)://hostname[:port],...") + private String servers; + + @UriParam(label = "common", + description = "MQTT client ID to use for all server definitions, rather than specifying the same one for each", + defaultValueNote = "If neither the clientId parameter nor an MqttClientId defined for an MQTT Server, a random MQTT Client ID will be generated, prefaced with \"Camel\"") + private String clientId; + + @UriParam(label = "common", + description = "MQTT client ID length check enabled", defaultValue = "true") + private boolean checkClientIdLength = true; + + @UriParam(label = "security", description = "Username for MQTT server authentication", secret = true) + private String username; + + @UriParam(label = "security", description = "Password for MQTT server authentication", secret = true) + private String password; + + @UriParam(label = "common", description = "Delay before node a rebirth message will be sent", defaultValue = "5000") + private long rebirthDebounceDelay = 5000L; + + @UriParam(label = "common", description = "Connection keep alive timeout in seconds", defaultValue = "30") + private int keepAliveTimeout = 30; + + @UriParam(label = "security", description = "SSL configuration for MQTT server connections") + private SSLContextParameters sslContextParameters; + + public String getServers() { + return servers; + } + + public void setServers(String servers) { + this.servers = servers; + } + + public List<MqttServerDefinition> getServerDefinitionList() { + List<MqttServerDefinition> serverDefinitionList; + if (ObjectHelper.isEmpty(servers)) { + serverDefinitionList = List.of(); + } else if (!SERVER_DEF_PATTERN.matcher(servers).find()) { + throw new RuntimeCamelException("Server definition list has invalid syntax: " + servers); + } else { + Matcher serverDefMatcher = SERVER_DEF_PATTERN.matcher(servers); + serverDefinitionList = serverDefMatcher.results().map(matchResult -> { + + // MatchResult does not support named groups + String serverName = matchResult.group(1); + String clientId = matchResult.group(2); + String serverUrl = matchResult.group(3); + + return parseFromUrlString(serverName, clientId, serverUrl); + }).toList(); + } + return serverDefinitionList; + } + + private MqttServerDefinition parseFromUrlString( + String serverName, String clientId, String serverUrl) { + if (ObjectHelper.isEmpty(serverName) || ObjectHelper.isEmpty(serverUrl)) { + throw new RuntimeCamelException( + "Both serverName and serverUrl must be specified in MQTT server definitions"); + } Review Comment: Here you can use `ObjectHelper.notNullOrEmpty` and it should do all of that for you. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Arrays; +import java.util.List; + +// import java.util.Map; +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultHeaderFilterStrategy; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; + +/** + * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse Tahu + */ +@UriEndpoint(firstVersion = "4.0.0", + scheme = TahuConstants.BASE_SCHEME, + title = "Tahu", + syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX, + alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX, + category = { Category.MESSAGING, Category.IOT, Category.MONITORING }, + headersClass = TahuConstants.class) +public class TahuEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { + + @UriPath(label = "producer", description = "ID of the group") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String groupId; + + @UriPath(label = "producer", description = "ID of the edge node") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String edgeNode; + + @UriPath(label = "producer (device only)", description = "ID of this edge node device") + @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true) + private final String deviceId; + + @UriParam(label = "producer (edge node only)", description = "Host ID of the primary host application for this edge node") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String primaryHostId; + + @UriParam(label = "producer (edge node only)", + description = "ID of each device connected to this edge node, as a comma-separated list") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true) + private String deviceIds; + + @UriParam(label = "producer", + description = "Tahu SparkplugBPayloadMap to configure metric data types for this edge node or device NOTE: This payload is used exclusively as a Sparkplug B spec-compliant configuration for all possible edge node or device metric names, aliases, and data types. This configuration is required to publish proper Sparkplug B NBIRTH and DBIRTH payloads.") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private SparkplugBPayloadMap metricDataTypePayloadMap; + + @UriParam + private final TahuConfiguration configuration; + + @UriParam(label = "producer (edge node only),advanced", description = "Flag enabling support for metric aliases", + defaultValue = "false") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private boolean useAliases = false; + + @UriParam(label = "producer,advanced", + description = "To use a custom HeaderFilterStrategy to filter headers used as Sparkplug metrics", + defaultValueNote = "Defaults to sending all Camel Message headers with name prefixes of \"" + + TahuConstants.METRIC_HEADER_PREFIX + "\", including those with null values") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }) + private volatile HeaderFilterStrategy headerFilterStrategy; + + @UriParam(label = "producer (edge node only),advanced", + description = "To use a specific org.eclipse.tahu.message.BdSeqManager implementation to manage edge node birth-death sequence numbers", + defaultValue = "org.apache.camel.component.tahu.CamelBdSeqManager") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private volatile BdSeqManager bdSeqManager; + + @UriParam(label = "producer (edge node only),advanced", + description = "Path for Sparkplug B NBIRTH/NDEATH sequence number persistence files. This path will contain files named as \"<Edge Node ID>-bdSeqNum\" and must be writable by the executing process' user", + defaultValue = "${sys:java.io.tmpdir}/CamelTahuTemp") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String bdSeqNumPath; + + @UriPath(label = "consumer", description = "ID for the host application") + @Metadata(applicableFor = TahuConstants.HOST_APP_SCHEME, required = true) + private final String hostId; + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode) { + this(uri, component, configuration, groupId, edgeNode, null); + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode, String deviceId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = groupId; + this.edgeNode = edgeNode; + this.deviceId = deviceId; + this.hostId = null; + + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, String hostId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = null; + this.edgeNode = null; + this.deviceId = null; + this.hostId = hostId; + + } + + @Override + public Producer createProducer() throws Exception { + if (ObjectHelper.isEmpty(getGroupId()) || ObjectHelper.isEmpty(getEdgeNode())) { + throw new FailedToCreateProducerException( + this, new IllegalArgumentException("No groupId and/or edgeNode configured for this Endpoint")); + } + + TahuEdgeProducer.Builder producerBuilder = new TahuEdgeProducer.Builder(this) + .groupId(groupId) + .edgeNode(edgeNode); + + if (deviceId != null) { + producerBuilder.deviceId(deviceId); + } + + TahuEdgeProducer producer = producerBuilder.build(); + + return producer; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + if (ObjectHelper.isEmpty(getHostId())) { Review Comment: Same not as before about `ObjectHelper.isNotNullOrEmpty`. ########## components/camel-tahu/src/test/java/org/apache/camel/component/tahu/TahuEdgeProducerTest.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Disabled Review Comment: Ideally, disabled tests should have a reason (i.e; `@Disabled("requires this and that to run")`). If the test is manual, it should be named `<something>ManualTest.java` so it doesn't get flagged in the code analysis. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Arrays; +import java.util.List; + +// import java.util.Map; +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultHeaderFilterStrategy; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; + +/** + * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse Tahu + */ +@UriEndpoint(firstVersion = "4.0.0", + scheme = TahuConstants.BASE_SCHEME, + title = "Tahu", + syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX, + alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX, + category = { Category.MESSAGING, Category.IOT, Category.MONITORING }, + headersClass = TahuConstants.class) +public class TahuEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { + + @UriPath(label = "producer", description = "ID of the group") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String groupId; + + @UriPath(label = "producer", description = "ID of the edge node") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String edgeNode; + + @UriPath(label = "producer (device only)", description = "ID of this edge node device") + @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true) + private final String deviceId; + + @UriParam(label = "producer (edge node only)", description = "Host ID of the primary host application for this edge node") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String primaryHostId; + + @UriParam(label = "producer (edge node only)", + description = "ID of each device connected to this edge node, as a comma-separated list") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true) + private String deviceIds; + + @UriParam(label = "producer", + description = "Tahu SparkplugBPayloadMap to configure metric data types for this edge node or device NOTE: This payload is used exclusively as a Sparkplug B spec-compliant configuration for all possible edge node or device metric names, aliases, and data types. This configuration is required to publish proper Sparkplug B NBIRTH and DBIRTH payloads.") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private SparkplugBPayloadMap metricDataTypePayloadMap; + + @UriParam + private final TahuConfiguration configuration; + + @UriParam(label = "producer (edge node only),advanced", description = "Flag enabling support for metric aliases", + defaultValue = "false") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private boolean useAliases = false; + + @UriParam(label = "producer,advanced", + description = "To use a custom HeaderFilterStrategy to filter headers used as Sparkplug metrics", + defaultValueNote = "Defaults to sending all Camel Message headers with name prefixes of \"" + + TahuConstants.METRIC_HEADER_PREFIX + "\", including those with null values") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }) + private volatile HeaderFilterStrategy headerFilterStrategy; + + @UriParam(label = "producer (edge node only),advanced", + description = "To use a specific org.eclipse.tahu.message.BdSeqManager implementation to manage edge node birth-death sequence numbers", + defaultValue = "org.apache.camel.component.tahu.CamelBdSeqManager") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private volatile BdSeqManager bdSeqManager; + + @UriParam(label = "producer (edge node only),advanced", + description = "Path for Sparkplug B NBIRTH/NDEATH sequence number persistence files. This path will contain files named as \"<Edge Node ID>-bdSeqNum\" and must be writable by the executing process' user", + defaultValue = "${sys:java.io.tmpdir}/CamelTahuTemp") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String bdSeqNumPath; + + @UriPath(label = "consumer", description = "ID for the host application") + @Metadata(applicableFor = TahuConstants.HOST_APP_SCHEME, required = true) + private final String hostId; + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode) { + this(uri, component, configuration, groupId, edgeNode, null); + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode, String deviceId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = groupId; + this.edgeNode = edgeNode; + this.deviceId = deviceId; + this.hostId = null; + + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, String hostId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = null; + this.edgeNode = null; + this.deviceId = null; + this.hostId = hostId; + + } + + @Override + public Producer createProducer() throws Exception { + if (ObjectHelper.isEmpty(getGroupId()) || ObjectHelper.isEmpty(getEdgeNode())) { Review Comment: Same not as before about `ObjectHelper.isNotNullOrEmpty`. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEdgePayloadConverter.java: ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Date; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +// import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.TypeConversionException; +import org.apache.camel.TypeConverter; +// import org.apache.camel.spi.HeaderFilterStrategy; +// import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.TypeConverterRegistry; +import org.eclipse.tahu.SparkplugInvalidTypeException; +import org.eclipse.tahu.message.model.Metric; +import org.eclipse.tahu.message.model.MetricDataType; +import org.eclipse.tahu.message.model.SparkplugBPayload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// WIP +// @Converter(generateLoader = true) +public class TahuEdgePayloadConverter { // implements HeaderFilterStrategyAware { Review Comment: Note to myself: to-be-reviewed after it's ready. ########## components/camel-tahu/src/test/java/org/apache/camel/component/tahu/services/HiveMQService.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu.services; + +import org.apache.camel.test.infra.common.services.TestService; + +public interface HiveMQService extends TestService { Review Comment: You can move this to an other test services to a new module in the `test-infra` dir. ########## components/camel-tahu/src/test/resources/hivemq-ce/extensions/sparkplug-tck/third-party-licenses/license-dependency.html: ########## Review Comment: This file shouldn't be here. ########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Arrays; +import java.util.List; + +// import java.util.Map; +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultHeaderFilterStrategy; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; + +/** + * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse Tahu + */ +@UriEndpoint(firstVersion = "4.0.0", Review Comment: 4.8.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org