ryan-highley commented on code in PR #15088: URL: https://github.com/apache/camel/pull/15088#discussion_r1715458243
########## components/camel-tahu/src/main/java/org/apache/camel/component/tahu/TahuEndpoint.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.tahu; + +import java.util.Arrays; +import java.util.List; + +// import java.util.Map; +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultHeaderFilterStrategy; +import org.apache.camel.util.ObjectHelper; +import org.eclipse.tahu.message.BdSeqManager; +import org.eclipse.tahu.message.model.SparkplugBPayloadMap; + +/** + * Sparkplug B Edge Node and Host Application support over MQTT using Eclipse Tahu + */ +@UriEndpoint(firstVersion = "4.0.0", + scheme = TahuConstants.BASE_SCHEME, + title = "Tahu", + syntax = TahuConstants.DEVICE_ENDPOINT_URI_SYNTAX, + alternativeSyntax = TahuConstants.HOST_APP_ENDPOINT_URI_SYNTAX, + category = { Category.MESSAGING, Category.IOT, Category.MONITORING }, + headersClass = TahuConstants.class) +public class TahuEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware { + + @UriPath(label = "producer", description = "ID of the group") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String groupId; + + @UriPath(label = "producer", description = "ID of the edge node") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private final String edgeNode; + + @UriPath(label = "producer (device only)", description = "ID of this edge node device") + @Metadata(applicableFor = TahuConstants.DEVICE_SCHEME, required = true) + private final String deviceId; + + @UriParam(label = "producer (edge node only)", description = "Host ID of the primary host application for this edge node") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String primaryHostId; + + @UriParam(label = "producer (edge node only)", + description = "ID of each device connected to this edge node, as a comma-separated list") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME, required = true) + private String deviceIds; + + @UriParam(label = "producer", + description = "Tahu SparkplugBPayloadMap to configure metric data types for this edge node or device NOTE: This payload is used exclusively as a Sparkplug B spec-compliant configuration for all possible edge node or device metric names, aliases, and data types. This configuration is required to publish proper Sparkplug B NBIRTH and DBIRTH payloads.") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }, required = true) + private SparkplugBPayloadMap metricDataTypePayloadMap; + + @UriParam + private final TahuConfiguration configuration; + + @UriParam(label = "producer (edge node only),advanced", description = "Flag enabling support for metric aliases", + defaultValue = "false") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private boolean useAliases = false; + + @UriParam(label = "producer,advanced", + description = "To use a custom HeaderFilterStrategy to filter headers used as Sparkplug metrics", + defaultValueNote = "Defaults to sending all Camel Message headers with name prefixes of \"" + + TahuConstants.METRIC_HEADER_PREFIX + "\", including those with null values") + @Metadata(applicableFor = { TahuConstants.EDGE_NODE_SCHEME, TahuConstants.DEVICE_SCHEME }) + private volatile HeaderFilterStrategy headerFilterStrategy; + + @UriParam(label = "producer (edge node only),advanced", + description = "To use a specific org.eclipse.tahu.message.BdSeqManager implementation to manage edge node birth-death sequence numbers", + defaultValue = "org.apache.camel.component.tahu.CamelBdSeqManager") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private volatile BdSeqManager bdSeqManager; + + @UriParam(label = "producer (edge node only),advanced", + description = "Path for Sparkplug B NBIRTH/NDEATH sequence number persistence files. This path will contain files named as \"<Edge Node ID>-bdSeqNum\" and must be writable by the executing process' user", + defaultValue = "${sys:java.io.tmpdir}/CamelTahuTemp") + @Metadata(applicableFor = TahuConstants.EDGE_NODE_SCHEME) + private String bdSeqNumPath; + + @UriPath(label = "consumer", description = "ID for the host application") + @Metadata(applicableFor = TahuConstants.HOST_APP_SCHEME, required = true) + private final String hostId; + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode) { + this(uri, component, configuration, groupId, edgeNode, null); + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, + String groupId, String edgeNode, String deviceId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = groupId; + this.edgeNode = edgeNode; + this.deviceId = deviceId; + this.hostId = null; + + } + + TahuEndpoint(String uri, TahuComponent component, TahuConfiguration configuration, String hostId) { + super(uri, component); + + this.configuration = configuration; + + this.groupId = null; + this.edgeNode = null; + this.deviceId = null; + this.hostId = hostId; + + } + + @Override + public Producer createProducer() throws Exception { + if (ObjectHelper.isEmpty(getGroupId()) || ObjectHelper.isEmpty(getEdgeNode())) { + throw new FailedToCreateProducerException( + this, new IllegalArgumentException("No groupId and/or edgeNode configured for this Endpoint")); + } + + TahuEdgeProducer.Builder producerBuilder = new TahuEdgeProducer.Builder(this) + .groupId(groupId) + .edgeNode(edgeNode); + + if (deviceId != null) { + producerBuilder.deviceId(deviceId); + } + + TahuEdgeProducer producer = producerBuilder.build(); + + return producer; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + if (ObjectHelper.isEmpty(getHostId())) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org