Initial IotpGateway implementation - some updates to the initial IotGateway interface - add IotpGateway and its runtime classes - add sample clients - it all seems to be working - TODO flesh out the new IotDevice methods in EchoIotDevice, MqttDevice and PubSubIotDevice
Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/3401fbf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/3401fbf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/3401fbf7 Branch: refs/heads/master Commit: 3401fbf74e69218634add84f895a2c7c78afe628 Parents: 26c198b Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Thu Feb 23 17:10:13 2017 -0500 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Fri Feb 24 21:53:41 2017 -0500 ---------------------------------------------------------------------- .../apache/edgent/apps/iot/PubSubIotDevice.java | 12 + .../edgent/test/apps/iot/EchoIotDevice.java | 12 + .../apache/edgent/connectors/iot/IotDevice.java | 6 +- .../edgent/connectors/iot/IotGateway.java | 32 +- .../edgent/connectors/iotp/IotpDevice.java | 26 +- .../edgent/connectors/iotp/IotpGWDevice.java | 80 +++++ .../edgent/connectors/iotp/IotpGateway.java | 316 +++++++++++++++++++ .../connectors/iotp/runtime/IotpConnector.java | 23 ++ .../iotp/runtime/IotpDeviceCommands.java | 5 +- .../connectors/iotp/runtime/IotpGWCommands.java | 51 +++ .../iotp/runtime/IotpGWConnector.java | 263 +++++++++++++++ .../iotp/runtime/IotpGWDeviceEventsFixed.java | 48 +++ .../runtime/IotpGWDeviceEventsFunction.java | 55 ++++ .../iotp/runtime/IotpGWEventsFixed.java | 46 +++ .../iotp/runtime/IotpGWEventsFunction.java | 53 ++++ .../edgent/connectors/mqtt/iot/MqttDevice.java | 12 + .../samples/connectors/iotp/IotpAppClient.java | 136 ++++++++ .../connectors/iotp/IotpDeviceSample.java | 152 +++++++++ .../connectors/iotp/IotpGWDeviceSample.java | 198 ++++++++++++ scripts/connectors/iotp/iotp-app-client.cfg | 25 ++ scripts/connectors/iotp/iotp-device-sample.cfg | 10 + .../connectors/iotp/iotp-gwdevice-sample.cfg | 17 + scripts/connectors/iotp/run-iotp-app-client.sh | 45 +++ .../connectors/iotp/run-iotp-device-sample.sh | 43 +++ .../connectors/iotp/run-iotp-gwdevice-sample.sh | 44 +++ 25 files changed, 1683 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java ---------------------------------------------------------------------- diff --git a/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java index 0110a01..e6a64b3 100644 --- a/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java +++ b/apps/iot/src/main/java/org/apache/edgent/apps/iot/PubSubIotDevice.java @@ -121,4 +121,16 @@ class PubSubIotDevice implements IotDevice { return commandsStream; } + @Override + public String getDeviceType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getDeviceId() { + // TODO Auto-generated method stub + return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java ---------------------------------------------------------------------- diff --git a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java index 65b217f..95d62d2 100644 --- a/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java +++ b/apps/iot/src/test/java/org/apache/edgent/test/apps/iot/EchoIotDevice.java @@ -114,5 +114,17 @@ public class EchoIotDevice implements IotDevice { Set<String> cmds = new HashSet<>(Arrays.asList(commands)); return echoCmds.filter(cmd -> cmds.contains(cmd.getAsJsonPrimitive(CMD_ID).getAsString())); } + + @Override + public String getDeviceType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getDeviceId() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java index 16d81f4..b44269d 100644 --- a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java +++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotDevice.java @@ -55,17 +55,15 @@ public interface IotDevice extends TopologyElement { /** * Get the device's opaque device type identifier. - * TODO remove the "default" - avoids compilation errors while discussing this. * @return the device's type */ - public default String getDeviceType() { return "a-device-type-id"; } + String getDeviceType(); /** * Get the device's unique opaque device identifier. - * TODO remove the "default" - avoids compilation errors while discussing this. * @return the device's id */ - public default String getDeviceId() { return "a-device-id"; } + String getDeviceId(); /** * Publish a stream's tuples as device events. http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotGateway.java ---------------------------------------------------------------------- diff --git a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotGateway.java b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotGateway.java index 4988ba1..abaaf8a 100644 --- a/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotGateway.java +++ b/connectors/iot/src/main/java/org/apache/edgent/connectors/iot/IotGateway.java @@ -32,6 +32,8 @@ import com.google.gson.JsonObject; /** * A generic IoT gateway device IoT hub connector. * <p> + * <b>This interface is incubating and is subject to change.</b> + * <p> * An IoT gateway device is a conduit for a collection of IoT devices * that lack direct connection to the enterprise IoT hub. * <p> @@ -45,7 +47,7 @@ import com.google.gson.JsonObject; * The name/value pairs in the map are IotGateway implementation defined values. * Refer to the IotGateway implementation for details. * <p> - * Events can be published that are from a deviceId and commands can be + * Events can be published that are from a connected device's deviceId and commands can be * received for that are targeted for it using * {@link #eventsForDevice(String, TStream, String, JsonObject, int) eventsForDevice()} * and {@link #commandsForDevice(Set, String...) commandsForDevice()}. @@ -111,7 +113,7 @@ public interface IotGateway extends IotDevice { /** * Publish a stream's tuples as device events. * Each tuple is published as a device event with the supplied - * device identifier, event identifier, payload and QoS. + * device identifier, event identifier and QoS. * <p> * Events for a particular device can also be published via its * {@link IotDevice#events(TStream, String, int) IotDevice.event()}. @@ -122,29 +124,27 @@ public interface IotGateway extends IotDevice { * Stream to be published. * @param eventId * Event identifier. - * @param payload - * Event's payload. * @param qos * Event's delivery Quality of Service. * @return TSink sink element representing termination of this stream. */ TSink<JsonObject> eventsForDevice(String deviceId, - TStream<JsonObject> stream, String eventId, JsonObject payload, int qos) ; + TStream<JsonObject> stream, String eventId, int qos) ; /** * Create a stream of device commands as JSON objects. - * Each command sent to one of the specified devices matching {@code commands} will + * Each command sent to one of the specified {@code deviceIds} matching {@code commands} will * result in a tuple on the stream. The JSON object has these keys: * <UL> - * <LI>{@link IotDevice#CMD_DEVICE device} - Command's target device's opaque id String. - * <LI>{@link IotDevice#CMD_ID command} - Command identifier as a String</LI> - * <LI>{@link IotDevice#CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI> - * <LI>{@link IotDevice#CMD_FORMAT format} - Format of the command as a String</LI> - * <LI>{@link IotDevice#CMD_PAYLOAD payload} - Payload of the command - * <UL> - * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI> - * <LI>Otherwise {@code payload} is String</LI> - * </UL> + * <LI>{@link #CMD_DEVICE device} - Command's opaque target device's id String. + * <LI>{@link #CMD_ID command} - Command identifier as a String</LI> + * <LI>{@link #CMD_TS tsms} - Timestamp of the command in milliseconds since the 1970/1/1 epoch.</LI> + * <LI>{@link #CMD_FORMAT format} - Format of the command as a String</LI> + * <LI>{@link #CMD_PAYLOAD payload} - Payload of the command + * <UL> + * <LI>If {@code format} is {@code json} then {@code payload} is JSON</LI> + * <LI>Otherwise {@code payload} is String</LI> + * </UL> * </LI> * </UL> * @@ -162,7 +162,7 @@ public interface IotGateway extends IotDevice { /** * Create a stream of device commands as JSON objects. - * Each command sent to the specified device matching {@code commands} will + * Each command sent to the specified {@code deviceId} matching {@code commands} will * result in a tuple on the stream. The JSON object has these keys: * <UL> * <LI>{@link IotDevice#CMD_DEVICE device} - Command's target device's opaque id String. http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java index f3a61fe..0ea5f06 100644 --- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java @@ -323,9 +323,10 @@ public class IotpDevice implements IotDevice { return all.map(cmd -> { JsonObject full = new JsonObject(); - full.addProperty("command", cmd.getCommand()); - full.addProperty("tsms", System.currentTimeMillis()); - full.addProperty("format", cmd.getFormat()); + full.addProperty(IotDevice.CMD_DEVICE, getDeviceId()); + full.addProperty(IotDevice.CMD_ID, cmd.getCommand()); + full.addProperty(IotDevice.CMD_TS, System.currentTimeMillis()); + full.addProperty(IotDevice.CMD_FORMAT, cmd.getFormat()); if ("json".equalsIgnoreCase(cmd.getFormat())) { JsonParser parser = new JsonParser(); // iot-java 0.2.2 bug https://github.com/ibm-watson-iot/iot-java/issues/81 @@ -346,9 +347,9 @@ public class IotpDevice implements IotDevice { } else { cmdData = jsonPayload; } - full.add("payload", cmdData); + full.add(IotDevice.CMD_PAYLOAD, cmdData); } else { - full.addProperty("payload", cmd.getData().toString()); + full.addProperty(IotDevice.CMD_PAYLOAD, cmd.getData().toString()); } return full; @@ -365,4 +366,19 @@ public class IotpDevice implements IotDevice { public Topology topology() { return topology; } + + @Override + public String getDeviceType() { + return connector.getDeviceType(); + } + + @Override + public String getDeviceId() { + return connector.getFqDeviceId(); + } + + @Override + public String toString() { + return String.format("IotpDevice %s", getDeviceId()); + } } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGWDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGWDevice.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGWDevice.java new file mode 100644 index 0000000..a0bd2e3 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGWDevice.java @@ -0,0 +1,80 @@ +package org.apache.edgent.connectors.iotp; + +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.connectors.iotp.runtime.IotpGWConnector; +import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFixed; +import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFunction; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import com.google.gson.JsonObject; + +class IotpGWDevice implements IotDevice { // TODO implements IotpDevice ??? does GW allow for http? + + private final IotpGateway gateway; + private final IotpGWConnector connector; + private final Topology topology; + private final String fqDeviceId; + private final String deviceType; + + IotpGWDevice(IotpGateway gw, IotpGWConnector connector, Topology topology, String fqDeviceId) { + this.gateway = gw; + this.connector = connector; + this.topology = topology; + this.fqDeviceId = fqDeviceId; + String[] devIdToks = IotpGWConnector.splitFqDeviceId(fqDeviceId); + this.deviceType = devIdToks[0]; + } + + @Override + public Topology topology() { + return topology; + } + + @Override + public String getDeviceType() { + return deviceType; + } + + @Override + public String getDeviceId() { + return fqDeviceId; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return stream.sink( + new IotpGWDeviceEventsFunction(connector, jo -> fqDeviceId, eventId, payload, qos)); + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { + return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos)); + } + + @Override + public TStream<JsonObject> commands(String... commands) { + return gateway.commandsForDevice(fqDeviceId, commands); + } + + @Override + public boolean equals(Object o2) { + return o2 == this + || equals(o2 instanceof IotpGWDevice && ((IotpGWDevice)o2).fqDeviceId.equals(fqDeviceId)); + } + + @Override + public int hashCode() { + return fqDeviceId.hashCode(); + } + + @Override + public String toString() { + return String.format("IotpGWDevice %s", fqDeviceId); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGateway.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGateway.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGateway.java new file mode 100644 index 0000000..aee3d71 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpGateway.java @@ -0,0 +1,316 @@ +package org.apache.edgent.connectors.iotp; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.connectors.iot.IotGateway; +import org.apache.edgent.connectors.iotp.runtime.IotpGWCommands; +import org.apache.edgent.connectors.iotp.runtime.IotpGWConnector; +import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFixed; +import org.apache.edgent.connectors.iotp.runtime.IotpGWDeviceEventsFunction; +import org.apache.edgent.connectors.iotp.runtime.IotpGWEventsFixed; +import org.apache.edgent.connectors.iotp.runtime.IotpGWEventsFunction; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.ibm.iotf.client.gateway.Command; +import com.ibm.iotf.client.gateway.GatewayCallback; +import com.ibm.iotf.client.gateway.GatewayClient; + +/** + * An IoT Gateway Device Connector to IBM Watson IoT Platform (WIoTP). + * <p> + * <b>This class is incubating and is subject to change.</b> + * <p> + * See {@link IotpDevice} for common WIoTP documentation references. + * <p> + * {@code IotpGateway} establishes its own WIoTP {@link GatewayCallback} + * handler in its embedded or the passed in WIoTP {@link GatewayClient}. + * An application can use + * {@link #setExternalCallbackHandler(GatewayCallback) setExternalCallbackHandler} + * to also receive and process callbacks. + */ +public class IotpGateway implements IotGateway { + + private final IotpGWConnector connector; + private final Topology topology; + private TStream<Command> commandStream; + + /** + * Create a connector for the IoT gateway device specified by {@code options}. + * <BR> + * These properties must be set in {@code options}. + * + * <UL> + * <LI>{@code org=}<em>organization identifier</em></LI> + * <LI>{@code type=}<em>gateway device type</em></LI> + * <LI>{@code id=}<em>gateway device identifier</em></LI> + * <LI>{@code auth-method=token}</LI> + * <LI>{@code auth-token=}<em>authorization token</em></LI> + * </UL> + * For example: + * <pre> + * <code> + * Properties options = new Properties(); + * options.setProperty("org", "uguhsp"); + * options.setProperty("type", "iotsample-gateway"); + * options.setProperty("id", "00aabbccde03"); + * options.setProperty("auth-method", "token"); + * options.setProperty("auth-token", "AJfKQV@&bBo@VX6Dcg"); + * + * IotDevice iotDevice = new IotpDevice(options); + * </code> + * </pre> + * <p> + * Connecting to the server occurs when the topology is submitted for + * execution. + * </p> + * + * @param options control options + * @param topology + * the connector's associated {@code Topology}. + * + * @see the IBM Watson IoT Platform documentation for additional properties. + */ + public IotpGateway(Topology topology, Properties options) { + this.topology = topology; + this.connector = new IotpGWConnector(options); + } + + /** + * Create a connector for the IoT gateway device specified by {@code optionsFile}. + * <BR> + * The format of the file is: + * <pre> + * <code> + * [device] + * org = <em>organization identifier</em> + * type = <em>gateway device type</em> + * id = <em>gateway device identifier</em> + * auth-method = token + * auth-token = <em>authorization token</em> + * </code> + * </pre> + * For example: + * <pre> + * <code> + * [device] + * org = uguhsp + * type = iotsample-gateway + * id = 00aabbccde03 + * auth-method = token + * auth-token = AJfKQV@&bBo@VX6Dcg + * </code> + * </pre> + * <p> + * Connecting to the server occurs when the topology is submitted for + * execution. + * </p> + * @param topology the connector's associated {@code Topology}. + * @param optionsFile File containing connection information. + * + * @see the IBM Watson IoT Platform documentation for additional properties. + */ + public IotpGateway(Topology topology, File optionsFile) { + this.topology = topology; + this.connector = new IotpGWConnector(optionsFile); + } + + /** + * Create a connector using the supplied WIoTP {@code DeviceClient}. + * @param topology the connector's associated {@code Topology}. + * @param gatewayClient a WIoTP device client API object. + */ + public IotpGateway(Topology topology, GatewayClient gatewayClient) { + this.topology = topology; + this.connector = new IotpGWConnector(gatewayClient); + } + + @Override + public String getDeviceType() { + return connector.getDeviceType(); + } + + @Override + public String getDeviceId() { + return connector.getFqDeviceId(); + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return stream.sink(new IotpGWEventsFunction(connector, eventId, payload, qos)); + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { + return stream.sink(new IotpGWEventsFixed(connector, eventId, qos)); + } + + @Override + public TStream<JsonObject> commands(String... commands) { + return commandsForDevice(Collections.singleton(connector.getFqDeviceId()), commands); + } + + private TStream<Command> allCommands() { + if (commandStream == null) + commandStream = topology.events(new IotpGWCommands(connector)); + return commandStream; + } + + @Override + public Topology topology() { + return topology; + } + + /** + * WIoTP Device Type identifier key. + * Key is {@value}. + * + * @see #getIotDevice(Map) + */ + public static String ATTR_DEVICE_TYPE = "deviceType"; + + /** + * WIoTP Device Id identifier key. + * Key is {@value}. + * + * @see #getIotDevice(Map) + */ + public static String ATTR_DEVICE_ID = "deviceId"; + + /** + * {@inheritDoc} + * <p> + * The device's WIoTP deviceType and deviceId must be supplied + * using the {@link #ATTR_DEVICE_TYPE} and {@link #ATTR_DEVICE_ID} + * keys respectively. + */ + @Override + public String getIotDeviceId(Map<String, String> deviceIdAttrs) { + return connector.getIotDeviceId(deviceIdAttrs); + } + + /** + * {@inheritDoc} + * <p> + * See {@link #getIotDeviceId(Map)} for the required attribute keys. + */ + @Override + public IotDevice getIotDevice(Map<String, String> deviceIdAttrs) { + return getIotDevice(getIotDeviceId(deviceIdAttrs)); + } + + @Override + public IotDevice getIotDevice(String fqDeviceId) { + return new IotpGWDevice(this, connector, topology, fqDeviceId); + } + + @Override + public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> fqDeviceId, + TStream<JsonObject> stream, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return stream.sink(new IotpGWDeviceEventsFunction(connector, fqDeviceId, eventId, payload, qos)); + } + + @Override + public TSink<JsonObject> eventsForDevice(String fqDeviceId, TStream<JsonObject> stream, + String eventId, int qos) { + return stream.sink(new IotpGWDeviceEventsFixed(connector, fqDeviceId, eventId, qos)); + } + + @Override + public TStream<JsonObject> commandsForDevice(Set<String> fqDeviceIds, String... commands) { + TStream<Command> all = allCommands(); + + if (fqDeviceIds.size() != 0) { + // support "all devices of type T" - fqDeviceId of typeId and "*" for the simple deviceId + boolean allDevicesOfType = fqDeviceIds.size() == 1 + && IotpGWConnector.splitFqDeviceId(fqDeviceIds.iterator().next())[1].equals("*"); + + all = all.filter(cmd -> { + String fqDeviceId = IotpGWConnector.toFqDeviceId(cmd.getDeviceType(), + allDevicesOfType ? "*" : cmd.getDeviceId()); + return fqDeviceIds.contains(fqDeviceId); + }); + } + + if (commands.length != 0) { + Set<String> uniqueCommands = new HashSet<>(); + uniqueCommands.addAll(Arrays.asList(commands)); + all = all.filter(cmd -> uniqueCommands.contains(cmd.getCommand())); + } + + return all.map(cmd -> { + JsonObject full = new JsonObject(); + full.addProperty(IotDevice.CMD_DEVICE, + IotpGWConnector.toFqDeviceId(cmd.getDeviceType(), cmd.getDeviceId())); + full.addProperty(IotDevice.CMD_ID, cmd.getCommand()); + full.addProperty(IotDevice.CMD_TS, System.currentTimeMillis()); + full.addProperty(IotDevice.CMD_FORMAT, cmd.getFormat()); + if ("json".equalsIgnoreCase(cmd.getFormat())) { + JsonParser parser = new JsonParser(); + // iot-java 0.2.2 bug https://github.com/ibm-watson-iot/iot-java/issues/81 + // cmd.getData() returns byte[] instead of JsonObject (or String). + // Must continue to use the deprecated method until that's fixed. + // final JsonObject jsonPayload = (JsonObject) cmd.getData(); + // final JsonObject jsonPayload = (JsonObject) parser.parse((String)cmd.getData()); + @SuppressWarnings("deprecation") + final JsonObject jsonPayload = (JsonObject) parser.parse(cmd.getPayload()); + final JsonObject cmdData; + // wiotp java client API >= 0.2.1 (other clients earlier?) + // A json fmt command's msg payload may or may not have "d" wrapping of + // the actual command data. + // The wiotp client API doesn't mask that from clients + // so deal with that here. + if (jsonPayload.has("d")) { + cmdData = jsonPayload.getAsJsonObject("d"); + } else { + cmdData = jsonPayload; + } + full.add(IotDevice.CMD_PAYLOAD, cmdData); + } else { + full.addProperty(IotDevice.CMD_PAYLOAD, cmd.getData().toString()); + } + return full; + }); + } + + @Override + public TStream<JsonObject> commandsForDevice(String fqDeviceId, String... commands) { + return commandsForDevice(Collections.singleton(fqDeviceId), commands); + } + + @Override + public TStream<JsonObject> commandsForType(String deviceTypeId, String... commands) { + return commandsForDevice( + Collections.singleton(IotpGWConnector.toFqDeviceId(deviceTypeId, "*")), commands); + } + + /** + * Set an external WIoTP {@link GatewayCallback} handler. + * + * @param handler the handler to call. May be null. + * @return the previously set handler. May be null. + */ + public GatewayCallback setExternalCallbackHandler(GatewayCallback handler) { + return connector.setExternalCallbackHandler(handler); + } + + @Override + public String toString() { + return String.format("IotpGateway %s", getDeviceId()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java index 6639c40..c2abce7 100644 --- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java @@ -44,6 +44,8 @@ public class IotpConnector implements Serializable, AutoCloseable { private File optionsFile; private transient DeviceClient client; private boolean disconnectOnClose = true; + private String deviceType; + private String fqDeviceId; /** * Create a new connector to the specified MQTT server. @@ -52,15 +54,28 @@ public class IotpConnector implements Serializable, AutoCloseable { */ public IotpConnector(Properties options) { this.options = options; + init(); } public IotpConnector(File optionsFile) { this.optionsFile = optionsFile; + init(); } public IotpConnector(DeviceClient iotpDeviceClient) { this.client = iotpDeviceClient; this.disconnectOnClose = false; + init(); + } + + private void init() { + try { + DeviceClient client = getClient(); + this.deviceType = client.getDeviceType(); + this.fqDeviceId = IotpGWConnector.toFqDeviceId(deviceType, client.getDeviceId()); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create DeviceClient", e); + } } synchronized DeviceClient connect() { @@ -133,4 +148,12 @@ public class IotpConnector implements Serializable, AutoCloseable { client.disconnect(); client = null; } + + public String getDeviceType() { + return deviceType; + } + + public String getFqDeviceId() { + return fqDeviceId; + } } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceCommands.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceCommands.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceCommands.java index 0883701..ab1350d 100644 --- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceCommands.java +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceCommands.java @@ -20,14 +20,15 @@ under the License. package org.apache.edgent.connectors.iotp.runtime; import org.apache.edgent.function.Consumer; +import org.apache.edgent.topology.Topology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ibm.iotf.client.device.Command; /** - * Consumer that publishes stream tuples as IoTf device events. - * + * An event setup adapter for {@link Topology#events(Consumer) topology.events()} + * that submits received WIoTP device commands as stream tuples. */ public class IotpDeviceCommands implements Consumer<Consumer<Command>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWCommands.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWCommands.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWCommands.java new file mode 100644 index 0000000..60d7625 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWCommands.java @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; +import org.apache.edgent.topology.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ibm.iotf.client.gateway.Command; + +/** + * An event setup adapter for {@link Topology#events(Consumer) topology.events()} + * that submits received WIoTP device commands as stream tuples. + */ +public class IotpGWCommands implements Consumer<Consumer<Command>> { + private static final long serialVersionUID = 1L; + private final IotpGWConnector connector; + private static final Logger logger = LoggerFactory.getLogger(IotpGWCommands.class); + + public IotpGWCommands(IotpGWConnector connector) { + this.connector = connector; + } + + @Override + public void accept(Consumer<Command> commandSubmitter) { + + try { + connector.subscribeCommands(commandSubmitter); + } catch (Exception e) { + logger.error("Exception caught while subscribing commands", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWConnector.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWConnector.java new file mode 100644 index 0000000..84e7631 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWConnector.java @@ -0,0 +1,263 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import static org.apache.edgent.connectors.iotp.IotpGateway.ATTR_DEVICE_ID; +import static org.apache.edgent.connectors.iotp.IotpGateway.ATTR_DEVICE_TYPE; + +import java.io.File; +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import org.apache.edgent.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonObject; +import com.ibm.iotf.client.device.DeviceClient; +import com.ibm.iotf.client.gateway.Command; +import com.ibm.iotf.client.gateway.GatewayCallback; +import com.ibm.iotf.client.gateway.GatewayClient; +import com.ibm.iotf.client.gateway.Notification; + +/** + * Gateway Device connector for IoTf. + */ +public class IotpGWConnector implements Serializable, AutoCloseable { + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(IotpGWConnector.class); + + private Properties options; + private File optionsFile; + private transient GatewayClient client; + private boolean disconnectOnClose = true; + private boolean isInitialConnect = true; + private GatewayCallback externalCallbackHandler; + private String deviceType; // for the gateway device + private String deviceId; // raw WIoTP deviceId for the gateway device + private String fqDeviceId; // for the gateway device + + /** + * Create a new connector to the specified MQTT server. + * + * @param options connector options + */ + public IotpGWConnector(Properties options) { + this.options = options; + init(); + } + + public IotpGWConnector(File optionsFile) { + this.optionsFile = optionsFile; + init(); + } + + public IotpGWConnector(GatewayClient iotpGatewayDeviceClient) { + this.client = iotpGatewayDeviceClient; + this.disconnectOnClose = false; + init(); + } + + private void init() { + try { + GatewayClient client = getClient(); + this.deviceType = client.getGWDeviceType(); + this.deviceId = client.getGWDeviceId(); + this.fqDeviceId = toFqDeviceId(deviceType, deviceId); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create GatewayClient", e); + } + } + + synchronized GatewayClient connect() { + GatewayClient client; + try { + client = getClient(); + if (!client.isConnected()) { + client.connect(); + } + if (isInitialConnect) { + // We need this for a passed in GatewayClient that was already + // connected, not just when we initiate the connect... + // + // GatewayClient pre-subscribes to cmds for the GW device + // but not for its connected devices so do that now. + // + // N.B. in the face of overlapping subscriptions, + // our GatewayCallback.processCommand(), established + // by our subscribeCommands(), gets called multiple times - once for + // each matching subscription. They are separate Command instances + // for the "duplicate" cmds so we can't filter them out. + // The net result is that the same cmd gets added to a stream + // multiple times. + // + // In combination with the GatewayClient's auto-subscription + // of the GW device's cmds, our desire to receive cmds for + // all of the GW's connected devices sets up this overlapping + // subscriptions condition. + // i.e., simply adding a "all deviceType and all deviceIDs" subscription + // results in duplicate GW device cmd callbacks/tuples. + // + // Unsubscribe the GW device auto-subscription to avoid the dups. + + client.unsubscribeFromDeviceCommands(client.getGWDeviceType(), client.getGWDeviceId()); + client.subscribeToDeviceCommands("+", "+"); + + isInitialConnect = false; + } + return client; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + synchronized GatewayClient getClient() throws Exception { + if (client == null) { + if (options == null) + options = DeviceClient.parsePropertiesFile(optionsFile); + + client = new GatewayClient(options); + } + return client; + } + + public synchronized GatewayCallback setExternalCallbackHandler(GatewayCallback handler) { + GatewayCallback prev = externalCallbackHandler; + externalCallbackHandler = handler; + return prev; + } + + synchronized void subscribeCommands(Consumer<Command> tupleSubmitter) throws Exception { + GatewayClient client = getClient(); + + // N.B. See commentary in connect() above re "dup cmds". + + client.setGatewayCallback(new GatewayCallback() { + + @Override + public void processCommand(Command cmd) { + if (externalCallbackHandler != null) { + externalCallbackHandler.processCommand(cmd); + } + + tupleSubmitter.accept(cmd); + } + + @Override + public void processNotification(Notification notification) { + if (externalCallbackHandler != null) { + externalCallbackHandler.processNotification(notification); + } + + // Edgent doesn't currently handle notifications. + } + + }); + + connect(); + } + + void publishGWEvent(String eventId, JsonObject event, int qos) { + GatewayClient client; + try { + client = connect(); + } catch (Exception e) { + throw new RuntimeException(e); + + } + if (!client.publishGatewayEvent(eventId, event, qos)) { + logger.error("Publish event failed for eventId {}", eventId); + } + } + + void publishDeviceEvent(String fqDeviceId, String eventId, JsonObject event, int qos) { + String[] devIdToks = splitFqDeviceId(fqDeviceId); + publishDeviceEvent(devIdToks[0], devIdToks[1], eventId, event, qos); + } + + void publishDeviceEvent(String deviceType, String deviceId, String eventId, JsonObject event, int qos) { + GatewayClient client; + try { + client = connect(); + } catch (Exception e) { + throw new RuntimeException(e); + + } + if (!client.publishDeviceEvent(deviceType, deviceId, eventId, event, qos)) { + logger.error("Publish event failed for eventId {}", eventId); + } + } + +// void publishHttpDeviceEvent(String eventId, JsonObject event) { +// try { +// APIClient api = getClient().api(); +// if (!api.publishDeviceEventOverHTTP(eventId, event, ContentType.json)) { +// logger.error("HTTP publish event failed for eventId {}", eventId); +// } +// } catch (Exception e) { +// // throw new RuntimeException(e); +// // If the publish throws, a RuntimeException will cause +// // everything to unwind and the app/topology can terminate. +// // See the commentary/impl of MqttPublisher.accept(). +// // See EDGENT-382 +// logger.error("Unable to publish event for eventId {}", eventId, e); +// } +// } + + @Override + public void close() throws Exception { + if (client == null) + return; + + if (disconnectOnClose) + client.disconnect(); + client = null; + } + + public String getDeviceType() { + return deviceType; + } + + public String getFqDeviceId() { + return fqDeviceId; + } + + public String getIotDeviceId(Map<String, String> deviceIdAttrs) { + Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_TYPE), ATTR_DEVICE_TYPE); + Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_ID), ATTR_DEVICE_ID); + + return toFqDeviceId(deviceIdAttrs.get(ATTR_DEVICE_TYPE), deviceIdAttrs.get(ATTR_DEVICE_ID)); + } + + public static String toFqDeviceId(String deviceType, String deviceId) { + return String.format("D/%s/%s", deviceType, deviceId); + } + + public static String[] splitFqDeviceId(String fqDeviceId) { + String[] tokens = fqDeviceId.split("/"); + if (tokens.length != 3 || !tokens[0].equals("D")) { + throw new IllegalArgumentException("bad fqDeviceId " + fqDeviceId); + } + return new String[] { tokens[1], tokens[2] }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFixed.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFixed.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFixed.java new file mode 100644 index 0000000..d647b52 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFixed.java @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes device stream tuples as IoTf device events. + * + */ +public class IotpGWDeviceEventsFixed implements Consumer<JsonObject> { + private static final long serialVersionUID = 1L; + private final IotpGWConnector connector; + private final String fqDeviceId; + private final String eventId; + private final int qos; + + public IotpGWDeviceEventsFixed(IotpGWConnector connector, String fqDeviceId, String eventId, int qos) { + this.connector = connector; + this.fqDeviceId = fqDeviceId; + this.eventId = eventId; + this.qos = qos; + } + + @Override + public void accept(JsonObject event) { + connector.publishDeviceEvent(fqDeviceId, eventId, event, qos); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFunction.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFunction.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFunction.java new file mode 100644 index 0000000..00838f5 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWDeviceEventsFunction.java @@ -0,0 +1,55 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes device stream tuples as IoTf device events with a fixed + * event identifier and qos. + * + */ +public class IotpGWDeviceEventsFunction implements Consumer<JsonObject> { + private static final long serialVersionUID = 1L; + private final IotpGWConnector connector; + private final Function<JsonObject, String> fqDeviceId; + private final Function<JsonObject, String> eventId; + private UnaryOperator<JsonObject> payload; + private final Function<JsonObject, Integer> qos; + + public IotpGWDeviceEventsFunction(IotpGWConnector connector, Function<JsonObject, String> fqDeviceId, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload, + Function<JsonObject, Integer> qos) { + this.connector = connector; + this.fqDeviceId = fqDeviceId; + this.payload = payload; + this.eventId = eventId; + this.qos = qos; + } + + @Override + public void accept(JsonObject event) { + connector.publishDeviceEvent(fqDeviceId.apply(event), eventId.apply(event), payload.apply(event), qos.apply(event)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFixed.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFixed.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFixed.java new file mode 100644 index 0000000..c6c5112 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFixed.java @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes stream tuples as IoTf GW device events. + * + */ +public class IotpGWEventsFixed implements Consumer<JsonObject> { + private static final long serialVersionUID = 1L; + private final IotpGWConnector connector; + private final String eventId; + private final int qos; + + public IotpGWEventsFixed(IotpGWConnector connector, String eventId, int qos) { + this.connector = connector; + this.eventId = eventId; + this.qos = qos; + } + + @Override + public void accept(JsonObject event) { + connector.publishGWEvent(eventId, event, qos); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFunction.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFunction.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFunction.java new file mode 100644 index 0000000..9da88e0 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpGWEventsFunction.java @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes stream tuples as IoTf GW device events with a fixed + * event identifier and qos. + * + */ +public class IotpGWEventsFunction implements Consumer<JsonObject> { + private static final long serialVersionUID = 1L; + private final IotpGWConnector connector; + private final Function<JsonObject, String> eventId; + private UnaryOperator<JsonObject> payload; + private final Function<JsonObject, Integer> qos; + + public IotpGWEventsFunction(IotpGWConnector connector, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload, + Function<JsonObject, Integer> qos) { + this.connector = connector; + this.payload = payload; + this.eventId = eventId; + this.qos = qos; + } + + @Override + public void accept(JsonObject event) { + connector.publishGWEvent(eventId.apply(event), payload.apply(event), qos.apply(event)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/connectors/mqtt/src/main/java/org/apache/edgent/connectors/mqtt/iot/MqttDevice.java ---------------------------------------------------------------------- diff --git a/connectors/mqtt/src/main/java/org/apache/edgent/connectors/mqtt/iot/MqttDevice.java b/connectors/mqtt/src/main/java/org/apache/edgent/connectors/mqtt/iot/MqttDevice.java index 4804a93..6df2c5b 100644 --- a/connectors/mqtt/src/main/java/org/apache/edgent/connectors/mqtt/iot/MqttDevice.java +++ b/connectors/mqtt/src/main/java/org/apache/edgent/connectors/mqtt/iot/MqttDevice.java @@ -276,4 +276,16 @@ public class MqttDevice implements IotDevice { public Topology topology() { return topology; } + + @Override + public String getDeviceType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getDeviceId() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpAppClient.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpAppClient.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpAppClient.java new file mode 100644 index 0000000..3511dc3 --- /dev/null +++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpAppClient.java @@ -0,0 +1,136 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.samples.connectors.iotp; + +import java.io.File; +import java.io.FileReader; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import com.google.gson.JsonObject; +import com.ibm.iotf.client.app.ApplicationClient; +import com.ibm.iotf.client.app.Command; +import com.ibm.iotf.client.app.Event; +import com.ibm.iotf.client.app.EventCallback; + +/** + * A WIoTP ApplicationClient that publishes cmds + * and subscribes to events for the + * IotpDeviceSample and IotpGWDeviceSample device samples. + * <p> + * Usage: {@code [useGw] <app-cfg-path> # see scripts/connectors/iotp/iotp-app-client.cfg} + * <p> + * This connects to your IBM Watson IoT Platform service + * as the Application defined in a application config file. + * The file format is the standard one for IBM Watson IoT Platform. + * <p> + * Note, the config file also contains some additional information for this application. + * A sample iot-app-client.cfg is in the scripts/connectors/iotp directory. + */ +public class IotpAppClient { + + private static final String usage = "[useGw] <app-cfg-path> # see scripts/connectors/iotp/iotp-app-client.cfg"; + + public static void main(String[] args) throws Exception { + if (args.length == 0) + throw new Exception("Usage: " + usage); + List<String> argList = Arrays.asList(args); + boolean useGW = argList.contains("useGW"); + String deviceCfgPath = argList.get(argList.size() - 1); + + Properties cfgProps = new Properties(); + cfgProps.load(new FileReader(new File(deviceCfgPath))); + + String iotpOrg = cfgProps.getProperty("org"); + String iotpAppId = cfgProps.getProperty("id"); + String iotpAppKey = cfgProps.getProperty("auth-key"); + System.out.println("org: " + iotpOrg); + System.out.println("id: " + iotpAppId); + System.out.println("key: " + iotpAppKey); + + String iotpDevType = cfgProps.getProperty("deviceType"); + String iotpDevId = cfgProps.getProperty("deviceId"); + if (useGW) { + iotpDevType = cfgProps.getProperty("gwDeviceType"); + iotpDevId = cfgProps.getProperty("gwDeviceId"); + } + System.out.println("deviceType: " + iotpDevType); + System.out.println("deviceId: " + iotpDevId); + + ApplicationClient client = new ApplicationClient(cfgProps); + + client.connect(); + + boolean sendCmd = true; + if (sendCmd) { + sendCmd(client, iotpDevType, iotpDevId); + if (useGW) { + sendCmd(client, cfgProps.getProperty("cn-dev1-type"), cfgProps.getProperty("cn-dev1-id")); + } + } + + boolean subscribeToEvents = true; + if (subscribeToEvents) { + System.out.println("Subscribing to events..."); + client.subscribeToDeviceEvents(); + client.setEventCallback(new EventCallback() { + + @Override + public void processCommand(Command cmd) { + // TODO Auto-generated method stub + + } + + @SuppressWarnings("deprecation") + @Override + public void processEvent(Event event) { + System.out.println( + String.format("Received event: %s %s:%s %s %s", event.getEvent(), + event.getDeviceType(), event.getDeviceId(), + event.getFormat(), + event.getPayload())); + } + + }); + Thread.sleep(Integer.MAX_VALUE); + } + + client.disconnect(); + } + + private static int msgNum = 0; + private static void sendCmd(ApplicationClient client, String iotpDevType, String iotpDevId) throws Exception { + String command = "cmdId-1"; + JsonObject jo = new JsonObject(); + jo.addProperty("msgNum", ++msgNum); + jo.addProperty("deviceTypeAndId", iotpDevType + "/" + iotpDevId); + jo.addProperty("cmdId", command); + jo.addProperty("str", "a-string"); + jo.addProperty("num", 12345); + JsonObject data = jo; + + System.out.println("Sending "+iotpDevType+"/"+iotpDevId+" command: "+command+" data("+data.getClass().getName()+")="+data); + + boolean ok = client.publishCommand(iotpDevType, iotpDevId, command, data); + + System.out.println("Sent: " + (ok ? "OK" : "NOT-OK")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpDeviceSample.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpDeviceSample.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpDeviceSample.java new file mode 100644 index 0000000..a88dfe1 --- /dev/null +++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpDeviceSample.java @@ -0,0 +1,152 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.samples.connectors.iotp; + +import java.io.File; +import java.io.FileReader; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.edgent.connectors.iot.QoS; +import org.apache.edgent.connectors.iotp.IotpDevice; +import org.apache.edgent.providers.direct.DirectProvider; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import com.google.gson.JsonObject; +import com.ibm.iotf.client.device.DeviceClient; +import com.ibm.iotf.devicemgmt.DeviceData; +import com.ibm.iotf.devicemgmt.device.ManagedDevice; + +/** + * Similar to IotpQuickstart2 but for a real/non-quickstart WIoTP account + * for a registered device + * AND it subscribes to/prints device cmds. + * <P> + * Use IotpAppClient or any other technique to generate cmds. + * e.g., mosquitto_{pub,sub} cmds are printed below. + * <P> + * This sample demonstrates: + * <UL> + * <LI>Using the IotpDevice connector</LI> + * <LI>Initializing the IotpDevice connector using the WIoTP API objects</LI> + * <LI>Publishing and subscribing to device events and commands</LI> + * </UL> + * <p> + * This connects to your IBM Watson IoT Platform service + * as the Device defined in a device config file. + * The file format is the standard one for IBM Watson IoT Platform. + * <p> + * Note, the config file also contains some additional information for this application. + * A sample iot-device-sample.cfg is in the scripts/connectors/iotp directory. + */ +public class IotpDeviceSample { + private static final String usage = "[useDeviceClient|useManagedDevice] [useHttp] <device-cfg-path>"; + + public static void main(String[] args) throws Exception { + if (args.length == 0) + throw new Exception("Usage: " + usage); + List<String> argList = Arrays.asList(args); + boolean useDeviceClient = argList.contains("useDeviceClient"); + boolean useManagedDevice = argList.contains("useManagedDevice"); + boolean useInternalDeviceClient = !(useDeviceClient || useManagedDevice); + boolean useHttp = argList.contains("useHttp"); + String deviceCfgPath = argList.get(argList.size() - 1); + + DirectProvider tp = new DirectProvider(); + Topology topology = tp.newTopology("IotpDeviceSample"); + + Properties cfgProps = new Properties(); + cfgProps.load(new FileReader(new File(deviceCfgPath))); + + String iotpOrg = getProperty(cfgProps, "Organization-ID", "org"); + String iotpDevType = getProperty(cfgProps, "Device-Type", "type"); + String iotpDevId = getProperty(cfgProps, "Device-ID", "id"); + System.out.println("org: " + iotpOrg); + System.out.println("DeviceType: " + iotpDevType); + System.out.println("DeviceId: " + iotpDevId); + + System.out.println("device clientId: " + "d:"+iotpOrg+":"+iotpDevType+":"+iotpDevId); + System.out.println("WIoTP host: " + iotpOrg+".messaging.internetofthings.ibmcloud.com"); + System.out.println("evt topic: " + "iot-2/type/"+iotpDevType+"/id/"+iotpDevId+"/evt/+/fmt/json"); + System.out.println("cmd topic: " + "iot-2/type/"+iotpDevType+"/id/"+iotpDevId+"/cmd/+/fmt/json"); + System.out.println("mosquitto_pub -u <api-auth-key> -P <api-quth-token> -h "+iotpOrg+".messaging.internetofthings.ibmcloud.com -p 1883 -i a:"+iotpOrg+":appId1 -t iot-2/type/"+iotpDevType+"/id/"+iotpDevId+"/cmd/cmd-1/fmt/json -m '{}'"); + System.out.println("mosquitto_sub -d -u <api-auth-key> -P <api-quth-token> -h "+iotpOrg+".messaging.internetofthings.ibmcloud.com -p 1883 -i a:"+iotpOrg+":appId2 -t iot-2/type/+/id/+/evt/+/fmt/+"); + + IotpDevice device; + if (useInternalDeviceClient) { + System.out.println("Using internal DeviceClient"); + device = new IotpDevice(topology, cfgProps); + } + else if (useDeviceClient) { + System.out.println("Using WIoTP DeviceClient"); + device = new IotpDevice(topology, new DeviceClient(cfgProps)); + } + else if (useManagedDevice) { + System.out.println("Using WIoTP ManagedDevice"); + DeviceData deviceData = new DeviceData.Builder().build(); + device = new IotpDevice(topology, new ManagedDevice(cfgProps, deviceData)); + } + else + throw new Exception("woops"); + + Random r = new Random(); + TStream<double[]> raw = topology.poll(() -> { + double[] v = new double[3]; + + v[0] = r.nextGaussian() * 10.0 + 40.0; + v[1] = r.nextGaussian() * 10.0 + 50.0; + v[2] = r.nextGaussian() * 10.0 + 60.0; + + return v; + }, 3, TimeUnit.SECONDS); + + TStream<JsonObject> json = raw.map(v -> { + JsonObject j = new JsonObject(); + j.addProperty("temp", v[0]); + j.addProperty("humidity", v[1]); + j.addProperty("objectTemp", v[2]); + return j; + }); + + if (!useHttp) { + device.events(json, "sensors", QoS.FIRE_AND_FORGET); + } + else { + System.out.println("Publishing events using HTTP"); + device.httpEvents(json, "sensors"); + } + + device.commands().sink(jo -> System.out.println("Received cmd: " + jo)); + + tp.submit(topology); + } + + private static String getProperty(Properties props, String... keys) { + for (String key : keys) { + String val = props.getProperty(key); + if (val != null) + return val; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpGWDeviceSample.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpGWDeviceSample.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpGWDeviceSample.java new file mode 100644 index 0000000..da665a8 --- /dev/null +++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpGWDeviceSample.java @@ -0,0 +1,198 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.samples.connectors.iotp; + +import java.io.File; +import java.io.FileReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.connectors.iot.QoS; +import org.apache.edgent.connectors.iotp.IotpGateway; +import org.apache.edgent.providers.direct.DirectProvider; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import com.google.gson.JsonObject; +import com.ibm.iotf.client.gateway.GatewayClient; +import com.ibm.iotf.devicemgmt.DeviceData; +import com.ibm.iotf.devicemgmt.gateway.ManagedGateway; + +/** + * Similar to IotpQuickstart2 but for a real/non-quickstart WIoTP account + * and a registered IoT Gateway device with connected devices + * AND it subscribes to/prints device cmds. + * <P> + * Use IotpAppClient or any other technique to generate cmds. + * e.g., mosquitto_{pub,sub} cmds are printed below. + * <P> + * This sample demonstrates: + * <UL> + * <LI>Using the IotpGateway connector</LI> + * <LI>Initializing the IotpGateway connector using the WIoTP API objects</LI> + * <LI>Publishing and subscribing to Gateway device events and commands</LI> + * <LI>Publishing and subscribing to connected device events and commands</LI> + * </UL> + * <p> + * This connects to your IBM Watson IoT Platform service + * as the Gateway defined in a gateway config file. + * The file format is the standard one for IBM Watson IoT Platform. + * <p> + * Note, the config file also contains some additional information for this application. + * A sample iot-gwdevice-sample.cfg is in the scripts/connectors/iotp directory. + */ +public class IotpGWDeviceSample { + + private static final String usage = "[useGatewayClient|useManagedGateway] [useHttp] <device-cfg-path>"; + + public static void main(String[] args) throws Exception { + if (args.length == 0) + throw new Exception("Usage: " + usage); + List<String> argList = Arrays.asList(args); + boolean useGatewayClient = argList.contains("useGatewayClient"); + boolean useManagedGateway = argList.contains("useManagedGateway"); + boolean useInternalGatewayClient = !(useGatewayClient || useManagedGateway); + boolean useHttp = argList.contains("useHttp"); + String deviceCfgPath = argList.get(argList.size() - 1); + + DirectProvider tp = new DirectProvider(); + Topology topology = tp.newTopology("IotpGWDeviceSample"); + + Properties cfgProps = new Properties(); + cfgProps.load(new FileReader(new File(deviceCfgPath))); + + String iotpOrg = getProperty(cfgProps, "Organization-ID", "org"); + String iotpGWDevType = getProperty(cfgProps, "Gateway-Type", "Device-Type", "type"); + String iotpGWDevId = getProperty(cfgProps, "Gateway-ID", "Device-ID", "id"); + System.out.println("orgId: " + iotpOrg); + System.out.println("GWDeviceType: " + iotpGWDevType); + System.out.println("GWDeviceId: " + iotpGWDevId); + + System.out.println("GW device clientId: " + "g:"+iotpOrg+":"+iotpGWDevType+":"+iotpGWDevId); + System.out.println("WIoTP host: " + iotpOrg+".messaging.internetofthings.ibmcloud.com"); + System.out.println("GW evt topic: " + "iot-2/type/"+iotpGWDevType+"/id/"+iotpGWDevId+"/evt/+/fmt/json"); + System.out.println("GW cmd topic: " + "iot-2/type/"+iotpGWDevType+"/id/"+iotpGWDevId+"/cmd/+/fmt/json"); + System.out.println("GW mosquitto_pub -u <api-auth-key> -P <api-auth-token> -h "+iotpOrg+".messaging.internetofthings.ibmcloud.com -p 1883 -i a:"+iotpOrg+":appId1 -t iot-2/type/"+iotpGWDevType+"/id/"+iotpGWDevId+"/cmd/cmd-1/fmt/json -m '{}'"); + System.out.println("GW mosquitto_sub -d -u <api-auth-key> -P <api-auth-token> -h "+iotpOrg+".messaging.internetofthings.ibmcloud.com -p 1883 -i a:"+iotpOrg+":appId2 -t iot-2/type/+/id/+/evt/+/fmt/+"); + + String iotpCnDev1Type = cfgProps.getProperty("cn-dev1-type"); + String iotpCnDev1Id = cfgProps.getProperty("cn-dev1-id"); + System.out.println("cn-dev1 clientId: " + "d:"+iotpOrg+":"+iotpCnDev1Type+":"+iotpCnDev1Id); + System.out.println("cn-dev1 evt topic: " + "iot-2/type/"+iotpCnDev1Type+"/id/"+iotpCnDev1Id+"/evt/+/fmt/json"); + System.out.println("cn-dev1 cmd topic: " + "iot-2/type/"+iotpCnDev1Type+"/id/"+iotpCnDev1Id+"/cmd/+/fmt/json"); + System.out.println("cn-dev1 mosquitto_pub -u <api-auth-key> -P <api-quth-token> -h "+iotpOrg+".messaging.internetofthings.ibmcloud.com -p 1883 -i a:"+iotpOrg+":appId1 -t iot-2/type/"+iotpCnDev1Type+"/id/"+iotpCnDev1Id+"/cmd/cmd-1/fmt/json -m '{}'"); + + IotpGateway gwDevice; + if (useInternalGatewayClient) { + System.out.println("Using internal GatewayClient"); + gwDevice = new IotpGateway(topology, cfgProps); + } + else if (useGatewayClient) { + System.out.println("Using WIoTP GatewayClient"); + gwDevice = new IotpGateway(topology, new GatewayClient(cfgProps)); + } + else if (useManagedGateway) { + System.out.println("Using WIoTP ManagedGateway"); + DeviceData deviceData = new DeviceData.Builder().build(); + gwDevice = new IotpGateway(topology, new ManagedGateway(cfgProps, deviceData)); + } + else + throw new IllegalStateException("woops"); + + + System.out.println("GW fqDeviceId: " + gwDevice.getDeviceId()); + + Map<String,String> devAttrMap = new HashMap<>(); + devAttrMap.put(IotpGateway.ATTR_DEVICE_TYPE, iotpCnDev1Type); + devAttrMap.put(IotpGateway.ATTR_DEVICE_ID, iotpCnDev1Id); + String cnDev1FqDeviceId = gwDevice.getIotDeviceId(devAttrMap); + IotDevice cnDev1Device = gwDevice.getIotDevice(cnDev1FqDeviceId); + System.out.println("cn-dev1 fqDeviceId: " + cnDev1FqDeviceId); + + Random r = new Random(); + TStream<double[]> raw = topology.poll(() -> { + double[] v = new double[3]; + + v[0] = r.nextGaussian() * 10.0 + 40.0; + v[1] = r.nextGaussian() * 10.0 + 50.0; + v[2] = r.nextGaussian() * 10.0 + 60.0; + + return v; + }, 3, TimeUnit.SECONDS); + + // Create a stream of Gateway device events + TStream<JsonObject> gwJson = raw.map(v -> { + JsonObject jo2 = new JsonObject(); + jo2.addProperty("gw-fqDeviceId", gwDevice.getDeviceId()); + jo2.addProperty("temp", v[0]); + return jo2; + }); + + // Create a stream of a connected device's events + TStream<JsonObject> cnDev1Json = raw.map(v -> { + JsonObject jo2 = new JsonObject(); + jo2.addProperty("cnDev1-fqDeviceId", cnDev1Device.getDeviceId()); + jo2.addProperty("humidity", v[1]); + return jo2; + }); + + if (!useHttp) { + gwDevice.events(gwJson, "gw-device", QoS.FIRE_AND_FORGET); + gwDevice.eventsForDevice(cnDev1FqDeviceId, cnDev1Json, "gw-events-for-cnDev1", QoS.FIRE_AND_FORGET); + cnDev1Device.events(cnDev1Json, "cnDev1-events", QoS.FIRE_AND_FORGET); + } + else { + System.out.println("Publishing events using HTTP"); + throw new IllegalStateException("GW httpEvents is NYI"); + // device.httpEvents(json, "sensors"); + } + + // should report cmds for ALL devices - gw+dev + gwDevice.commandsForDevice(Collections.emptySet()).sink(jo -> System.out.println("Received all-cmds cmd: " + jo)); + + // just GW device cmds + gwDevice.commands().sink(jo -> System.out.println("Received gwDevice cmd: " + jo)); + + // just cnDev1 device cmds + gwDevice.commandsForDevice(cnDev1FqDeviceId).sink(jo -> System.out.println("Received gwDevice-for-cnDev1 cmd: " + jo)); + cnDev1Device.commands().sink(jo -> System.out.println("Received cnDev1 cmd: " + jo)); + + // just cmds for a specific device type + gwDevice.commandsForType(iotpGWDevType).sink(jo -> System.out.println("Received for-type-gwDeviceType cmd: " + jo)); + gwDevice.commandsForType(iotpCnDev1Type).sink(jo -> System.out.println("Received for-type-cnDev1DeviceType cmd: " + jo)); + + tp.submit(topology); + } + + private static String getProperty(Properties props, String... keys) { + for (String key : keys) { + String val = props.getProperty(key); + if (val != null) + return val; + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/iotp-app-client.cfg ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/iotp-app-client.cfg b/scripts/connectors/iotp/iotp-app-client.cfg new file mode 100644 index 0000000..177e6b5 --- /dev/null +++ b/scripts/connectors/iotp/iotp-app-client.cfg @@ -0,0 +1,25 @@ +# configuration properties for the IotpAppSample app + +[application] +# WIoTP defined application configuration properties + +Organization-ID = +id = +Authentication-Method = apikey +API-Key = +Authentication-Token = + +# -------------------------------------------------------- +# Input for the sample app, not WIoTP App props + +# Non-gateway mode target registered device +# Corresponding values from the iotp-device-sample.cfg file +deviceType = +deviceId = + +# Gateway mode target registered gateway device and the connected device +# Corresponding values from the iotp-gwdevice-sample.cfg file +gwDeviceType = +gwDeviceId = +cn-dev1-type = myCnDev1Type +cn-dev1-id = myCnDev1Id http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/iotp-device-sample.cfg ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/iotp-device-sample.cfg b/scripts/connectors/iotp/iotp-device-sample.cfg new file mode 100644 index 0000000..3a2aefb --- /dev/null +++ b/scripts/connectors/iotp/iotp-device-sample.cfg @@ -0,0 +1,10 @@ +# configuration properties for IotpDeviceSample + +[device] +# WIoTP defined IoT device configuration properties + +Organization-ID = +Device-Type = +Device-ID = +Authentication-Method = token +Authentication-Token = http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/iotp-gwdevice-sample.cfg ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/iotp-gwdevice-sample.cfg b/scripts/connectors/iotp/iotp-gwdevice-sample.cfg new file mode 100644 index 0000000..f1f4009 --- /dev/null +++ b/scripts/connectors/iotp/iotp-gwdevice-sample.cfg @@ -0,0 +1,17 @@ +# configuration properties for IotpGWDeviceSample + +[device] +# WIoTP defined IoT Gateway device configuration properties + +Organization-ID = zr2b4z +Gateway-Type = +Gateway-ID = +Authentication-Method = token +Authentication-Token = + +# -------------------------------------------------------- +# Input for the sample app, not WIoTP device props + +# a type and id for a device connected to the gateway - pick anything +cn-dev1-type = myCnDev1Type +cn-dev1-id = myCnDev1Id http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/run-iotp-app-client.sh ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/run-iotp-app-client.sh b/scripts/connectors/iotp/run-iotp-app-client.sh new file mode 100755 index 0000000..ee20908 --- /dev/null +++ b/scripts/connectors/iotp/run-iotp-app-client.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +edgent=../../.. + +# Runs IBM Watson IoT Platform IotpAppClient sample. +# +# run-iotp-app-client.sh [useGw] <app-cfg-path> # see iotp-app-client.cfg +# +# Connects to WIoTP and sends device commands to the +# IotpDeviceSample or IotpGWDeviceSample device samples. +# +# This connects to your IBM Watson IoT Platform service +# as the Application defined in a application config file. +# The file format is the standard one for IBM Watson IoT Platform. +# +# Note, the config file also contains some additional information for this application. +# A sample iot-app-client.cfg is in the scripts/connectors/iotp directory. + + +export CLASSPATH=${edgent}/samples/lib/edgent.samples.connectors.jar + +# https://github.com/ibm-watson-iot/iot-java/tree/master#migration-from-release-015-to-021 +# Uncomment the following to use the pre-0.2.1 WIoTP client behavior. +# +#USE_OLD_EVENT_FORMAT=-Dcom.ibm.iotf.enableCustomFormat=false + +VM_OPTS=${USE_OLD_EVENT_FORMAT} + +java ${VM_OPTS} org.apache.edgent.samples.connectors.iotp.IotpAppClient $* http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/run-iotp-device-sample.sh ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/run-iotp-device-sample.sh b/scripts/connectors/iotp/run-iotp-device-sample.sh new file mode 100755 index 0000000..35eb6fc --- /dev/null +++ b/scripts/connectors/iotp/run-iotp-device-sample.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +edgent=../../.. + +# Runs IBM Watson IoT Platform IotpDeviceSample sample. +# +# run-iotp-device-sample.sh [useDeviceClient|useManagedDevice] [useHttp] <device-cfg-path> # see iotp-device-sample.cfg +# +# Connects to WIoTP and sends device events and receives device commands. +# +# This connects to your IBM Watson IoT Platform service +# as the Device defined in a device config file. +# The file format is the standard one for IBM Watson IoT Platform. +# +# Note, the config file also contains some additional information for this application. +# A sample iot-device-sample.cfg is in the scripts/connectors/iotp directory. + +export CLASSPATH=${edgent}/samples/lib/edgent.samples.connectors.jar + +# https://github.com/ibm-watson-iot/iot-java/tree/master#migration-from-release-015-to-021 +# Uncomment the following to use the pre-0.2.1 WIoTP client behavior. +# +#USE_OLD_EVENT_FORMAT=-Dcom.ibm.iotf.enableCustomFormat=false + +VM_OPTS=${USE_OLD_EVENT_FORMAT} + +java ${VM_OPTS} org.apache.edgent.samples.connectors.iotp.IotpDeviceSample $* http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/3401fbf7/scripts/connectors/iotp/run-iotp-gwdevice-sample.sh ---------------------------------------------------------------------- diff --git a/scripts/connectors/iotp/run-iotp-gwdevice-sample.sh b/scripts/connectors/iotp/run-iotp-gwdevice-sample.sh new file mode 100755 index 0000000..aeba5dc --- /dev/null +++ b/scripts/connectors/iotp/run-iotp-gwdevice-sample.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +edgent=../../.. + +# Runs IBM Watson IoT Platform IotpGWDeviceSample sample. +# +# run-iotp-gwdevice-sample.sh [useGatewayClient|useManagedGateway] [useHttp] <device-cfg-path> # see iotp-gwdevice-sample.cfg +# +# Connects to WIoTP and sends Gateway and connected device events and receives device commands. +# +# This connects to your IBM Watson IoT Platform service +# as the Gateway defined in a gateway config file. +# The file format is the standard one for IBM Watson IoT Platform. +# +# Note, the config file also contains some additional information for this application. +# A sample iot-gwdevice-sample.cfg is in the scripts/connectors/iotp directory. + + +export CLASSPATH=${edgent}/samples/lib/edgent.samples.connectors.jar + +# https://github.com/ibm-watson-iot/iot-java/tree/master#migration-from-release-015-to-021 +# Uncomment the following to use the pre-0.2.1 WIoTP client behavior. +# +#USE_OLD_EVENT_FORMAT=-Dcom.ibm.iotf.enableCustomFormat=false + +VM_OPTS=${USE_OLD_EVENT_FORMAT} + +java ${VM_OPTS} org.apache.edgent.samples.connectors.iotp.IotpGWDeviceSample $*