Ignite-65 code restores
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7acbfeda Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7acbfeda Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7acbfeda Branch: refs/heads/ignite-49 Commit: 7acbfeda3d0fcd7eb515153595463cae21d2d308 Parents: 09dd0ef Author: avinogradov <avinogra...@gridgain.com> Authored: Fri Jan 23 18:22:21 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Fri Jan 23 18:22:21 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridJobExecuteRequest.java | 75 +++++--- .../internal/GridJobExecuteRequestV2.java | 185 ------------------- .../GridCacheContinuousQueryManager.java | 10 - .../processors/job/GridJobProcessor.java | 3 - .../processors/task/GridTaskWorker.java | 78 +++----- .../GridTcpCommunicationMessageFactory.java | 5 +- 6 files changed, 75 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index 8f9cef8..1a1eea6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@ -34,7 +34,10 @@ import java.util.*; */ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter implements GridTaskMessage { /** */ - private static final long serialVersionUID = 0L; + private static final long serialVersionUID = -1470089047880101067L; + + /** Subject ID. */ + private UUID subjId; /** */ private IgniteUuid sesId; @@ -161,32 +164,34 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im * @param forceLocDep {@code True} If remote node should ignore deployment settings. * @param sesFullSup {@code True} if session attributes are disabled. * @param internal {@code True} if internal job. + * @param subjId Subject ID. */ public GridJobExecuteRequest( - IgniteUuid sesId, - IgniteUuid jobId, - String taskName, - String userVer, - String taskClsName, - byte[] jobBytes, - ComputeJob job, - long startTaskTime, - long timeout, - @Nullable Collection<UUID> top, - byte[] siblingsBytes, - Collection<ComputeJobSibling> siblings, - byte[] sesAttrsBytes, - Map<Object, Object> sesAttrs, - byte[] jobAttrsBytes, - Map<? extends Serializable, ? extends Serializable> jobAttrs, - String cpSpi, - IgniteUuid clsLdrId, - IgniteDeploymentMode depMode, - boolean dynamicSiblings, - Map<UUID, IgniteUuid> ldrParticipants, - boolean forceLocDep, - boolean sesFullSup, - boolean internal) { + IgniteUuid sesId, + IgniteUuid jobId, + String taskName, + String userVer, + String taskClsName, + byte[] jobBytes, + ComputeJob job, + long startTaskTime, + long timeout, + @Nullable Collection<UUID> top, + byte[] siblingsBytes, + Collection<ComputeJobSibling> siblings, + byte[] sesAttrsBytes, + Map<Object, Object> sesAttrs, + byte[] jobAttrsBytes, + Map<? extends Serializable, ? extends Serializable> jobAttrs, + String cpSpi, + IgniteUuid clsLdrId, + IgniteDeploymentMode depMode, + boolean dynamicSiblings, + Map<UUID, IgniteUuid> ldrParticipants, + boolean forceLocDep, + boolean sesFullSup, + boolean internal, + UUID subjId) { this.top = top; assert sesId != null; assert jobId != null; @@ -222,6 +227,7 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im this.forceLocDep = forceLocDep; this.sesFullSup = sesFullSup; this.internal = internal; + this.subjId = subjId; this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi; } @@ -406,7 +412,7 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im * @return Subject ID. */ public UUID getSubjectId() { - return null; + return subjId; } /** {@inheritDoc} */ @@ -449,6 +455,7 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im _clone.sesFullSup = sesFullSup; _clone.internal = internal; _clone.top = top; + _clone.subjId = subjId; } /** {@inheritDoc} */ @@ -643,6 +650,11 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im commState.idx++; + case 21: + if (!commState.putUuid(subjId)) + return false; + + commState.idx++; } return true; @@ -901,6 +913,15 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im commState.idx++; + case 21: + UUID subjId0 = commState.getUuid(); + + if (subjId0 == UUID_NOT_READ) + return false; + + subjId = subjId0; + + commState.idx++; } return true; @@ -908,7 +929,7 @@ public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter im /** {@inheritDoc} */ @Override public byte directType() { - return 1; + return 81; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequestV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequestV2.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequestV2.java deleted file mode 100644 index 157c7bf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequestV2.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -/** - * Updated job execute request with subject ID. - */ -public class GridJobExecuteRequestV2 extends GridJobExecuteRequest { - /** */ - private static final long serialVersionUID = -1470089047880101067L; - - /** Subject ID. */ - private UUID subjId; - - /** - * No-op constructor to support {@link Externalizable} interface. - */ - public GridJobExecuteRequestV2() { - // No-op. - } - - /** - * @param sesId Task session ID. - * @param jobId Job ID. - * @param taskName Task name. - * @param userVer Code version. - * @param taskClsName Fully qualified task name. - * @param jobBytes Job serialized body. - * @param job Job. - * @param startTaskTime Task execution start time. - * @param timeout Task execution timeout. - * @param top Topology. - * @param siblingsBytes Serialized collection of split siblings. - * @param siblings Collection of split siblings. - * @param sesAttrsBytes Map of session attributes. - * @param sesAttrs Session attributes. - * @param jobAttrsBytes Job context attributes. - * @param jobAttrs Job attributes. - * @param cpSpi Collision SPI. - * @param clsLdrId Task local class loader id. - * @param depMode Task deployment mode. - * @param dynamicSiblings {@code True} if siblings are dynamic. - * @param ldrParticipants Other node class loader IDs that can also load classes. - * @param forceLocDep {@code True} If remote node should ignore deployment settings. - * @param sesFullSup {@code True} if session attributes are disabled. - * @param internal {@code True} if internal job. - * @param subjId Subject ID. - */ - public GridJobExecuteRequestV2( - IgniteUuid sesId, - IgniteUuid jobId, - String taskName, - String userVer, - String taskClsName, - byte[] jobBytes, - ComputeJob job, - long startTaskTime, - long timeout, - @Nullable Collection<UUID> top, - byte[] siblingsBytes, - Collection<ComputeJobSibling> siblings, - byte[] sesAttrsBytes, - Map<Object, Object> sesAttrs, - byte[] jobAttrsBytes, - Map<? extends Serializable, ? extends Serializable> jobAttrs, - String cpSpi, - IgniteUuid clsLdrId, - IgniteDeploymentMode depMode, - boolean dynamicSiblings, - Map<UUID, IgniteUuid> ldrParticipants, - boolean forceLocDep, - boolean sesFullSup, - boolean internal, - UUID subjId - ) { - super(sesId, jobId, taskName, userVer, taskClsName, jobBytes, job, startTaskTime, timeout, top, siblingsBytes, - siblings, sesAttrsBytes, sesAttrs, jobAttrsBytes, jobAttrs, cpSpi, clsLdrId, depMode, dynamicSiblings, - ldrParticipants, forceLocDep, sesFullSup, internal); - - this.subjId = subjId; - } - - /** {@inheritDoc} */ - @Override public UUID getSubjectId() { - return subjId; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridJobExecuteRequestV2 _clone = new GridJobExecuteRequestV2(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - super.clone0(_msg); - - GridJobExecuteRequestV2 _clone = (GridJobExecuteRequestV2)_msg; - - _clone.subjId = subjId; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 21: - if (!commState.putUuid(subjId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (commState.idx) { - case 21: - UUID subjId0 = commState.getUuid(); - - if (subjId0 == UUID_NOT_READ) - return false; - - subjId = subjId0; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 81; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java index 0c2d9eb..e706979 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryManager.java @@ -75,16 +75,6 @@ public class GridCacheContinuousQueryManager<K, V> extends GridCacheManagerAdapt /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected void onKernalStart0() throws IgniteCheckedException { - if (intLsnrCnt.get() > 0 || lsnrCnt.get() > 0) { - Collection<ClusterNode> nodes = cctx.discovery().cacheNodes(cctx.name(), -1); - - for (ClusterNode n : nodes) { - if (!n.version().greaterThanEqual(6, 2, 0)) - throw new IgniteCheckedException("Rolling update is not supported for continuous queries " + - "for versions below 6.2.0"); - } - } - Iterable<CacheEntryListenerConfiguration<K, V>> lsnrCfgs = cctx.config().getCacheEntryListenerConfigurations(); if (lsnrCfgs != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index ece55f5..416d756 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -57,9 +57,6 @@ public class GridJobProcessor extends GridProcessorAdapter { /** */ private static final int FINISHED_JOBS_COUNT = Integer.getInteger(GG_JOBS_HISTORY_SIZE, 10240); - /** Version when subject ID was added. */ - public static final IgniteProductVersion SUBJECT_ID_ADDED_SINCE_VER = IgniteProductVersion.fromString("6.2.1"); - /** */ private final IgniteMarshaller marsh; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index b032994..88a6691 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1119,58 +1119,32 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { boolean forceLocDep = internal || !ctx.deploy().enabled(); - req = node.version().compareTo(GridJobProcessor.SUBJECT_ID_ADDED_SINCE_VER) < 0 ? - new GridJobExecuteRequest( - ses.getId(), - res.getJobContext().getJobId(), - ses.getTaskName(), - ses.getUserVersion(), - ses.getTaskClassName(), - loc ? null : marsh.marshal(res.getJob()), - loc ? res.getJob() : null, - ses.getStartTime(), - timeout, - ses.getTopology(), - loc ? null : marsh.marshal(ses.getJobSiblings()), - loc ? ses.getJobSiblings() : null, - loc ? null : marsh.marshal(sesAttrs), - loc ? sesAttrs : null, - loc ? null: marsh.marshal(jobAttrs), - loc ? jobAttrs : null, - ses.getCheckpointSpi(), - dep.classLoaderId(), - dep.deployMode(), - continuous, - dep.participants(), - forceLocDep, - ses.isFullSupport(), - internal) : - new GridJobExecuteRequestV2( - ses.getId(), - res.getJobContext().getJobId(), - ses.getTaskName(), - ses.getUserVersion(), - ses.getTaskClassName(), - loc ? null : marsh.marshal(res.getJob()), - loc ? res.getJob() : null, - ses.getStartTime(), - timeout, - ses.getTopology(), - loc ? null : marsh.marshal(ses.getJobSiblings()), - loc ? ses.getJobSiblings() : null, - loc ? null : marsh.marshal(sesAttrs), - loc ? sesAttrs : null, - loc ? null: marsh.marshal(jobAttrs), - loc ? jobAttrs : null, - ses.getCheckpointSpi(), - dep.classLoaderId(), - dep.deployMode(), - continuous, - dep.participants(), - forceLocDep, - ses.isFullSupport(), - internal, - subjId); + req = new GridJobExecuteRequest( + ses.getId(), + res.getJobContext().getJobId(), + ses.getTaskName(), + ses.getUserVersion(), + ses.getTaskClassName(), + loc ? null : marsh.marshal(res.getJob()), + loc ? res.getJob() : null, + ses.getStartTime(), + timeout, + ses.getTopology(), + loc ? null : marsh.marshal(ses.getJobSiblings()), + loc ? ses.getJobSiblings() : null, + loc ? null : marsh.marshal(sesAttrs), + loc ? sesAttrs : null, + loc ? null : marsh.marshal(jobAttrs), + loc ? jobAttrs : null, + ses.getCheckpointSpi(), + dep.classLoaderId(), + dep.deployMode(), + continuous, + dep.participants(), + forceLocDep, + ses.isFullSupport(), + internal, + subjId); if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7acbfeda/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java index 1025843..9ef1b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java @@ -62,9 +62,6 @@ public class GridTcpCommunicationMessageFactory { case 0: return new GridJobCancelRequest(); - case 1: - return new GridJobExecuteRequest(); - case 2: return new GridJobExecuteResponse(); @@ -270,7 +267,7 @@ public class GridTcpCommunicationMessageFactory { return new GridDhtAffinityAssignmentResponse(); case 81: - return new GridJobExecuteRequestV2(); + return new GridJobExecuteRequest(); case 82: return new GridCacheTtlUpdateRequest();