http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index 178c604..4cbd11a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -528,11 +528,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { } /** {@inheritDoc} */ - @Override public final ClusterGroup forStreamer(@Nullable String streamerName, @Nullable String... streamerNames) { - return forPredicate(new StreamersFilter(streamerName, streamerNames)); - } - - /** {@inheritDoc} */ @Override public ClusterGroup forCacheNodes(@Nullable String cacheName, Set<CacheDistributionMode> distributionModes) { return forPredicate(new CachesFilter(cacheName, distributionModes)); @@ -700,41 +695,6 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { /** */ - private static class StreamersFilter implements IgnitePredicate<ClusterNode> { - /** */ - private static final long serialVersionUID = 0L; - - /** Streamer name. */ - private final String streamerName; - - /** Streamer names. */ - private final String[] streamerNames; - - /** - * @param streamerName Streamer name. - * @param streamerNames Streamer names. - */ - private StreamersFilter(@Nullable String streamerName, @Nullable String[] streamerNames) { - this.streamerName = streamerName; - this.streamerNames = streamerNames; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode n) { - if (!U.hasStreamer(n, streamerName)) - return false; - - if (!F.isEmpty(streamerNames)) - for (String sn : streamerNames) - if (!U.hasStreamer(n, sn)) - return false; - - return true; - } - } - - /** - */ private static class AttributeFilter implements IgnitePredicate<ClusterNode> { /** */ private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index 960bacd..75386e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -213,11 +213,6 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ - @Override public ClusterGroup forStreamer(String streamerName, @Nullable String... streamerNames) { - return cluster.forStreamer(streamerName, streamerNames); - } - - /** {@inheritDoc} */ @Override public ClusterGroup forRemotes() { return cluster.forRemotes(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index d69a809..f54d85f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.processors.rest.handlers.task.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -445,21 +444,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 79: - msg = new GridStreamerCancelRequest(); - - break; - - case 80: - msg = new GridStreamerExecutionRequest(); - - break; - - case 81: - msg = new GridStreamerResponse(); - - break; - case 82: msg = new JobStealingRequest(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java index e81cf38..52c8b0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerLoaderStore.java @@ -494,7 +494,6 @@ public class GridDeploymentPerLoaderStore extends GridDeploymentStoreAdapter { ClassLoader ldr = classLoader(); ctx.cache().onUndeployed(ldr); - ctx.stream().onUndeployed(ldr); // Clear optimized marshaller's cache. if (ctx.config().getMarshaller() instanceof OptimizedMarshaller) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java index 1cfe4b8..b9c9522 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentPerVersionStore.java @@ -1257,7 +1257,6 @@ public class GridDeploymentPerVersionStore extends GridDeploymentStoreAdapter { ClassLoader ldr = classLoader(); ctx.cache().onUndeployed(ldr); - ctx.stream().onUndeployed(ldr); // Clear optimized marshaller's cache. if (ctx.config().getMarshaller() instanceof OptimizedMarshaller) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java deleted file mode 100644 index fb6cb85..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamProcessor.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; -import org.apache.ignite.streamer.index.*; -import org.apache.ignite.streamer.window.*; -import org.jetbrains.annotations.*; - -import javax.management.*; -import java.util.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; - -/** - * - */ -public class GridStreamProcessor extends GridProcessorAdapter { - /** Streamers map. */ - private Map<String, IgniteStreamerImpl> map; - - /** Registered MBeans */ - private Collection<ObjectName> mBeans; - - /** MBean server. */ - private final MBeanServer mBeanSrv; - - /** - * @param ctx Kernal context. - */ - public GridStreamProcessor(GridKernalContext ctx) { - super(ctx); - - mBeanSrv = ctx.config().getMBeanServer(); - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - super.onKernalStart(); - - if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { - for (ClusterNode n : ctx.discovery().remoteNodes()) - checkStreamer(n); - } - - for (IgniteStreamerImpl s : map.values()) { - try { - mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Streamer", - new StreamerMBeanAdapter(s), StreamerMBean.class)); - - if (log.isDebugEnabled()) - log.debug("Registered MBean for streamer: " + s.name()); - } - catch (JMException e) { - U.error(log, "Failed to register streamer MBean: " + s.name(), e); - } - - // Add mbeans for stages. - for (StreamerStage stage : s.configuration().getStages()) { - try { - mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Stage-" + stage.name(), - new StreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s), - StreamerStageMBean.class)); - - if (log.isDebugEnabled()) - log.debug("Registered MBean for streamer stage [streamer=" + s.name() + - ", stage=" + stage.name() + ']'); - } - catch (JMException e) { - U.error(log, "Failed to register streamer stage MBean [streamer=" + s.name() + - ", stage=" + stage.name() + ']', e); - } - } - - // Add mbeans for windows. - for (StreamerWindow win : s.configuration().getWindows()) { - try { - if (hasInterface(win.getClass(), StreamerWindowMBean.class)) { - mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), - "Window-" + win.name(), - (StreamerWindowMBean)win, - StreamerWindowMBean.class)); - - if (log.isDebugEnabled()) - log.debug("Registered MBean for streamer window [streamer=" + s.name() + - ", window=" + win.name() + ']'); - } - } - catch (JMException e) { - U.error(log, "Failed to register streamer window MBean [streamer=" + s.name() + - ", window=" + win.name() + ']', e); - } - - if (win instanceof StreamerWindowAdapter) { - StreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders(); - - if (idxs != null && idxs.length > 0) { - for (StreamerIndexProvider idx : idxs) { - try { - mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), - "Window-" + win.name() + "-index-" + idx.name(), idx, - StreamerIndexProviderMBean.class)); - - if (log.isDebugEnabled()) - log.debug("Registered MBean for streamer window index [streamer=" + s.name() + - ", window=" + win.name() + ", index=" + idx.name() + ']'); - } - catch (JMException e) { - U.error(log, "Failed to register streamer index MBean [streamer=" + s.name() + - ", window=" + win.name() + ", index=" + idx.name() + ']', e); - } - } - } - } - } - } - } - - /** - * Check configuration identity on local and remote nodes. - * - * @param rmtNode Remote node to check. - * @throws IgniteCheckedException If configuration mismatch detected. - */ - private void checkStreamer(ClusterNode rmtNode) throws IgniteCheckedException { - GridStreamerAttributes[] rmtAttrs = rmtNode.attribute(ATTR_STREAMER); - GridStreamerAttributes[] locAttrs = ctx.discovery().localNode().attribute(ATTR_STREAMER); - - // If local or remote streamer is not configured, nothing to validate. - if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs)) - return; - - for (GridStreamerAttributes rmtAttr : rmtAttrs) { - for (GridStreamerAttributes locAttr : locAttrs) { - if (!F.eq(rmtAttr.name(), locAttr.name())) - continue; - - if (rmtAttr.atLeastOnce() != locAttr.atLeastOnce()) - throw new IgniteCheckedException("Streamer atLeastOnce configuration flag mismatch (fix atLeastOnce flag " + - "in streamer configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property) [streamer=" + locAttr.name() + - ", locAtLeastOnce=" + locAttr.atLeastOnce() + - ", rmtAtLeastOnce=" + rmtAttr.atLeastOnce() + - ", rmtNodeId=" + rmtNode.id() + ']'); - - if (!rmtAttr.stages().equals(locAttr.stages())) - throw new IgniteCheckedException("Streamer stages configuration mismatch (fix streamer stages " + - "configuration or set " + - "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " + - "property) [streamer=" + locAttr.name() + - ", locStages=" + locAttr.stages() + - ", rmtStages=" + rmtAttr.stages() + - ", rmtNodeId=" + rmtNode.id() + ']'); - - if (rmtAttr.atLeastOnce()) { - if (rmtAttr.maxFailoverAttempts() != locAttr.maxFailoverAttempts()) - U.warn(log, "Streamer maxFailoverAttempts configuration property differs on local and remote " + - "nodes (ignore this message if it is done on purpose) [streamer=" + locAttr.name() + - ", locMaxFailoverAttempts=" + locAttr.maxFailoverAttempts() + - ", rmtMaxFailoverAttempts=" + rmtAttr.maxFailoverAttempts() + - ", rmtNodeId=" + rmtNode.id() + ']'); - - if (rmtAttr.maxConcurrentSessions() != locAttr.maxConcurrentSessions()) - U.warn(log, "Streamer maxConcurrentSessions configuration property differs on local and " + - "remote nodes (ignore this message if it is done on purpose) [streamer=" + locAttr.name() + - ", locMaxConcurrentSessions=" + locAttr.maxConcurrentSessions() + - ", rmtMaxConcurrentSessions=" + rmtAttr.maxConcurrentSessions() + - ", rmtNodeId=" + rmtNode.id() + ']'); - } - } - } - } - - /** - * Traverses class hierarchy and checks if class implements given interface. - * - * @param cls Class to check. - * @param iface Interface to search for. - * @return {@code True} if at least one parent implements given interface. - */ - private boolean hasInterface(Class<?> cls, Class<?> iface) { - while (cls != null) { - Class<?>[] interfaces = cls.getInterfaces(); - - for (Class<?> iface0 : interfaces) { - if (iface0.equals(iface)) - return true; - } - - cls = cls.getSuperclass(); - } - - return false; - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - super.start(); - - StreamerConfiguration[] cfg = ctx.config().getStreamerConfiguration(); - - if (F.isEmpty(cfg)) { - map = Collections.emptyMap(); - - return; - } - else { - int len = cfg.length; - - map = new HashMap<>(len, 1.0f); - - mBeans = new ArrayList<>(len); - } - - for (StreamerConfiguration c : cfg) { - IgniteStreamerImpl s = new IgniteStreamerImpl(ctx, c); - - s.start(); - - IgniteStreamerImpl old = map.put(c.getName(), s); - - if (old != null) { - old.stop(true); - - throw new IgniteCheckedException("Duplicate streamer name found (check configuration and " + - "assign unique name to each streamer): " + c.getName()); - } - } - - if (F.isEmpty(cfg)) - return; - - GridStreamerAttributes[] arr = new GridStreamerAttributes[cfg.length]; - - int i = 0; - - for (StreamerConfiguration c : cfg) - arr[i++] = new GridStreamerAttributes(c); - - ctx.addNodeAttribute(ATTR_STREAMER, arr); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (ctx.config().isDaemon()) - return; - - super.onKernalStop(cancel); - - if (!F.isEmpty(mBeans)) { - for (ObjectName name : mBeans) { - try { - mBeanSrv.unregisterMBean(name); - } - catch (JMException e) { - U.error(log, "Failed to unregister streamer MBean.", e); - } - } - } - - for (IgniteStreamerImpl streamer : map.values()) - streamer.onKernalStop(cancel); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - super.stop(cancel); - - for (IgniteStreamerImpl s : map.values()) - s.stop(cancel); - } - - /** - * @return Default no-name streamer. - */ - public IgniteStreamer streamer() { - return streamer(null); - } - - /** - * @param name Streamer name. - * @return Streamer for given name. - */ - public IgniteStreamer streamer(@Nullable String name) { - IgniteStreamer streamer = map.get(name); - - if (streamer == null) - throw new IllegalArgumentException("Streamer is not configured: " + name); - - return streamer; - } - - /** - * @return All configured streamers. - */ - public Collection<IgniteStreamer> streamers() { - Collection<IgniteStreamerImpl> streamers = map.values(); - - Collection<IgniteStreamer> res = new ArrayList<>(streamers.size()); - - streamers.addAll(map.values()); - - return res; - } - - /** - * Callback for undeployed class loaders. - * - * @param ldr Class loader. - */ - public void onUndeployed(ClassLoader ldr) { - for (IgniteStreamerEx streamer : map.values()) - streamer.onUndeploy(ldr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java deleted file mode 100644 index e4c3094..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerAttributes.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * - */ -public class GridStreamerAttributes implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private String name; - - /** Stages. */ - private Collection<IgniteBiTuple<String, String>> stages; - - /** At least once flag. */ - private boolean atLeastOnce; - - /** Max failover attempts. */ - private int maxFailoverAttempts; - - /** Max concurrent sessions. */ - private int maxConcurrentSes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridStreamerAttributes() { - // No-op. - } - - /** - * @param cfg Streamer configuration. - */ - public GridStreamerAttributes(StreamerConfiguration cfg) { - atLeastOnce = cfg.isAtLeastOnce(); - maxConcurrentSes = cfg.getMaximumConcurrentSessions(); - maxFailoverAttempts = cfg.getMaximumFailoverAttempts(); - name = cfg.getName(); - - stages = new LinkedList<>(); - - if (!F.isEmpty(cfg.getStages())) { - for (StreamerStage stage : cfg.getStages()) - stages.add(F.t(stage.name(), stage.getClass().getName())); - } - } - - /** - * @return Name. - */ - @Nullable public String name() { - return name; - } - - /** - * @return Streamer stages. - */ - public Collection<IgniteBiTuple<String, String>> stages() { - return stages; - } - - /** - * @return At least once flag. - */ - public boolean atLeastOnce() { - return atLeastOnce; - } - - /** - * @return Max failover attempts. - */ - public int maxFailoverAttempts() { - return maxFailoverAttempts; - } - - /** - * @return Max concurrent sessions. - */ - public int maxConcurrentSessions() { - return maxConcurrentSes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(atLeastOnce); - out.writeInt(maxConcurrentSes); - out.writeInt(maxFailoverAttempts); - U.writeString(out, name); - U.writeCollection(out, stages); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - atLeastOnce = in.readBoolean(); - maxConcurrentSes = in.readInt(); - maxFailoverAttempts = in.readInt(); - name = U.readString(in); - stages = U.readCollection(in); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java deleted file mode 100644 index c7669b7..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.io.*; -import java.nio.*; - -/** - * Streamer cancel request. - */ -public class GridStreamerCancelRequest implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Cancelled future ID. */ - private IgniteUuid cancelledFutId; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridStreamerCancelRequest() { - // No-op. - } - - /** - * @param cancelledFutId Cancelled future ID. - */ - public GridStreamerCancelRequest(IgniteUuid cancelledFutId) { - this.cancelledFutId = cancelledFutId; - } - - /** - * @return Cancelled future ID. - */ - public IgniteUuid cancelledFutureId() { - return cancelledFutId; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeIgniteUuid("cancelledFutId", cancelledFutId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - cancelledFutId = reader.readIgniteUuid("cancelledFutId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 79; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java deleted file mode 100644 index 4421ad6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextDelegate.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Context delegate allowing to override next stage name. - */ -public class GridStreamerContextDelegate implements StreamerContext { - /** Context delegate. */ - private StreamerContext delegate; - - /** Next stage name. */ - private String nextStageName; - - /** - * @param delegate Delegate object. - * @param nextStageName Next stage name. - */ - public GridStreamerContextDelegate(StreamerContext delegate, @Nullable String nextStageName) { - this.delegate = delegate; - this.nextStageName = nextStageName; - } - - /** {@inheritDoc} */ - @Override public ClusterGroup projection() { - return delegate.projection(); - } - - /** {@inheritDoc} */ - @Override public <K, V> ConcurrentMap<K, V> localSpace() { - return delegate.localSpace(); - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window() { - return delegate.window(); - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window(String winName) { - return delegate.window(winName); - } - - /** {@inheritDoc} */ - @Override public String nextStageName() { - return nextStageName; - } - - /** {@inheritDoc} */ - @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) { - return delegate.query(clo); - } - - /** {@inheritDoc} */ - @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes) { - return delegate.query(clo, nodes); - } - - /** {@inheritDoc} */ - @Override public void broadcast(IgniteInClosure<StreamerContext> clo) { - delegate.broadcast(clo); - } - - /** {@inheritDoc} */ - @Override public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) { - delegate.broadcast(clo, nodes); - } - - /** {@inheritDoc} */ - @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) { - return delegate.reduce(clo, rdc); - } - - /** {@inheritDoc} */ - @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc, - Collection<ClusterNode> nodes) { - return delegate.reduce(clo, rdc, nodes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java deleted file mode 100644 index c2a992d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerContextImpl.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.streamer.task.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Streamer context implementation. - */ -public class GridStreamerContextImpl implements StreamerContext { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Local space. */ - private final ConcurrentMap<Object, Object> locSpace = new ConcurrentHashMap8<>(); - - /** Streamer projection. */ - private AtomicReference<ClusterGroup> streamPrj = new AtomicReference<>(); - - /** Streamer. */ - private IgniteStreamerEx streamer; - - /** Next stage name. */ - private String nextStageName; - - /** - * @param ctx Kernal context. - * @param cfg Streamer configuration. - * @param streamer Streamer impl. - */ - public GridStreamerContextImpl(GridKernalContext ctx, StreamerConfiguration cfg, IgniteStreamerEx streamer) { - assert ctx != null; - assert cfg != null; - assert streamer != null; - - this.ctx = ctx; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public ClusterGroup projection() { - ctx.gateway().readLock(); - - try { - return projection0(); - } - finally { - ctx.gateway().readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public <K, V> ConcurrentMap<K, V> localSpace() { - return (ConcurrentMap<K, V>)locSpace; - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window() { - return streamer.window(); - } - - /** {@inheritDoc} */ - @Override public <E> StreamerWindow<E> window(String winName) { - StreamerWindow<E> window = streamer.window(winName); - - if (window == null) - throw new IllegalArgumentException("Streamer window is not configured: " + winName); - - return window; - } - - /** {@inheritDoc} */ - @Override public String nextStageName() { - return nextStageName; - } - - /** - * Sets next stage name for main context. - * - * @param nextStageName Next stage name. - */ - public void nextStageName(String nextStageName) { - this.nextStageName = nextStageName; - } - - /** {@inheritDoc} */ - @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) { - return query(clo, Collections.<ClusterNode>emptyList()); - } - - /** {@inheritDoc} */ - @Override public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes) { - ctx.gateway().readLock(); - - try { - ClusterGroup prj = projection0(); - - if (!F.isEmpty(nodes)) - prj = prj.forNodes(nodes); - - long startTime = U.currentTimeMillis(); - - Collection<R> res = ctx.grid().compute(prj).execute(new GridStreamerQueryTask<>(clo, streamer.name()), null); - - streamer.onQueryCompleted(U.currentTimeMillis() - startTime, prj.nodes().size()); - - return res; - } - finally { - ctx.gateway().readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public void broadcast(IgniteInClosure<StreamerContext> clo) { - broadcast(clo, Collections.<ClusterNode>emptyList()); - } - - /** {@inheritDoc} */ - @Override public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) { - ctx.gateway().readLock(); - - try { - ClusterGroup prj = projection0(); - - if (!F.isEmpty(nodes)) - prj = prj.forNodes(nodes); - - ctx.grid().compute(prj).execute(new GridStreamerBroadcastTask(clo, streamer.name()), null); - } - finally { - ctx.gateway().readUnlock(); - } - } - - /** {@inheritDoc} */ - @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) { - return reduce(clo, rdc, Collections.<ClusterNode>emptyList()); - } - - /** {@inheritDoc} */ - @Override public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc, - Collection<ClusterNode> nodes) { - ctx.gateway().readLock(); - - try { - ClusterGroup prj = projection0(); - - if (!F.isEmpty(nodes)) - prj = prj.forNodes(nodes); - - return ctx.grid().compute(prj).execute(new GridStreamerReduceTask<>(clo, rdc, streamer.name()), null); - } - finally { - ctx.gateway().readUnlock(); - } - } - - /** - * @return Streamer projection without grabbing read lock. - */ - private ClusterGroup projection0() { - ClusterGroup prj = streamPrj.get(); - - if (prj == null) { - prj = ctx.grid().cluster().forStreamer(streamer.name()); - - streamPrj.compareAndSet(null, prj); - - prj = streamPrj.get(); - } - - return prj; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java deleted file mode 100644 index 055ed8f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionBatch.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Streamer execution batch which should be processed on one node. - */ -public class GridStreamerExecutionBatch implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Execution ID (ID of root future). */ - private IgniteUuid execId; - - /** Execution start timestamp. */ - private long execStartTs; - - /** Originating future ID. */ - private IgniteUuid futId; - - /** Nodes participated in this execution. */ - @GridToStringInclude - private Collection<UUID> execNodeIds; - - /** Stage name. */ - private String stageName; - - /** Events to process. */ - private Collection<Object> evts; - - /** Deployment. This is transient field. */ - private transient GridDeployment dep; - - /** - * Empty constructor required by {@code Externalizable}. - */ - public GridStreamerExecutionBatch() { - // No-op. - } - - /** - * Execution batch. - * - * @param execId Execution ID. - * @param futId Future ID. - * @param execStartTs Execution start timestamp. - * @param execNodeIds Nodes participated in this execution. - * @param stageName Stage name to execute. - * @param evts Events to process. - */ - public GridStreamerExecutionBatch( - IgniteUuid execId, - long execStartTs, - IgniteUuid futId, - Collection<UUID> execNodeIds, - String stageName, - Collection<Object> evts - ) { - assert stageName != null; - - this.execId = execId; - this.futId = futId; - this.execStartTs = execStartTs; - this.execNodeIds = execNodeIds; - this.stageName = stageName; - this.evts = evts; - } - - /** - * Sets deployment. Deployment is not marshalled. - * - * @param dep Deployment for batch. - */ - public void deployment(GridDeployment dep) { - this.dep = dep; - } - - /** - * @return Batch deployment, if any. - */ - @Nullable GridDeployment deployment() { - return dep; - } - - /** - * @return Execution ID. - */ - public IgniteUuid executionId() { - return execId; - } - - /** - * @return Execution start timestamp. - */ - public long executionStartTimeStamp() { - return execStartTs; - } - - /** - * @return Batch future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Execution node IDs. - */ - public Collection<UUID> executionNodeIds() { - return execNodeIds; - } - - /** - * @return Stage name. - */ - public String stageName() { - return stageName; - } - - /** - * @return Events collection. - */ - public Collection<Object> events() { - return evts; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, execId); - out.writeLong(execStartTs); - U.writeGridUuid(out, futId); - U.writeUuids(out, execNodeIds); - out.writeUTF(stageName); - U.writeCollection(out, evts); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - execId = U.readGridUuid(in); - execStartTs = in.readLong(); - futId = U.readGridUuid(in); - execNodeIds = U.readUuids(in); - stageName = in.readUTF(); - evts = U.readCollection(in); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridStreamerExecutionBatch.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java deleted file mode 100644 index fc86c3b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.nio.*; -import java.util.*; - -/** - * - */ -public class GridStreamerExecutionRequest implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Force local deployment flag. */ - private boolean forceLocDep; - - /** Serialized batch in case if P2P class loading is enabled. */ - @GridToStringExclude - private byte[] batchBytes; - - /** Deployment mode. */ - private DeploymentMode depMode; - - /** Deployment sample class name. */ - private String sampleClsName; - - /** Deployment user version. */ - private String userVer; - - /** Node class loader participants. */ - @GridToStringInclude - @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) - private Map<UUID, IgniteUuid> ldrParticipants; - - /** Class loader ID. */ - private IgniteUuid clsLdrId; - - /** - * - */ - public GridStreamerExecutionRequest() { - // No-op. - } - - /** - * @param forceLocDep Force local deployment flag. - * @param batchBytes Batch serialized bytes. - * @param depMode Deployment mode. - * @param sampleClsName Sample class name. - * @param userVer User version. - * @param ldrParticipants Loader participants. - * @param clsLdrId Class loader ID. - */ - public GridStreamerExecutionRequest( - boolean forceLocDep, - byte[] batchBytes, - @Nullable DeploymentMode depMode, - @Nullable String sampleClsName, - @Nullable String userVer, - @Nullable Map<UUID, IgniteUuid> ldrParticipants, - @Nullable IgniteUuid clsLdrId - ) { - assert batchBytes != null; - - this.forceLocDep = forceLocDep; - this.batchBytes = batchBytes; - this.depMode = depMode; - this.sampleClsName = sampleClsName; - this.userVer = userVer; - this.ldrParticipants = ldrParticipants; - this.clsLdrId = clsLdrId; - } - - /** - * @return Force local deployment flag. - */ - public boolean forceLocalDeployment() { - return forceLocDep; - } - - /** - * @return Deployment mode. - */ - public DeploymentMode deploymentMode() { - return depMode; - } - - /** - * @return Deployment sample class name. - */ - public String sampleClassName() { - return sampleClsName; - } - - /** - * @return Deployment user version. - */ - public String userVersion() { - return userVer; - } - - /** - * @return Node class loader participants. - */ - public Map<UUID, IgniteUuid> loaderParticipants() { - return ldrParticipants; - } - - /** - * @return Class loader ID. - */ - public IgniteUuid classLoaderId() { - return clsLdrId; - } - - /** - * @return Serialized batch in case if P2P class loading is enabled. - */ - public byte[] batchBytes() { - return batchBytes; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridStreamerExecutionRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("batchBytes", batchBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid("clsLdrId", clsLdrId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 3: - if (!writer.writeBoolean("forceLocDep", forceLocDep)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeString("sampleClsName", sampleClsName)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeString("userVer", userVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - batchBytes = reader.readByteArray("batchBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - clsLdrId = reader.readIgniteUuid("clsLdrId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - byte depModeOrd; - - depModeOrd = reader.readByte("depMode"); - - if (!reader.isLastRead()) - return false; - - depMode = DeploymentMode.fromOrdinal(depModeOrd); - - reader.incrementState(); - - case 3: - forceLocDep = reader.readBoolean("forceLocDep"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - sampleClsName = reader.readString("sampleClsName"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - userVer = reader.readString("userVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 80; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 7; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java deleted file mode 100644 index 36f8822..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.nio.*; - -/** - * - */ -public class GridStreamerResponse implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteUuid futId; - - /** */ - private byte[] errBytes; - - /** - * - */ - public GridStreamerResponse() { - // No-op. - } - - /** - * @param futId Future ID. - * @param errBytes Serialized error, if any. - */ - public GridStreamerResponse(IgniteUuid futId, @Nullable byte[] errBytes) { - assert futId != null; - - this.futId = futId; - this.errBytes = errBytes; - } - - /** - * @return Future ID. - */ - public IgniteUuid futureId() { - return futId; - } - - /** - * @return Serialized error. - */ - public byte[] errorBytes() { - return errBytes; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridStreamerResponse.class, this); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeIgniteUuid("futId", futId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray("errBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - futId = reader.readIgniteUuid("futId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 81; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java deleted file mode 100644 index f05557f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerRouteFailedException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; - -/** - * Exception thrown when router did not return route map. In this case pipeline execution is stopped - * and corresponding callback is executed on originating node. - */ -public class GridStreamerRouteFailedException extends IgniteCheckedException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Error message. - */ - public GridStreamerRouteFailedException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java deleted file mode 100644 index d11ad39..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Streamer execution future. - */ -public class GridStreamerStageExecutionFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Logger. */ - private IgniteLogger log; - - /** Execution ID. */ - private final IgniteUuid execId; - - /** Execution start timestamp. */ - private final long execStartTs; - - /** Future ID. */ - private final IgniteUuid futId; - - /** Parent future ID. By the contract, global ID is sender node ID. */ - private final IgniteUuid parentFutId; - - /** Stage name. */ - private final String stageName; - - /** Events. */ - private final Collection<Object> evts; - - /** Failover attempts count. */ - private int failoverAttemptCnt; - - /** Child executions. */ - private final ConcurrentMap<UUID, GridStreamerExecutionBatch> childExecs = new ConcurrentHashMap<>(); - - /** Nodes on which this pipeline is known to be executed. */ - private final Set<UUID> execNodeIds = new GridConcurrentHashSet<>(); - - /** Streamer context. */ - @GridToStringExclude - private final IgniteStreamerEx streamer; - - /** Metrics holder. */ - @GridToStringExclude - private StreamerMetricsHolder metricsHolder; - - /** - * @param streamer Streamer extended context. - * @param execId Execution ID. If parent future ID is {@code null} then this is a root future - * and execution ID must be {@code null}. - * @param failoverAttemptCnt Number of attempts this set of events was tried to failover. - * @param execStartTs Execution start timestamp. - * @param parentFutId Parent future ID. - * @param prevExecNodes Node IDs on which pipeline was already executed. - * @param stageName Stage name to run. - * @param evts Events to process. - */ - public GridStreamerStageExecutionFuture( - IgniteStreamerEx streamer, - @Nullable IgniteUuid execId, - int failoverAttemptCnt, - long execStartTs, - @Nullable IgniteUuid parentFutId, - @Nullable Collection<UUID> prevExecNodes, - String stageName, - Collection<?> evts - ) { - assert streamer != null; - assert stageName != null; - assert evts != null; - assert !evts.isEmpty(); - assert (execId == null && parentFutId == null) || (execId != null && parentFutId != null); - - this.streamer = streamer; - futId = IgniteUuid.fromUuid(streamer.kernalContext().localNodeId()); - this.parentFutId = parentFutId; - - this.execId = parentFutId == null ? futId : execId; - this.failoverAttemptCnt = failoverAttemptCnt; - this.execStartTs = execStartTs; - - this.stageName = stageName; - this.evts = (Collection<Object>)evts; - - if (prevExecNodes != null) - execNodeIds.addAll(prevExecNodes); - - log = streamer.kernalContext().log(GridStreamerStageExecutionFuture.class); - } - - /** - * @return Future ID. - */ - public IgniteUuid id() { - return futId; - } - - /** - * Sets metrics holder to update counters when future completes. Used to avoid unnecessary listener creation. - * - * @param metricsHolder Metrics holder. - */ - public void metrics(StreamerMetricsHolder metricsHolder) { - assert metricsHolder != null; - assert rootExecution(); - - this.metricsHolder = metricsHolder; - } - - /** - * @return Failover attempt count. - */ - public int failoverAttemptCount() { - return failoverAttemptCnt; - } - - /** - * @return Stage name. - */ - public String stageName() { - return stageName; - } - - /** - * @return Events collection. - */ - public Collection<Object> events() { - return evts; - } - - /** - * Sends execution requests to remote nodes or schedules local execution if events were mapped locally. - */ - public void map() { - try { - // This will be a no-op when atLeastOnce is false, so this future will be discarded right - // after map() is executed. - streamer.onFutureMapped(this); - - StreamerEventRouter evtRouter = streamer.eventRouter(); - - Map<ClusterNode, Collection<Object>> routeMap = evtRouter.route(streamer.context(), stageName, evts); - - if (log.isDebugEnabled()) - log.debug("Mapped stage to nodes [futId=" + futId + ", stageName=" + stageName + - ", nodeIds=" + (routeMap != null ? U.nodeIds(routeMap.keySet()) : null) + ']'); - - if (F.isEmpty(routeMap)) { - U.error(log, "Failed to route events to nodes (will fail pipeline execution) " + - "[streamer=" + streamer.name() + ", stageName=" + stageName + ", evts=" + evts + ']'); - - UUID locNodeId = streamer.kernalContext().localNodeId(); - - onFailed(locNodeId, new GridStreamerRouteFailedException("Failed to route " + - "events to nodes (router returned null or empty route map) [locNodeId=" + locNodeId + ", " + - "stageName=" + stageName + ']')); - } - else { - execNodeIds.addAll(U.nodeIds(routeMap.keySet())); - - for (Map.Entry<ClusterNode, Collection<Object>> entry : routeMap.entrySet()) { - ClusterNode node = entry.getKey(); - - childExecs.put(node.id(), new GridStreamerExecutionBatch( - execId, - execStartTs, - futId, - execNodeIds, - stageName, - entry.getValue())); - } - - // Send execution requests to nodes. - streamer.scheduleExecutions(this, childExecs); - } - } - catch (IgniteCheckedException e) { - onFailed(streamer.kernalContext().localNodeId(), e); - } - } - - /** - * @return {@code True} if this future is a root execution future (i.e. initiated by streamer's addEvent call). - */ - public boolean rootExecution() { - return parentFutId == null; - } - - /** - * If not root future, will return parent node ID (may be local node ID). - * - * @return Sender node ID. - */ - @Nullable public UUID senderNodeId() { - return parentFutId == null ? null : parentFutId.globalId(); - } - - /** - * If not root future, will return parent future ID. - * - * @return Parent future ID. - */ - public IgniteUuid parentFutureId() { - return parentFutId; - } - - /** - * @return Map of child executions. - */ - public Map<UUID, GridStreamerExecutionBatch> childExecutions() { - return Collections.unmodifiableMap(childExecs); - } - - /** - * @return Execution node IDs. - */ - public Collection<UUID> executionNodeIds() { - return execNodeIds; - } - - /** - * Callback invoked when child node reports execution is completed (successfully or not). - * - * @param childNodeId Child node ID. - * @param err Exception if execution failed. - */ - public void onExecutionCompleted(UUID childNodeId, @Nullable Throwable err) { - if (log.isDebugEnabled()) - log.debug("Completed child execution for node [fut=" + this + - ", childNodeId=" + childNodeId + ", err=" + err + ']'); - - if (err != null) - onFailed(childNodeId, err); - else { - childExecs.remove(childNodeId); - - if (childExecs.isEmpty()) - onDone(); - } - } - - /** - * Callback invoked when node leaves the grid. If left node is known to participate in - * pipeline execution, cancel all locally running stages. - * - * @param leftNodeId Node ID that has left the grid. - */ - public void onNodeLeft(UUID leftNodeId) { - if (execNodeIds.contains(leftNodeId)) - onFailed(leftNodeId, new ClusterTopologyCheckedException("Failed to wait for streamer pipeline future completion " + - "(execution node has left the grid). All running stages will be cancelled " + - "[fut=" + this + ", leftNodeId=" + leftNodeId + ']')); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - if (super.onDone(res, err)) { - if (log.isDebugEnabled()) - log.debug("Completed stage execution future [fut=" + this + ", err=" + err + ']'); - - if (rootExecution() && metricsHolder != null) { - if (err != null) - metricsHolder.onSessionFinished(); - else - metricsHolder.onSessionFailed(); - } - - streamer.onFutureCompleted(this); - - return true; - } - - return false; - } - - /** - * Failed callback. - * - * @param failedNodeId Failed node ID. - * @param err Error reason. - */ - private void onFailed(UUID failedNodeId, Throwable err) { - if (log.isDebugEnabled()) - log.debug("Pipeline execution failed on node [fut=" + this + ", failedNodeId=" + failedNodeId + - ", err=" + err + ']'); - - onDone(err); - } - - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - if (!onCancelled()) - return false; - - if (log.isDebugEnabled()) - log.debug("Cancelling streamer execution future: " + this); - - streamer.onFutureCompleted(this); - - return true; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridStreamerStageExecutionFuture.class, this, "childNodes", childExecs.keySet()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java deleted file mode 100644 index d9cd56f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerWindowIterator.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Helper iterator extension which prevents regular element remove and adds removex() method tracking which element - * was actually removed. - */ -public abstract class GridStreamerWindowIterator<T> implements Iterator<T> { - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Remove element from the underlying collection and return removed element. - * - * @return Removed element or {@code null} in case no deletion occurred. - */ - @Nullable public abstract T removex(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java deleted file mode 100644 index 0158fab..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerEx.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Extended streamer context with methods intended for internal use. - */ -public interface IgniteStreamerEx extends IgniteStreamer { - /** - * @return Kernal context. - */ - public GridKernalContext kernalContext(); - - /** - * Gets streamer default window (the first one in configuration list). - * - * @return Streamer window. - */ - public <E> StreamerWindow<E> window(); - - /** - * Gets streamer window by window name. - * - * @param windowName Window name. - * @return Streamer window. - */ - @Nullable public <E> StreamerWindow<E> window(String windowName); - - /** - * Called before execution requests are sent to remote nodes or scheduled for local execution. - * - * @param fut Future. - */ - public void onFutureMapped(GridStreamerStageExecutionFuture fut); - - /** - * Called when future is completed and parent should be notified, if any. - * - * @param fut Future. - */ - public void onFutureCompleted(GridStreamerStageExecutionFuture fut); - - /** - * @return Streamer event router. - */ - public StreamerEventRouter eventRouter(); - - /** - * Schedules batch executions either on local or on remote nodes. - * - * @param fut Future. - * @param execs Executions grouped by node ID. - * @throws IgniteCheckedException If failed. - */ - public void scheduleExecutions(GridStreamerStageExecutionFuture fut, Map<UUID, GridStreamerExecutionBatch> execs) - throws IgniteCheckedException; - - /** - * Callback for undeployed class loaders. All deployed events will be removed from window and local storage. - * - * @param undeployedLdr Undeployed class loader. - */ - public void onUndeploy(ClassLoader undeployedLdr); - - /** - * Callback executed when streamer query completes. - * - * @param time Consumed time. - * @param nodes Participating nodes count. - */ - public void onQueryCompleted(long time, int nodes); -}