This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 811d93c camel-milo: added possibility to read node values via producer. (#4909) 811d93c is described below commit 811d93c1e4199612aac81ab25a2ee51aceb3474c Author: attrobit <git...@attrobit.de> AuthorDate: Sun Jan 24 08:53:23 2021 +0100 camel-milo: added possibility to read node values via producer. (#4909) * camel-milo: added possibility to read node values via producer. * camel-milo: added possibility to read node values via producer. * Fixed Producer construction after rebase. * documentation CamelMiloNodeIds Co-authored-by: attrobit <git...@attrobit.com> --- .../camel/catalog/docs/milo-client-component.adoc | 19 +++++ .../src/main/docs/milo-client-component.adoc | 19 +++++ .../milo/client/MiloClientConnection.java | 7 ++ .../component/milo/client/MiloClientEndpoint.java | 2 +- .../component/milo/client/MiloClientProducer.java | 16 ++++- .../milo/client/internal/SubscriptionManager.java | 39 ++++++++-- .../camel/component/milo/ReadValuesClientTest.java | 83 ++++++++++++++++++++++ .../modules/ROOT/pages/milo-client-component.adoc | 19 +++++ 8 files changed, 197 insertions(+), 7 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc index 985979f..fdec769 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc @@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information, but override the host information with the value of the original URI. +[[nodeid]] === Node ID @@ -228,6 +229,24 @@ Input parameters are taken from the body: * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]` +=== Read Values from Nodes + +The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format). + +Example: +```java +from("direct:start") + .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1"))) + .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites" + .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + return newExchange; + } + }).to("mock:test1"); +``` + === Security policies When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`) diff --git a/components/camel-milo/src/main/docs/milo-client-component.adoc b/components/camel-milo/src/main/docs/milo-client-component.adoc index 985979f..fdec769 100644 --- a/components/camel-milo/src/main/docs/milo-client-component.adoc +++ b/components/camel-milo/src/main/docs/milo-client-component.adoc @@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information, but override the host information with the value of the original URI. +[[nodeid]] === Node ID @@ -228,6 +229,24 @@ Input parameters are taken from the body: * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]` +=== Read Values from Nodes + +The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format). + +Example: +```java +from("direct:start") + .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1"))) + .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites" + .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + return newExchange; + } + }).to("mock:test1"); +``` + === Security policies When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`) diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java index 7af45a6..b282a2d 100644 --- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java +++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.milo.client; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -107,6 +108,12 @@ public class MiloClientConnection implements AutoCloseable { return this.manager.write(nodeId, mapWriteValue(value)); } + public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) { + checkInit(); + + return this.manager.readValues(nodeIds); + } + public CompletableFuture<CallMethodResult> call( final ExpandedNodeId nodeId, final ExpandedNodeId methodId, final Object value) { checkInit(); diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java index f689e62..75bc2c6 100644 --- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java +++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java @@ -110,7 +110,7 @@ public class MiloClientEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return new MiloClientProducer(this, this.defaultAwaitWrites); + return new MiloClientProducer(this, this.createConnection(), this.defaultAwaitWrites); } @Override diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java index d244236..8de1e59 100644 --- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java +++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.milo.client; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; @@ -35,7 +37,9 @@ public class MiloClientProducer extends DefaultAsyncProducer { private final boolean defaultAwaitWrites; - public MiloClientProducer(final MiloClientEndpoint endpoint, + private static final String HEADER_NODE_IDS = "CamelMiloNodeIds"; + + public MiloClientProducer(final MiloClientEndpoint endpoint, final MiloClientConnection connection, final boolean defaultAwaitWrites) { super(endpoint); @@ -74,7 +78,15 @@ public class MiloClientProducer extends DefaultAsyncProducer { final CompletableFuture<?> future; - if (this.methodId == null) { + if (msg.getHeaders().containsKey(HEADER_NODE_IDS)) { + final List<String> nodeIds = msg.getHeader(HEADER_NODE_IDS, List.class); + final List<ExpandedNodeId> expandedNodeIds + = nodeIds.stream().map(String.class::cast).map(ExpandedNodeId::parse).collect(Collectors.toList()); + future = this.connection.readValues(expandedNodeIds).thenApply(nodes -> { + exchange.getIn().setBody(nodes); + return nodes; + }); + } else if (this.methodId == null) { future = this.connection.writeValue(this.nodeId, value); } else { future = this.connection.call(this.nodeId, this.methodId, value); diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java index 95410d0..2b49043 100644 --- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java +++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.base.Strings; import org.apache.camel.component.milo.client.MiloClientConfiguration; @@ -188,9 +190,8 @@ public class SubscriptionManager { final ReadValueId itemId = new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE); Double samplingInterval = s.getSamplingInterval(); - final MonitoringParameters parameters - = new MonitoringParameters( - entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null); + final MonitoringParameters parameters = new MonitoringParameters( + entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null); items.add(new MonitoredItemCreateRequest(itemId, MonitoringMode.Reporting, parameters)); } } @@ -355,6 +356,17 @@ public class SubscriptionManager { }); } + + public CompletableFuture<List<DataValue>> readValues(List<ExpandedNodeId> expandedNodeIds) { + + final CompletableFuture<NodeId>[] nodeIdFutures + = expandedNodeIds.stream().map(this::lookupNamespace).toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(nodeIdFutures).thenCompose(param -> { + List<NodeId> nodeIds = Stream.of(nodeIdFutures).map(CompletableFuture::join).collect(Collectors.toList()); + return this.client.readValues(0, TimestampsToReturn.Server, nodeIds); + }); + } } private final MiloClientConfiguration configuration; @@ -449,7 +461,9 @@ public class SubscriptionManager { final URI uri = URI.create(getEndpointDiscoveryUri()); - //milo library doesn't allow user info as a part of the uri, it has to be removed before sending to milo + // milo library doesn't allow user info as a part of the uri, it has to + // be + // removed before sending to milo final String user = uri.getUserInfo(); if (user != null && !user.isEmpty()) { discoveryUri = discoveryUri.replaceFirst(user + "@", ""); @@ -706,4 +720,21 @@ public class SubscriptionManager { } } + public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) { + synchronized (this) { + if (this.connected == null) { + return newNotConnectedResult(); + } + + return this.connected.readValues(nodeIds).handleAsync((nodes, e) -> { + // handle outside the lock, running using + // handleAsync + if (e != null) { + handleConnectionFailue(e); + } + return nodes; + }, this.executor); + } + } + } diff --git a/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java new file mode 100644 index 0000000..7b082fd --- /dev/null +++ b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.milo; + +import java.util.Arrays; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +public class ReadValuesClientTest extends AbstractMiloServerTest { + + private static final String DIRECT_START_1 = "direct:start1"; + private static final String DIRECT_START_2 = "direct:start2"; + + private static final String MILO_SERVER_ITEM_1 = "milo-server:myitem1"; + + private static final String MILO_CLIENT_ITEM_C1_1 + = "milo-client:opc.tcp://foo:bar@localhost:@@port@@?allowedSecurityPolicies=None&overrideHost=true"; + + private static final String MOCK_TEST_1 = "mock:test1"; + + @EndpointInject(MOCK_TEST_1) + protected MockEndpoint test1Endpoint; + + @Produce(DIRECT_START_1) + protected ProducerTemplate producer1; + + @Produce(DIRECT_START_2) + protected ProducerTemplate producer2; + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(DIRECT_START_1).to(MILO_SERVER_ITEM_1); + + from(DIRECT_START_2) + .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1"))) + .enrich(resolve(MILO_CLIENT_ITEM_C1_1), new AggregationStrategy() { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + return newExchange; + } + }).to(MOCK_TEST_1); + } + }; + } + + @Test + void testReadValues_Successful() throws Exception { + this.test1Endpoint.expectedMinimumMessageCount(1); + + producer1.sendBody("Foo"); + producer2.sendBody("Bar"); + + this.test1Endpoint.await(); + + testBody(this.test1Endpoint.message(0), assertGoodValue("Foo")); + } +} diff --git a/docs/components/modules/ROOT/pages/milo-client-component.adoc b/docs/components/modules/ROOT/pages/milo-client-component.adoc index a0f4c43..06b739b 100644 --- a/docs/components/modules/ROOT/pages/milo-client-component.adoc +++ b/docs/components/modules/ROOT/pages/milo-client-component.adoc @@ -187,6 +187,7 @@ might be different, and wrong from the point of view of the connecting client (e In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information, but override the host information with the value of the original URI. +[[nodeid]] === Node ID @@ -230,6 +231,24 @@ Input parameters are taken from the body: * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]` +=== Read Values from Nodes + +The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format). + +Example: +```java +from("direct:start") + .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1"))) + .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites" + .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + return newExchange; + } + }).to("mock:test1"); +``` + === Security policies When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)