Repository: incubator-edgent Updated Branches: refs/heads/feature/iot-aws [created] b93dc40c2
- Started work on an AWS platform adapter Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/b93dc40c Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/b93dc40c Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/b93dc40c Branch: refs/heads/feature/iot-aws Commit: b93dc40c208dc99987ab098c16b53cd5d19ca388 Parents: 88a21af Author: Christofer Dutz <christofer.d...@c-ware.de> Authored: Wed Apr 4 13:27:43 2018 +0200 Committer: Christofer Dutz <christofer.d...@c-ware.de> Committed: Wed Apr 4 13:27:43 2018 +0200 ---------------------------------------------------------------------- connectors/iot-aws/pom.xml | 55 +++++ .../edgent/connectors/iot/aws/IotAwsDevice.java | 61 ++++++ .../connectors/iot/aws/IotAwsGateway.java | 179 ++++++++++++++++ .../connectors/iot/aws/IotAwsGatewayDevice.java | 60 ++++++ .../edgent/connectors/iot/aws/help/Command.java | 22 ++ .../iot/aws/runtime/IotAwsGatewayConnector.java | 209 +++++++++++++++++++ connectors/pom.xml | 1 + 7 files changed, 587 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/pom.xml ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/pom.xml b/connectors/iot-aws/pom.xml new file mode 100644 index 0000000..6324fab --- /dev/null +++ b/connectors/iot-aws/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-connectors</artifactId> + <version>1.3.0-SNAPSHOT</version> + </parent> + + <artifactId>edgent-connectors-iot-aws</artifactId> + + <name>Apache Edgent (Java 8): Connectors: Amazon IoT Platform</name> + + <dependencies> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-api-function</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-api-topology</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.edgent</groupId> + <artifactId>edgent-connectors-iot</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-iot-device-sdk-java</artifactId> + <version>1.1.1</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsDevice.java b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsDevice.java new file mode 100644 index 0000000..fd7645f --- /dev/null +++ b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsDevice.java @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.connectors.iot.aws; + +import com.google.gson.JsonObject; +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +public class IotAwsDevice implements IotDevice { + + @Override + public String getDeviceType() { + return null; + } + + @Override + public String getDeviceId() { + return null; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return null; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { + return null; + } + + @Override + public TStream<JsonObject> commands(String... commands) { + return null; + } + + @Override + public Topology topology() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGateway.java ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGateway.java b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGateway.java new file mode 100644 index 0000000..7a9929e --- /dev/null +++ b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGateway.java @@ -0,0 +1,179 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.connectors.iot.aws; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.connectors.iot.IotGateway; +import org.apache.edgent.connectors.iot.aws.help.Command; +import org.apache.edgent.connectors.iot.aws.runtime.IotAwsGatewayConnector; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +import java.io.File; +import java.util.*; + +public class IotAwsGateway implements IotGateway { + + private final IotAwsGatewayConnector connector; + private final Topology topology; + private TStream<Command> commandStream; + + public IotAwsGateway(Topology topology, Properties options) { + this.topology = topology; + this.connector = new IotAwsGatewayConnector(options); + } + + public IotAwsGateway(Topology topology, File optionsFile) { + this.topology = topology; + this.connector = new IotAwsGatewayConnector(optionsFile); + } + + @Override + public String getDeviceType() { + return connector.getDeviceType(); + } + + @Override + public String getDeviceId() { + return connector.getDeviceId(); + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return null; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { + return null; + } + + @Override + public TStream<JsonObject> commands(String... commands) { + return null; + } + + private TStream<Command> allCommands() { + if (commandStream == null) + commandStream = topology.events(new IotAwsGatewayCommands(connector)); + return commandStream; + } + + @Override + public Topology topology() { + return topology; + } + + @Override + public String getIotDeviceId(Map<String, String> deviceIdAttrs) { + return connector.getIotDeviceId(deviceIdAttrs); + } + + @Override + public IotDevice getIotDevice(Map<String, String> deviceIdAttrs) { + return getIotDevice(getIotDeviceId(deviceIdAttrs)); + } + + @Override + public IotDevice getIotDevice(String deviceId) { + return new IotAwsGatewayDevice(this, connector, topology, deviceId); + } + + @Override + public TSink<JsonObject> eventsForDevice(Function<JsonObject, String> deviceId, TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return stream.sink(new IotAwsGatewayDeviceEventsFunction(connector, deviceId, eventId, payload, qos)); + } + + @Override + public TSink<JsonObject> eventsForDevice(String deviceId, TStream<JsonObject> stream, String eventId, int qos) { + return stream.sink(new IotAwsGatewayDeviceEventsFixed(connector, deviceId, eventId, qos)); + } + + @Override + public TStream<JsonObject> commandsForDevice(Set<String> deviceIds, String... commands) { + TStream<Command> all = allCommands(); + + if (deviceIds.size() != 0) { + // support "all devices of type T" - fqDeviceId of typeId and "*" for the simple deviceId + boolean allDevicesOfType = deviceIds.size() == 1 + && IotAwsGatewayConnector.splitDeviceId(deviceIds.iterator().next())[1].equals("*"); + + all = all.filter(cmd -> { + String fqDeviceId = IotAwsGatewayConnector.toDeviceId(cmd.getDeviceType(), + allDevicesOfType ? "*" : cmd.getDeviceId()); + return deviceIds.contains(fqDeviceId); + }); + } + + if (commands.length != 0) { + Set<String> uniqueCommands = new HashSet<>(Arrays.asList(commands)); + all = all.filter(cmd -> uniqueCommands.contains(cmd.getCommand())); + } + + return all.map(cmd -> { + JsonObject full = new JsonObject(); + full.addProperty(IotDevice.CMD_DEVICE, + IotAwsGatewayConnector.toDeviceId(cmd.getDeviceType(), cmd.getDeviceId())); + full.addProperty(IotDevice.CMD_ID, cmd.getCommand()); + full.addProperty(IotDevice.CMD_TS, System.currentTimeMillis()); + full.addProperty(IotDevice.CMD_FORMAT, cmd.getFormat()); + if ("json".equalsIgnoreCase(cmd.getFormat())) { + JsonParser parser = new JsonParser(); + // iot-java 0.2.2 bug https://github.com/ibm-watson-iot/iot-java/issues/81 + // cmd.getData() returns byte[] instead of JsonObject (or String). + // Must continue to use the deprecated method until that's fixed. + // final JsonObject jsonPayload = (JsonObject) cmd.getData(); + // final JsonObject jsonPayload = (JsonObject) parser.parse((String)cmd.getData()); + @SuppressWarnings("deprecation") + final JsonObject jsonPayload = (JsonObject) parser.parse(cmd.getPayload()); + final JsonObject cmdData; + // wiotp java client API >= 0.2.1 (other clients earlier?) + // A json fmt command's msg payload may or may not have "d" wrapping of + // the actual command data. + // The wiotp client API doesn't mask that from clients + // so deal with that here. + if (jsonPayload.has("d")) { + cmdData = jsonPayload.getAsJsonObject("d"); + } else { + cmdData = jsonPayload; + } + full.add(IotDevice.CMD_PAYLOAD, cmdData); + } else { + full.addProperty(IotDevice.CMD_PAYLOAD, cmd.getData().toString()); + } + return full; + }); } + + @Override + public TStream<JsonObject> commandsForDevice(String deviceId, String... commands) { + return commandsForDevice(Collections.singleton(deviceId), commands); + } + + @Override + public TStream<JsonObject> commandsForType(String deviceTypeId, String... commands) { + return commandsForDevice( + Collections.singleton(IotAwsGatewayConnector.toDeviceId(deviceTypeId, "*")), commands); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGatewayDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGatewayDevice.java b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGatewayDevice.java new file mode 100644 index 0000000..19b23e1 --- /dev/null +++ b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/IotAwsGatewayDevice.java @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.connectors.iot.aws; + +import com.google.gson.JsonObject; +import org.apache.edgent.connectors.iot.IotDevice; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; +import org.apache.edgent.topology.TSink; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; + +public class IotAwsGatewayDevice implements IotDevice { + + @Override + public String getDeviceType() { + return null; + } + + @Override + public String getDeviceId() { + return null; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId, UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) { + return null; + } + + @Override + public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) { + return null; + } + + @Override + public TStream<JsonObject> commands(String... commands) { + return null; + } + + @Override + public Topology topology() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/help/Command.java ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/help/Command.java b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/help/Command.java new file mode 100644 index 0000000..0e89f36 --- /dev/null +++ b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/help/Command.java @@ -0,0 +1,22 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.connectors.iot.aws.help; + +public class Command { +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/runtime/IotAwsGatewayConnector.java ---------------------------------------------------------------------- diff --git a/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/runtime/IotAwsGatewayConnector.java b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/runtime/IotAwsGatewayConnector.java new file mode 100644 index 0000000..747e85c --- /dev/null +++ b/connectors/iot-aws/src/main/java/org/apache/edgent/connectors/iot/aws/runtime/IotAwsGatewayConnector.java @@ -0,0 +1,209 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.edgent.connectors.iot.aws.runtime; + +import com.amazonaws.services.iot.client.AWSIotConnectionStatus; +import com.amazonaws.services.iot.client.AWSIotMessage; +import com.amazonaws.services.iot.client.AWSIotMqttClient; +import com.google.gson.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.security.KeyStore; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class IotAwsGatewayConnector implements Serializable, AutoCloseable { + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(IotAwsGatewayConnector.class); + + private static final String OPTION_CLIENT_ENDPOINT = "clientEndpoint"; + private static final String OPTION_CLIENT_ID = "clientId"; + private static final String OPTION_KEYSTORE_PASSWORD = "keystorePassword"; + private static final String OPTION_KEYSTORE_FILE = "keystoreFile"; + private static final String OPTION_KEYSTORE_TYPE = "keystoreType"; + private static final String OPTION_PRIVATE_KEY_PASSWORD = "privateKeyPassword"; + private static final String OPTION_AWS_ACCESS_KEY_ID = "awsAccessKeyId"; + private static final String OPTION_AWS_SECRET_ACCESS_KEY = "awsSecretAccessKey"; + private static final String OPTION_SESSION_TOKEN = "sessionToken"; + + private Properties options; + private String deviceType; // for the gateway device + private String deviceId; // raw WIoTP deviceId for the gateway device + private String fqDeviceId; // for the gateway device + + private AWSIotMqttClient client; + + public IotAwsGatewayConnector(Properties options) { + this.options = options; + init(); + } + + public IotAwsGatewayConnector(File optionsFile) { + try { + this.options = new Properties(); + options.load(new InputStreamReader(new FileInputStream(optionsFile))); + init(); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create GatewayClient", e); + } + } + + private void init() { + try { + AWSIotMqttClient client = getClient(); + this.deviceType = client.getGWDeviceType(); + this.deviceId = client.getGWDeviceId(); + this.fqDeviceId = toFqDeviceId(deviceType, deviceId); + } catch (IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new IllegalArgumentException("Unable to create GatewayClient", e); + } + } + + synchronized AWSIotMqttClient getClient() throws Exception { + if (client == null) { + // Initialize a client instance depending on the properties provided. + String clientEndpoint = (String) options.get(OPTION_CLIENT_ENDPOINT); + String clientId = (String) options.get(OPTION_CLIENT_ID); + + // Configure an MQTT over TLS connection + if(options.containsKey(OPTION_PRIVATE_KEY_PASSWORD) && options.containsKey(OPTION_KEYSTORE_FILE)) { + String keystoreType = KeyStore.getDefaultType(); + if(options.containsKey(OPTION_KEYSTORE_TYPE)) { + keystoreType = options.getProperty(OPTION_KEYSTORE_TYPE); + } + String keystorePassword = (String) options.get(OPTION_KEYSTORE_PASSWORD); + FileInputStream keystoreFile = new FileInputStream(options.getProperty(OPTION_KEYSTORE_FILE)); + + // Initialize the keystore + KeyStore keystore = KeyStore.getInstance(keystoreType); + keystore.load(keystoreFile, (keystorePassword != null) ? keystorePassword.toCharArray() : null); + + String privateKeyPassword = (String) options.get(OPTION_PRIVATE_KEY_PASSWORD); + client = new AWSIotMqttClient(clientEndpoint, clientId, keystore, privateKeyPassword); + } + + // Configure an MQTT over Websocket connection. + else if(options.containsKey(OPTION_AWS_ACCESS_KEY_ID) && + options.containsKey(OPTION_AWS_SECRET_ACCESS_KEY)) { + String awsAccessKeyId = (String) options.get(OPTION_AWS_ACCESS_KEY_ID); + String awsSecretAccessKey = (String) options.get(OPTION_AWS_SECRET_ACCESS_KEY); + String sessionToken = (String) options.get(OPTION_SESSION_TOKEN); + client = new AWSIotMqttClient( + clientEndpoint, clientId, awsAccessKeyId, awsSecretAccessKey, sessionToken); + } + + // Well if it's not one of these, give up. + else { + throw new IllegalArgumentException("Unable to create GatewayClient. Missing properties."); + } + } + return client; + } + + synchronized AWSIotMqttClient connect() { + AWSIotMqttClient client; + try { + client = getClient(); + if (client.getConnectionStatus() == AWSIotConnectionStatus.DISCONNECTED) { + client.connect(); + } + return client; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + if (client == null) { + return; + } + + client.disconnect(); + client = null; + } + + void publishGWEvent(String eventId, JsonObject event, int qos) { + AWSIotMqttClient client; + try { + client = connect(); + } catch (Exception e) { + throw new RuntimeException(e); + + } + AWSIotMessage + if (!client.publishGatewayEvent(eventId, event, qos)) { + logger.error("Publish event failed for eventId {}", eventId); + } + } + + void publishDeviceEvent(String fqDeviceId, String eventId, JsonObject event, int qos) { + String[] devIdToks = splitFqDeviceId(fqDeviceId); + publishDeviceEvent(devIdToks[0], devIdToks[1], eventId, event, qos); + } + + void publishDeviceEvent(String deviceType, String deviceId, String eventId, JsonObject event, int qos) { + AWSIotMqttClient client; + try { + client = connect(); + } catch (Exception e) { + throw new RuntimeException(e); + + } + if (!client.publishDeviceEvent(deviceType, deviceId, eventId, event, qos)) { + logger.error("Publish event failed for eventId {}", eventId); + } + } + + public String getDeviceType() { + return deviceType; + } + + public String getFqDeviceId() { + return fqDeviceId; + } + + public String getIotDeviceId(Map<String, String> deviceIdAttrs) { + Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_TYPE), ATTR_DEVICE_TYPE); + Objects.requireNonNull(deviceIdAttrs.get(ATTR_DEVICE_ID), ATTR_DEVICE_ID); + + return toFqDeviceId(deviceIdAttrs.get(ATTR_DEVICE_TYPE), deviceIdAttrs.get(ATTR_DEVICE_ID)); + } + + public static String toFqDeviceId(String deviceType, String deviceId) { + return String.format("D/%s/%s", deviceType, deviceId); + } + + public static String[] splitFqDeviceId(String fqDeviceId) { + String[] tokens = fqDeviceId.split("/"); + if (tokens.length != 3 || !tokens[0].equals("D")) { + throw new IllegalArgumentException("bad fqDeviceId " + fqDeviceId); + } + return new String[] { tokens[1], tokens[2] }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/b93dc40c/connectors/pom.xml ---------------------------------------------------------------------- diff --git a/connectors/pom.xml b/connectors/pom.xml index 40be3b5..99da4c2 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -38,6 +38,7 @@ <module>file</module> <module>http</module> <module>iot</module> + <module>iot-aws</module> <module>iotp</module> <module>jdbc</module> <module>kafka</module>