CAMEL-7386: camel-openshift component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ecdd4707 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ecdd4707 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ecdd4707 Branch: refs/heads/master Commit: ecdd4707b3a3957b4002344f18a8b84e81912700 Parents: 9d53700 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Apr 23 12:41:54 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Apr 23 12:41:54 2014 +0200 ---------------------------------------------------------------------- .../component/openshift/OpenShiftConstants.java | 3 + .../component/openshift/OpenShiftConsumer.java | 185 ++++++++++++++++++- .../component/openshift/OpenShiftEndpoint.java | 19 ++ .../component/openshift/OpenShiftHelper.java | 14 ++ .../component/openshift/OpenShiftPollMode.java | 22 +++ .../component/openshift/OpenShiftProducer.java | 11 +- .../OpenShiftConsumerOnChangeTest.java | 58 ++++++ ...penShiftConsumerOnChangeWithInitialTest.java | 58 ++++++ 8 files changed, 358 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java index dcbabbf..1dd0495 100644 --- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConstants.java @@ -20,6 +20,9 @@ public final class OpenShiftConstants { public static final String OPERATION = "CamelOpenShiftOperation"; public static final String APPLICATION = "CamelOpenShiftApplication"; + public static final String EVENT_TYPE = "CamelOpenShiftEventType"; + public static final String EVENT_OLD_STATE = "CamelOpenShiftEventOldState"; + public static final String EVENT_NEW_STATE = "CamelOpenShiftEventNewState"; private OpenShiftConstants() { } http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java index 9cbefd8..0af8ba1 100644 --- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftConsumer.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.openshift; +import java.util.HashMap; import java.util.List; +import java.util.Map; import com.openshift.client.IApplication; import com.openshift.client.IDomain; @@ -27,6 +29,9 @@ import org.apache.camel.impl.ScheduledPollConsumer; public class OpenShiftConsumer extends ScheduledPollConsumer { + private final Map<ApplicationState, ApplicationState> oldState = new HashMap<ApplicationState, ApplicationState>(); + private volatile boolean initialPoll; + public OpenShiftConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -37,6 +42,12 @@ public class OpenShiftConsumer extends ScheduledPollConsumer { } @Override + protected void doStart() throws Exception { + initialPoll = true; + super.doStart(); + } + + @Override protected int poll() throws Exception { String openshiftServer = OpenShiftHelper.getOpenShiftServer(getEndpoint()); IDomain domain = OpenShiftHelper.loginAndGetDomain(getEndpoint(), openshiftServer); @@ -44,11 +55,15 @@ public class OpenShiftConsumer extends ScheduledPollConsumer { return 0; } - List<IApplication> apps = domain.getApplications(); - - // TODO grab state - // TODO: option to only emit if state changes + if (getEndpoint().getPollMode().equals(OpenShiftPollMode.all.name())) { + return doPollAll(domain); + } else { + return doPollOnChange(domain); + } + } + protected int doPollAll(IDomain domain) { + List<IApplication> apps = domain.getApplications(); for (IApplication app : apps) { Exchange exchange = getEndpoint().createExchange(app); try { @@ -60,8 +75,168 @@ public class OpenShiftConsumer extends ScheduledPollConsumer { getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException()); } } - return apps.size(); } + protected int doPollOnChange(IDomain domain) { + + // an app can either be + // - added + // - removed + // - state changed + + Map<ApplicationState, ApplicationState> newState = new HashMap<ApplicationState, ApplicationState>(); + + List<IApplication> apps = domain.getApplications(); + for (IApplication app : apps) { + ApplicationState state = new ApplicationState(app.getUUID(), app, OpenShiftHelper.getStateForApplication(app)); + newState.put(state, state); + } + + // compute what is the delta from last time + // so we split up into 3 groups, of added/removed/changed + Map<ApplicationState, ApplicationState> added = new HashMap<ApplicationState, ApplicationState>(); + Map<ApplicationState, ApplicationState> removed = new HashMap<ApplicationState, ApplicationState>(); + Map<ApplicationState, ApplicationState> changed = new HashMap<ApplicationState, ApplicationState>(); + + for (ApplicationState state : newState.keySet()) { + if (!oldState.containsKey(state)) { + // its a new app added + added.put(state, state); + } else { + ApplicationState old = oldState.get(state); + if (old != null && !old.getState().equals(state.getState())) { + // the state changed + state.setOldState(old.getState()); + changed.put(state, state); + } + } + } + for (ApplicationState state : oldState.keySet()) { + if (!newState.containsKey(state)) { + // its a app removed + removed.put(state, state); + } + } + + // only emit if needed + int processed = 0; + if (!initialPoll || initialPoll && getEndpoint().getPollMode().equals(OpenShiftPollMode.onChangeWithInitial.name())) { + + for (ApplicationState add : added.keySet()) { + Exchange exchange = getEndpoint().createExchange(add.getApplication()); + exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "added"); + try { + processed++; + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException()); + } + } + for (ApplicationState remove : removed.keySet()) { + Exchange exchange = getEndpoint().createExchange(remove.getApplication()); + exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "removed"); + try { + processed++; + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException()); + } + } + + for (ApplicationState change : changed.keySet()) { + Exchange exchange = getEndpoint().createExchange(change.getApplication()); + exchange.getIn().setHeader(OpenShiftConstants.EVENT_TYPE, "changed"); + exchange.getIn().setHeader(OpenShiftConstants.EVENT_OLD_STATE, change.getOldState()); + exchange.getIn().setHeader(OpenShiftConstants.EVENT_NEW_STATE, change.getState()); + try { + processed++; + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error during processing exchange.", exchange, exchange.getException()); + } + } + } + + // update old state with latest state for next poll + oldState.clear(); + oldState.putAll(newState); + + initialPoll = false; + + return processed; + } + + private static final class ApplicationState { + private final String uuid; + private final IApplication application; + private final String state; + private String oldState; + + private ApplicationState(String uuid, IApplication application, String state) { + this.uuid = uuid; + this.application = application; + this.state = state; + } + + public String getUuid() { + return uuid; + } + + public IApplication getApplication() { + return application; + } + + public String getState() { + return state; + } + + public String getOldState() { + return oldState; + } + + public void setOldState(String oldState) { + this.oldState = oldState; + } + + // only use uuid and state for equals as that is what we want to use for state change detection + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ApplicationState that = (ApplicationState) o; + + if (!state.equals(that.state)) { + return false; + } + if (!uuid.equals(that.uuid)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = uuid.hashCode(); + result = 31 * result + state.hashCode(); + return result; + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java index 0735762..2fe6aef 100644 --- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftEndpoint.java @@ -45,6 +45,8 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint { private String operation; @UriParam private String application; + @UriParam + private String pollMode = OpenShiftPollMode.all.name(); public OpenShiftEndpoint(String endpointUri, Component component) { super(endpointUri, component); @@ -55,11 +57,16 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint { ObjectHelper.notEmpty(clientId, "clientId", this); ObjectHelper.notEmpty(username, "username", this); ObjectHelper.notEmpty(password, "password", this); + return new OpenShiftProducer(this); } @Override public Consumer createConsumer(Processor processor) throws Exception { + ObjectHelper.notEmpty(clientId, "clientId", this); + ObjectHelper.notEmpty(username, "username", this); + ObjectHelper.notEmpty(password, "password", this); + Consumer consumer = new OpenShiftConsumer(this, processor); configureConsumer(consumer); return consumer; @@ -135,4 +142,16 @@ public class OpenShiftEndpoint extends ScheduledPollEndpoint { public void setApplication(String application) { this.application = application; } + + public String getPollMode() { + return pollMode; + } + + public void setPollMode(String pollMode) { + this.pollMode = pollMode; + } + + public void setPollMode(OpenShiftPollMode pollMode) { + this.pollMode = pollMode.name(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java index cb7db5e..757ce0f 100644 --- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftHelper.java @@ -17,8 +17,12 @@ package org.apache.camel.component.openshift; import java.io.IOException; +import java.util.Locale; +import com.openshift.client.IApplication; import com.openshift.client.IDomain; +import com.openshift.client.IGear; +import com.openshift.client.IGearGroup; import com.openshift.client.IOpenShiftConnection; import com.openshift.client.IUser; import com.openshift.client.OpenShiftConnectionFactory; @@ -58,4 +62,14 @@ public final class OpenShiftHelper { return domain; } + + public static String getStateForApplication(IApplication application) { + for (IGearGroup group : application.getGearGroups()) { + for (IGear gear : group.getGears()) { + String state = gear.getState().name().toLowerCase(Locale.ENGLISH); + return state; + } + } + return "unknown"; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java new file mode 100644 index 0000000..64f534a --- /dev/null +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftPollMode.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.openshift; + +public enum OpenShiftPollMode { + + all, onChange, onChangeWithInitial +} http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java index d453be4..47714e8 100644 --- a/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java +++ b/components/camel-openshift/src/main/java/org/apache/camel/component/openshift/OpenShiftProducer.java @@ -80,6 +80,8 @@ public class OpenShiftProducer extends DefaultProducer { protected void doList(Exchange exchange, IDomain domain) { StringBuilder sb = new StringBuilder("{\n \"applications\": ["); + // TODO: option to output as pojo or json + boolean first = true; for (IApplication application : domain.getApplications()) { if (!first) { @@ -203,13 +205,8 @@ public class OpenShiftProducer extends DefaultProducer { if (app == null) { throw new CamelExchangeException("Application with id " + name + " not found.", exchange); } else { - for (IGearGroup group : app.getGearGroups()) { - for (IGear gear : group.getGears()) { - String state = gear.getState().name().toLowerCase(Locale.ENGLISH); - exchange.getIn().setBody(state); - break; - } - } + String state = OpenShiftHelper.getStateForApplication(app); + exchange.getIn().setBody(state); } } http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java new file mode 100644 index 0000000..4b7199c --- /dev/null +++ b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.openshift; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class OpenShiftConsumerOnChangeTest extends CamelTestSupport { + + private String username; + private String password; + + @Override + public void setUp() throws Exception { + // INSERT credentials here + username = null; + password = null; + super.setUp(); + } + + @Test + public void testConsumer() throws Exception { + if (username == null) { + return; + } + + getMockEndpoint("mock:result").expectedMinimumMessageCount(1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("openshift:myApp?username=%s&password=%s&delay=5s&pollMode=onChange", username, password) + .log("Event ${header.CamelOpenShiftEventType} for app ${body.name}") + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/ecdd4707/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java ---------------------------------------------------------------------- diff --git a/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java new file mode 100644 index 0000000..7502ab9 --- /dev/null +++ b/components/camel-openshift/src/test/java/org/apache/camel/component/openshift/OpenShiftConsumerOnChangeWithInitialTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.openshift; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class OpenShiftConsumerOnChangeWithInitialTest extends CamelTestSupport { + + private String username; + private String password; + + @Override + public void setUp() throws Exception { + // INSERT credentials here + username = null; + password = null; + super.setUp(); + } + + @Test + public void testConsumer() throws Exception { + if (username == null) { + return; + } + + getMockEndpoint("mock:result").expectedMinimumMessageCount(1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("openshift:myApp?username=%s&password=%s&delay=5s&pollMode=onChangeWithInitial", username, password) + .log("Event ${header.CamelOpenShiftEventType} for app ${body.name}") + .to("mock:result"); + } + }; + } +}