http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java deleted file mode 100644 index 3a55d19..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopPrepareForJobRequest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Child process initialization request. - */ -public class GridHadoopPrepareForJobRequest implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private GridHadoopJobInfo jobInfo; - - /** Total amount of reducers in the job. */ - @GridToStringInclude - private int totalReducersCnt; - - /** Reducers to be executed on current node. */ - @GridToStringInclude - private int[] locReducers; - - /** - * Constructor required by {@link Externalizable}. - */ - public GridHadoopPrepareForJobRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobInfo Job info. - * @param totalReducersCnt Number of reducers in the job. - * @param locReducers Reducers to be executed on current node. - */ - public GridHadoopPrepareForJobRequest(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo, int totalReducersCnt, - int[] locReducers) { - assert jobId != null; - - this.jobId = jobId; - this.jobInfo = jobInfo; - this.totalReducersCnt = totalReducersCnt; - this.locReducers = locReducers; - } - - /** - * @return Job info. - */ - public GridHadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducers to be executed on current node. - */ - public int[] localReducers() { - return locReducers; - } - - /** - * @return Number of reducers in job. - */ - public int totalReducerCount() { - return totalReducersCnt; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - out.writeInt(totalReducersCnt); - - U.writeIntArray(out, locReducers); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - jobId.readExternal(in); - - jobInfo = (GridHadoopJobInfo)in.readObject(); - totalReducersCnt = in.readInt(); - - locReducers = U.readIntArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopPrepareForJobRequest.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java deleted file mode 100644 index 7fc8858..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessDescriptor.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Process descriptor used to identify process for which task is running. - */ -public class GridHadoopProcessDescriptor implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Parent node ID. */ - private UUID parentNodeId; - - /** Process ID. */ - private UUID procId; - - /** Address. */ - private String addr; - - /** TCP port. */ - private int tcpPort; - - /** Shared memory port. */ - private int shmemPort; - - /** - * @param parentNodeId Parent node ID. - * @param procId Process ID. - */ - public GridHadoopProcessDescriptor(UUID parentNodeId, UUID procId) { - this.parentNodeId = parentNodeId; - this.procId = procId; - } - - /** - * Gets process ID. - * - * @return Process ID. - */ - public UUID processId() { - return procId; - } - - /** - * Gets parent node ID. - * - * @return Parent node ID. - */ - public UUID parentNodeId() { - return parentNodeId; - } - - /** - * Gets host address. - * - * @return Host address. - */ - public String address() { - return addr; - } - - /** - * Sets host address. - * - * @param addr Host address. - */ - public void address(String addr) { - this.addr = addr; - } - - /** - * @return Shared memory port. - */ - public int sharedMemoryPort() { - return shmemPort; - } - - /** - * Sets shared memory port. - * - * @param shmemPort Shared memory port. - */ - public void sharedMemoryPort(int shmemPort) { - this.shmemPort = shmemPort; - } - - /** - * @return TCP port. - */ - public int tcpPort() { - return tcpPort; - } - - /** - * Sets TCP port. - * - * @param tcpPort TCP port. - */ - public void tcpPort(int tcpPort) { - this.tcpPort = tcpPort; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridHadoopProcessDescriptor)) - return false; - - GridHadoopProcessDescriptor that = (GridHadoopProcessDescriptor)o; - - return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = parentNodeId.hashCode(); - - result = 31 * result + procId.hashCode(); - - return result; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopProcessDescriptor.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java deleted file mode 100644 index 679da6c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopProcessStartedAck.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Process started message. - */ -public class GridHadoopProcessStartedAck implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopProcessStartedAck.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java deleted file mode 100644 index 9f11e0e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskExecutionRequest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Message sent from node to child process to start task(s) execution. - */ -public class GridHadoopTaskExecutionRequest implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** Job info. */ - @GridToStringInclude - private GridHadoopJobInfo jobInfo; - - /** Mappers. */ - @GridToStringInclude - private Collection<GridHadoopTaskInfo> tasks; - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @param jobId Job ID. - */ - public void jobId(GridHadoopJobId jobId) { - this.jobId = jobId; - } - - /** - * @return Jon info. - */ - public GridHadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * @param jobInfo Job info. - */ - public void jobInfo(GridHadoopJobInfo jobInfo) { - this.jobInfo = jobInfo; - } - - /** - * @return Tasks. - */ - public Collection<GridHadoopTaskInfo> tasks() { - return tasks; - } - - /** - * @param tasks Tasks. - */ - public void tasks(Collection<GridHadoopTaskInfo> tasks) { - this.tasks = tasks; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskExecutionRequest.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobInfo); - U.writeCollection(out, tasks); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - jobId.readExternal(in); - - jobInfo = (GridHadoopJobInfo)in.readObject(); - tasks = U.readCollection(in); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java deleted file mode 100644 index f69abaf..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopTaskFinishedMessage.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Task finished message. Sent when local task finishes execution. - */ -public class GridHadoopTaskFinishedMessage implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Finished task info. */ - private GridHadoopTaskInfo taskInfo; - - /** Task finish status. */ - private GridHadoopTaskStatus status; - - /** - * Constructor required by {@link Externalizable}. - */ - public GridHadoopTaskFinishedMessage() { - // No-op. - } - - /** - * @param taskInfo Finished task info. - * @param status Task finish status. - */ - public GridHadoopTaskFinishedMessage(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status) { - assert taskInfo != null; - assert status != null; - - this.taskInfo = taskInfo; - this.status = status; - } - - /** - * @return Finished task info. - */ - public GridHadoopTaskInfo taskInfo() { - return taskInfo; - } - - /** - * @return Task finish status. - */ - public GridHadoopTaskStatus status() { - return status; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskFinishedMessage.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - taskInfo.writeExternal(out); - status.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - taskInfo = new GridHadoopTaskInfo(); - taskInfo.readExternal(in); - - status = new GridHadoopTaskStatus(); - status.readExternal(in); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index 616d383..10ad648 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -39,7 +39,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; -import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*; +import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*; /** * External process registry. Handles external process lifecycle. @@ -55,7 +55,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private IgniteLogger log; /** Node process descriptor. */ - private GridHadoopProcessDescriptor nodeDesc; + private HadoopProcessDescriptor nodeDesc; /** Output base. */ private File outputBase; @@ -127,7 +127,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) { + @Override public void onJobStateChanged(final HadoopJobMetadata meta) { final HadoopProcess proc = runningProcsByJobId.get(meta.jobId()); // If we have a local process for this job. @@ -156,8 +156,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { "[jobId=" + meta.jobId() + ", meta=" + meta + ']'); } else { - proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() { - @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) { + proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { try { f.get(); @@ -223,9 +223,9 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { final HadoopProcess proc0 = proc; - proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() { + proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) { + IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { if (!busyLock.tryReadLock()) return; @@ -281,7 +281,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { return; } - GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest(); + HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest(); req.jobId(job.id()); req.jobInfo(job.info()); @@ -297,8 +297,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * @return External task metadata. */ - private GridHadoopExternalTaskMetadata buildTaskMeta() { - GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata(); + private HadoopExternalTaskMetadata buildTaskMeta() { + HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata(); meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled", @@ -312,8 +312,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param state Fail state. * @param e Optional error. */ - private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) { - GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e); + private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { + HadoopTaskStatus fail = new HadoopTaskStatus(state, e); for (GridHadoopTaskInfo task : tasks) jobTracker.onTaskFinished(task, fail); @@ -351,7 +351,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } try { - GridHadoopExternalTaskMetadata startMeta = buildTaskMeta(); + HadoopExternalTaskMetadata startMeta = buildTaskMeta(); if (log.isDebugEnabled()) log.debug("Created hadoop child process metadata for job [job=" + job + @@ -404,8 +404,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } }, true); - fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() { - @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) { + fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { try { // Make sure there were no exceptions. f.get(); @@ -493,7 +493,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job. * @return Started process. */ - private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta, + private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta, GridHadoopJob job) throws Exception { String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId; @@ -565,18 +565,18 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param proc Process to send request to. * @param meta Job metadata. */ - private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) { - Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses(); + private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) { + Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses(); int rdcNum = meta.mapReducePlan().reducers(); - GridHadoopProcessDescriptor[] addrs = null; + HadoopProcessDescriptor[] addrs = null; if (rdcAddrs != null && rdcAddrs.size() == rdcNum) { - addrs = new GridHadoopProcessDescriptor[rdcNum]; + addrs = new HadoopProcessDescriptor[rdcNum]; for (int i = 0; i < rdcNum; i++) { - GridHadoopProcessDescriptor desc = rdcAddrs.get(i); + HadoopProcessDescriptor desc = rdcAddrs.get(i); assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']'; @@ -585,7 +585,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } try { - comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs)); + comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs)); } catch (IgniteCheckedException e) { if (!proc.terminated()) { @@ -606,7 +606,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { */ private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) { try { - comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(), + comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), plan.reducers(), plan.reducers(ctx.localNodeId()))); } catch (IgniteCheckedException e) { @@ -623,7 +623,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param desc Remote process descriptor. * @param taskMsg Task finished message. */ - private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) { + private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) { HadoopProcess proc = runningProcsByProcId.get(desc.processId()); if (proc != null) @@ -637,12 +637,12 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { */ private class MessageListener implements GridHadoopMessageListener { /** {@inheritDoc} */ - @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { if (!busyLock.tryReadLock()) return; try { - if (msg instanceof GridHadoopProcessStartedAck) { + if (msg instanceof HadoopProcessStartedAck) { HadoopProcess proc = runningProcsByProcId.get(desc.processId()); assert proc != null : "Missing child process for processId: " + desc; @@ -655,8 +655,8 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { else log.warning("Failed to find process start future (will ignore): " + desc); } - else if (msg instanceof GridHadoopTaskFinishedMessage) { - GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg; + else if (msg instanceof HadoopTaskFinishedMessage) { + HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg; processTaskFinishedMessage(desc, taskMsg); } @@ -669,7 +669,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) { + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { if (!busyLock.tryReadLock()) return; @@ -689,7 +689,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { if (!F.isEmpty(tasks)) { log.warning("Lost connection with alive process (will terminate): " + desc); - GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED, + HadoopTaskStatus status = new HadoopTaskStatus(CRASHED, new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc)); for (GridHadoopTaskInfo info : tasks) @@ -725,7 +725,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private final GridHadoopProcessFuture initFut; /** Process descriptor. */ - private GridHadoopProcessDescriptor procDesc; + private HadoopProcessDescriptor procDesc; /** Reducers planned for this process. */ private Collection<Integer> reducers; @@ -756,7 +756,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * @return Communication process descriptor. */ - private GridHadoopProcessDescriptor descriptor() { + private HadoopProcessDescriptor descriptor() { return procDesc; } @@ -773,7 +773,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param proc Java process representation. * @param procDesc Process descriptor. */ - private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) { + private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) { this.proc = proc; this.procDesc = procDesc; } @@ -789,9 +789,9 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { terminated = true; if (!initFut.isDone()) - initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() { + initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { @Override public void apply( - IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) { + IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { proc.destroy(); } }); @@ -852,7 +852,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * */ - private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> { + private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { /** */ private static final long serialVersionUID = 0L; @@ -863,7 +863,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private GridHadoopJobId jobId; /** Process descriptor. */ - private GridHadoopProcessDescriptor desc; + private HadoopProcessDescriptor desc; /** Running process. */ private Process proc; @@ -909,7 +909,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * Reply received callback. */ - public void onReplyReceived(GridHadoopProcessDescriptor desc) { + public void onReplyReceived(HadoopProcessDescriptor desc) { assert childProcId.equals(desc.processId()); this.desc = desc; @@ -921,7 +921,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res, + @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res, @Nullable Throwable err) { if (err == null) { HadoopProcess proc = runningProcsByProcId.get(childProcId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java new file mode 100644 index 0000000..f0acc9f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * External task metadata (classpath, JVM options) needed to start external process execution. + */ +public class HadoopExternalTaskMetadata { + /** Process classpath. */ + private Collection<String> classpath; + + /** JVM options. */ + @GridToStringInclude + private Collection<String> jvmOpts; + + /** + * @return JVM Options. + */ + public Collection<String> jvmOptions() { + return jvmOpts; + } + + /** + * @param jvmOpts JVM options. + */ + public void jvmOptions(Collection<String> jvmOpts) { + this.jvmOpts = jvmOpts; + } + + /** + * @return Classpath. + */ + public Collection<String> classpath() { + return classpath; + } + + /** + * @param classpath Classpath. + */ + public void classpath(Collection<String> classpath) { + this.classpath = classpath; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopExternalTaskMetadata.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java new file mode 100644 index 0000000..1258819 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Job info update request. + */ +public class HadoopJobInfoUpdateRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private GridHadoopJobId jobId; + + /** Job phase. */ + @GridToStringInclude + private GridHadoopJobPhase jobPhase; + + /** Reducers addresses. */ + @GridToStringInclude + private HadoopProcessDescriptor[] reducersAddrs; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopJobInfoUpdateRequest() { + // No-op. + } + + /** + * @param jobId Job ID. + * @param jobPhase Job phase. + * @param reducersAddrs Reducers addresses. + */ + public HadoopJobInfoUpdateRequest(GridHadoopJobId jobId, GridHadoopJobPhase jobPhase, + HadoopProcessDescriptor[] reducersAddrs) { + assert jobId != null; + + this.jobId = jobId; + this.jobPhase = jobPhase; + this.reducersAddrs = reducersAddrs; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @return Job phase. + */ + public GridHadoopJobPhase jobPhase() { + return jobPhase; + } + + /** + * @return Reducers addresses. + */ + public HadoopProcessDescriptor[] reducersAddresses() { + return reducersAddrs; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobPhase); + U.writeArray(out, reducersAddrs); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new GridHadoopJobId(); + jobId.readExternal(in); + + jobPhase = (GridHadoopJobPhase)in.readObject(); + reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopJobInfoUpdateRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java new file mode 100644 index 0000000..4037b26 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Child process initialization request. + */ +public class HadoopPrepareForJobRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private GridHadoopJobId jobId; + + /** Job info. */ + @GridToStringInclude + private GridHadoopJobInfo jobInfo; + + /** Total amount of reducers in the job. */ + @GridToStringInclude + private int totalReducersCnt; + + /** Reducers to be executed on current node. */ + @GridToStringInclude + private int[] locReducers; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopPrepareForJobRequest() { + // No-op. + } + + /** + * @param jobId Job ID. + * @param jobInfo Job info. + * @param totalReducersCnt Number of reducers in the job. + * @param locReducers Reducers to be executed on current node. + */ + public HadoopPrepareForJobRequest(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo, int totalReducersCnt, + int[] locReducers) { + assert jobId != null; + + this.jobId = jobId; + this.jobInfo = jobInfo; + this.totalReducersCnt = totalReducersCnt; + this.locReducers = locReducers; + } + + /** + * @return Job info. + */ + public GridHadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducers to be executed on current node. + */ + public int[] localReducers() { + return locReducers; + } + + /** + * @return Number of reducers in job. + */ + public int totalReducerCount() { + return totalReducersCnt; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobInfo); + out.writeInt(totalReducersCnt); + + U.writeIntArray(out, locReducers); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new GridHadoopJobId(); + jobId.readExternal(in); + + jobInfo = (GridHadoopJobInfo)in.readObject(); + totalReducersCnt = in.readInt(); + + locReducers = U.readIntArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopPrepareForJobRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java new file mode 100644 index 0000000..dea73c3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Process descriptor used to identify process for which task is running. + */ +public class HadoopProcessDescriptor implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Parent node ID. */ + private UUID parentNodeId; + + /** Process ID. */ + private UUID procId; + + /** Address. */ + private String addr; + + /** TCP port. */ + private int tcpPort; + + /** Shared memory port. */ + private int shmemPort; + + /** + * @param parentNodeId Parent node ID. + * @param procId Process ID. + */ + public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) { + this.parentNodeId = parentNodeId; + this.procId = procId; + } + + /** + * Gets process ID. + * + * @return Process ID. + */ + public UUID processId() { + return procId; + } + + /** + * Gets parent node ID. + * + * @return Parent node ID. + */ + public UUID parentNodeId() { + return parentNodeId; + } + + /** + * Gets host address. + * + * @return Host address. + */ + public String address() { + return addr; + } + + /** + * Sets host address. + * + * @param addr Host address. + */ + public void address(String addr) { + this.addr = addr; + } + + /** + * @return Shared memory port. + */ + public int sharedMemoryPort() { + return shmemPort; + } + + /** + * Sets shared memory port. + * + * @param shmemPort Shared memory port. + */ + public void sharedMemoryPort(int shmemPort) { + this.shmemPort = shmemPort; + } + + /** + * @return TCP port. + */ + public int tcpPort() { + return tcpPort; + } + + /** + * Sets TCP port. + * + * @param tcpPort TCP port. + */ + public void tcpPort(int tcpPort) { + this.tcpPort = tcpPort; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopProcessDescriptor)) + return false; + + HadoopProcessDescriptor that = (HadoopProcessDescriptor)o; + + return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = parentNodeId.hashCode(); + + result = 31 * result + procId.hashCode(); + + return result; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessDescriptor.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java new file mode 100644 index 0000000..49ff4bf --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Process started message. + */ +public class HadoopProcessStartedAck implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessStartedAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java new file mode 100644 index 0000000..edf1840 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Message sent from node to child process to start task(s) execution. + */ +public class HadoopTaskExecutionRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private GridHadoopJobId jobId; + + /** Job info. */ + @GridToStringInclude + private GridHadoopJobInfo jobInfo; + + /** Mappers. */ + @GridToStringInclude + private Collection<GridHadoopTaskInfo> tasks; + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @param jobId Job ID. + */ + public void jobId(GridHadoopJobId jobId) { + this.jobId = jobId; + } + + /** + * @return Jon info. + */ + public GridHadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * @param jobInfo Job info. + */ + public void jobInfo(GridHadoopJobInfo jobInfo) { + this.jobInfo = jobInfo; + } + + /** + * @return Tasks. + */ + public Collection<GridHadoopTaskInfo> tasks() { + return tasks; + } + + /** + * @param tasks Tasks. + */ + public void tasks(Collection<GridHadoopTaskInfo> tasks) { + this.tasks = tasks; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskExecutionRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobInfo); + U.writeCollection(out, tasks); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new GridHadoopJobId(); + jobId.readExternal(in); + + jobInfo = (GridHadoopJobInfo)in.readObject(); + tasks = U.readCollection(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java new file mode 100644 index 0000000..a516f6b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Task finished message. Sent when local task finishes execution. + */ +public class HadoopTaskFinishedMessage implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Finished task info. */ + private GridHadoopTaskInfo taskInfo; + + /** Task finish status. */ + private HadoopTaskStatus status; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopTaskFinishedMessage() { + // No-op. + } + + /** + * @param taskInfo Finished task info. + * @param status Task finish status. + */ + public HadoopTaskFinishedMessage(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status) { + assert taskInfo != null; + assert status != null; + + this.taskInfo = taskInfo; + this.status = status; + } + + /** + * @return Finished task info. + */ + public GridHadoopTaskInfo taskInfo() { + return taskInfo; + } + + /** + * @return Task finish status. + */ + public HadoopTaskStatus status() { + return status; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskFinishedMessage.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + taskInfo.writeExternal(out); + status.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + taskInfo = new GridHadoopTaskInfo(); + taskInfo.readExternal(in); + + status = new HadoopTaskStatus(); + status.readExternal(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java index 2d00222..21552e2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java @@ -42,13 +42,13 @@ import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public class GridHadoopChildProcessRunner { /** Node process descriptor. */ - private GridHadoopProcessDescriptor nodeDesc; + private HadoopProcessDescriptor nodeDesc; /** Message processing executor service. */ private ExecutorService msgExecSvc; /** Task executor service. */ - private GridHadoopExecutorService execSvc; + private HadoopExecutorService execSvc; /** */ protected GridUnsafeMemory mem = new GridUnsafeMemory(0); @@ -75,7 +75,7 @@ public class GridHadoopChildProcessRunner { private final AtomicInteger pendingTasks = new AtomicInteger(); /** Shuffle job. */ - private GridHadoopShuffleJob<GridHadoopProcessDescriptor> shuffleJob; + private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; /** Concurrent mappers. */ private int concMappers; @@ -86,7 +86,7 @@ public class GridHadoopChildProcessRunner { /** * Starts child process runner. */ - public void start(GridHadoopExternalCommunication comm, GridHadoopProcessDescriptor nodeDesc, + public void start(GridHadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, ExecutorService msgExecSvc, IgniteLogger parentLog) throws IgniteCheckedException { this.comm = comm; @@ -99,7 +99,7 @@ public class GridHadoopChildProcessRunner { startTime = U.currentTimeMillis(); // At this point node knows that this process has started. - comm.sendMessage(this.nodeDesc, new GridHadoopProcessStartedAck()); + comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); } /** @@ -107,7 +107,7 @@ public class GridHadoopChildProcessRunner { * * @param req Initialization request. */ - private void prepareProcess(GridHadoopPrepareForJobRequest req) { + private void prepareProcess(HadoopPrepareForJobRequest req) { if (initGuard.compareAndSet(false, true)) { try { if (log.isDebugEnabled()) @@ -119,7 +119,7 @@ public class GridHadoopChildProcessRunner { job.initialize(true, nodeDesc.processId()); - shuffleJob = new GridHadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, + shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, req.totalReducerCount(), req.localReducers()); initializeExecutors(req); @@ -143,7 +143,7 @@ public class GridHadoopChildProcessRunner { /** * @param req Task execution request. */ - private void runTasks(final GridHadoopTaskExecutionRequest req) { + private void runTasks(final HadoopTaskExecutionRequest req) { if (!initFut.isDone() && log.isDebugEnabled()) log.debug("Will wait for process initialization future completion: " + req); @@ -175,7 +175,7 @@ public class GridHadoopChildProcessRunner { log.debug("Submitted task for external execution: " + taskInfo); execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { - @Override protected void onTaskFinished(GridHadoopTaskStatus status) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { onTaskFinished0(this, status); } @@ -193,7 +193,7 @@ public class GridHadoopChildProcessRunner { } catch (IgniteCheckedException e) { for (GridHadoopTaskInfo info : req.tasks()) - notifyTaskFinished(info, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); + notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); } } }); @@ -204,13 +204,13 @@ public class GridHadoopChildProcessRunner { * * @param req Init child process request. */ - private void initializeExecutors(GridHadoopPrepareForJobRequest req) { + private void initializeExecutors(HadoopPrepareForJobRequest req) { int cpus = Runtime.getRuntime().availableProcessors(); // // concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); // concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); - execSvc = new GridHadoopExecutorService(log, "", cpus * 2, 1024); + execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); } /** @@ -218,7 +218,7 @@ public class GridHadoopChildProcessRunner { * * @param req Update request. */ - private void updateTasks(final GridHadoopJobInfoUpdateRequest req) { + private void updateTasks(final HadoopJobInfoUpdateRequest req) { initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> gridFut) { assert initGuard.get(); @@ -228,9 +228,9 @@ public class GridHadoopChildProcessRunner { if (req.reducersAddresses() != null) { if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { shuffleJob.startSending("external", - new IgniteInClosure2X<GridHadoopProcessDescriptor, GridHadoopShuffleMessage>() { - @Override public void applyx(GridHadoopProcessDescriptor dest, - GridHadoopShuffleMessage msg) throws IgniteCheckedException { + new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { + @Override public void applyx(HadoopProcessDescriptor dest, + HadoopShuffleMessage msg) throws IgniteCheckedException { comm.sendMessage(dest, msg); } }); @@ -264,7 +264,7 @@ public class GridHadoopChildProcessRunner { * @param run Finished task runnable. * @param status Task status. */ - private void onTaskFinished0(GridHadoopRunnableTask run, GridHadoopTaskStatus status) { + private void onTaskFinished0(GridHadoopRunnableTask run, HadoopTaskStatus status) { GridHadoopTaskInfo info = run.taskInfo(); int pendingTasks0 = pendingTasks.decrementAndGet(); @@ -286,10 +286,10 @@ public class GridHadoopChildProcessRunner { * @param taskInfo Finished task info. * @param status Task status. */ - private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final GridHadoopTaskStatus status, + private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status, boolean flush) { - final GridHadoopTaskState state = status.state(); + final HadoopTaskState state = status.state(); final Throwable err = status.failCause(); if (!flush) { @@ -298,7 +298,7 @@ public class GridHadoopChildProcessRunner { log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + ", err=" + err + ']'); - comm.sendMessage(nodeDesc, new GridHadoopTaskFinishedMessage(taskInfo, status)); + comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); } catch (IgniteCheckedException e) { log.error("Failed to send message to parent node (will terminate child process).", e); @@ -335,7 +335,7 @@ public class GridHadoopChildProcessRunner { ", state=" + state + ", err=" + err + ']', e); notifyTaskFinished(taskInfo, - new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); + new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); } } }); @@ -344,7 +344,7 @@ public class GridHadoopChildProcessRunner { log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + ", state=" + state + ", err=" + err + ']', e); - notifyTaskFinished(taskInfo, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); + notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); } } } @@ -356,7 +356,7 @@ public class GridHadoopChildProcessRunner { * @param msg Received message. * @return {@code True} if received from parent node. */ - private boolean validateNodeMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { + private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { if (!nodeDesc.processId().equals(desc.processId())) { log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + ", msg=" + msg + ']'); @@ -379,31 +379,31 @@ public class GridHadoopChildProcessRunner { */ private class MessageListener implements GridHadoopMessageListener { /** {@inheritDoc} */ - @Override public void onMessageReceived(final GridHadoopProcessDescriptor desc, final GridHadoopMessage msg) { - if (msg instanceof GridHadoopTaskExecutionRequest) { + @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { + if (msg instanceof HadoopTaskExecutionRequest) { if (validateNodeMessage(desc, msg)) - runTasks((GridHadoopTaskExecutionRequest)msg); + runTasks((HadoopTaskExecutionRequest)msg); } - else if (msg instanceof GridHadoopJobInfoUpdateRequest) { + else if (msg instanceof HadoopJobInfoUpdateRequest) { if (validateNodeMessage(desc, msg)) - updateTasks((GridHadoopJobInfoUpdateRequest)msg); + updateTasks((HadoopJobInfoUpdateRequest)msg); } - else if (msg instanceof GridHadoopPrepareForJobRequest) { + else if (msg instanceof HadoopPrepareForJobRequest) { if (validateNodeMessage(desc, msg)) - prepareProcess((GridHadoopPrepareForJobRequest)msg); + prepareProcess((HadoopPrepareForJobRequest)msg); } - else if (msg instanceof GridHadoopShuffleMessage) { + else if (msg instanceof HadoopShuffleMessage) { if (log.isTraceEnabled()) log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { try { - GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; shuffleJob.onShuffleMessage(m); - comm.sendMessage(desc, new GridHadoopShuffleAck(m.id(), m.jobId())); + comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); } catch (IgniteCheckedException e) { U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); @@ -411,18 +411,18 @@ public class GridHadoopChildProcessRunner { } }); } - else if (msg instanceof GridHadoopShuffleAck) { + else if (msg instanceof HadoopShuffleAck) { if (log.isTraceEnabled()) log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); - shuffleJob.onShuffleAck((GridHadoopShuffleAck)msg); + shuffleJob.onShuffleAck((HadoopShuffleAck)msg); } else log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); } /** {@inheritDoc} */ - @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) { + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { if (log.isDebugEnabled()) log.debug("Lost connection with remote process: " + desc); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java index 5aeeeee..1216c9a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java @@ -98,7 +98,7 @@ public class GridHadoopExternalProcessStarter { comm.start(); - GridHadoopProcessDescriptor nodeDesc = new GridHadoopProcessDescriptor(args.nodeId, args.parentProcId); + HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId); nodeDesc.address(args.addr); nodeDesc.tcpPort(args.tcpPort); nodeDesc.sharedMemoryPort(args.shmemPort); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java index b375b55..f4eb41a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java @@ -68,5 +68,5 @@ public interface GridHadoopCommunicationClient { * @param msg Message to send. * @throws IgniteCheckedException If failed. */ - public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws IgniteCheckedException; + public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java index f5ddced..937e245 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java @@ -87,10 +87,10 @@ public class GridHadoopExternalCommunication { public static final boolean DFLT_TCP_NODELAY = true; /** Server listener. */ - private final GridNioServerListener<GridHadoopMessage> srvLsnr = - new GridNioServerListenerAdapter<GridHadoopMessage>() { + private final GridNioServerListener<HadoopMessage> srvLsnr = + new GridNioServerListenerAdapter<HadoopMessage>() { @Override public void onConnected(GridNioSession ses) { - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); + HadoopProcessDescriptor desc = ses.meta(PROCESS_META); assert desc != null : "Received connected notification without finished handshake: " + ses; } @@ -103,7 +103,7 @@ public class GridHadoopExternalCommunication { if (e != null) U.error(log, "Session disconnected due to exception: " + ses, e); - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); + HadoopProcessDescriptor desc = ses.meta(PROCESS_META); if (desc != null) { GridHadoopCommunicationClient rmv = clients.remove(desc.processId()); @@ -120,8 +120,8 @@ public class GridHadoopExternalCommunication { } /** {@inheritDoc} */ - @Override public void onMessage(GridNioSession ses, GridHadoopMessage msg) { - notifyListener(ses.<GridHadoopProcessDescriptor>meta(PROCESS_META), msg); + @Override public void onMessage(GridNioSession ses, HadoopMessage msg) { + notifyListener(ses.<HadoopProcessDescriptor>meta(PROCESS_META), msg); if (msgQueueLimit > 0) { GridNioMessageTracker tracker = ses.meta(TRACKER_META); @@ -137,7 +137,7 @@ public class GridHadoopExternalCommunication { private IgniteLogger log; /** Local process descriptor. */ - private GridHadoopProcessDescriptor locProcDesc; + private HadoopProcessDescriptor locProcDesc; /** Marshaller. */ private Marshaller marsh; @@ -183,7 +183,7 @@ public class GridHadoopExternalCommunication { private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT; /** NIO server. */ - private GridNioServer<GridHadoopMessage> nioSrvr; + private GridNioServer<HadoopMessage> nioSrvr; /** Shared memory server. */ private IpcSharedMemoryServerEndpoint shmemSrv; @@ -234,7 +234,7 @@ public class GridHadoopExternalCommunication { ExecutorService execSvc, String gridName ) { - locProcDesc = new GridHadoopProcessDescriptor(parentNodeId, procId); + locProcDesc = new HadoopProcessDescriptor(parentNodeId, procId); this.marsh = marsh; this.log = log.getLogger(GridHadoopExternalCommunication.class); @@ -563,7 +563,7 @@ public class GridHadoopExternalCommunication { * * @return Local process descriptor. */ - public GridHadoopProcessDescriptor localProcessDescriptor() { + public HadoopProcessDescriptor localProcessDescriptor() { return locProcDesc; } @@ -587,7 +587,7 @@ public class GridHadoopExternalCommunication { * @return Server instance. * @throws IgniteCheckedException Thrown if it's not possible to create server. */ - private GridNioServer<GridHadoopMessage> resetNioServer() throws IgniteCheckedException { + private GridNioServer<HadoopMessage> resetNioServer() throws IgniteCheckedException { if (boundTcpPort >= 0) throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort); @@ -596,8 +596,8 @@ public class GridHadoopExternalCommunication { // If configured TCP port is busy, find first available in range. for (int port = locPort; port < locPort + locPortRange; port++) { try { - GridNioServer<GridHadoopMessage> srvr = - GridNioServer.<GridHadoopMessage>builder() + GridNioServer<HadoopMessage> srvr = + GridNioServer.<HadoopMessage>builder() .address(locHost) .port(port) .listener(srvLsnr) @@ -722,7 +722,7 @@ public class GridHadoopExternalCommunication { * @param msg * @throws IgniteCheckedException */ - public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws + public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) throws IgniteCheckedException { assert desc != null; assert msg != null; @@ -761,7 +761,7 @@ public class GridHadoopExternalCommunication { * @return The existing or just created client. * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridHadoopCommunicationClient reserveClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException { + private GridHadoopCommunicationClient reserveClient(HadoopProcessDescriptor desc) throws IgniteCheckedException { assert desc != null; UUID procId = desc.processId(); @@ -806,7 +806,7 @@ public class GridHadoopExternalCommunication { * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridHadoopCommunicationClient createNioClient(GridHadoopProcessDescriptor desc) + @Nullable protected GridHadoopCommunicationClient createNioClient(HadoopProcessDescriptor desc) throws IgniteCheckedException { assert desc != null; @@ -837,7 +837,7 @@ public class GridHadoopExternalCommunication { * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridHadoopCommunicationClient createShmemClient(GridHadoopProcessDescriptor desc, int port) + @Nullable protected GridHadoopCommunicationClient createShmemClient(HadoopProcessDescriptor desc, int port) throws IgniteCheckedException { int attempt = 1; @@ -929,7 +929,7 @@ public class GridHadoopExternalCommunication { * @return Client. * @throws IgniteCheckedException If failed. */ - protected GridHadoopCommunicationClient createTcpClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException { + protected GridHadoopCommunicationClient createTcpClient(HadoopProcessDescriptor desc) throws IgniteCheckedException { String addr = desc.address(); int port = desc.tcpPort(); @@ -1066,7 +1066,7 @@ public class GridHadoopExternalCommunication { * @param desc Sender process descriptor. * @param msg Communication message. */ - protected void notifyListener(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { + protected void notifyListener(HadoopProcessDescriptor desc, HadoopMessage msg) { GridHadoopMessageListener lsnr = this.lsnr; if (lsnr != null) @@ -1135,7 +1135,7 @@ public class GridHadoopExternalCommunication { private final IpcEndpoint endpoint; /** Adapter. */ - private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter; + private GridHadoopIpcToNioAdapter<HadoopMessage> adapter; /** * @param endpoint Endpoint. @@ -1279,7 +1279,7 @@ public class GridHadoopExternalCommunication { /** {@inheritDoc} */ @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException { - GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META); + HadoopProcessDescriptor desc = ses.meta(PROCESS_META); UUID rmtProcId = desc == null ? null : desc.processId(); @@ -1387,12 +1387,12 @@ public class GridHadoopExternalCommunication { * Process ID message. */ @SuppressWarnings("PublicInnerClass") - public static class ProcessHandshakeMessage implements GridHadoopMessage { + public static class ProcessHandshakeMessage implements HadoopMessage { /** */ private static final long serialVersionUID = 0L; /** Node ID. */ - private GridHadoopProcessDescriptor procDesc; + private HadoopProcessDescriptor procDesc; /** */ public ProcessHandshakeMessage() { @@ -1402,14 +1402,14 @@ public class GridHadoopExternalCommunication { /** * @param procDesc Process descriptor. */ - private ProcessHandshakeMessage(GridHadoopProcessDescriptor procDesc) { + private ProcessHandshakeMessage(HadoopProcessDescriptor procDesc) { this.procDesc = procDesc; } /** * @return Process ID. */ - public GridHadoopProcessDescriptor processDescriptor() { + public HadoopProcessDescriptor processDescriptor() { return procDesc; } @@ -1420,7 +1420,7 @@ public class GridHadoopExternalCommunication { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - procDesc = (GridHadoopProcessDescriptor)in.readObject(); + procDesc = (HadoopProcessDescriptor)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java index 2a25357..e9dfc92 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMarshallerFilter.java @@ -55,7 +55,7 @@ public class GridHadoopMarshallerFilter extends GridNioFilterAdapter { /** {@inheritDoc} */ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - assert msg instanceof GridHadoopMessage : "Invalid message type: " + msg; + assert msg instanceof HadoopMessage : "Invalid message type: " + msg; return proceedSessionWrite(ses, marshaller.marshal(msg)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java index 219f4db..6010a8d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopMessageListener.java @@ -28,12 +28,12 @@ public interface GridHadoopMessageListener { * @param desc Process descriptor. * @param msg Hadoop message. */ - public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg); + public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg); /** * Called when connection to remote process was lost. * * @param desc Process descriptor. */ - public void onConnectionLost(GridHadoopProcessDescriptor desc); + public void onConnectionLost(HadoopProcessDescriptor desc); }