# IGNITE-831 Make StartRequestData separated class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94fed657 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94fed657 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94fed657 Branch: refs/heads/ignite-836_2 Commit: 94fed6571fefeeb58cc89282be84ea8292c36389 Parents: 94fb3c5 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Mon May 4 23:02:14 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon May 4 23:02:14 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 149 +---------- .../processors/continuous/StartRequestData.java | 267 +++++++++++++++++++ 2 files changed, 278 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fed657/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0d76ad4..41f5940 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -447,14 +447,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (dep == null) throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred); - reqData.clsName = clsName; - reqData.depInfo = new GridDeploymentInfoBean(dep); + reqData.className(clsName); + reqData.deploymentInfo(new GridDeploymentInfoBean(dep)); reqData.p2pMarshal(marsh); } // Handle peer deployment for other handler-specific objects. - reqData.hnd.p2pMarshal(ctx); + reqData.handler().p2pMarshal(ctx); } } catch (IgniteCheckedException e) { @@ -557,8 +557,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (!nodes.isEmpty()) { // Do not send projection predicate (nodes already filtered). - reqData.prjPred = null; - reqData.prjPredBytes = null; + reqData.projectionPredicate(null); + reqData.projectionPredicateBytes(null); // Send start requests. try { @@ -789,16 +789,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID routineId = req.routineId(); StartRequestData data = req.data(); - GridContinuousHandler hnd = data.hnd; + GridContinuousHandler hnd = data.handler(); IgniteCheckedException err = null; try { if (ctx.config().isPeerClassLoadingEnabled()) { - String clsName = data.clsName; + String clsName = data.className(); if (clsName != null) { - GridDeploymentInfo depInfo = data.depInfo; + GridDeploymentInfo depInfo = data.deploymentInfo(); GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); @@ -822,11 +822,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (err == null) { try { - IgnitePredicate<ClusterNode> prjPred = data.prjPred; + IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { - registered = registerHandler(nodeId, routineId, hnd, data.bufSize, data.interval, - data.autoUnsubscribe, false); + registered = registerHandler(nodeId, routineId, hnd, data.bufferSize(), data.interval(), + data.autoUnsubscribe(), false); } } catch (IgniteCheckedException e) { @@ -1430,133 +1430,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * Start request data. - */ - private static class StartRequestData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Projection predicate. */ - private IgnitePredicate<ClusterNode> prjPred; - - /** Serialized projection predicate. */ - private byte[] prjPredBytes; - - /** Deployment class name. */ - private String clsName; - - /** Deployment info. */ - private GridDeploymentInfo depInfo; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public StartRequestData() { - // No-op. - } - - /** - * @param prjPred Serialized projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, - int bufSize, long interval, boolean autoUnsubscribe) { - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** - * @param marsh Marshaller. - * @throws IgniteCheckedException In case of error. - */ - void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { - assert marsh != null; - - prjPredBytes = marsh.marshal(prjPred); - } - - /** - * @param marsh Marshaller. - * @param ldr Class loader. - * @throws IgniteCheckedException In case of error. - */ - void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { - assert marsh != null; - - assert prjPred == null; - assert prjPredBytes != null; - - prjPred = marsh.unmarshal(prjPredBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean b = prjPredBytes != null; - - out.writeBoolean(b); - - if (b) { - U.writeByteArray(out, prjPredBytes); - U.writeString(out, clsName); - out.writeObject(depInfo); - } - else - out.writeObject(prjPred); - - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - boolean b = in.readBoolean(); - - if (b) { - prjPredBytes = U.readByteArray(in); - clsName = U.readString(in); - depInfo = (GridDeploymentInfo)in.readObject(); - } - else - prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); - - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRequestData.class, this); - } - } - - /** * Discovery data. */ private static class DiscoveryData implements Externalizable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fed657/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java new file mode 100644 index 0000000..c721d44 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Start request data. + */ +class StartRequestData implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Projection predicate. */ + private IgnitePredicate<ClusterNode> prjPred; + + /** Serialized projection predicate. */ + private byte[] prjPredBytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** Handler. */ + private GridContinuousHandler hnd; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * Required by {@link java.io.Externalizable}. + */ + public StartRequestData() { + // No-op. + } + + /** + * @param prjPred Serialized projection predicate. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + StartRequestData(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, + int bufSize, long interval, boolean autoUnsubscribe) { + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @param marsh Marshaller. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { + assert marsh != null; + + prjPredBytes = marsh.marshal(prjPred); + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws org.apache.ignite.IgniteCheckedException In case of error. + */ + void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + assert prjPred == null; + assert prjPredBytes != null; + + prjPred = marsh.unmarshal(prjPredBytes, ldr); + } + + /** + * @return Projection predicate. + */ + public IgnitePredicate<ClusterNode> projectionPredicate() { + return prjPred; + } + + /** + * @param prjPred New projection predicate. + */ + public void projectionPredicate(IgnitePredicate<ClusterNode> prjPred) { + this.prjPred = prjPred; + } + + /** + * @return Serialized projection predicate. + */ + public byte[] projectionPredicateBytes() { + return prjPredBytes; + } + + /** + * @param prjPredBytes New serialized projection predicate. + */ + public void projectionPredicateBytes(byte[] prjPredBytes) { + this.prjPredBytes = prjPredBytes; + } + + /** + * @return Deployment class name. + */ + public String className() { + return clsName; + } + + /** + * @param clsName New deployment class name. + */ + public void className(String clsName) { + this.clsName = clsName; + } + + /** + * @return Deployment info. + */ + public GridDeploymentInfo deploymentInfo() { + return depInfo; + } + + /** + * @param depInfo New deployment info. + */ + public void deploymentInfo(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** + * @return Handler. + */ + public GridContinuousHandler handler() { + return hnd; + } + + /** + * @param hnd New handler. + */ + public void handler(GridContinuousHandler hnd) { + this.hnd = hnd; + } + + /** + * @return Buffer size. + */ + public int bufferSize() { + return bufSize; + } + + /** + * @param bufSize New buffer size. + */ + public void bufferSize(int bufSize) { + this.bufSize = bufSize; + } + + /** + * @return Time interval. + */ + public long interval() { + return interval; + } + + /** + * @param interval New time interval. + */ + public void interval(long interval) { + this.interval = interval; + } + + /** + * @return Automatic unsubscribe flag. + */ + public boolean autoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * @param autoUnsubscribe New automatic unsubscribe flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + boolean b = prjPredBytes != null; + + out.writeBoolean(b); + + if (b) { + U.writeByteArray(out, prjPredBytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + else + out.writeObject(prjPred); + + out.writeObject(hnd); + out.writeInt(bufSize); + out.writeLong(interval); + out.writeBoolean(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + boolean b = in.readBoolean(); + + if (b) { + prjPredBytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + else + prjPred = (IgnitePredicate<ClusterNode>)in.readObject(); + + hnd = (GridContinuousHandler)in.readObject(); + bufSize = in.readInt(); + interval = in.readLong(); + autoUnsubscribe = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRequestData.class, this); + } +}