http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java deleted file mode 100644 index 2739ad1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsAdapter.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; - -/** - * Streamer window metrics adapter. - */ -public class StreamerWindowMetricsAdapter implements StreamerWindowMetrics { - /** Window name. */ - private String name; - - /** Window size. */ - private int size; - - /** Window eviction queue size. */ - private int evictionQueueSize; - - /** - * @param m Metrics to copy. - */ - public StreamerWindowMetricsAdapter(StreamerWindowMetrics m) { - // Preserve alphabetic order for maintenance. - evictionQueueSize = m.evictionQueueSize(); - name = m.name(); - size = m.size(); - } - - /** {@inheritDoc} */ - @Override public String name() { - return name; - } - - /** {@inheritDoc} */ - @Override public int size() { - return size; - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - return evictionQueueSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerWindowMetricsAdapter.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java deleted file mode 100644 index 44e7e90..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/StreamerWindowMetricsHolder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer; - -import org.apache.ignite.streamer.*; - -/** - * Streamer window metrics holder. - */ -public class StreamerWindowMetricsHolder implements StreamerWindowMetrics { - /** Window instance. */ - private StreamerWindow window; - - /** - * @param window Streamer window. - */ - public StreamerWindowMetricsHolder(StreamerWindow window) { - this.window = window; - } - - /** {@inheritDoc} */ - @Override public String name() { - return window.name(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return window.size(); - } - - /** {@inheritDoc} */ - @Override public int evictionQueueSize() { - return window.evictionQueueSize(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java deleted file mode 100644 index c537a3b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <img alt="icon" class="javadocimg" src="{@docRoot}/img/cube.gif"/> - * TODO. - */ -package org.apache.ignite.internal.processors.streamer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java deleted file mode 100644 index 8cb7133..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer.task; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.closure.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Streamer broadcast task. - */ -public class GridStreamerBroadcastTask extends GridPeerDeployAwareTaskAdapter<Void, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** Closure. */ - private IgniteInClosure<StreamerContext> clo; - - /** Streamer. */ - private String streamer; - - /** - * @param clo Closure. - * @param streamer Streamer. - */ - public GridStreamerBroadcastTask(IgniteInClosure<StreamerContext> clo, @Nullable String streamer) { - super(U.peerDeployAware(clo)); - - this.clo = clo; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size()); - - for (ClusterNode node : subgrid) - res.put(new StreamerBroadcastJob(clo, streamer), node); - - return res; - } - - /** {@inheritDoc} */ - @Override public Void reduce(List<ComputeJobResult> results) { - return null; - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - // No failover. - if (res.getException() != null) - throw res.getException(); - - return ComputeJobResultPolicy.WAIT; - } - - /** - * Streamer broadcast job. - */ - private static class StreamerBroadcastJob extends ComputeJobAdapter implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Closure. */ - private IgniteInClosure<StreamerContext> clo; - - /** Streamer. */ - private String streamer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public StreamerBroadcastJob() { - // No-op. - } - - /** - * @param clo Closure. - * @param streamer Streamer. - */ - private StreamerBroadcastJob(IgniteInClosure<StreamerContext> clo, String streamer) { - this.clo = clo; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Object execute() { - IgniteStreamer s = g.streamer(streamer); - - assert s != null; - - clo.apply(s.context()); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(clo); - U.writeString(out, streamer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - clo = (IgniteInClosure<StreamerContext>)in.readObject(); - streamer = U.readString(in); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java deleted file mode 100644 index 38ed703..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer.task; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.closure.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Streamer query task. - */ -public class GridStreamerQueryTask<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { - /** */ - private static final long serialVersionUID = 0L; - - /** Query closure. */ - private IgniteClosure<StreamerContext, R> qryClos; - - /** Streamer. */ - private String streamer; - - /** - * @param qryClos Query closure. - * @param streamer Streamer. - */ - public GridStreamerQueryTask(IgniteClosure<StreamerContext, R> qryClos, @Nullable String streamer) { - super(U.peerDeployAware(qryClos)); - - this.qryClos = qryClos; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size()); - - for (ClusterNode node : subgrid) - res.put(new QueryJob<>(qryClos, streamer), node); - - return res; - } - - /** {@inheritDoc} */ - @Override public Collection<R> reduce(List<ComputeJobResult> results) { - Collection<R> res = new ArrayList<>(results.size()); - - for (ComputeJobResult jobRes : results) - res.add(jobRes.<R>getData()); - - return res; - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - // No failover for this task. - if (res.getException() != null) - throw res.getException(); - - return ComputeJobResultPolicy.WAIT; - } - - /** - * Query job. - */ - private static class QueryJob<R> extends ComputeJobAdapter implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Query closure. */ - private IgniteClosure<StreamerContext, R> qryClos; - - /** Streamer. */ - private String streamer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public QueryJob() { - // No-op. - } - - /** - * @param qryClos Query closure. - * @param streamer Streamer. - */ - private QueryJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) { - this.qryClos = qryClos; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Object execute() { - IgniteStreamer s = g.streamer(streamer); - - assert s != null; - - return qryClos.apply(s.context()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(qryClos); - U.writeString(out, streamer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - qryClos = (IgniteClosure<StreamerContext, R>)in.readObject(); - streamer = U.readString(in); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java deleted file mode 100644 index e9bd436..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.streamer.task; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.closure.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Streamer query task. - */ -@ComputeTaskNoResultCache -public class GridStreamerReduceTask<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> { - /** */ - private static final long serialVersionUID = 0L; - - /** Query closure. */ - private IgniteClosure<StreamerContext, R1> clos; - - /** Reducer. */ - private IgniteReducer<R1, R2> rdc; - - /** Streamer. */ - private String streamer; - - /** - * @param clos Query closure. - * @param rdc Query reducer. - * @param streamer Streamer. - */ - public GridStreamerReduceTask(IgniteClosure<StreamerContext, R1> clos, IgniteReducer<R1, R2> rdc, - @Nullable String streamer) { - super(U.peerDeployAware(clos)); - - this.clos = clos; - this.rdc = rdc; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) { - Map<ComputeJob, ClusterNode> res = U.newHashMap(subgrid.size()); - - for (ClusterNode node : subgrid) - res.put(new ReduceJob<>(clos, streamer), node); - - return res; - } - - /** {@inheritDoc} */ - @Override public R2 reduce(List<ComputeJobResult> results) { - return rdc.reduce(); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - // No failover for this task. - if (res.getException() != null) - throw res.getException(); - - rdc.collect(res.<R1>getData()); - - return ComputeJobResultPolicy.WAIT; - } - - /** - * Query job. - */ - private static class ReduceJob<R> extends ComputeJobAdapter implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite g; - - /** Query closure. */ - private IgniteClosure<StreamerContext, R> qryClos; - - /** Streamer. */ - private String streamer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public ReduceJob() { - // No-op. - } - - /** - * @param qryClos Query closure. - * @param streamer Streamer. - */ - private ReduceJob(IgniteClosure<StreamerContext, R> qryClos, String streamer) { - this.qryClos = qryClos; - this.streamer = streamer; - } - - /** {@inheritDoc} */ - @Override public Object execute() { - IgniteStreamer s = g.streamer(streamer); - - assert s != null; - - return qryClos.apply(s.context()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(qryClos); - U.writeString(out, streamer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - qryClos = (IgniteClosure<StreamerContext, R>)in.readObject(); - streamer = U.readString(in); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 7804c9d..f095438 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.mxbean.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.ipc.shmem.*; @@ -7056,28 +7055,6 @@ public abstract class IgniteUtils { } /** - * Checks if given node has specified streamer started. - * - * @param n Node to check. - * @param streamerName Streamer name to check. - * @return {@code True} if given node has specified streamer started. - */ - public static boolean hasStreamer(ClusterNode n, @Nullable String streamerName) { - assert n != null; - - GridStreamerAttributes[] attrs = n.attribute(ATTR_STREAMER); - - if (attrs != null) { - for (GridStreamerAttributes attr : attrs) { - if (F.eq(streamerName, attr.name())) - return true; - } - } - - return false; - } - - /** * Gets cache mode or a cache on given node or {@code null} if cache is not * present on given node. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java index 9a7458e..f6243c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.cache.*; -import org.apache.ignite.internal.visor.streamer.*; import java.io.*; import java.util.*; @@ -75,9 +74,6 @@ public class VisorGridConfiguration implements Serializable { /** Igfss. */ private Iterable<VisorIgfsConfiguration> igfss; - /** Streamers. */ - private Iterable<VisorStreamerConfiguration> streamers; - /** Environment. */ private Map<String, String> env; @@ -112,7 +108,6 @@ public class VisorGridConfiguration implements Serializable { userAttrs = c.getUserAttributes(); caches = VisorCacheConfiguration.list(ignite, c.getCacheConfiguration()); igfss = VisorIgfsConfiguration.list(c.getFileSystemConfiguration()); - streamers = VisorStreamerConfiguration.list(c.getStreamerConfiguration()); env = new HashMap<>(System.getenv()); sysProps = IgniteSystemProperties.snapshot(); atomic = VisorAtomicConfiguration.from(c.getAtomicConfiguration()); @@ -213,13 +208,6 @@ public class VisorGridConfiguration implements Serializable { } /** - * @return Streamers. - */ - public Iterable<VisorStreamerConfiguration> streamers() { - return streamers; - } - - /** * @return Environment. */ public Map<String, String> env() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index 6dc27f1..a01bb1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -27,8 +27,6 @@ import org.apache.ignite.internal.visor.*; import org.apache.ignite.internal.visor.cache.*; import org.apache.ignite.internal.visor.compute.*; import org.apache.ignite.internal.visor.igfs.*; -import org.apache.ignite.internal.visor.streamer.*; -import org.apache.ignite.streamer.*; import java.util.*; import java.util.concurrent.*; @@ -176,34 +174,6 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa } } - /** - * Collect streamers. - * - * @param res Job result. - */ - protected void streamers(VisorNodeDataCollectorJobResult res) { - try { - StreamerConfiguration[] cfgs = ignite.configuration().getStreamerConfiguration(); - - if (cfgs != null) { - for (StreamerConfiguration cfg : cfgs) { - long start0 = U.currentTimeMillis(); - - try { - res.streamers().add(VisorStreamer.from(ignite.streamer(cfg.getName()))); - } - finally { - if (debug) - log(ignite.log(), "Collected streamer: " + cfg.getName(), getClass(), start0); - } - } - } - } - catch (Throwable streamersEx) { - res.streamersEx(streamersEx); - } - } - /** {@inheritDoc} */ @Override protected VisorNodeDataCollectorJobResult run(VisorNodeDataCollectorTaskArg arg) { return run(new VisorNodeDataCollectorJobResult(), arg); @@ -239,11 +209,6 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa if (debug) start0 = log(ignite.log(), "Collected igfs", getClass(), start0); - streamers(res); - - if (debug) - log(ignite.log(), "Collected streamers", getClass(), start0); - res.errorCount(ignite.context().exceptionRegistry().errorCount()); return res; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java index d711e06..b1bc44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node; import org.apache.ignite.internal.visor.cache.*; import org.apache.ignite.internal.visor.event.*; import org.apache.ignite.internal.visor.igfs.*; -import org.apache.ignite.internal.visor.streamer.*; import java.io.*; import java.util.*; @@ -62,9 +61,6 @@ public class VisorNodeDataCollectorJobResult implements Serializable { /** Exception while collecting node IGFSs. */ private Throwable igfssEx; - /** Node streamers. */ - private final Collection<VisorStreamer> streamers = new ArrayList<>(); - /** Exception while collecting node streamers. */ private Throwable streamersEx; @@ -184,13 +180,6 @@ public class VisorNodeDataCollectorJobResult implements Serializable { } /** - * @return Collection of streamers metrics. - */ - public Collection<VisorStreamer> streamers() { - return streamers; - } - - /** * @return Exception caught during collecting streamers metrics. */ public Throwable streamersEx() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java index 26f5be3..723b1a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTask.java @@ -123,9 +123,6 @@ public class VisorNodeDataCollectorTask extends VisorMultiNodeTask<VisorNodeData if (jobRes.cachesEx() != null) taskRes.cachesEx().put(nid, jobRes.cachesEx()); - if (!jobRes.streamers().isEmpty()) - taskRes.streamers().put(nid, jobRes.streamers()); - if (jobRes.streamersEx() != null) taskRes.streamersEx().put(nid, jobRes.streamersEx()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java index 7826a9c..bacfbc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.node; import org.apache.ignite.internal.visor.cache.*; import org.apache.ignite.internal.visor.event.*; import org.apache.ignite.internal.visor.igfs.*; -import org.apache.ignite.internal.visor.streamer.*; import java.io.*; import java.util.*; @@ -68,9 +67,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { /** Exceptions caught during collecting IGFS from nodes. */ private final Map<UUID, Throwable> igfssEx = new HashMap<>(); - /** All streamers collected from nodes. */ - private final Map<UUID, Collection<VisorStreamer>> streamers = new HashMap<>(); - /** Exceptions caught during collecting streamers from nodes. */ private final Map<UUID, Throwable> streamersEx = new HashMap<>(); @@ -90,7 +86,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { igfss.isEmpty() && igfsEndpoints.isEmpty() && igfssEx.isEmpty() && - streamers.isEmpty() && streamersEx.isEmpty(); } @@ -172,13 +167,6 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { } /** - * @return All streamers collected from nodes. - */ - public Map<UUID, Collection<VisorStreamer>> streamers() { - return streamers; - } - - /** * @return Exceptions caught during collecting streamers from nodes. */ public Map<UUID, Throwable> streamersEx() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java deleted file mode 100644 index bd878a8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamer.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Data transfer object for {@link org.apache.ignite.IgniteStreamer}. - */ -public class VisorStreamer implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Streamer name. */ - private String name; - - /** Metrics. */ - private VisorStreamerMetrics metrics; - - /** Stages. */ - private Collection<VisorStreamerStageMetrics> stages; - - /** - * @param s Streamer. - * @return Data transfer object for given streamer. - */ - public static VisorStreamer from(IgniteStreamer s) { - assert s != null; - - VisorStreamer streamer = new VisorStreamer(); - - streamer.name(s.name()); - streamer.metrics(VisorStreamerMetrics.from(s)); - streamer.stages(VisorStreamerStageMetrics.stages(s)); - - return streamer; - } - - /** - * @return Streamer name. - */ - public String name() { - return name; - } - - /** - * @param name New streamer name. - */ - public void name(String name) { - this.name = name; - } - - /** - * @return Metrics. - */ - public VisorStreamerMetrics metrics() { - return metrics; - } - - /** - * @param metrics New metrics. - */ - public void metrics(VisorStreamerMetrics metrics) { - this.metrics = metrics; - } - - /** - * @return Stages. - */ - public Collection<VisorStreamerStageMetrics> stages() { - return stages; - } - - /** - * @param stages New stages. - */ - public void stages(Collection<VisorStreamerStageMetrics> stages) { - this.stages = stages; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamer.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java deleted file mode 100644 index ab236a2..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerConfiguration.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; - -/** - * Data transfer object for streamer configuration properties. - */ -public class VisorStreamerConfiguration implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Streamer name. */ - private String name; - - /** Events router. */ - private String router; - - /** Flag indicating whether event should be processed at least once. */ - private boolean atLeastOnce; - - /** Maximum number of failover attempts to try. */ - private int maxFailoverAttempts; - - /** Maximum number of concurrent events to be processed. */ - private int maxConcurrentSessions; - - /** Streamer thread pool size. */ - private int poolSize; - - /** - * @param scfg Streamer configuration. - * @return Data transfer object for streamer configuration properties. - */ - public static VisorStreamerConfiguration from(StreamerConfiguration scfg) { - VisorStreamerConfiguration cfg = new VisorStreamerConfiguration(); - - cfg.name(scfg.getName()); - cfg.router(compactClass(scfg.getRouter())); - cfg.atLeastOnce(scfg.isAtLeastOnce()); - cfg.maximumFailoverAttempts(scfg.getMaximumFailoverAttempts()); - cfg.maximumConcurrentSessions(scfg.getMaximumConcurrentSessions()); - cfg.threadPoolSize(scfg.getThreadPoolSize()); - - return cfg; - } - - /** - * Construct data transfer object for streamer configurations properties. - * - * @param streamers streamer configurations. - * @return streamer configurations properties. - */ - public static Iterable<VisorStreamerConfiguration> list(StreamerConfiguration[] streamers) { - if (streamers == null) - return Collections.emptyList(); - - final Collection<VisorStreamerConfiguration> cfgs = new ArrayList<>(streamers.length); - - for (StreamerConfiguration streamer : streamers) - cfgs.add(from(streamer)); - - return cfgs; - } - - /** - * @return Streamer name. - */ - @Nullable public String name() { - return name; - } - - /** - * @param name New streamer name. - */ - public void name(@Nullable String name) { - this.name = name; - } - - /** - * @return Events router. - */ - @Nullable public String router() { - return router; - } - - /** - * @param router New events router. - */ - public void router(@Nullable String router) { - this.router = router; - } - - /** - * @return Flag indicating whether event should be processed at least once. - */ - public boolean atLeastOnce() { - return atLeastOnce; - } - - /** - * @param atLeastOnce New flag indicating whether event should be processed at least once. - */ - public void atLeastOnce(boolean atLeastOnce) { - this.atLeastOnce = atLeastOnce; - } - - /** - * @return Maximum number of failover attempts to try. - */ - public int maximumFailoverAttempts() { - return maxFailoverAttempts; - } - - /** - * @param maxFailoverAttempts New maximum number of failover attempts to try. - */ - public void maximumFailoverAttempts(int maxFailoverAttempts) { - this.maxFailoverAttempts = maxFailoverAttempts; - } - - /** - * @return Maximum number of concurrent events to be processed. - */ - public int maximumConcurrentSessions() { - return maxConcurrentSessions; - } - - /** - * @param maxConcurrentSessions New maximum number of concurrent events to be processed. - */ - public void maximumConcurrentSessions(int maxConcurrentSessions) { - this.maxConcurrentSessions = maxConcurrentSessions; - } - - /** - * @return Streamer thread pool size. - */ - public int threadPoolSize() { - return poolSize; - } - - /** - * @param poolSize New streamer thread pool size. - */ - public void threadPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamerConfiguration.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java deleted file mode 100644 index bdf6e0b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetrics.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; - -import java.io.*; - -/** - * Data transfer object for {@link org.apache.ignite.streamer.StreamerMetrics}. - */ -public class VisorStreamerMetrics implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Active stages. */ - private int active; - - /** Waiting stages. */ - private int waiting; - - /** Stages execution capacity. */ - private int cap; - - /** Pipeline minimum execution time. */ - private long pipelineMinExecTm; - - /** Pipeline average execution time. */ - private long pipelineAvgExecTm; - - /** Pipeline maximum execution time. */ - private long pipelineMaxExecTm; - - /** Minimum number of unique nodes in pipeline execution. */ - private int pipelineMinExecNodes; - - /** Average number of unique nodes in pipeline execution. */ - private int pipelineAvgExecNodes; - - /** Maximum number of unique nodes in pipeline execution. */ - private int pipelineMaxExecNodes; - - /** Query minimum execution time. */ - private long qryMinExecTm; - - /** Query average execution time. */ - private long qryAvgExecTm; - - /** Query maximum execution time. */ - private long qryMaxExecTm; - - /** Minimum number of unique nodes in query execution. */ - private int qryMinExecNodes; - - /** Average number of unique nodes in query execution. */ - private int qryAvgExecNodes; - - /** Maximum number of unique nodes in query execution. */ - private int qryMaxExecNodes; - - /** Current window size. */ - private int windowSize; - - /** - * @param streamer Source streamer. - * @return Data transfer streamer for given streamer. - */ - public static VisorStreamerMetrics from(IgniteStreamer streamer) { - assert streamer != null; - - StreamerMetrics m = streamer.metrics(); - - int windowSz = 0; - - for (StreamerWindowMetrics wm : m.windowMetrics()) - windowSz += wm.size(); - - VisorStreamerMetrics metrics = new VisorStreamerMetrics(); - - metrics.active(m.stageActiveExecutionCount()); - metrics.waiting(m.stageWaitingExecutionCount()); - metrics.capacity(m.executorServiceCapacity()); - - metrics.pipelineMinExecutionTime(m.pipelineMinimumExecutionTime()); - metrics.pipelineAvgExecutionTime(m.pipelineAverageExecutionTime()); - metrics.pipelineMaxExecutionTime(m.pipelineMaximumExecutionTime()); - - metrics.pipelineMinExecutionNodes(m.pipelineMinimumExecutionNodes()); - metrics.pipelineAvgExecutionNodes(m.pipelineAverageExecutionNodes()); - metrics.pipelineMaxExecutionNodes(m.pipelineMaximumExecutionNodes()); - - metrics.queryMinExecutionTime(m.queryMinimumExecutionTime()); - metrics.queryAvgExecutionTime(m.queryAverageExecutionTime()); - metrics.queryMaxExecutionTime(m.queryMaximumExecutionTime()); - - metrics.queryMinExecutionNodes(m.queryMinimumExecutionNodes()); - metrics.queryAvgExecutionNodes(m.queryAverageExecutionNodes()); - metrics.queryMaxExecutionNodes(m.queryMaximumExecutionNodes()); - - metrics.windowSize(windowSz); - - return metrics; - } - - /** - * @return Active stages. - */ - public int active() { - return active; - } - - /** - * @param active New active stages. - */ - public void active(int active) { - this.active = active; - } - - /** - * @return Waiting stages. - */ - public int waiting() { - return waiting; - } - - /** - * @param waiting New waiting stages. - */ - public void waiting(int waiting) { - this.waiting = waiting; - } - - /** - * @return Stages execution capacity. - */ - public int capacity() { - return cap; - } - - /** - * @param cap New stages execution capacity. - */ - public void capacity(int cap) { - this.cap = cap; - } - - /** - * @return Pipeline minimum execution time. - */ - public long pipelineMinExecutionTime() { - return pipelineMinExecTm; - } - - /** - * @param pipelineMinExecTm New pipeline minimum execution time. - */ - public void pipelineMinExecutionTime(long pipelineMinExecTm) { - this.pipelineMinExecTm = pipelineMinExecTm; - } - - /** - * @return Pipeline average execution time. - */ - public long pipelineAvgExecutionTime() { - return pipelineAvgExecTm; - } - - /** - * @param pipelineAvgExecTm New pipeline average execution time. - */ - public void pipelineAvgExecutionTime(long pipelineAvgExecTm) { - this.pipelineAvgExecTm = pipelineAvgExecTm; - } - - /** - * @return Pipeline maximum execution time. - */ - public long pipelineMaxExecutionTime() { - return pipelineMaxExecTm; - } - - /** - * @param pipelineMaxExecTm New pipeline maximum execution time. - */ - public void pipelineMaxExecutionTime(long pipelineMaxExecTm) { - this.pipelineMaxExecTm = pipelineMaxExecTm; - } - - /** - * @return Minimum number of unique nodes in pipeline execution. - */ - public int pipelineMinExecutionNodes() { - return pipelineMinExecNodes; - } - - /** - * @param pipelineMinExecNodes New minimum number of unique nodes in pipeline execution. - */ - public void pipelineMinExecutionNodes(int pipelineMinExecNodes) { - this.pipelineMinExecNodes = pipelineMinExecNodes; - } - - /** - * @return Average number of unique nodes in pipeline execution. - */ - public int pipelineAvgExecutionNodes() { - return pipelineAvgExecNodes; - } - - /** - * @param pipelineAvgExecNodes New average number of unique nodes in pipeline execution. - */ - public void pipelineAvgExecutionNodes(int pipelineAvgExecNodes) { - this.pipelineAvgExecNodes = pipelineAvgExecNodes; - } - - /** - * @return Maximum number of unique nodes in pipeline execution. - */ - public int pipelineMaxExecutionNodes() { - return pipelineMaxExecNodes; - } - - /** - * @param pipelineMaxExecNodes New maximum number of unique nodes in pipeline execution. - */ - public void pipelineMaxExecutionNodes(int pipelineMaxExecNodes) { - this.pipelineMaxExecNodes = pipelineMaxExecNodes; - } - - /** - * @return Query minimum execution time. - */ - public long queryMinExecutionTime() { - return qryMinExecTm; - } - - /** - * @param qryMinExecTime New query minimum execution time. - */ - public void queryMinExecutionTime(long qryMinExecTime) { - qryMinExecTm = qryMinExecTime; - } - - /** - * @return Query average execution time. - */ - public long queryAvgExecutionTime() { - return qryAvgExecTm; - } - - /** - * @param qryAvgExecTime New query average execution time. - */ - public void queryAvgExecutionTime(long qryAvgExecTime) { - qryAvgExecTm = qryAvgExecTime; - } - - /** - * @return Query maximum execution time. - */ - public long queryMaxExecutionTime() { - return qryMaxExecTm; - } - - /** - * @param qryMaxExecTime New query maximum execution time. - */ - public void queryMaxExecutionTime(long qryMaxExecTime) { - qryMaxExecTm = qryMaxExecTime; - } - - /** - * @return Minimum number of unique nodes in query execution. - */ - public int queryMinExecutionNodes() { - return qryMinExecNodes; - } - - /** - * @param qryMinExecNodes New minimum number of unique nodes in query execution. - */ - public void queryMinExecutionNodes(int qryMinExecNodes) { - this.qryMinExecNodes = qryMinExecNodes; - } - - /** - * @return Average number of unique nodes in query execution. - */ - public int queryAvgExecutionNodes() { - return qryAvgExecNodes; - } - - /** - * @param qryAvgExecNodes New average number of unique nodes in query execution. - */ - public void queryAvgExecutionNodes(int qryAvgExecNodes) { - this.qryAvgExecNodes = qryAvgExecNodes; - } - - /** - * @return Maximum number of unique nodes in query execution. - */ - public int queryMaxExecutionNodes() { - return qryMaxExecNodes; - } - - /** - * @param qryMaxExecNodes New maximum number of unique nodes in query execution. - */ - public void queryMaxExecutionNodes(int qryMaxExecNodes) { - this.qryMaxExecNodes = qryMaxExecNodes; - } - - /** - * @return Current window size. - */ - public int windowSize() { - return windowSize; - } - - /** - * @param windowSize New current window size. - */ - public void windowSize(int windowSize) { - this.windowSize = windowSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamerMetrics.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java deleted file mode 100644 index c0ee2b1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerMetricsResetTask.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.visor.*; - -import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; - -/** - * Task for reset metrics for specified streamer. - */ -@GridInternal -public class VisorStreamerMetricsResetTask extends VisorOneNodeTask<String, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override protected VisorStreamerMetricsResetJob job(String arg) { - return new VisorStreamerMetricsResetJob(arg, debug); - } - - /** - * Job that reset streamer metrics. - */ - private static class VisorStreamerMetricsResetJob extends VisorJob<String, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param arg Streamer name. - * @param debug Debug flag. - */ - private VisorStreamerMetricsResetJob(String arg, boolean debug) { - super(arg, debug); - } - - /** {@inheritDoc} */ - @Override protected Void run(String streamerName) { - try { - IgniteStreamer streamer = ignite.streamer(streamerName); - - streamer.resetMetrics(); - - return null; - } - catch (IllegalArgumentException iae) { - throw new IgniteException("Failed to reset metrics for streamer: " + escapeName(streamerName) + - " on node: " + ignite.localNode().id(), iae); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamerMetricsResetJob.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java deleted file mode 100644 index 60ce94d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerResetTask.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.task.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.visor.*; - -import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*; - -/** - * Task for reset specified streamer. - */ -@GridInternal -public class VisorStreamerResetTask extends VisorOneNodeTask<String, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override protected VisorStreamerResetJob job(String arg) { - return new VisorStreamerResetJob(arg, debug); - } - - /** - * Job that reset streamer. - */ - private static class VisorStreamerResetJob extends VisorJob<String, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param arg Streamer name. - * @param debug Debug flag. - */ - private VisorStreamerResetJob(String arg, boolean debug) { - super(arg, debug); - } - - /** {@inheritDoc} */ - @Override protected Void run(String streamerName) { - try { - IgniteStreamer streamer = ignite.streamer(streamerName); - - streamer.reset(); - - return null; - } - catch (IllegalArgumentException iae) { - throw new IgniteException("Failed to reset streamer: " + escapeName(streamerName) - + " on node: " + ignite.localNode().id(), iae); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamerResetJob.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java deleted file mode 100644 index 2de597f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/streamer/VisorStreamerStageMetrics.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.streamer.*; - -import java.io.*; -import java.util.*; - -/** - * Data transfer object for {@link org.apache.ignite.streamer.StreamerStageMetrics}. - */ -public class VisorStreamerStageMetrics implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Stage name. */ - private String name; - - /** Minimum execution time. */ - private long minExecTm; - - /** Average execution time. */ - private long avgExecTm; - - /** Maximum execution time. */ - private long maxExecTm; - - /** Minimum waiting time. */ - private long minWaitingTm; - - /** Average waiting time. */ - private long avgWaitingTm; - - /** Maximum waiting time. */ - private long maxWaitingTm; - - /** Executed count. */ - private long executed; - - /** Failures count. */ - private int failures; - - /** If executing. */ - private boolean executing; - - /** Throughput. */ - private long throughput = -1; - - /** Failures frequency. */ - private int failuresFreq = -1; - - /** Create data transfer object for given metrics. */ - public static VisorStreamerStageMetrics from(StreamerStageMetrics m) { - assert m != null; - - VisorStreamerStageMetrics metrics = new VisorStreamerStageMetrics(); - - metrics.name(m.name()); - - metrics.minExecutionTime(m.minimumExecutionTime()); - metrics.avgExecutionTime(m.averageExecutionTime()); - metrics.maxExecutionTime(m.maximumExecutionTime()); - - metrics.minWaitingTime(m.minimumWaitingTime()); - metrics.avgWaitingTime(m.averageWaitingTime()); - metrics.maxWaitingTime(m.maximumWaitingTime()); - - metrics.executed(m.totalExecutionCount()); - metrics.failures(m.failuresCount()); - metrics.executing(m.executing()); - - return metrics; - } - - /** Create data transfer objects for all stages. */ - public static Collection<VisorStreamerStageMetrics> stages(IgniteStreamer streamer) { - assert streamer != null; - - Collection<VisorStreamerStageMetrics> res = new ArrayList<>(); - - for (StreamerStageMetrics m : streamer.metrics().stageMetrics()) - res.add(from(m)); - - return res; - } - - /** - * @return Stage name. - */ - public String name() { - return name; - } - - /** - * @param name New stage name. - */ - public void name(String name) { - this.name = name; - } - - /** - * @return Minimum execution time. - */ - public long minExecutionTime() { - return minExecTm; - } - - /** - * @param minExecTm New minimum execution time. - */ - public void minExecutionTime(long minExecTm) { - this.minExecTm = minExecTm; - } - - /** - * @return Average execution time. - */ - public long avgExecutionTime() { - return avgExecTm; - } - - /** - * @param avgExecTm New average execution time. - */ - public void avgExecutionTime(long avgExecTm) { - this.avgExecTm = avgExecTm; - } - - /** - * @return Maximum execution time. - */ - public long maxExecutionTime() { - return maxExecTm; - } - - /** - * @param maxExecTm New maximum execution time. - */ - public void maxExecutionTime(long maxExecTm) { - this.maxExecTm = maxExecTm; - } - - /** - * @return Minimum waiting time. - */ - public long minWaitingTime() { - return minWaitingTm; - } - - /** - * @param minWaitingTm New minimum waiting time. - */ - public void minWaitingTime(long minWaitingTm) { - this.minWaitingTm = minWaitingTm; - } - - /** - * @return Average waiting time. - */ - public long avgWaitingTime() { - return avgWaitingTm; - } - - /** - * @param avgWaitingTm New average waiting time. - */ - public void avgWaitingTime(long avgWaitingTm) { - this.avgWaitingTm = avgWaitingTm; - } - - /** - * @return Maximum waiting time. - */ - public long maxWaitingTime() { - return maxWaitingTm; - } - - /** - * @param maxWaitingTm New maximum waiting time. - */ - public void maxWaitingTime(long maxWaitingTm) { - this.maxWaitingTm = maxWaitingTm; - } - - /** - * @return Executed count. - */ - public long executed() { - return executed; - } - - /** - * @param executed New executed count. - */ - public void executed(long executed) { - this.executed = executed; - } - - /** - * @return Failures count. - */ - public int failures() { - return failures; - } - - /** - * @param failures New failures count. - */ - public void failures(int failures) { - this.failures = failures; - } - - /** - * @return If executing. - */ - public boolean executing() { - return executing; - } - - /** - * @param executing New if executing. - */ - public void executing(boolean executing) { - this.executing = executing; - } - - /** - * @return Throughput. - */ - public long throughput() { - return throughput; - } - - /** - * @param throughput New throughput. - */ - public void throughput(long throughput) { - this.throughput = throughput; - } - - /** - * @return Failures frequency. - */ - public int failuresFrequency() { - return failuresFreq; - } - - /** - * @param failuresFreq New failures frequency. - */ - public void failuresFrequency(int failuresFreq) { - this.failuresFreq = failuresFreq; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(VisorStreamerStageMetrics.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java deleted file mode 100644 index d0e895f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerConfiguration.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer; - -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Streamer configuration. - */ -public class StreamerConfiguration { - /** By default maximum number of concurrent sessions is unlimited. */ - public static final int DFLT_MAX_CONCURRENT_SESSIONS = -1; - - /** Default value for maximum failover attempts. */ - public static final int DFLT_MAX_FAILOVER_ATTEMPTS = 3; - - /** Name. */ - private String name; - - /** Window. */ - private Collection<StreamerWindow> win; - - /** Router. */ - private StreamerEventRouter router; - - /** Stages. */ - @GridToStringInclude - private Collection<StreamerStage> stages; - - /** At least once flag. */ - private boolean atLeastOnce; - - /** Maximum number of failover attempts. */ - private int maxFailoverAttempts = DFLT_MAX_FAILOVER_ATTEMPTS; - - /** Maximum number of concurrent sessions to be processed. */ - private int maxConcurrentSessions = DFLT_MAX_CONCURRENT_SESSIONS; - - /** Streamer thread pool size. */ - private int poolSize = Runtime.getRuntime().availableProcessors(); - - /** - * - */ - public StreamerConfiguration() { - // No-op. - } - - /** - * @param c Configuration to copy. - */ - public StreamerConfiguration(StreamerConfiguration c) { - atLeastOnce = c.isAtLeastOnce(); - poolSize = c.getThreadPoolSize(); - maxConcurrentSessions = c.getMaximumConcurrentSessions(); - maxFailoverAttempts = c.getMaximumFailoverAttempts(); - name = c.getName(); - router = c.getRouter(); - stages = c.getStages(); - win = c.getWindows(); - } - - /** - * Gets streamer name. Must be unique within grid. - * - * @return Streamer name, if {@code null} then default streamer is returned. - */ - @Nullable public String getName() { - return name; - } - - /** - * Sets the name of the streamer. - * - * @param name Name. - */ - public void setName(String name) { - this.name = name; - } - - /** - * Gets streamer event router. - * - * @return Event router, if {@code null} then events will be executed locally. - */ - @SuppressWarnings("unchecked") - @Nullable public StreamerEventRouter getRouter() { - return router; - } - - /** - * Sets router for streamer. - * - * @param router Router. - */ - public void setRouter(StreamerEventRouter router) { - this.router = router; - } - - /** - * Gets collection of streamer event windows. At least one window should be configured. Each window - * must have unique name. - * - * @return Streamer windows. - */ - public Collection<StreamerWindow> getWindows() { - return win; - } - - /** - * Sets collection of streamer windows. - * - * @param win Window. - */ - public void setWindows(Collection<StreamerWindow> win) { - this.win = win; - } - - /** - * Gets collection of streamer stages. Streamer must have at least one stage to execute. Each stage - * must have unique name. - * - * @return Collection of streamer stages. - */ - public Collection<StreamerStage> getStages() { - return stages; - } - - /** - * Sets stages. - * - * @param stages Stages. - */ - public void setStages(Collection<StreamerStage> stages) { - this.stages = stages; - } - - /** - * Gets flag indicating whether streamer should track event execution sessions and failover event execution - * if any failure detected or any node on which execution happened has left the grid before successful response - * is received. - * <p> - * Setting this flag to {@code true} will guarantee that all pipeline stages will be executed at least once for - * each group of event submitted to streamer (or failure listener will be notified if failover cannot succeed). - * However, it does not guarantee that each stage will be executed at most once. - * - * @return {@code True} if event should be processed at least once, - * or {@code false} if failures can be safely ignored. - */ - public boolean isAtLeastOnce() { - return atLeastOnce; - } - - /** - * @param atLeastOnce {@code True} to guarantee that event will be processed at least once. - */ - public void setAtLeastOnce(boolean atLeastOnce) { - this.atLeastOnce = atLeastOnce; - } - - /** - * Gets maximum number of failover attempts to try when pipeline execution has failed. This parameter - * is ignored if {@link #isAtLeastOnce()} is set to {@code false}. - * <p> - * If not set, default value is - * - * @return Maximum number of failover attempts to try. - */ - public int getMaximumFailoverAttempts() { - return maxFailoverAttempts; - } - - /** - * Sets maximum number of failover attempts. - - * @param maxFailoverAttempts Maximum number of failover attempts. - * @see #getMaximumFailoverAttempts() - */ - public void setMaximumFailoverAttempts(int maxFailoverAttempts) { - this.maxFailoverAttempts = maxFailoverAttempts; - } - - /** - * Gets maximum number of concurrent events to be processed by streamer. This property is taken into - * account when {@link #isAtLeastOnce()} is set to {@code true}. If not positive, number of sessions - * will not be limited by any value. - * - * @return Maximum number of concurrent events to be processed. If number of concurrent events is greater - * then this value, caller will be blocked until enough responses are received. - */ - public int getMaximumConcurrentSessions() { - return maxConcurrentSessions; - } - - /** - * Sets maximum number of concurrent sessions. - * - * @param maxConcurrentSessions Maximum number of concurrent sessions. - * @see #getMaximumConcurrentSessions() - */ - public void setMaximumConcurrentSessions(int maxConcurrentSessions) { - this.maxConcurrentSessions = maxConcurrentSessions; - } - - /** - * Gets streamer pool size. Defines a thread pool size in which streamer stages will be executed. - * <p> - * If not specified, thread pool executor with max pool size equal to number of cores will be created. - * - * @return Streamer thread pool size. - */ - public int getThreadPoolSize() { - return poolSize; - } - - /** - * Sets streamer pool size. - * - * @param poolSize Pool size. - * @see #getThreadPoolSize() - */ - public void setThreadPoolSize(int poolSize) { - this.poolSize = poolSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StreamerConfiguration.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java deleted file mode 100644 index 7ce4cce..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerContext.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Streamer context. Provides access to streamer local store, configured windows and various methods - * to run streamer queries. - */ -public interface StreamerContext { - /** - * Gets instance of dynamic grid projection including all nodes on which this streamer is running. - * - * @return Projection with all nodes on which streamer is configured. - */ - public ClusterGroup projection(); - - /** - * Gets streamer local space. Note that all updates to this space will be local. - * - * @return Streamer local space. - */ - public <K, V> ConcurrentMap<K, V> localSpace(); - - /** - * Gets default event window, i.e. window that is on the first place in streamer configuration. - * - * @return Default window. - */ - public <E> StreamerWindow<E> window(); - - /** - * Gets streamer event window by window name, if no window with such - * name was configured {@link IllegalArgumentException} will be thrown. - * - * @param winName Window name. - * @return Window instance. - */ - public <E> StreamerWindow<E> window(String winName); - - /** - * For context passed to {@link StreamerStage#run(StreamerContext, Collection)} this method will - * return next stage name in execution pipeline. For context obtained from streamer object, this method will - * return first stage name. - * - * @return Next stage name depending on invocation context. - */ - public String nextStageName(); - - /** - * Queries all streamer nodes deployed within grid. Given closure will be executed on each node on which streamer - * is configured. Streamer context local for that node will be passed to closure during execution. All results - * returned by closure will be added to result collection. - * - * @param clo Function to be executed on individual nodes. - * @return Result received from all streamers. - * @throws IgniteException If query execution failed. - */ - public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo) throws IgniteException; - - /** - * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes - * on which streamer is configured. Streamer context local for that node will be passed to closure during - * execution. All results returned by closure will be added to result collection. - * - * @param clo Function to be executed on individual nodes. - * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on - * which this streamer is running will be queried. - * @return Result received from all streamers. - * @throws IgniteException If query execution failed. - */ - public <R> Collection<R> query(IgniteClosure<StreamerContext, R> clo, Collection<ClusterNode> nodes) - throws IgniteException; - - /** - * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node - * in the grid. No result is collected. - * - * @param clo Function to be executed on individual nodes. - * @throws IgniteException If closure execution failed. - */ - public void broadcast(IgniteInClosure<StreamerContext> clo) throws IgniteException; - - /** - * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on - * which streamer is configured. No result is collected. - * - * @param clo Function to be executed on individual nodes. - * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on - * which this streamer is running will be queried. - * @throws IgniteException If closure execution failed. - */ - public void broadcast(IgniteInClosure<StreamerContext> clo, Collection<ClusterNode> nodes) throws IgniteException; - - /** - * Queries all streamer nodes deployed within grid. Given closure will be executed on each streamer node in - * the grid. Streamer context local for that node will be passed to closure during execution. Results returned - * by closure will be passed to given reducer. - * - * @param clo Function to be executed on individual nodes. - * @param rdc Reducer to reduce results received from remote nodes. - * @return Reducer result. - * @throws IgniteException If query execution failed. - */ - public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc) throws IgniteException; - - /** - * Queries streamer nodes deployed within grid. Given closure will be executed on those of passed nodes on which - * streamer is configured. Streamer context local for that node will be passed to closure during execution. - * Results returned by closure will be passed to given reducer. - * - * @param clo Function to be executed on individual nodes. - * @param rdc Reducer to reduce results received from remote nodes. - * @param nodes Optional list of nodes to execute query on, if empty, then all nodes on - * which this streamer is running will be queried. - * @return Reducer result. - * @throws IgniteException If query execution failed. - */ - public <R1, R2> R2 reduce(IgniteClosure<StreamerContext, R1> clo, IgniteReducer<R1, R2> rdc, - Collection<ClusterNode> nodes) throws IgniteException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java deleted file mode 100644 index 1994a6a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouter.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer; - -import org.apache.ignite.cluster.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Streamer event router. Pluggable component that determines event execution flow across the grid. - * Each time a group of events is submitted to streamer or returned to streamer by a stage, event - * router will be used to select execution node for next stage. - */ -public interface StreamerEventRouter { - /** - * Selects a node for given event that should be processed by a stage with given name. - * - * @param ctx Streamer context. - * @param stageName Stage name. - * @param evt Event to route. - * @return Node to route to. If this method returns {@code null} then the whole pipeline execution - * will be terminated. All running and ongoing stages for pipeline execution will be - * cancelled. - */ - @Nullable public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt); - - /** - * Selects a node for given events that should be processed by a stage with given name. - * - * @param ctx Streamer context. - * @param stageName Stage name to route events. - * @param evts Events. - * @return Events to node mapping. If this method returns {@code null} then the whole pipeline execution - * will be terminated. All running and ongoing stages for pipeline execution will be - * cancelled. - */ - @Nullable public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName, - Collection<T> evts); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1cc3d528/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java deleted file mode 100644 index 6ab4eda..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerEventRouterAdapter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streamer; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.*; - -/** - * Streamer adapter for event routers. - */ -public abstract class StreamerEventRouterAdapter implements StreamerEventRouter { - /** {@inheritDoc} */ - @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName, - Collection<T> evts) { - if (evts.size() == 1) { - ClusterNode route = route(ctx, stageName, F.first(evts)); - - if (route == null) - return null; - - return Collections.singletonMap(route, evts); - } - - Map<ClusterNode, Collection<T>> map = new GridLeanMap<>(); - - for (T e : evts) { - ClusterNode n = route(ctx, stageName, e); - - if (n == null) - return null; - - Collection<T> mapped = map.get(n); - - if (mapped == null) - map.put(n, mapped = new ArrayList<>()); - - mapped.add(e); - } - - return map; - } -}